1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
22 #include "parse_fta.h"
23 #include "parse_schema.h"
24 #include "analyze_fta.h"
25 #include "generate_utils.h"
26 #include "query_plan.h"
27 #include "generate_lfta_code.h"
28 #include "generate_nic_code.h"
32 extern int DEFAULT_LFTA_HASH_TABLE_SIZE;
34 // default value for correlation between the interface card and
36 #define DEFAULT_TIME_CORR 16
41 extern string hash_nums[NRANDS];
44 "12916008961267169387ull", "13447227858232756685ull",
45 "15651770379918602919ull", "1154671861688431608ull",
46 "6777078091984849858ull", "14217205709582564356ull",
47 "4955408621820609982ull", "15813680319165523695ull",
48 "9897969721407807129ull", "5799700135519793083ull",
49 "3446529189623437397ull", "2766403683465910630ull",
50 "3759321430908793328ull", "6569396511892890354ull",
51 "11124853911180290924ull", "17425412145238035549ull",
52 "6879931585355039943ull", "16598635011539670441ull",
53 "9615975578494811651ull", "4378135509538422740ull",
54 "741282195344332574ull", "17368612862906255584ull",
55 "17294299200556814618ull", "518343398779663051ull",
56 "3861893449302272757ull", "8951107288843549591ull",
57 "15785139392894559409ull", "5917810836789601602ull",
58 "16169988133001117004ull", "9792861259254509262ull",
59 "5089058010244872136ull", "2130075224835397689ull",
60 "844136788226150435ull", "1303298091153875333ull",
61 "3579898206894361183ull", "7529542662845336496ull",
62 "13151949992653382522ull", "2145333536541545660ull",
63 "11258221828939586934ull", "3741808146124570279ull",
64 "16272841626371307089ull", "12174572036188391283ull",
65 "9749343496254107661ull", "9141275584134508830ull",
66 "10134192232065698216ull", "12944268412561423018ull",
67 "17499725811865666340ull", "5281482378159088661ull",
68 "13254803486023572607ull", "4526762838498717025ull",
69 "15990846379668494011ull", "10680949816169027468ull",
70 "7116154096012931030ull", "5296740689865236632ull",
71 "5222427027515795922ull", "6893215299448261251ull",
72 "10164707755932877485ull", "15325979189512082255ull",
73 "3713267224148573289ull", "12292682741753167354ull",
74 "4098115959960163588ull", "16095675565885113990ull",
75 "11391590846210510720ull", "8432889531466002673ull",
76 "7146668520368482523ull", "7678169991822407997ull",
77 "9882712513525031447ull", "13904414563513869160ull",
78 "1080076724395768626ull", "8448147843172150388ull",
79 "17633093729608185134ull", "10044622457050142303ull",
80 "4128911859292425737ull", "30642269109444395ull",
81 "16124215396922640581ull", "15444089895060081110ull",
82 "16437006538696302944ull", "800338649777443426ull",
83 "5355794945275091932ull", "11656354278827687117ull",
84 "1110873718944691255ull", "10829576045617693977ull",
85 "3846916616884579955ull", "17055821716837625668ull",
86 "13418968402643535758ull", "11671612594828802128ull",
87 "11597298928184328586ull", "13196028510862205499ull",
88 "16539578557089782373ull", "3182048322921507591ull",
89 "10016080431267550241ull", "148751875162592690ull",
90 "10400930266590768572ull", "4023803397139127870ull",
91 "17766462746879108920ull", "14807761432134600873ull",
92 "13521540421053792403ull", "13980983198941385205ull",
93 "16257584414193564367ull", "1760484796451765024ull"
98 // ----------------------------------------------
99 // Data extracted from the query plan node
100 // for use by code generation.
102 static cplx_lit_table *complex_literals; //Table of literals with constructors.
103 static vector<handle_param_tbl_entry *> param_handle_table;
104 static param_table *param_tbl; // Table of all referenced parameters.
106 static vector<scalarexp_t *> sl_list;
107 static vector<cnf_elem *> where;
109 static gb_table *gb_tbl; // Table of all group-by attributes.
110 static aggregate_table *aggr_tbl; // Table of all referenced aggregates.
112 static bool packed_return; // unpack using structyure, not fcns
113 static nic_property *nicprop; // nic properties for this interface.
114 static int global_id;
117 // The partial_fcns vector can now refer to
118 // partial functions, or expensive functions
119 // which can be cached (if there are multiple refs). A couple
120 // of int vectors distinguish the cases.
121 static vector<scalarexp_t *> partial_fcns;
122 static vector<int> fcn_ref_cnt;
123 static vector<bool> is_partial_fcn;
124 int sl_fcns_start = 0, sl_fcns_end = 0;
125 int wh_fcns_start = 0, wh_fcns_end = 0;
126 int gb_fcns_start = 0, gb_fcns_end = 0;
127 int ag_fcns_start = 0, ag_fcns_end = 0;
130 // These vectors are for combinable predicates.
131 static vector<int> pred_class; // identifies the group
132 static vector<int> pred_pos; // position in the group.
136 static char tmpstr[1000];
138 //////////////////////////////////////////////////////////////////////
139 /// Various utilities
141 string generate_fta_name(string node_name){
142 string ret = normalize_name(node_name);
152 string generate_aggr_struct_name(string node_name){
153 string ret = normalize_name(node_name);
157 ret += "_aggr_struct";
162 string generate_fj_struct_name(string node_name){
163 string ret = normalize_name(node_name);
172 string generate_unpack_code(int tblref, int schref, string field, table_list *schema, string node_name, string end_goto = string("end")){
175 sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n",
176 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
178 if(!schema->get_modifier_list(schref,field)->contains_key("required"))
179 ret += "\tif(retval) goto "+end_goto+";\n";
182 // TODO: ntoh xforms (aug 2010 : removing ntoh, hton)
183 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
184 if(dt.is_buffer_type()){
185 if(dt.get_type() != v_str_t){
186 ret += "\tif(sizeof(struct "+node_name+"_input_struct)+"+node_name+"_input_struct_var->unpack_var_"+field+".length+int("+node_name+"_input_struct_var->unpack_var_"+field+".data) > sz)\n";
187 ret += "\t\tgoto "+end_goto+";\n";
188 ret+= "\t\t"+node_name+"_input_struct_var->unpack_var_"+field+".data += "+node_name+"_input_struct_var->unpack_var_"+field+".length;\n";
189 ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
190 " = "+node_name+"_input_struct_var->unpack_var_"+field+";+\n";
192 fprintf(stderr,"INTERNAL ERROR buffer type not string type in generate_lfta_code.cc:generate_unpack_code\n");
196 ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
197 " = "+node_name+"_input_struct_var->unpack_var_"+field+";\n";
203 string generate_aggr_struct(string node_name, gb_table *gb_tbl, aggregate_table *aggr_tbl){
204 string ret = "struct " + generate_aggr_struct_name(node_name) + "{\n";
207 for(g=0;g<gb_tbl->size();g++){
208 sprintf(tmpstr,"gb_var%d",g);
209 ret += "\t"+gb_tbl->get_data_type(g)->make_cvar(tmpstr)+";\n";
213 for(a=0;a<aggr_tbl->size();a++){
215 sprintf(tmpstr,"aggr_var%d",a);
216 if(aggr_tbl->is_builtin(a))
217 ret+="\t"+aggr_tbl->get_data_type(a)->make_cvar(tmpstr)+";\n";
219 ret+="\t"+aggr_tbl->get_storage_type(a)->make_cvar(tmpstr)+";\n";
223 ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *next;\n";
232 string generate_fj_struct(filter_join_qpn *fs, string node_name ){
235 if(fs->use_bloom == false){ // uses hash table instead
236 ret = "struct " + generate_fj_struct_name(node_name) + "{\n";
238 for(k=0;k<fs->hash_eq.size();++k){
239 sprintf(tmpstr,"key_var%d",k);
240 ret += "\t"+fs->hash_eq[k]->pr->get_left_se()->get_data_type()->make_cvar(tmpstr)+";\n";
242 ret += "\tlong long int ts;\n";
252 string generate_fta_struct(string node_name, gb_table *gb_tbl,
253 aggregate_table *aggr_tbl, param_table *param_tbl,
254 cplx_lit_table *complex_literals,
255 vector<handle_param_tbl_entry *> ¶m_handle_table,
256 bool is_aggr_query, bool is_fj, bool uses_bloom,
259 string ret = "struct " + generate_fta_name(node_name) + "{\n";
260 ret += "\tstruct FTA f;\n";
262 //-------------------------------------------------------------
263 // Aggregate-specific fields
267 ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_head, *flush_head;\n";
269 ret+="\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_table; // the groups\n";
270 ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n";
271 // ret+="\tint bitmap_size;\n";
272 ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n";
273 ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n";
274 ret += "\tint max_windows; // max number of open windows.\n";
275 ret += "\tunsigned int generation; // initially zero, increment on\n";
276 ret += "\t // every hash table flush - whether regular or induced.\n";
277 ret += "\t // Old groups are identified by a generation mismatch.\n";
278 ret += "\tunsigned int flush_pos; // next aggr_table entry to examine\n";
279 ret += "\tunsigned int flush_ctr; // control slow flushing\n";
284 bool uses_temporal_flush = false;
285 for(g=0;g<gb_tbl->size();g++){
286 data_type *dt = gb_tbl->get_data_type(g);
287 if(dt->is_temporal()){
289 fprintf(stderr,"group by attribute %s is temporal, ",
290 gb_tbl->get_name(g).c_str());
291 if(dt->is_increasing()){
292 fprintf(stderr,"increasing.\n");
294 fprintf(stderr,"decreasing.\n");
297 data_type *gdt = gb_tbl->get_data_type(g);
298 if(gdt->is_buffer_type()){
299 fprintf(stderr, "\t but temporal BUFFER types are not supported, skipping.\n");
301 sprintf(tmpstr,"\t%s last_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
303 sprintf(tmpstr,"\t%s flush_start_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
305 sprintf(tmpstr,"\t%s last_flushed_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
307 uses_temporal_flush = true;
313 if(! uses_temporal_flush){
314 fprintf(stderr,"Warning: no temporal flush.\n");
318 // ---------------------------------------------------------
319 // Filter-join specific fields
324 "\tunsigned char * bf_table; //array of bloom filters with layout \n"
325 "\t\t// bit 0 bf 0| bit 0 bf 1| bit 0 bf 2| bit 1 bf 0| bit 1 bf 1|.....\n"
326 "\tint first_exec;\n"
327 "\tlong long int last_bin;\n"
328 "\tint last_bloom_pos;\n"
331 }else{ // limited hash table
333 " struct "+generate_fj_struct_name(node_name)+" *join_table;\n"
340 //--------------------------------------------------------
343 // Create places to hold the parameters.
345 vector<string> param_vec = param_tbl->get_param_names();
346 for(p=0;p<param_vec.size();p++){
347 data_type *dt = param_tbl->get_data_type(param_vec[p]);
348 sprintf(tmpstr,"\t%s param_%s;\n",dt->get_cvar_type().c_str(),
349 param_vec[p].c_str());
351 if(param_tbl->handle_access(param_vec[p])){
352 ret += "\tstruct search_handle *param_handle_"+param_vec[p]+";\n";
356 // Create places to hold complex literals.
358 for(cl=0;cl<complex_literals->size();cl++){
359 literal_t *l = complex_literals->get_literal(cl);
360 data_type *dtl = new data_type( l->get_type() );
361 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
365 // Create places to hold the pass-by-handle parameters.
366 for(p=0;p<param_handle_table.size();++p){
367 sprintf(tmpstr,"\tgs_param_handle_t handle_param_%d;\n",p);
371 // Create places to hold the last values of temporal
372 // attributes referenced in select clause
373 // we also need to store values of the temoral attributed
374 // of last flushed tuple in aggr queries
375 // to make sure we generate the cirrect temporal tuple
376 // in the presense of slow flushes
379 col_id_set temp_cids; // col ids of temp attributes in select clause
382 col_id_set::iterator csi;
384 for(s=0;s<sl_list.size();s++){
385 data_type *sdt = sl_list[s]->get_data_type();
386 if (sdt->is_temporal()) {
387 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
391 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
392 int tblref = (*csi).tblvar_ref;
393 int schref = (*csi).schema_ref;
394 string field = (*csi).field;
395 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
396 sprintf(tmpstr,"\t%s last_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
400 ret += "\tgs_uint64_t trace_id;\n\n";
402 // Fields to store the runtime stats
404 ret += "\tgs_uint32_t in_tuple_cnt;\n";
405 ret += "\tgs_uint32_t out_tuple_cnt;\n";
406 ret += "\tgs_uint32_t out_tuple_sz;\n";
407 ret += "\tgs_uint32_t accepted_tuple_cnt;\n";
408 ret += "\tgs_uint64_t cycle_cnt;\n";
409 ret += "\tgs_uint32_t collision_cnt;\n";
410 ret += "\tgs_uint32_t eviction_cnt;\n";
411 ret += "\tgs_float_t sampling_rate;\n";
420 //------------------------------------------------------------
421 // Set colref tblvars to 0..
422 // (special processing for join-like operators in an lfta).
424 void reset_se_col_ids_tblvars(scalarexp_t *se, gb_table *gtbl){
425 vector<scalarexp_t *> operands;
431 switch(se->get_operator_type()){
437 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
440 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
441 reset_se_col_ids_tblvars(se->get_right_se(),gtbl);
445 se->get_colref()->set_tablevar_ref(0);
448 fprintf(stderr,"INTERNAL ERROR: gbvar ref in gather_se_col_ids, but gtbl is NULL.\n");
451 reset_se_col_ids_tblvars(gtbl->get_def(se->get_gb_ref()),gtbl);
457 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
460 operands = se->get_operands();
461 for(o=0;o<operands.size();o++){
462 reset_se_col_ids_tblvars(operands[o], gtbl);
466 fprintf(stderr,"INTERNAL ERROR in reset_se_col_ids_tblvars, line %d, character %d: unknown operator type %d\n",
467 se->get_lineno(), se->get_charno(),se->get_operator_type());
473 // reset column tblvars accessed in this pr.
475 void reset_pr_col_ids_tblvars(predicate_t *pr, gb_table *gtbl){
476 vector<scalarexp_t *> op_list;
479 switch(pr->get_operator_type()){
481 reset_se_col_ids_tblvars(pr->get_left_se(), gtbl);
484 reset_se_col_ids_tblvars(pr->get_left_se(),gtbl) ;
485 reset_se_col_ids_tblvars(pr->get_right_se(),gtbl) ;
488 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
491 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
492 reset_pr_col_ids_tblvars(pr->get_right_pr(),gtbl) ;
495 op_list = pr->get_op_list();
496 for(o=0;o<op_list.size();++o){
497 reset_se_col_ids_tblvars(op_list[o],gtbl) ;
501 fprintf(stderr,"INTERNAL ERROR in reset_pr_col_ids_tblvars, line %d, character %d, unknown predicate operator type %d\n",
502 pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
509 // Generate code that makes reference
510 // to the tuple, and not to any aggregates.
511 static string generate_se_code(scalarexp_t *se,table_list *schema){
513 data_type *ldt, *rdt;
515 vector<scalarexp_t *> operands;
518 switch(se->get_operator_type()){
520 if(se->is_handle_ref()){
521 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
525 if(se->get_literal()->is_cpx_lit()){
526 sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
530 return(se->get_literal()->to_C_code("")); // not complex, no constructor
532 if(se->is_handle_ref()){
533 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
538 ret += se->get_param_name();
541 ldt = se->get_left_se()->get_data_type();
542 if(ldt->complex_operator(se->get_op()) ){
543 ret += ldt->get_complex_operator(se->get_op());
545 ret += generate_se_code(se->get_left_se(),schema);
550 ret += generate_se_code(se->get_left_se(),schema);
555 ldt = se->get_left_se()->get_data_type();
556 rdt = se->get_right_se()->get_data_type();
558 if(ldt->complex_operator(rdt, se->get_op()) ){
559 ret += ldt->get_complex_operator(rdt, se->get_op());
561 ret += generate_se_code(se->get_left_se(),schema);
563 ret += generate_se_code(se->get_right_se(),schema);
567 ret += generate_se_code(se->get_left_se(),schema);
569 ret += generate_se_code(se->get_right_se(),schema);
574 if(se->is_gb()){ // OK to ref gb attrs, but they're not yet unpacked ...
575 // so return the defining code.
576 ret = generate_se_code(gb_tbl->get_def(se->get_gb_ref()), schema );
579 sprintf(tmpstr,"unpack_var_%s_%d",
580 se->get_colref()->get_field().c_str(), se->get_colref()->get_tablevar_ref() );
585 // Should not be ref'ing any aggr here.
586 if(se->get_aggr_ref() >= 0){
587 fprintf(stderr,"INTERNAL ERROR, UDAF reference in generate_se_code.\n");
588 return("ERROR in generate_se_code");
591 if(se->is_partial()){
592 sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
596 operands = se->get_operands();
597 for(o=0;o<operands.size();o++){
599 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
601 ret += generate_se_code(operands[o], schema);
607 fprintf(stderr,"INTERNAL ERROR in generate_se_code (lfta), line %d, character %d: unknown operator type %d\n",
608 se->get_lineno(), se->get_charno(),se->get_operator_type());
609 return("ERROR in generate_se_code");
613 // generate code that refers only to aggregate data and constants.
614 static string generate_se_code_fm_aggr(scalarexp_t *se, string var, table_list *schema){
617 data_type *ldt, *rdt;
619 vector<scalarexp_t *> operands;
622 switch(se->get_operator_type()){
624 if(se->is_handle_ref()){
625 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
629 if(se->get_literal()->is_cpx_lit()){
630 sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
634 return(se->get_literal()->to_C_code("")); // not complex no constructor
636 if(se->is_handle_ref()){
637 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
642 ret += se->get_param_name();
645 ldt = se->get_left_se()->get_data_type();
646 if(ldt->complex_operator(se->get_op()) ){
647 ret += ldt->get_complex_operator(se->get_op());
649 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
654 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
659 ldt = se->get_left_se()->get_data_type();
660 rdt = se->get_right_se()->get_data_type();
662 if(ldt->complex_operator(rdt, se->get_op()) ){
663 ret += ldt->get_complex_operator(rdt, se->get_op());
665 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
667 ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
671 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
673 ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
678 if(se->is_gb()){ // OK to ref gb attrs, but they're not yet
679 // unpacked ... so return the defining code.
680 sprintf(tmpstr,"%sgb_var%d",var.c_str(),se->get_gb_ref());
684 fprintf(stderr,"ERROR reference to non-GB column ref not permitted here,"
685 "error in generate_se_code_fm_aggr, line %d, character %d.\n",
686 se->get_lineno(), se->get_charno());
692 sprintf(tmpstr,"%saggr_var%d",var.c_str(),se->get_aggr_ref());
697 if(se->get_aggr_ref() >= 0){
698 sprintf(tmpstr,"udaf_ret%d",se->get_aggr_ref());
703 if(se->is_partial()){
704 sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
708 operands = se->get_operands();
709 for(o=0;o<operands.size();o++){
711 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
713 ret += generate_se_code_fm_aggr(operands[o], var, schema);
719 fprintf(stderr,"INTERNAL ERROR in generate_lfta_code.cc::generate_se_code_fm_aggr, line %d, character %d: unknown operator type %d\n",
720 se->get_lineno(), se->get_charno(),se->get_operator_type());
721 return("ERROR in generate_se_code");
727 static string unpack_partial_fcn_fm_aggr(scalarexp_t *se, int pfn_id, string var, table_list *schema){
730 vector<scalarexp_t *> operands;
733 if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
734 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn_fm_aggr. line %d, character %d\n",
735 se->get_lineno(), se->get_charno());
736 return("ERROR in generate_se_code");
739 ret = "\tretval = " + se->get_op() + "( ";
740 sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
743 operands = se->get_operands();
744 for(o=0;o<operands.size();o++){
746 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
748 ret += generate_se_code_fm_aggr(operands[o], var, schema);
755 static string generate_cached_fcn(scalarexp_t *se, table_list *schema){
758 vector<scalarexp_t *> operands;
760 if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
761 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to generate_cached_fcn. line %d, character %d\n",
762 se->get_lineno(), se->get_charno());
763 return("ERROR in generate_se_code");
766 ret = se->get_op() + "( ";
768 operands = se->get_operands();
769 for(o=0;o<operands.size();o++){
771 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
773 ret += generate_se_code(operands[o], schema);
782 static string unpack_partial_fcn(scalarexp_t *se, int pfn_id, table_list *schema){
785 vector<scalarexp_t *> operands;
788 if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
789 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn. line %d, character %d\n",
790 se->get_lineno(), se->get_charno());
791 return("ERROR in generate_se_code");
794 ret = "\tretval = " + se->get_op() + "( ",
795 sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
798 operands = se->get_operands();
799 for(o=0;o<operands.size();o++){
801 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
803 ret += generate_se_code(operands[o], schema);
814 static string generate_C_comparison_op(string op){
815 if(op == "=") return("==");
816 if(op == "<>") return("!=");
820 static string generate_C_boolean_op(string op){
821 if( (op == "AND") || (op == "And") || (op == "and") ){
824 if( (op == "OR") || (op == "Or") || (op == "or") ){
827 if( (op == "NOT") || (op == "Not") || (op == "not") ){
831 fprintf(stderr,"INTERNAL ERROR: unknown boolean operator %s\n",op.c_str());
832 return("ERROR UNKNOWN BOOLEAN OPERATOR :"+op);
836 static string generate_predicate_code(predicate_t *pr,table_list *schema){
838 vector<literal_t *> litv;
840 data_type *ldt, *rdt;
841 vector<scalarexp_t *> op_list;
843 unsigned int bitmask;
845 switch(pr->get_operator_type()){
847 ldt = pr->get_left_se()->get_data_type();
850 litv = pr->get_lit_vec();
851 for(i=0;i<litv.size();i++){
852 if(i>0) ret += " || ";
855 if(ldt->complex_comparison(ldt) ){
856 ret += ldt->get_comparison_fcn(ldt) ;
858 if(ldt->is_buffer_type() ) ret += "&";
859 ret += generate_se_code(pr->get_left_se(), schema);
861 if(ldt->is_buffer_type() ) ret += "&";
862 if(litv[i]->is_cpx_lit()){
863 sprintf(tmpstr,"t->complex_literal_%d",litv[i]->get_cpx_lit_ref() );
866 ret += litv[i]->to_C_code("");
870 ret += generate_se_code(pr->get_left_se(), schema);
872 ret += litv[i]->to_C_code("");
881 ldt = pr->get_left_se()->get_data_type();
882 rdt = pr->get_right_se()->get_data_type();
885 if(ldt->complex_comparison(rdt) ){
886 ret += ldt->get_comparison_fcn(rdt);
888 if(ldt->is_buffer_type() ) ret += "&";
889 ret += generate_se_code(pr->get_left_se(),schema);
891 if(rdt->is_buffer_type() ) ret += "&";
892 ret += generate_se_code(pr->get_right_se(),schema);
894 ret += generate_C_comparison_op(pr->get_op());
897 ret += generate_se_code(pr->get_left_se(),schema);
898 ret += generate_C_comparison_op(pr->get_op());
899 ret += generate_se_code(pr->get_right_se(),schema);
905 ret += generate_C_boolean_op(pr->get_op());
906 ret += generate_predicate_code(pr->get_left_pr(),schema);
911 ret += generate_predicate_code(pr->get_left_pr(),schema);
912 ret += generate_C_boolean_op(pr->get_op());
913 ret += generate_predicate_code(pr->get_right_pr(),schema);
917 op_list = pr->get_op_list();
918 cref = pr->get_combinable_ref();
919 if(cref >= 0){ // predicate is a combinable pred reference
921 if(pred_class.size() >= cref && pred_class[cref] >= 0){
922 ppos = pred_pos[cref];
923 bitmask = 1 << ppos % 32;
924 sprintf(tmpstr,"(pref_common_pred_val_%d_%d & %u)",pred_class[cref],ppos/32,bitmask);
930 ret = pr->get_op() + "(";
931 if (pr->is_sampling_fcn) {
932 ret += "t->sampling_rate";
933 if (!op_list.empty())
936 for(o=0;o<op_list.size();++o){
938 if(op_list[o]->get_data_type()->is_buffer_type() && (! (op_list[o]->is_handle_ref()) ) )
940 ret += generate_se_code(op_list[o],schema);
945 fprintf(stderr,"INTERNAL ERROR in generate_predicate_code, line %d, character %d, unknown predicate operator type %d\n",
946 pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
947 return("ERROR in generate_predicate_code");
952 static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *dt){
955 if(dt->complex_comparison(dt) ){
956 ret += dt->get_comparison_fcn(dt);
958 if(dt->is_buffer_type() ) ret += "&";
961 if(dt->is_buffer_type() ) ret += "&";
973 static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
976 if(dt->complex_comparison(dt) ){
977 ret += dt->get_comparison_fcn(dt);
979 if(dt->is_buffer_type() ) ret += "&";
982 if(dt->is_buffer_type() ) ret += "&";
994 // Here I assume that only MIN and MAX aggregates can be computed
995 // over BUFFER data types.
997 static string generate_aggr_update(string var, aggregate_table *atbl,int aidx, table_list *schema){
998 string retval = "\t\t";
999 string op = atbl->get_op(aidx);
1002 if(! atbl->is_builtin(aidx)) {
1004 retval += op+"_LFTA_AGGR_UPDATE_(";
1005 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1006 retval+="("+var+")";
1007 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1008 for(o=0;o<opl.size();++o){
1010 if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1012 retval += generate_se_code(opl[o], schema);
1019 // Built-in aggregate processing.
1021 data_type *dt = atbl->get_data_type(aidx);
1025 retval.append("++;\n");
1030 retval.append(" += ");
1031 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1032 retval.append(";\n");
1036 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1037 retval.append(tmpstr);
1038 if(dt->complex_comparison(dt)){
1039 if(dt->is_buffer_type())
1040 sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1042 sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1044 sprintf(tmpstr,"\t\tif(aggr_tmp_%d < %s)\n",aidx,var.c_str());
1046 retval.append(tmpstr);
1047 if(dt->is_buffer_type()){
1048 sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1050 sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1052 retval.append(tmpstr);
1057 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1058 retval.append(tmpstr);
1059 if(dt->complex_comparison(dt)){
1060 if(dt->is_buffer_type())
1061 sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1063 sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1065 sprintf(tmpstr,"\t\tif(aggr_tmp_%d > %s)\n",aidx,var.c_str());
1067 retval.append(tmpstr);
1068 if(dt->is_buffer_type()){
1069 sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1071 sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1073 retval.append(tmpstr);
1078 if(op == "AND_AGGR"){
1080 retval.append(" &= ");
1081 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1082 retval.append(";\n");
1085 if(op == "OR_AGGR"){
1087 retval.append(" |= ");
1088 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1089 retval.append(";\n");
1092 if(op == "XOR_AGGR"){
1094 retval.append(" ^= ");
1095 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1096 retval.append(";\n");
1099 fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_update.\n",op.c_str());
1100 return("ERROR: aggregate not recognized: "+op);
1106 static string generate_aggr_init(string var, aggregate_table *atbl,int aidx, table_list *schema){
1108 string op = atbl->get_op(aidx);
1111 if(! atbl->is_builtin(aidx)) {
1113 retval += "\t\t"+op+"_LFTA_AGGR_INIT_(";
1114 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1115 retval+="("+var+"));\n";
1117 retval += "\t"+atbl->get_op(aidx)+"_LFTA_AGGR_UPDATE_(";
1118 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1119 retval+="("+var+")";
1120 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1121 for(o=0;o<opl.size();++o){
1123 if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1125 retval += generate_se_code(opl[o],schema);
1131 // Built-in aggregate processing.
1134 data_type *dt = atbl->get_data_type(aidx);
1137 retval = "\t\t"+var;
1138 retval.append(" = 1;\n");
1142 if(op == "SUM" || op == "MIN" || op == "MAX" || op == "AND_AGGR" ||
1143 op == "OR_AGGR" || op == "XOR_AGGR"){
1144 if(dt->is_buffer_type()){
1145 sprintf(tmpstr,"\t\taggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1146 retval.append(tmpstr);
1147 sprintf(tmpstr,"\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_assign_copy().c_str(),var.c_str(),aidx);
1148 retval.append(tmpstr);
1150 retval = "\t\t"+var;
1152 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema));
1153 retval.append(";\n");
1158 fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_init.\n",op.c_str());
1159 return("ERROR: aggregate not recognized: "+op);
1163 ////////////////////////////////////////////////////////////
1166 string generate_preamble(table_list *schema, //map<string,string> &int_fcn_defs,
1167 std::string &node_name, std::string &schema_embed_str){
1168 // Include these only once, not once per lfta
1169 // string ret = "#include \"rts.h\"\n";
1170 // ret += "#include \"fta.h\"\n\n");
1172 string ret = "#ifndef LFTA_IN_NIC\n";
1173 ret += "char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n";
1174 ret += "#include<stdio.h>\n";
1175 ret += "#include <limits.h>\n";
1176 ret += "#include <float.h>\n";
1177 ret += "#include \"rdtsc.h\"\n";
1186 string generate_tuple_from_aggr(string node_name, table_list *schema, string idx){
1188 // need to create and output the tuple.
1189 string ret = "/*\t\tCreate an output tuple for the aggregate being kicked out \t*/\n";
1190 // Check for any UDAFs with LFTA_BAILOUT
1191 ret += "\tlfta_bailout = 0;\n";
1192 for(a=0;a<aggr_tbl->size();a++){
1193 if(aggr_tbl->has_bailout(a)){
1194 ret += "\tlfta_bailout+="+aggr_tbl->get_op(a)+"_LFTA_AGGR_BAILOUT_(";
1195 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1196 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1199 ret += "\tif(! lfta_bailout){\n";
1201 // First, compute the size of the tuple.
1203 // Unpack UDAF return values
1204 for(a=0;a<aggr_tbl->size();a++){
1205 if(! aggr_tbl->is_builtin(a)){
1206 ret += "\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_OUTPUT_(&(udaf_ret"+int_to_string(a)+"),";
1207 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1208 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1214 // Unpack partial fcns ref'd by the select clause.
1215 if(sl_fcns_start != sl_fcns_end){
1216 ret += "\t\tunpack_failed = 0;\n";
1217 for(p=sl_fcns_start;p<sl_fcns_end;p++){
1218 if(is_partial_fcn[p]){
1219 ret += "\t" + unpack_partial_fcn_fm_aggr(partial_fcns[p], p,
1220 "t->aggr_table["+idx+"].",schema);
1221 ret += "\t\tif(retval) unpack_failed = 1;\n";
1224 // BEGIN don't allocate tuple if
1225 ret += "\t\tif( unpack_failed == 0 ){\n"; // unpack failed.
1228 // Unpack any BUFFER type selections into temporaries
1229 // so that I can compute their size and not have
1230 // to recompute their value during tuple packing.
1231 // I can use regular assignment here because
1232 // these temporaries are non-persistent.
1234 for(s=0;s<sl_list.size();s++){
1235 data_type *sdt = sl_list[s]->get_data_type();
1236 if(sdt->is_buffer_type()){
1237 sprintf(tmpstr,"\t\t\tselvar_%d = ",s);
1239 ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1245 // The size of the tuple is the size of the tuple struct plus the
1246 // size of the buffers to be copied in.
1248 ret += "\t\t\ttuple_size = sizeof( struct ";
1249 ret += generate_tuple_name(node_name);
1251 for(s=0;s<sl_list.size();s++){
1252 data_type *sdt = sl_list[s]->get_data_type();
1253 if(sdt->is_buffer_type()){
1254 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
1261 ret += "\t\t\ttuple = allocate_tuple(f, tuple_size );\n";
1262 ret += "\t\t\tif( tuple != NULL){\n";
1265 // Test passed, make assignments to the tuple.
1267 ret += "\t\t\t\ttuple_pos = sizeof( struct ";
1268 ret += generate_tuple_name(node_name) ;
1271 // Mark tuple as REGULAR_TUPLE
1272 ret += "\n\t\t\t\ttuple->tuple_type = REGULAR_TUPLE;\n";
1274 for(s=0;s<sl_list.size();s++){
1275 data_type *sdt = sl_list[s]->get_data_type();
1276 if(sdt->is_buffer_type()){
1277 sprintf(tmpstr,"\t\t\t\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
1279 sprintf(tmpstr,"\t\t\t\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
1282 sprintf(tmpstr,"\t\t\t\ttuple->tuple_var%d = ",s);
1284 // if(sdt->needs_hn_translation())
1285 // ret += sdt->hton_translation() +"( ";
1286 ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1287 // if(sdt->needs_hn_translation())
1294 ret += "\t\t\t\tpost_tuple(tuple);\n";
1295 ret += "\t\t\t\t#ifdef LFTA_STATS\n";
1296 ret+="\t\t\t\tt->out_tuple_cnt++;\n";
1297 ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
1298 ret += "\t\t\t\t#endif\n\n";
1301 if(sl_fcns_start != sl_fcns_end) // END don't allocate tuple if
1302 ret += "\t\t}\n"; // unpack failed.
1305 // Need to release memory held by BUFFER types.
1308 for(g=0;g<gb_tbl->size();g++){
1309 data_type *gdt = gb_tbl->get_data_type(g);
1310 if(gdt->is_buffer_type()){
1311 sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].gb_var%d));\n",gdt->get_buffer_destroy().c_str(),idx.c_str(),g);
1315 for(a=0;a<aggr_tbl->size();a++){
1316 if(aggr_tbl->is_builtin(a)){
1317 data_type *adt = aggr_tbl->get_data_type(a);
1318 if(adt->is_buffer_type()){
1319 sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].aggr_var%d));\n",adt->get_buffer_destroy().c_str(),idx.c_str(),a);
1323 ret += "\t\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_DESTROY_(";
1324 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1325 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1329 ret += "\t\tt->n_aggrs--;\n";
1335 string generate_gb_match_test(string idx){
1337 string ret="\tif (IS_FILLED(t->aggr_table_bitmap, "+idx+") && IS_NEW(t->aggr_table_bitmap,"+idx+")";
1338 if(gb_tbl->size()>0){
1339 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
1342 // Next, scan list for a match on the group-by attributes.
1343 string rhs_op, lhs_op;
1344 for(g=0;g<gb_tbl->size();g++){
1347 sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
1348 sprintf(tmpstr,"t->aggr_table[%s].gb_var%d",idx.c_str(),g); rhs_op = tmpstr;
1349 ret += generate_equality_test(lhs_op, rhs_op, gb_tbl->get_data_type(g) );
1359 string generate_gb_update(string node_name, table_list *schema, string idx, bool has_udaf){
1363 ret += "/*\t\tMatch found : update in place.\t*/\n";
1366 for(a=0;a<aggr_tbl->size();a++){
1367 sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1368 ret += generate_aggr_update(tmpstr,aggr_tbl,a, schema);
1369 if(! aggr_tbl->is_builtin(a)) has_udaf = true;
1372 // garbage collect copied buffer type gb attrs.
1373 for(g=0;g<gb_tbl->size();g++){
1374 data_type *gdt = gb_tbl->get_data_type(g);
1375 if(gdt->is_buffer_type()){
1376 sprintf(tmpstr,"\t\t\t%s(&(gb_attr_%d));\n",gdt->get_buffer_destroy().c_str(),g);
1383 bool first_udaf = true;
1386 for(a=0;a<aggr_tbl->size();a++){
1387 if(! aggr_tbl->is_builtin(a)){
1388 if(! first_udaf)ret += " || ";
1389 else first_udaf = false;
1390 ret += aggr_tbl->get_op(a)+"_LFTA_AGGR_FLUSHME_(";
1391 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1392 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"))";
1396 ret+=" fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
1397 ret += generate_tuple_from_aggr(node_name,schema,idx);
1398 ret += "\t\tt->aggr_table_hashmap["+idx+"] &= ~SLOT_FILLED;\n";
1405 string generate_init_group( table_list *schema, string idx){
1407 string ret="\t\t\tt->aggr_table_hashmap["+idx+"] = hash2 | SLOT_FILLED | gen_val;\n";
1408 // Fill up the aggregate block.
1409 for(g=0;g<gb_tbl->size();g++){
1410 sprintf(tmpstr,"\t\t\tt->aggr_table[%s].gb_var%d = gb_attr_%d;\n",idx.c_str(),g,g);
1413 for(a=0;a<aggr_tbl->size();a++){
1414 sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1415 ret += generate_aggr_init(tmpstr, aggr_tbl,a, schema);
1417 ret+="\t\tt->n_aggrs++;\n";
1422 string generate_fta_flush(string node_name, table_list *schema,
1423 ext_fcn_list *Ext_fcns){
1426 string select_var_defs ;
1429 // Flush from previous epoch
1431 ret+="static void fta_aggr_flush_old_"+node_name+"(struct FTA *f, unsigned int nflush){\n";
1433 ret += "\tgs_int32_t tuple_size, tuple_pos;\n";
1434 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1435 ret += "\tint i, lfta_bailout;\n";
1436 ret += "\tunsigned int gen_val;\n";
1438 ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1439 ret += generate_fta_name(node_name)+" *) f;\n";
1444 // Variables needed to store selected attributes of BUFFER type
1445 // temporarily, in order to compute their size for storage
1446 // in an output tuple.
1448 select_var_defs = "";
1449 for(s=0;s<sl_list.size();s++){
1450 data_type *sdt = sl_list[s]->get_data_type();
1451 if(sdt->is_buffer_type()){
1452 sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
1453 select_var_defs.append(tmpstr);
1456 if(select_var_defs != ""){
1457 ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
1458 ret += select_var_defs;
1462 // Variables to store results of partial functions.
1463 if(sl_fcns_start != sl_fcns_end){
1464 ret += "/*\t\tVariables to store the results of partial functions.\t*/\n";
1465 for(p=sl_fcns_start;p<sl_fcns_end;p++){
1466 sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
1467 partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
1470 ret += "\tgs_retval_t retval = 0;\n\tint unpack_failed = 0;\n;";
1473 // Variables for udaf output temporaries
1474 bool no_udaf = true;
1476 for(a=0;a<aggr_tbl->size();a++){
1477 if(! aggr_tbl->is_builtin(a)){
1479 ret+="/*\t\tUDAF output vars.\t*/\n";
1482 int afcn_id = aggr_tbl->get_fcn_id(a);
1483 data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
1484 sprintf(tmpstr,"udaf_ret%d", a);
1485 ret+="\t"+adt->make_cvar(tmpstr)+";\n";
1490 // ret+="\tt->flush_finished=1; /* flush will be completed */\n";
1492 ret+="\tgen_val = t->generation & SLOT_GEN_BITS;\n";
1493 ret+="\tfor (i=t->flush_pos; (i < t->max_aggrs) && t->n_aggrs && nflush>0; ++i){\n";
1494 ret+="\t\tif ( (t->aggr_table_hashmap[i] & SLOT_FILLED) && (((t->aggr_table_hashmap[i] & SLOT_GEN_BITS) != gen_val ) || (";
1497 for(g=0;g<gb_tbl->size();g++){
1498 data_type *gdt = gb_tbl->get_data_type(g);
1499 if(gdt->is_temporal()){
1500 if(first_g) first_g=false; else ret+=" || ";
1501 ret += "t->last_gb_"+int_to_string(g)+" > t->aggr_table[i].gb_var"+int_to_string(g)+" ";
1505 ret+="\t\t\tt->aggr_table_hashmap[i] = 0;\n";
1507 "#ifdef LFTA_STATS\n"
1508 "\t\t\tt->eviction_cnt++;\n"
1513 ret+=generate_tuple_from_aggr(node_name,schema,"i");
1515 // ret+="\t\t\tt->n_aggrs--;\n"; // done in generate_tuple_from_aggr
1516 ret+="\t\t\tnflush--;\n";
1519 ret+="\tt->flush_pos=i;\n";
1520 ret+="\tif(t->n_aggrs == 0) {\n";
1521 ret+="\t\tt->flush_pos = t->max_aggrs;\n";
1524 ret+="\tif(t->flush_pos == t->max_aggrs) {\n";
1526 for(int g=0;g<gb_tbl->size();g++){
1527 data_type *dt = gb_tbl->get_data_type(g);
1528 if(dt->is_temporal()){
1529 data_type *gdt = gb_tbl->get_data_type(g);
1530 if(!gdt->is_buffer_type()){
1531 sprintf(tmpstr,"\t\tt->last_flushed_gb_%d = t->flush_start_gb_%d;\n",g,g);
1536 ret += "\t}\n}\n\n";
1541 // TODO Remove sprintf to perform string catenation
1542 string generate_fta_load_params(string node_name){
1544 vector<string> param_names = param_tbl->get_param_names();
1546 string ret = "static int load_params_"+node_name+"(struct "+generate_fta_name(node_name);
1547 ret += " *t, int sz, void *value, int initial_call){\n";
1548 ret += "\tint pos=0;\n";
1549 ret += "\tint data_pos;\n";
1551 for(p=0;p<param_names.size();p++){
1552 data_type *dt = param_tbl->get_data_type(param_names[p]);
1553 if(dt->is_buffer_type()){
1554 sprintf(tmpstr,"\t%s tmp_var_%s;\n",dt->get_cvar_type().c_str(), param_names[p].c_str() );
1556 sprintf(tmpstr,"\t%s access_var_%s;\n",dt->get_tuple_cvar_type().c_str(), param_names[p].c_str() );
1563 ret += "\n\tdata_pos = ";
1564 for(p=0;p<param_names.size();p++){
1565 if(p>0) ret += " + ";
1566 data_type *dt = param_tbl->get_data_type(param_names[p]);
1568 ret += dt->get_tuple_cvar_type();
1572 ret += "\tif(data_pos > sz) return 1;\n\n";
1575 for(p=0;p<param_names.size();p++){
1576 data_type *dt = param_tbl->get_data_type(param_names[p]);
1577 if(dt->is_buffer_type()){
1578 sprintf(tmpstr,"\taccess_var_%s = *( (%s *)((char *)value+pos) );\n",param_names[p].c_str(), dt->get_tuple_cvar_type().c_str() );
1580 switch( dt->get_type() ){
1582 // ret += "\ttmp_var_"+param_names[p]+".data = ntohl( tmp_var_"+param_names[p]+".data );\n"; // ntoh conversion
1583 // ret += "\ttmp_var_"+param_names[p]+".length = ntohl( tmp_var_"+param_names[p]+".length );\n"; // ntoh conversion
1584 sprintf(tmpstr,"\tif( (access_var_%s.offset) + access_var_%s.length > sz) return 1;\n",param_names[p].c_str(), param_names[p].c_str() );
1586 sprintf(tmpstr,"\ttmp_var_%s.data = (gs_sp_t)(value) + access_var_%s.offset ;\n",param_names[p].c_str(), param_names[p].c_str() );
1588 sprintf(tmpstr,"\ttmp_var_%s.length = access_var_%s.length ;\n",param_names[p].c_str(), param_names[p].c_str() );
1592 fprintf(stderr,"ERROR: parameter %s is of type %s, a buffered type, but I don't know how to unpack it as a parameter.\n",param_names[p].c_str(), dt->get_type_str().c_str() );
1596 // First, destroy the old
1597 ret += "\tif(! initial_call)\n";
1598 sprintf(tmpstr,"\t\t%s(&(t->param_%s));\n",dt->get_buffer_destroy().c_str(),param_names[p].c_str());
1600 // Next, create the new.
1601 sprintf(tmpstr,"\t%s((struct FTA *)t, &(t->param_%s), &tmp_var_%s);\n", dt->get_buffer_assign_copy().c_str(), param_names[p].c_str(), param_names[p].c_str() );
1604 // if(dt->needs_hn_translation()){
1605 // sprintf(tmpstr,"\tt->param_%s = %s( *( (%s *)( (char *)value+pos) ) );\n",
1606 // param_names[p].c_str(), dt->ntoh_translation().c_str(), dt->get_cvar_type().c_str() );
1608 sprintf(tmpstr,"\tt->param_%s = *( (%s *)( (char *)value+pos) );\n",
1609 param_names[p].c_str(), dt->get_cvar_type().c_str() );
1613 sprintf(tmpstr,"\tpos += sizeof( %s );\n",dt->get_cvar_type().c_str() );
1617 // Register the pass-by-handle parameters
1619 ret += "/* register and de-register the pass-by-handle parameters */\n";
1622 for(ph=0;ph<param_handle_table.size();++ph){
1623 data_type pdt(param_handle_table[ph]->type_name);
1624 switch(param_handle_table[ph]->val_type){
1630 ret += "\tif(! initial_call)\n";
1631 sprintf(tmpstr, "\t\t%s(t->handle_param_%d);\n",
1632 param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1634 sprintf(tmpstr,"\tt->handle_param_%d = %s((struct FTA *)t,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
1637 if(pdt.is_buffer_type()) ret += "&(";
1638 ret += "t->param_"+param_handle_table[ph]->param_name;
1639 if(pdt.is_buffer_type()) ret += ")";
1643 sprintf(tmpstr, "INTERNAL ERROR unknown case (%d) found when processing pass-by-handle parameter table.",param_handle_table[ph]->val_type);
1644 fprintf(stderr,"%s\n",tmpstr);
1649 ret+="\treturn 0;\n";
1658 string generate_fta_free(string node_name, bool is_aggr_query){
1660 string ret="static gs_retval_t free_fta_"+node_name+"(struct FTA *f, gs_uint32_t recursive){\n";
1661 ret+= "\tstruct "+generate_fta_name(node_name)+
1662 " * t = (struct "+generate_fta_name(node_name)+" *) f;\n";
1663 ret += "\tint i;\n";
1666 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1667 ret+="\t/* \t\tmark all groups as old */\n";
1668 ret+="\tt->generation++;\n";
1669 ret+="\tt->flush_pos = 0;\n";
1670 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1673 // Deregister the pass-by-handle parameters
1674 ret += "/* de-register the pass-by-handle parameters */\n";
1676 for(ph=0;ph<param_handle_table.size();++ph){
1677 sprintf(tmpstr, "\t%s(t->handle_param_%d);\n",
1678 param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1683 ret += "\treturn 0;\n}\n\n";
1688 string generate_fta_control(string node_name, table_list *schema, bool is_aggr_query){
1689 string ret="static gs_retval_t control_fta_"+node_name+"(struct FTA *f, gs_int32_t command, gs_int32_t sz, void *value){\n";
1690 ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1691 ret += generate_fta_name(node_name)+" *) f;\n\n";
1695 ret += "\t/* temp status tuple */\n";
1696 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1697 ret += "\tgs_int32_t tuple_size;\n";
1701 ret+="\tif(command == FTA_COMMAND_FLUSH){\n";
1703 ret+="\t\tif (!t->n_aggrs) {\n";
1704 ret+="\t\t\ttuple = allocate_tuple(f, 0);\n";
1705 ret+="\t\t\tif( tuple != NULL)\n";
1706 ret+="\t\t\t\tpost_tuple(tuple);\n";
1708 ret+="\t\t}else{\n";
1710 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1711 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1712 ret +="\t\tt->generation++;\n";
1713 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1714 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1715 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1716 ret+="\t\t\tt->flush_pos = 0;\n";
1717 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1722 if(param_tbl->size() > 0){
1724 "\tif(command == FTA_COMMAND_LOAD_PARAMS){\n"
1725 "\t\tif(load_params_"+node_name+"(t, sz, value, 0))\n"
1726 "#ifndef LFTA_IN_NIC\n"
1727 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small, ignored.\\n\");\n"
1734 "\tif(command == FTA_COMMAND_SET_SAMPLING_RATE){\n"
1735 "\t\tmemcpy(&t->sampling_rate, value, sizeof(gs_float_t));\n"
1739 ret += "\tif(command == FTA_COMMAND_FILE_DONE ){\n";
1742 ret+="\t\tif (t->n_aggrs) {\n";
1743 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1744 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1745 ret +="\t\tt->generation++;\n";
1746 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1747 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1748 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1749 ret+="\t\t\tt->flush_pos = 0;\n";
1750 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1754 ret += "\t\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+");\n";
1755 ret += "\t\ttuple = allocate_tuple(f, tuple_size );\n";
1756 ret += "\t\tif( tuple == NULL)\n\t\treturn 1;\n";
1758 /* mark tuple as EOF_TUPLE */
1759 ret += "\n\t\t/* Mark tuple as eof_tuple */\n";
1760 ret += "\t\ttuple->tuple_type = EOF_TUPLE;\n";
1761 ret += "\t\tpost_tuple(tuple);\n";
1764 ret += "\treturn 0;\n}\n\n";
1769 string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query){
1770 string ret="static gs_retval_t clock_fta_"+node_name+"(struct FTA *f){\n";
1771 ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1772 ret += generate_fta_name(node_name)+" *) f;\n\n";
1774 ret += "\t/* Create a temp status tuple */\n";
1775 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1776 ret += "\tgs_int32_t tuple_size;\n";
1777 ret += "\tunsigned int i;\n";
1778 ret += "\ttime_t cur_time;\n";
1779 ret += "\tint time_advanced;\n";
1780 ret += "\tstruct fta_stat stats;\n";
1784 /* copy the last seen values of temporal attributes */
1785 col_id_set temp_cids; // col ids of temp attributes in select clause
1788 /* HACK: in order to reuse the SE generation code, we need to copy
1789 * the last values of the temp attributes into new variables
1790 * which have names unpack_var_XXX_XXX
1794 col_id_set::iterator csi;
1796 for(s=0;s<sl_list.size();s++){
1797 data_type *sdt = sl_list[s]->get_data_type();
1798 if (sdt->is_temporal()) {
1799 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
1803 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
1804 int tblref = (*csi).tblvar_ref;
1805 int schref = (*csi).schema_ref;
1806 string field = (*csi).field;
1807 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
1808 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
1812 if (is_aggr_query) {
1813 for(g=0;g<gb_tbl->size();g++){
1814 data_type *gdt = gb_tbl->get_data_type(g);
1815 if(gdt->is_temporal()){
1816 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
1818 data_type *gdt = gb_tbl->get_data_type(g);
1819 if(gdt->is_buffer_type()){
1820 sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
1828 ret += "\ttime_advanced = 0;\n";
1830 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
1831 int tblref = (*csi).tblvar_ref;
1832 int schref = (*csi).schema_ref;
1833 string field = (*csi).field;
1834 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
1836 // update last seen value with the value seen
1837 ret += "\t#ifdef PREFILTER_DEFINED\n";
1838 sprintf(tmpstr,"\tif (prefilter_temp_vars.unpack_var_%s_%d > t->last_%s_%d) {\n\t\tt->last_%s_%d = prefilter_temp_vars.unpack_var_%s_%d;\n",
1839 field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref);
1841 ret += "\t\ttime_advanced = 1;\n\t}\n";
1842 ret += "\t#endif\n";
1844 // we need to pay special attention to time fields
1845 if (field == "time" || field == "timestamp" || field == "timestamp_ms"){
1846 ret += "\tcur_time = time(&cur_time);\n";
1848 if (field == "time") {
1849 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && (t->last_time_%d < (cur_time - %d))) {\n",
1852 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = cur_time - %d;\n",
1853 field.c_str(), tblref, field.c_str(), tblref, time_corr);
1854 } else if (field == "timestamp_ms") {
1855 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((t->last_timestamp_ms_%d/1000) < (cur_time - %d))) {\n",
1858 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = (cur_time - %d) * 1000;\n",
1859 field.c_str(), tblref, field.c_str(), tblref, time_corr);
1861 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((gs_uint32_t)(t->last_%s_%d>>32) < (cur_time - %d))) {\n",
1862 field.c_str(), tblref, time_corr);
1864 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = ((gs_uint64_t)(cur_time - %d))<<32;\n",
1865 field.c_str(), tblref, field.c_str(), tblref, time_corr);
1869 ret += "\t\ttime_advanced = 1;\n";
1872 sprintf(tmpstr,"\telse\n\t\tunpack_var_%s_%d = t->last_%s_%d;\n",
1873 field.c_str(), tblref, field.c_str(), tblref);
1876 sprintf(tmpstr,"\tunpack_var_%s_%d = t->last_%s_%d;\n",
1877 field.c_str(), tblref, field.c_str(), tblref);
1882 // for aggregation lftas we need to check if the time was advanced beyond the current epoch
1883 if (is_aggr_query) {
1886 bool first_one = true;
1887 for(g=0;g<gb_tbl->size();g++){
1888 data_type *gdt = gb_tbl->get_data_type(g);
1889 if(gdt->is_temporal()){
1890 // To perform the test, first need to compute the value
1891 // of the temporal gb attrs.
1892 if(gdt->is_buffer_type()){
1893 // NOTE : if the SE defining the gb is anything
1894 // other than a ref to a variable, this will generate
1895 // illegal code. To be resolved with Spatch.
1896 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
1897 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
1899 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
1900 gdt->get_buffer_assign_copy().c_str(), g, g);
1902 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
1906 sprintf(tmpstr,"t->last_gb_%d",g); string lhs_op = tmpstr;
1907 sprintf(tmpstr,"gb_attr_%d",g); string rhs_op = tmpstr;
1908 if(first_one){first_one = false;} else {change_test.append(") && (");}
1909 change_test.append(generate_equality_test(lhs_op, rhs_op, gdt));
1913 ret += "\n\tif( time_advanced && !( (";
1917 ret += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
1918 ret += "\t\tif(t->flush_pos<t->max_aggrs) \n";
1919 ret += "\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
1921 ret += "\t\t/* \t\tmark all groups as old */\n";
1922 ret +="\t\tt->generation++;\n";
1923 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1924 ret += "//\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1925 ret += "//\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1926 ret += "\t\tt->flush_pos = 0;\n";
1928 for(g=0;g<gb_tbl->size();g++){
1929 data_type *gdt = gb_tbl->get_data_type(g);
1930 if(gdt->is_temporal()){
1931 sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g); ret += tmpstr;
1932 sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g); ret += tmpstr;
1939 ret += "\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t) + sizeof(struct fta_stat);\n";
1940 ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
1941 ret += "\tif( tuple == NULL)\n\t\treturn 1;\n";
1944 for(s=0;s<sl_list.size();s++){
1945 data_type *sdt = sl_list[s]->get_data_type();
1946 if(sdt->is_temporal()){
1948 if (sl_list[s]->is_gb()) {
1949 sprintf(tmpstr,"\tt->last_flushed_gb_%d = (t->n_aggrs) ? t->last_flushed_gb_%d : %s;\n",sl_list[s]->get_gb_ref(), sl_list[s]->get_gb_ref(), generate_se_code(sl_list[s],schema).c_str());
1953 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
1955 // if(sdt->needs_hn_translation())
1956 // ret += sdt->hton_translation() +"( ";
1957 if (sl_list[s]->is_gb()) {
1958 sprintf(tmpstr, "t->last_flushed_gb_%d",sl_list[s]->get_gb_ref());
1961 ret += generate_se_code(sl_list[s],schema);
1963 // if(sdt->needs_hn_translation())
1969 /* mark tuple as temporal */
1970 ret += "\n\t/* Mark tuple as temporal */\n";
1971 ret += "\ttuple->tuple_type = TEMPORAL_TUPLE;\n";
1973 ret += "\n\t/* Copy trace id */\n";
1974 ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+"), &t->trace_id, sizeof(gs_uint64_t));\n\n";
1976 ret += "\n\t/* Populate runtime stats */\n";
1977 ret += "\tstats.ftaid = f->ftaid;\n";
1978 ret += "\tstats.in_tuple_cnt = t->in_tuple_cnt;\n";
1979 ret += "\tstats.out_tuple_cnt = t->out_tuple_cnt;\n";
1980 ret += "\tstats.out_tuple_sz = t->out_tuple_sz;\n";
1981 ret += "\tstats.accepted_tuple_cnt = t->accepted_tuple_cnt;\n";
1982 ret += "\tstats.cycle_cnt = t->cycle_cnt;\n";
1983 ret += "\tstats.collision_cnt = t->collision_cnt;\n";
1984 ret += "\tstats.eviction_cnt = t->eviction_cnt;\n";
1985 ret += "\tstats.sampling_rate = t->sampling_rate;\n";
1987 ret += "\n#ifdef LFTA_PROFILE\n";
1988 ret += "\n\t/* Print stats */\n";
1989 ret += "\tfprintf(stderr, \"STATS " + node_name + " \");\n";
1990 ret += "\tfprintf(stderr, \"in_tuple_cnt= %u \", t->in_tuple_cnt);\n";
1991 ret += "\tfprintf(stderr, \"out_tuple_cnt= %u \", t->out_tuple_cnt);\n";
1992 ret += "\tfprintf(stderr, \"out_tuple_sz= %u \", t->out_tuple_sz);\n";
1993 ret += "\tfprintf(stderr, \"accepted_tuple_cnt= %u \", t->accepted_tuple_cnt);\n";
1994 ret += "\tfprintf(stderr, \"cycle_cnt= %llu \", t->cycle_cnt);\n";
1995 ret += "\tfprintf(stderr, \"cycle_per_in_tuple= %lf \", ((double)t->cycle_cnt)/((double)t->in_tuple_cnt));\n";
1996 ret += "\tfprintf(stderr, \"collision_cnt= %d\\n\", t->collision_cnt);\n\n";
1997 ret += "\tfprintf(stderr, \"eviction_cnt= %d\\n\", t->eviction_cnt);\n\n";
1998 ret += "\n#endif\n";
2001 ret += "\n\t/* Copy stats */\n";
2002 ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t), &stats, sizeof(fta_stat));\n\n";
2003 ret+="\tpost_tuple(tuple);\n";
2005 ret += "\n\t/* Send a heartbeat message to clearinghouse */\n";
2006 ret += "\tfta_heartbeat(f->ftaid, t->trace_id++, 1, &stats);\n";
2008 ret += "\n\t/* Reset runtime stats */\n";
2009 ret += "\tt->in_tuple_cnt = 0;\n";
2010 ret += "\tt->out_tuple_cnt = 0;\n";
2011 ret += "\tt->out_tuple_sz = 0;\n";
2012 ret += "\tt->accepted_tuple_cnt = 0;\n";
2013 ret += "\tt->cycle_cnt = 0;\n";
2014 ret += "\tt->collision_cnt = 0;\n";
2015 ret += "\tt->eviction_cnt = 0;\n";
2017 ret += "\treturn 0;\n}\n\n";
2023 // accept processing before the where clause,
2024 // do flush processwing.
2025 string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *schema, col_id_set &unpacked_cids, string &temporal_flush){
2029 string ret="\n/*\tslow flush\t*/\n";
2030 string slow_flush_str = fs->get_val_of_def("slow_flush");
2031 int n_slow_flush = atoi(slow_flush_str.c_str());
2032 if(n_slow_flush <= 0) n_slow_flush = 2;
2033 if(n_slow_flush > 1){
2034 ret += "\tt->flush_ctr++;\n";
2035 ret += "\tif(t->flush_ctr >= "+int_to_string(n_slow_flush)+"){\n";
2036 ret += "\t\tt->flush_ctr = 0;\n";
2037 ret+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n";
2040 ret+="\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n\n";
2045 bool first_one = true;
2047 col_id_set flush_cids; // col ids accessed when computing flush variables.
2048 // unpack them at temporal flush test time.
2049 temporal_flush = "";
2052 for(g=0;g<gb_tbl->size();g++){
2053 data_type *gdt = gb_tbl->get_data_type(g);
2054 if(gdt->is_temporal()){
2055 gather_se_col_ids(gb_tbl->get_def(g), flush_cids, gb_tbl);
2057 // To perform the test, first need to compute the value
2058 // of the temporal gb attrs.
2059 if(gdt->is_buffer_type()){
2060 // NOTE : if the SE defining the gb is anything
2061 // other than a ref to a variable, this will generate
2062 // illegal code. To be resolved with Spatch.
2063 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2064 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2065 temporal_flush += tmpstr;
2066 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2067 gdt->get_buffer_assign_copy().c_str(), g, g);
2069 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2071 temporal_flush += tmpstr;
2072 // END computing the value of the temporal GB attr.
2075 sprintf(tmpstr,"t->last_gb_%d",g); string lhs_op = tmpstr;
2076 sprintf(tmpstr,"gb_attr_%d",g); string rhs_op = tmpstr;
2077 if(first_one){first_one = false;} else {change_test.append(") && (");}
2078 change_test += generate_equality_test(lhs_op, rhs_op, gdt);
2081 if(!first_one){ // will be false iff. there is a temporal GB attribute
2082 temporal_flush += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
2083 temporal_flush += "\tif( !( (";
2084 temporal_flush += change_test;
2085 temporal_flush += ") ) ){\n";
2087 // temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2088 temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs){ \n";
2089 temporal_flush+="\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2090 temporal_flush+="\t\t}\n";
2091 temporal_flush+="\t\t/* \t\tmark all groups as old */\n";
2092 temporal_flush+="\t\tt->generation++;\n";
2093 temporal_flush+="\t\tt->flush_pos = 0;\n";
2096 // Now set the saved temporal value of the gb to the
2097 // current value of the gb. Only for simple types,
2098 // not for buffer types -- but the strings are not
2099 // temporal in any case.
2101 for(g=0;g<gb_tbl->size();g++){
2102 data_type *gdt = gb_tbl->get_data_type(g);
2103 if(gdt->is_temporal()){
2104 if(gdt->is_buffer_type()){
2106 fprintf(stderr,"ERROR : can't handle temporal buffer types, ignoring in buffer flush control.\n");
2108 sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);
2109 temporal_flush += tmpstr;
2110 sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);
2111 temporal_flush += tmpstr;
2115 temporal_flush += "\t}\n\n";
2118 // Unpack all the temporal attributes referenced in select clause
2119 // and update the last value of the attribute
2120 col_id_set temp_cids; // col ids of temp attributes in select clause
2121 col_id_set::iterator csi;
2123 for(s=0;s<sl_list.size();s++){
2124 data_type *sdt = sl_list[s]->get_data_type();
2125 if (sdt->is_temporal()) {
2126 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
2130 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2131 if(unpacked_cids.count((*csi)) == 0){
2132 int tblref = (*csi).tblvar_ref;
2133 int schref = (*csi).schema_ref;
2134 string field = (*csi).field;
2135 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2137 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2138 sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n",
2139 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2141 ret += "\tif(retval) return 1;\n";
2143 sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
2146 unpacked_cids.insert( (*csi) );
2151 // Do the flush here if this is a real_time query
2152 string rt_level = fs->get_val_of_def("real_time");
2153 if(rt_level != "" && temporal_flush != ""){
2154 for(csi=flush_cids.begin(); csi != flush_cids.end();++csi){
2155 if(unpacked_cids.count((*csi)) == 0){
2156 int tblref = (*csi).tblvar_ref;
2157 int schref = (*csi).schema_ref;
2158 string field = (*csi).field;
2159 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2161 sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n",
2162 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2164 ret += "\tif(retval) return 1;\n";
2166 unpacked_cids.insert( (*csi) );
2169 ret += temporal_flush;
2175 string generate_sel_accept_body(qp_node *fs, string node_name, table_list *schema){
2180 /////////////// Processing for filter-only query
2182 // test passed : create the tuple, then assign to it.
2183 ret += "/*\t\tCreate and post the tuple\t*/\n";
2185 // Unpack partial fcns ref'd by the select clause.
2186 // Its a kind of a WHERE clause ...
2187 for(p=sl_fcns_start;p<sl_fcns_end;p++){
2188 if(fcn_ref_cnt[p] > 1){
2189 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2191 if(is_partial_fcn[p]){
2192 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2193 ret += "\tif(retval) goto end;\n";
2195 if(fcn_ref_cnt[p] > 1){
2196 if(!is_partial_fcn[p]){
2197 ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2199 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2204 // increment the counter of accepted tuples
2205 ret += "\n\t#ifdef LFTA_STATS\n";
2206 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2207 ret += "\t#endif\n\n";
2209 // First, compute the size of the tuple.
2211 // Unpack any BUFFER type selections into temporaries
2212 // so that I can compute their size and not have
2213 // to recompute their value during tuple packing.
2214 // I can use regular assignment here because
2215 // these temporaries are non-persistent.
2217 for(s=0;s<sl_list.size();s++){
2218 data_type *sdt = sl_list[s]->get_data_type();
2219 if(sdt->is_buffer_type()){
2220 sprintf(tmpstr,"\tselvar_%d = ",s);
2222 ret += generate_se_code(sl_list[s],schema);
2228 // The size of the tuple is the size of the tuple struct plus the
2229 // size of the buffers to be copied in.
2231 ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
2232 for(s=0;s<sl_list.size();s++){
2233 data_type *sdt = sl_list[s]->get_data_type();
2234 if(sdt->is_buffer_type()){
2235 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
2242 ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
2243 ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
2245 // Test passed, make assignments to the tuple.
2247 ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
2249 // Mark tuple as REGULAR_TUPLE
2250 ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
2253 for(s=0;s<sl_list.size();s++){
2254 data_type *sdt = sl_list[s]->get_data_type();
2255 if(sdt->is_buffer_type()){
2256 sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
2258 sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
2261 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2263 // if(sdt->needs_hn_translation())
2264 // ret += sdt->hton_translation() +"( ";
2265 ret += generate_se_code(sl_list[s],schema);
2266 // if(sdt->needs_hn_translation())
2274 ret += "\tpost_tuple(tuple);\n";
2276 // Increment the counter of posted tuples
2277 ret += "\n\t#ifdef LFTA_STATS\n";
2278 ret += "\tt->out_tuple_cnt++;\n";
2279 ret+="\tt->out_tuple_sz+=tuple_size;\n";
2280 ret += "\t#endif\n\n";
2287 string generate_fj_accept_body(filter_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
2293 unsigned int window_len = fs->temporal_range;
2294 unsigned int n_bloom = 11;
2295 string n_bloom_str = fs->get_val_of_def("num_bloom");
2296 int tmp_n_bloom = atoi(n_bloom_str.c_str());
2298 n_bloom = tmp_n_bloom+1;
2299 float bloom_width = (window_len+1.0)/(1.0*n_bloom-1);
2300 sprintf(tmpstr,"%f",bloom_width);
2301 string bloom_width_str = tmpstr;
2303 if(window_len < n_bloom){
2304 n_bloom = window_len+1;
2305 bloom_width_str = "1";
2309 // Grab the current window time
2310 scalarexp_t winvar(fs->temporal_var);
2311 ret += "\tcurr_fj_ts = "+generate_se_code(&winvar,schema)+";\n";
2313 int bf_exp_size = 12; // base-2 log of number of bits
2314 string bloom_len_str = fs->get_val_of_def("bloom_size");
2315 int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
2316 if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
2317 bf_exp_size = tmp_bf_exp_size;
2319 int bf_bit_size = 1 << bf_exp_size;
2320 int bf_byte_size = bf_bit_size / (8*sizeof(char));
2322 unsigned int ht_size = 4096;
2323 string ht_size_s = fs->get_val_of_def("aggregate_slots");
2324 int tmp_ht_size = atoi(ht_size_s.c_str());
2325 if(tmp_ht_size > 1024){
2326 unsigned int hs = 1; // make it power of 2
2329 tmp_ht_size = tmp_ht_size >> 1;
2336 for(i=0;i<bf_exp_size;i++)
2337 bf_mask = (bf_mask << 1) | 1;
2339 for(i=ht_size;i>1;i=i>>1)
2340 bf_mask = (bf_mask << 1) | 1;
2344 printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=%d, bf_byte_size=%d, ht_size=%d, ht_size_s=%s, bf_mask=%d\n",
2347 bloom_width_str.c_str(),
2359 // If this is a bloom-filter fj, first test if the
2360 // bloom filter needs to be advanced.
2361 // SET_BF_EMPTY(table,number of bloom filters,bloom filter index,bit index)
2362 // t->bf_size : number of bits in bloom filter
2365 "// Clean out old bloom filters if needed.\n"
2366 " if(t->first_exec){\n"
2367 " t->first_exec = 0;\n"
2368 " t->last_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2369 " t->last_bloom_pos = t->last_bin % "+int_to_string(n_bloom)+";\n"
2371 " curr_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2372 " if(curr_bin != t->last_bin){\n"
2373 " for(the_bin=t->last_bin+1;the_bin<=curr_bin;the_bin++){\n"
2374 " t->last_bloom_pos++;\n"
2375 " if(t->last_bloom_pos >= "+int_to_string(n_bloom)+")\n"
2376 " t->last_bloom_pos = 0;\n"
2377 " tmp_i = t->last_bloom_pos;\n"
2378 " for(j=0;j<"+int_to_string(bf_bit_size)+";j++){\n"
2379 " SET_BF_EMPTY(t->bf_table, "+int_to_string(n_bloom)+", tmp_i,j);\n"
2383 " t->last_bin = curr_bin;\n"
2389 //-----------------------------------------------------------------
2390 // First, determine whether to do S (filter stream) processing.
2393 "// S (filtering stream) predicate, should it be processed?\n"
2396 // Sort S preds based on cost.
2397 vector<cnf_elem *> s_filt = fs->pred_t1;
2398 col_id_set::iterator csi;
2399 if(s_filt.size() > 0){
2401 // Unpack fields ref'd in the S pred
2402 for(w=0;w<s_filt.size();++w){
2403 col_id_set this_pred_cids;
2404 gather_pr_col_ids(s_filt[w]->pr, this_pred_cids, gb_tbl);
2405 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2406 if(unpacked_cids.count( (*csi) ) == 0){
2407 int tblref = (*csi).tblvar_ref;
2408 int schref = (*csi).schema_ref;
2409 string field = (*csi).field;
2410 ret += generate_unpack_code(tblref,schref,field,schema,node_name,"end_s");
2411 unpacked_cids.insert( (*csi) );
2417 // Sort by evaluation cost.
2418 // First, estimate evaluation costs
2419 // Eliminate predicates covered by the prefilter (those in s_pids).
2420 // I need to do it before the sort becuase the indices refer
2421 // to the position in the unsorted list.
2422 vector<cnf_elem *> tmp_wh;
2423 for(w=0;w<s_filt.size();++w){
2424 compute_cnf_cost(s_filt[w],Ext_fcns);
2425 tmp_wh.push_back(s_filt[w]);
2429 sort(s_filt.begin(), s_filt.end(), compare_cnf_cost());
2431 // Now generate the predicates.
2432 for(w=0;w<s_filt.size();++w){
2433 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,s_filt[w]->cost);
2436 // Find partial fcns ref'd in this cnf element
2438 collect_partial_fcns_pr(s_filt[w]->pr, pfcn_refs);
2439 // Since set<..> is a "Sorted Associative Container",
2440 // we can walk through it in sorted order by walking from
2441 // begin() to end(). (and the partial fcns must be
2442 // evaluated in this order).
2443 set<int>::iterator si;
2445 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2446 if(fcn_ref_cnt[(*si)] > 1){
2447 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2449 if(is_partial_fcn[(*si)]){
2450 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2451 ret += "\t\tif(retval) goto end_s;\n";
2453 if(fcn_ref_cnt[(*si)] > 1){
2454 if(!is_partial_fcn[(*si)]){
2455 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2456 // Testing for S is a side branch.
2457 // I don't want a cacheable partial function to be
2458 // marked as evaluated. Therefore I mark the function
2459 // as evalauted ONLY IF it is not partial.
2460 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2466 ret += "\tif( !("+generate_predicate_code(s_filt[w]->pr,schema)+
2467 ") ) goto end_s;\n";
2470 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2473 for(p=0;p<fs->hash_eq.size();++p)
2474 ret += "\t\ts_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_right_se(),schema)+";\n";
2477 // First, generate the S scalar expressions in the hash_eq
2479 // Iterate over the bloom filters
2481 ret += "\t\tbucket=0;\n";
2482 for(p=0;p<fs->hash_eq.size();++p){
2484 " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2485 fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2486 +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2488 // SET_BF_BIT(table,number of bloom filters,bloom filter index,bit index)
2490 " bucket &= "+int_to_string(bf_mask)+";\n"
2491 " SET_BF_BIT(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket);\n"
2496 ret += "\t\tbucket=0;\n";
2497 for(p=0;p<fs->hash_eq.size();++p){
2499 " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2500 fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2501 +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2504 " bucket &= "+int_to_string(bf_mask)+";\n"
2505 " bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2507 // Try the first bucket
2509 for(p=0;p<fs->hash_eq.size();++p){
2510 if(p>0) ret += " && ";
2511 // ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2512 // " == s_equijoin_"+int_to_string(p);
2513 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2514 string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2515 string rhs_op = "s_equijoin_"+int_to_string(p);
2516 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2518 ret += "){\n\t\t\tthe_bucket = bucket;\n";
2519 ret += "\t\t}else {if(";
2520 for(p=0;p<fs->hash_eq.size();++p){
2521 if(p>0) ret += " && ";
2522 // ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2523 // " == s_equijoin_"+int_to_string(p);
2524 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2525 string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2526 string rhs_op = "s_equijoin_"+int_to_string(p);
2527 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2529 ret += "){\n\t\t\tthe_bucket = bucket1;\n";
2530 ret += "\t\t}else{ if(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n";
2531 ret+="\t\t\tthe_bucket = bucket;\n\t\t\telse the_bucket=bucket1;\n";
2533 for(p=0;p<fs->hash_eq.size();++p){
2534 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2535 if(hdt->is_buffer_type()){
2536 sprintf(tmpstr,"\t\t%s(f, &(t->join_table[the_bucket].key_var%d), &s_equijoin_%d);\n", hdt->get_buffer_assign_copy().c_str(), p, p);
2539 ret += "\t\tt->join_table[the_bucket].key_var"+int_to_string(p)+
2540 " = s_equijoin_"+int_to_string(p)+";\n";
2543 ret+="\t\tt->join_table[the_bucket].ts = curr_fj_ts;\n";
2545 ret += "\tend_s:\n";
2547 // ------------------------------------------------------------
2548 // Next, determine if the R record should be processed.
2552 "// R (main stream) cheap predicate\n"
2556 // Unpack r_filt fields
2557 vector<cnf_elem *> r_filt = fs->pred_t0;
2558 for(w=0;w<r_filt.size();++w){
2559 col_id_set this_pred_cids;
2560 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
2561 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2562 if(unpacked_cids.count( (*csi) ) == 0){
2563 int tblref = (*csi).tblvar_ref;
2564 int schref = (*csi).schema_ref;
2565 string field = (*csi).field;
2566 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2567 unpacked_cids.insert( (*csi) );
2572 // Sort S preds based on cost.
2574 vector<cnf_elem *> tmp_wh;
2575 for(w=0;w<r_filt.size();++w){
2576 compute_cnf_cost(r_filt[w],Ext_fcns);
2577 tmp_wh.push_back(r_filt[w]);
2581 sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
2583 // WARNING! the constant 20 below is a wild-ass guess.
2585 for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++)
2587 // Test the cheap filters on R.
2590 // Now generate the predicates.
2591 for(w=0;w<cheap_rpos;++w){
2592 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
2595 // Find partial fcns ref'd in this cnf element
2597 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2598 // Since set<..> is a "Sorted Associative Container",
2599 // we can walk through it in sorted order by walking from
2600 // begin() to end(). (and the partial fcns must be
2601 // evaluated in this order).
2602 set<int>::iterator si;
2603 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2604 if(fcn_ref_cnt[(*si)] > 1){
2605 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2607 if(is_partial_fcn[(*si)]){
2608 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2609 ret += "\t\tif(retval) goto end;\n";
2611 if(fcn_ref_cnt[(*si)] > 1){
2612 if(!is_partial_fcn[(*si)]){
2613 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2615 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2620 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2624 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2627 ret += "\n// Do the join\n\n";
2628 for(p=0;p<fs->hash_eq.size();++p)
2629 ret += "\t\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n";
2632 // Passed the cheap pred, now test the join with S.
2635 ret += "\t\tbucket"+int_to_string(i)+"=0;\n";
2636 for(p=0;p<fs->hash_eq.size();++p){
2638 " bucket"+int_to_string(i)+
2639 " ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2640 fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2641 +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2644 " bucket"+int_to_string(i)+" &= "+int_to_string(bf_mask)+";\n";
2646 ret += "\tfound = 0;\n";
2647 ret += "\tfor(b=0;b<"+int_to_string(n_bloom)+" && !found; b++){\n";
2649 "\t\tif(IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket0) && "
2650 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket1) && "
2651 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket2))\n "
2660 ret += "\tfound = 0;\n";
2661 ret += "\t\tbucket=0;\n";
2662 for(p=0;p<fs->hash_eq.size();++p){
2664 " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2665 fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2666 +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2669 " bucket &= "+int_to_string(bf_mask)+";\n"
2670 " bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2672 // Try the first bucket
2674 for(p=0;p<fs->hash_eq.size();++p){
2675 if(p>0) ret += " && ";
2676 // ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2677 // " == r_equijoin_"+int_to_string(p);
2678 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2679 string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2680 string rhs_op = "s_equijoin_"+int_to_string(p);
2681 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2683 if(p>0) ret += " && ";
2684 ret += "t->join_table[bucket].ts+"+int_to_string(fs->temporal_range)+" <= curr_fj_ts";
2685 ret += "){\n\t\t\tfound = 1;\n";
2686 ret += "\t\t}else {if(";
2687 for(p=0;p<fs->hash_eq.size();++p){
2688 if(p>0) ret += " && ";
2689 // ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2690 // " == r_equijoin_"+int_to_string(p);
2691 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2692 string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2693 string rhs_op = "s_equijoin_"+int_to_string(p);
2694 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2696 if(p>0) ret += " && ";
2697 ret += "t->join_table[bucket1].ts+"+int_to_string(fs->temporal_range)+" <= curr_fj_ts";
2698 ret += ")\n\t\t\tfound=1;\n";
2707 // Test the expensive filters on R.
2708 if(cheap_rpos < r_filt.size()){
2710 // Now generate the predicates.
2711 for(w=cheap_rpos;w<r_filt.size();++w){
2712 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
2715 // Find partial fcns ref'd in this cnf element
2717 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2718 // Since set<..> is a "Sorted Associative Container",
2719 // we can walk through it in sorted order by walking from
2720 // begin() to end(). (and the partial fcns must be
2721 // evaluated in this order).
2722 set<int>::iterator si;
2723 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2724 if(fcn_ref_cnt[(*si)] > 1){
2725 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2727 if(is_partial_fcn[(*si)]){
2728 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2729 ret += "\t\tif(retval) goto end;\n";
2731 if(fcn_ref_cnt[(*si)] > 1){
2732 if(!is_partial_fcn[(*si)]){
2733 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2735 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2740 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2744 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2749 /////////////// post the tuple
2751 // test passed : create the tuple, then assign to it.
2752 ret += "/*\t\tCreate and post the tuple\t*/\n";
2754 // Unpack r_filt fields
2755 for(s=0;s<sl_list.size();++s){
2756 col_id_set this_se_cids;
2757 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
2758 for(csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
2759 if(unpacked_cids.count( (*csi) ) == 0){
2760 int tblref = (*csi).tblvar_ref;
2761 int schref = (*csi).schema_ref;
2762 string field = (*csi).field;
2763 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2764 unpacked_cids.insert( (*csi) );
2770 // Unpack partial fcns ref'd by the select clause.
2771 // Its a kind of a WHERE clause ...
2772 for(p=sl_fcns_start;p<sl_fcns_end;p++){
2773 if(fcn_ref_cnt[p] > 1){
2774 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2776 if(is_partial_fcn[p]){
2777 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2778 ret += "\tif(retval) goto end;\n";
2780 if(fcn_ref_cnt[p] > 1){
2781 if(!is_partial_fcn[p]){
2782 ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2784 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2789 // increment the counter of accepted tuples
2790 ret += "\n\t#ifdef LFTA_STATS\n";
2791 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2792 ret += "\t#endif\n\n";
2794 // First, compute the size of the tuple.
2796 // Unpack any BUFFER type selections into temporaries
2797 // so that I can compute their size and not have
2798 // to recompute their value during tuple packing.
2799 // I can use regular assignment here because
2800 // these temporaries are non-persistent.
2802 for(s=0;s<sl_list.size();s++){
2803 data_type *sdt = sl_list[s]->get_data_type();
2804 if(sdt->is_buffer_type()){
2805 sprintf(tmpstr,"\tselvar_%d = ",s);
2807 ret += generate_se_code(sl_list[s],schema);
2813 // The size of the tuple is the size of the tuple struct plus the
2814 // size of the buffers to be copied in.
2816 ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
2817 for(s=0;s<sl_list.size();s++){
2818 data_type *sdt = sl_list[s]->get_data_type();
2819 if(sdt->is_buffer_type()){
2820 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
2827 ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
2828 ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
2830 // Test passed, make assignments to the tuple.
2832 ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
2834 // Mark tuple as REGULAR_TUPLE
2835 ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
2838 for(s=0;s<sl_list.size();s++){
2839 data_type *sdt = sl_list[s]->get_data_type();
2840 if(sdt->is_buffer_type()){
2841 sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
2843 sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
2846 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2848 // if(sdt->needs_hn_translation())
2849 // ret += sdt->hton_translation() +"( ";
2850 ret += generate_se_code(sl_list[s],schema);
2851 // if(sdt->needs_hn_translation())
2859 ret += "\tpost_tuple(tuple);\n";
2861 // Increment the counter of posted tuples
2862 ret += "\n\t#ifdef LFTA_STATS\n";
2863 ret += "\n\tt->out_tuple_cnt++;\n\n";
2864 ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
2865 ret += "\t#endif\n\n";
2871 string generate_aggr_accept_body(qp_node *fs,string node_name,table_list *schema, string &temporal_flush){
2875 ////////////// Processing for aggregtion query
2877 // First, search for a match. Start by unpacking the group-by attributes.
2879 // One complication : if a real-time aggregate flush occurs,
2880 // the GB attr has already been calculated. So don't compute
2881 // it again if 1) its temporal and 2) it will be computed in the
2882 // agggregate flush code.
2884 // Unpack the partial fcns ref'd by the gb's and the aggr defs.
2885 for(p=gb_fcns_start;p<gb_fcns_end;p++){
2886 if(is_partial_fcn[p]){
2887 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2888 ret += "\tif(retval) goto end;\n";
2891 for(p=ag_fcns_start;p<ag_fcns_end;p++){
2892 if(is_partial_fcn[p]){
2893 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2894 ret += "\tif(retval) goto end;\n";
2898 // increment the counter of accepted tuples
2899 ret += "\n\t#ifdef LFTA_STATS\n";
2900 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2901 ret += "\t#endif\n\n";
2903 ret += "/*\t\tTest if the group is in the hash table \t*/\n";
2904 // Compute the values of the group-by variables.
2905 for(g=0;g<gb_tbl->size();g++){
2906 data_type *gdt = gb_tbl->get_data_type(g);
2907 if((! gdt->is_temporal()) || temporal_flush == ""){
2909 if(gdt->is_buffer_type()){
2910 // NOTE : if the SE defining the gb is anything
2911 // other than a ref to a variable, this will generate
2912 // illegal code. To be resolved with Spatch.
2913 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2914 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2916 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2917 gdt->get_buffer_assign_copy().c_str(), g, g);
2919 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2926 // A quick aside : if any of the GB attrs are temporal,
2927 // test for change and flush if any change occurred.
2928 // We've already computed the flush code,
2929 // Put it here if this is not a real time query.
2930 // We've already unpacked all column refs, so no need to
2931 // do it again here.
2933 string rt_level = fs->get_val_of_def("real_time");
2934 if(rt_level == "" && temporal_flush != ""){
2935 ret += temporal_flush;
2938 // Compute the hash bucket
2939 if(gb_tbl->size() > 0){
2940 ret += "\thashval = ";\
2941 for(g=0;g<gb_tbl->size();g++){
2942 if(g>0) ret += " ^ ";
2943 data_type *gdt = gb_tbl->get_data_type(g);
2944 if(gdt->is_buffer_type()){
2945 sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
2946 gdt->get_type_str().c_str(), g);
2948 sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
2949 gdt->get_type_str().c_str(), g);
2954 ret += "\thash2 = ((hashval * "+hash_nums[g%NRANDS]+") >> 32) & SLOT_HASH_BITS;\n";
2955 ret+="\tprobe = (hashval >> 32) & (t->max_aggrs-1);\n";
2957 ret+="\tprobe = 0;\n";
2958 ret+="\thash2 = 0;\n\n";
2961 // Does the lfta reference a udaf?
2962 bool has_udaf = false;
2963 for(a=0;a<aggr_tbl->size();a++){
2964 if(! aggr_tbl->is_builtin(a)) has_udaf = true;
2967 // Scan for a match, or alternatively the best slot.
2968 // Currently, hardcode 5 tests.
2970 " gen_val = t->generation & SLOT_GEN_BITS;\n"
2971 " match_found = 0;\n"
2972 " best_slot = probe;\n"
2973 " for(i=0;i<5 && match_found == 0;i++){\n"
2974 " if((t->aggr_table_hashmap[probe] & SLOT_FILLED) && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_HASH_BITS) == hash2 ){\n"
2976 if(gb_tbl->size()>0){
2977 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
2979 string rhs_op, lhs_op;
2980 for(g=0;g<gb_tbl->size();g++){
2981 if(g>0) ret += " && ";
2983 sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
2984 sprintf(tmpstr,"t->aggr_table[probe].gb_var%d",g); rhs_op = tmpstr;
2985 ret += generate_equality_test(lhs_op,rhs_op,gb_tbl->get_data_type(g));
2990 " match_found = 1;\n"
2991 " best_slot = probe;\n"
2994 "// Rate slots in case no match found: prefer empty, then full but old slots\n"
2995 " if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
2996 " if((t->aggr_table_hashmap[probe] & SLOT_FILLED)==0)\n"
2997 " best_slot = probe;\n"
2999 " if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) != gen_val){\n"
3000 " best_slot = probe;\n"
3004 " if(probe >= t->max_aggrs)\n"
3007 " if(match_found){\n"
3009 ret += generate_gb_update(node_name, schema, "best_slot",has_udaf);
3012 " if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3014 printf("sgah_qpn name is %s, disorder is %d\n",fs->node_name.c_str(),((sgah_qpn *)fs)->lfta_disorder);
3015 if(((sgah_qpn *)fs)->lfta_disorder <= 1){
3017 " if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS)==gen_val){\n"
3019 bool first_g = true;
3020 for(int g=0;g<gb_tbl->size();g++){
3021 data_type *gdt = gb_tbl->get_data_type(g);
3022 if(gdt->is_temporal()){
3023 if(first_g) first_g = false; else ret+=" + ";
3024 ret += "(gb_attr_"+int_to_string(g)+" - t->aggr_table[best_slot].gb_var"+int_to_string(g)+")";
3027 ret += ") == 0 ){\n";
3030 " fta_aggr_flush_old_"+ node_name+"(f,t->max_aggrs);\n"
3036 ret += generate_tuple_from_aggr(node_name,schema,"best_slot");
3038 "\t\t\t#ifdef LFTA_STATS\n"
3039 "\t\t\tif((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val)\n"
3040 "\t\t\t\tt->collision_cnt++;\n\n"
3044 ret += generate_init_group(schema,"best_slot");
3054 string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, set<unsigned int> &s_pids){
3056 string ret="static gs_retval_t accept_packet_"+node_name+
3057 "(struct FTA *f, FTAID * ftaid, void *pkt, gs_int32_t sz){\n";
3058 ret += "\tstruct packet *p = (struct packet *)pkt;\n";
3062 // Define all of the variables needed by this
3066 // Gather all column references, need to define unpacking variables.
3069 col_id_set::iterator csi;
3071 // If its a filter join, rebind all colrefs
3072 // to the first range var, to avoid double unpacking.
3075 for(w=0;w<where.size();++w)
3076 reset_pr_col_ids_tblvars(where[w]->pr, gb_tbl);
3077 for(s=0;s<sl_list.size();s++)
3078 reset_se_col_ids_tblvars(sl_list[s], gb_tbl);
3081 for(w=0;w<where.size();++w){
3082 if(is_fj || s_pids.count(w) == 0)
3083 gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
3085 for(s=0;s<sl_list.size();s++){
3086 gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
3091 for(g=0;g<gb_tbl->size();g++)
3092 gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
3095 // Variables for unpacking attributes.
3096 ret += "/*\t\tVariables for unpacking attributes\t*/\n";
3097 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
3098 int schref = (*csi).schema_ref;
3099 int tblref = (*csi).tblvar_ref;
3100 string field = (*csi).field;
3101 data_type dt(schema->get_type_name(schref,field));
3102 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
3103 field.c_str(), tblref);
3109 // Variables that are always needed
3110 ret += "/*\t\tVariables which are always needed\t*/\n";
3111 ret += "\tgs_retval_t retval;\n";
3112 ret += "\tgs_int32_t tuple_size, tuple_pos, lfta_bailout;\n";
3113 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
3115 ret+="\tstruct "+generate_fta_name(node_name)+" *t = (struct "+generate_fta_name(node_name)+"*) f;\n\n";
3118 // Variables needed for aggregation queries.
3120 ret += "\n/*\t\tVariables for aggregation\t*/\n";
3121 ret+="\tunsigned int i, probe;\n";
3122 ret+="\tunsigned int gen_val, match_found, best_slot;\n";
3123 ret+="\tgs_uint64_t hashval, hash2;\n";
3124 // Variables for storing group-by attribute values.
3125 if(gb_tbl->size() > 0)
3126 ret += "/*\t\tGroup-by attributes\t*/\n";
3127 for(g=0;g<gb_tbl->size();g++){
3128 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3130 data_type *gdt = gb_tbl->get_data_type(g);
3131 if(gdt->is_buffer_type()){
3132 sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3137 // Temporaries for min/max
3138 string aggr_tmp_str = "";
3139 for(a=0;a<aggr_tbl->size();a++){
3140 string aggr_op = aggr_tbl->get_op(a);
3141 if(aggr_op == "MIN" || aggr_op == "MAX"){
3142 sprintf(tmpstr,"\t%s aggr_tmp_%d;\n",aggr_tbl->get_data_type(a)->get_cvar_type().c_str(),a);
3143 aggr_tmp_str.append(tmpstr);
3146 if(aggr_tmp_str != ""){
3147 ret += "/*\t\tTemp vars for BUFFER aggregates\t*/\n";
3148 ret += aggr_tmp_str;
3151 // Variables for udaf output temporaries
3152 bool no_udaf = true;
3153 for(a=0;a<aggr_tbl->size();a++){
3154 if(! aggr_tbl->is_builtin(a)){
3156 ret+="/*\t\tUDAF output vars.\t*/\n";
3159 int afcn_id = aggr_tbl->get_fcn_id(a);
3160 data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
3161 sprintf(tmpstr,"udaf_ret%d", a);
3162 ret+="\t"+adt->make_cvar(tmpstr)+";\n";
3167 // Variables needed for a filter join query
3168 if(fs->node_type() == "filter_join"){
3169 filter_join_qpn *fjq = (filter_join_qpn *)fs;
3170 bool uses_bloom = fjq->use_bloom;
3171 ret += "/*\t\tJoin fields\t*/\n";
3172 for(g=0;g<fjq->hash_eq.size();g++){
3173 sprintf(tmpstr,"\t%s s_equijoin_%d, r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g,g);
3178 " /* Variables for fj bloom filter */ \n"
3179 "\tunsigned int i=0,j=0,k=0, b, bf_clean = 0, tmp_i, found; \n"
3180 "\tunsigned int bucket, bucket0, bucket1, bucket2;\n"
3181 "\tlong long int curr_fj_ts;\n"
3182 "\tunsigned int curr_bin, the_bin;\n"
3187 " /* Variables for fj join table */ \n"
3188 "\tunsigned int i, bucket, found; \n"
3189 "\tunsigned int bucket1, the_bucket;\n"
3190 " long long int curr_fj_ts;\n"
3197 // Variables needed to store selected attributes of BUFFER type
3198 // temporarily, in order to compute their size for storage
3199 // in an output tuple.
3201 string select_var_defs = "";
3202 for(s=0;s<sl_list.size();s++){
3203 data_type *sdt = sl_list[s]->get_data_type();
3204 if(sdt->is_buffer_type()){
3205 sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
3206 select_var_defs.append(tmpstr);
3209 if(select_var_defs != ""){
3210 ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
3211 ret += select_var_defs;
3214 // Variables to store results of partial functions.
3216 if(partial_fcns.size()>0){
3217 ret += "/*\t\tVariables for storing results of partial functions. \t*/\n";
3218 for(p=0;p<partial_fcns.size();++p){
3219 if(is_partial_fcn[p] || (!is_aggr_query && fcn_ref_cnt[p] >1)){
3220 sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
3221 partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
3223 if(!is_aggr_query && fcn_ref_cnt[p] >1){
3224 ret += "\tint fcn_ref_cnt_"+int_to_string(p)+" = 0;\n";
3229 if(is_aggr_query) ret += "\tint unpack_failed = 0;\n";
3233 // variable to hold packet struct //
3235 ret += "\t struct "+node_name+"_input_struct *"+node_name+"_input_struct_var;\n";
3239 ret += "\t#ifdef LFTA_STATS\n";
3240 // variable to store counter of cpu cycles spend in accept_tuple
3241 ret += "\tgs_uint64_t start_cycle = rdtsc();\n";
3242 // increment counter of received tuples
3243 ret += "\tt->in_tuple_cnt++;\n";
3244 ret += "\t#endif\n";
3247 // -------------------------------------------------
3248 // If the packet is "packet", test if its for this lfta,
3249 // and if so load it into its struct
3252 ret+="\n/* packed tuple : test and load. \t*/\n";
3253 ret+="\t"+node_name+"_input_struct_var = (struct "+node_name+"_input_struct *) pkt;\n";
3254 ret+="\tif("+node_name+"_input_struct_var->__lfta_id_fm_nic__ != "+int_to_string(global_id) + ")\n";
3255 ret+="\t\tgoto end;\n\n";
3260 col_id_set unpacked_cids; // Keep track of the cols that have been unpacked.
3262 string temporal_flush;
3264 ret += generate_aggr_accept_prelim(fs, node_name, schema, unpacked_cids, temporal_flush);
3265 else { // non-aggregation operators
3267 // Unpack all the temporal attributes referenced in select clause
3268 // and update the last value of the attribute
3269 col_id_set temp_cids; // col ids of temp attributes in select clause
3271 for(s=0;s<sl_list.size();s++){
3272 data_type *sdt = sl_list[s]->get_data_type();
3273 if (sdt->is_temporal()) {
3274 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
3277 // If this is a filter join,
3278 // ensure that the temporal range field is unpacked.
3280 col_id window_var_cid(((filter_join_qpn *)fs)->temporal_var);
3281 if(temp_cids.count(window_var_cid)==0)
3282 temp_cids.insert(window_var_cid);
3285 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
3286 if(unpacked_cids.count((*csi)) == 0){
3287 int tblref = (*csi).tblvar_ref;
3288 int schref = (*csi).schema_ref;
3289 string field = (*csi).field;
3290 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3291 sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
3294 unpacked_cids.insert( (*csi) );
3300 vector<cnf_elem *> filter = fs->get_filter_clause();
3301 // Test the filter predicate (some query types have additional preds).
3302 if(filter.size() > 0){
3304 // Sort by evaluation cost.
3305 // First, estimate evaluation costs
3306 // Eliminate predicates covered by the prefilter (those in s_pids).
3307 // I need to do it before the sort becuase the indices refer
3308 // to the position in the unsorted list./
3309 vector<cnf_elem *> tmp_wh;
3310 for(w=0;w<filter.size();++w){
3311 if(s_pids.count(w) == 0){
3312 compute_cnf_cost(filter[w],Ext_fcns);
3313 tmp_wh.push_back(filter[w]);
3318 sort(filter.begin(), filter.end(), compare_cnf_cost());
3320 // Now generate the predicates.
3321 for(w=0;w<filter.size();++w){
3322 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,filter[w]->cost);
3324 // Find the set of variables accessed in this CNF elem,
3325 // but in no previous element.
3326 col_id_set this_pred_cids;
3327 gather_pr_col_ids(filter[w]->pr, this_pred_cids, gb_tbl);
3328 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3329 if(unpacked_cids.count( (*csi) ) == 0){
3330 int tblref = (*csi).tblvar_ref;
3331 int schref = (*csi).schema_ref;
3332 string field = (*csi).field;
3333 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3334 unpacked_cids.insert( (*csi) );
3337 // Find partial fcns ref'd in this cnf element
3339 collect_partial_fcns_pr(filter[w]->pr, pfcn_refs);
3340 // Since set<..> is a "Sorted Associative Container",
3341 // we can walk through it in sorted order by walking from
3342 // begin() to end(). (and the partial fcns must be
3343 // evaluated in this order).
3344 set<int>::iterator si;
3345 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3346 if(fcn_ref_cnt[(*si)] > 1){
3347 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3349 if(is_partial_fcn[(*si)]){
3350 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3351 ret += "\t\tif(retval) goto end;\n";
3353 if(fcn_ref_cnt[(*si)] > 1){
3354 if(!is_partial_fcn[(*si)]){
3355 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3357 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3362 ret += "\tif( !("+generate_predicate_code(filter[w]->pr,schema)+
3366 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
3370 // We've passed the WHERE clause,
3371 // unpack the remainder of the accessed fields.
3373 ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
3374 vector<cnf_elem *> h_eq = ((filter_join_qpn *)fs)-> hash_eq;
3375 for(w=0;w<h_eq.size();++w){
3376 col_id_set this_pred_cids;
3377 gather_pr_col_ids(h_eq[w]->pr, this_pred_cids, gb_tbl);
3378 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3379 if(unpacked_cids.count( (*csi) ) == 0){
3380 int tblref = (*csi).tblvar_ref;
3381 int schref = (*csi).schema_ref;
3382 string field = (*csi).field;
3383 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3384 unpacked_cids.insert( (*csi) );
3389 ret += "\n/*\tPassed the WHERE clause, unpack the rest of the accessed fields. */\n";
3391 for(csi=cid_set.begin();csi!=cid_set.end();++csi){
3392 if(unpacked_cids.count( (*csi) ) == 0){
3393 int schref = (*csi).schema_ref;
3394 int tblref = (*csi).tblvar_ref;
3395 string field = (*csi).field;
3396 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3397 unpacked_cids.insert( (*csi) );
3404 ////////////////// After this, the query types
3405 ////////////////// are processed differently.
3407 if(!is_aggr_query && !is_fj)
3408 ret += generate_sel_accept_body(fs, node_name, schema);
3409 else if(is_aggr_query)
3410 ret += generate_aggr_accept_body(fs, node_name, schema, temporal_flush);
3412 ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
3417 ret += "\n\tend:\n";
3418 ret += "\t#ifdef LFTA_STATS\n";
3419 ret+= "\tt->cycle_cnt += rdtsc() - start_cycle;\n";
3420 ret += "\t#endif\n";
3421 ret += "\n\treturn 1;\n}\n\n";
3427 string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, bool is_aggr_query, bool is_fj, bool uses_bloom){
3430 string ret = "struct FTA * "+generate_alloc_name(node_name) +
3431 "(struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command, gs_int32_t sz, void *value){\n";
3433 ret+="\tstruct "+generate_fta_name(node_name)+"* f;\n";
3436 ret+="\tif((f=fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n";
3438 // assign a streamid to fta instance
3439 ret+="\t/* assign a streamid */\n";
3440 ret+="\tf->f.ftaid = ftaid;\n";
3441 ret+="\tf->f.ftaid.streamid = (gs_p_t)f;\n";
3442 ret+="\tgslog(LOG_INFO,\"Lfta "+node_name+" has FTAID {ip=%u,port=%u,index=%u,streamid=%u}\\n\",f->f.ftaid.ip,f->f.ftaid.port,f->f.ftaid.index,f->f.ftaid.streamid);\n";
3445 ret += "\tf->n_aggrs = 0;\n";
3447 ret += "\tf->max_aggrs = ";
3449 // Computing the number of aggregate blocks is a little
3450 // tricky. If there are no GB attrs, or if all GB attrs
3451 // are temporal, then use a single aggregate block, else
3452 // use a default value (10). A user specification overrides
3454 bool single_group = true;
3455 for(g=0;g<gb_tbl->size();g++){
3456 data_type *gdt = gb_tbl->get_data_type(g);
3457 if(! gdt->is_temporal() ){
3458 single_group = false;
3461 string max_aggr_str = fs->get_val_of_def("aggregate_slots");
3462 int max_aggr_i = atoi(max_aggr_str.c_str());
3463 if(max_aggr_i <= 0){
3467 ret += int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
3469 unsigned int naggrs = 1; // make it power of 2
3470 unsigned int nones = 0;
3474 naggrs = naggrs << 1;
3475 max_aggr_i = max_aggr_i >> 1;
3477 if(nones==1) // in case it was already a power of 2.
3479 ret += int_to_string(naggrs);
3483 ret+="\tif ((f->aggr_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n";
3484 ret+="\t\treturn(0);\n";
3486 // ret+="/* compute how many integers we need to store the hashmap */\n";
3487 // ret+="\tf->bitmap_size = (f->max_aggrs % (sizeof(gs_uint32_t) * 4)) ? (f->max_aggrs / (sizeof(gs_uint32_t) * 4) + 1) : (f->max_aggrs / (sizeof(gs_uint32_t) * 4));\n\n";
3488 ret+="\tif ((f->aggr_table_hashmap = sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n";
3489 ret+="\t\treturn(0);\n";
3491 ret+="/*\t\tfill bitmap with zero \t*/\n";
3492 ret+="\tfor (i = 0; i < f->max_aggrs; ++i)\n";
3493 ret+="\t\tf->aggr_table_hashmap[i] = 0;\n";
3494 ret+="\tf->generation=0;\n";
3495 ret+="\tf->flush_pos = f->max_aggrs;\n";
3497 ret += "\tf->flush_ctr = 0;\n";
3503 ret+="\tf->first_exec = 1;\n";
3504 unsigned int n_bloom = 11;
3505 string n_bloom_str = fs->get_val_of_def("num_bloom");
3506 int tmp_n_bloom = atoi(n_bloom_str.c_str());
3508 n_bloom = tmp_n_bloom+1;
3510 unsigned int window_len = ((filter_join_qpn *)fs)->temporal_range;
3511 if(window_len < n_bloom){
3512 n_bloom = window_len+1;
3515 int bf_exp_size = 12; // base-2 log of number of bits
3516 string bloom_len_str = fs->get_val_of_def("bloom_size");
3517 int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
3518 if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
3519 bf_exp_size = tmp_bf_exp_size;
3521 int bf_bit_size = 1 << 12;
3522 int bf_byte_size = bf_bit_size / (8*sizeof(char));
3524 int bf_tot = n_bloom*bf_byte_size;
3525 ret+="\tif ((f->bf_table = sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n";
3526 ret+="\t\treturn(0);\n";
3529 " for(i=0;i<"+int_to_string(bf_tot)+";i++)\n"
3530 " f->bf_table[i] = 0;\n"
3533 unsigned int ht_size = 4096;
3534 string ht_size_s = fs->get_val_of_def("aggregate_slots");
3535 int tmp_ht_size = atoi(ht_size_s.c_str());
3536 if(tmp_ht_size > 1024){
3537 unsigned int hs = 1; // make it power of 2
3540 tmp_ht_size = tmp_ht_size >> 1;
3544 ret+="\tif ((f->join_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n";
3545 ret+="\t\treturn(0);\n";
3548 " for(i=0;i<"+int_to_string(ht_size)+";i++)\n"
3549 " f->join_table[i].ts = 0;\n"
3554 // Initialize the complex literals (which might be handles).
3556 for(cl=0;cl<complex_literals->size();cl++){
3557 literal_t *l = complex_literals->get_literal(cl);
3558 // sprintf(tmpstr,"\tf->complex_literal_%d = ",cl);
3559 // ret += tmpstr + l->to_C_code() + ";\n";
3560 sprintf(tmpstr,"&(f->complex_literal_%d)",cl);
3561 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
3566 // Initialize the last seen values of temporal attributes to min(max) value of
3567 // their respective type
3568 // Create places to hold the last values of temporal attributes referenced in select clause
3571 col_id_set temp_cids; // col ids of temp attributes in select clause
3574 col_id_set::iterator csi;
3576 for(s=0;s<sl_list.size();s++){
3577 data_type *sdt = sl_list[s]->get_data_type();
3578 if (sdt->is_temporal()) {
3579 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
3583 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
3584 int tblref = (*csi).tblvar_ref;
3585 int schref = (*csi).schema_ref;
3586 string field = (*csi).field;
3587 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
3588 if (dt.is_increasing()) {
3589 sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_min_literal().c_str());
3591 } else if (dt.is_decreasing()) {
3592 sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_max_literal().c_str());
3597 // initialize last seen values of temporal groubpy variables
3599 for(g=0;g<gb_tbl->size();g++){
3600 data_type *dt = gb_tbl->get_data_type(g);
3601 if(dt->is_temporal()){
3603 fprintf(stderr,"group by attribute %s is temporal, ",
3604 gb_tbl->get_name(g).c_str());
3606 if(dt->is_increasing()){
3607 sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_min_literal().c_str());
3609 sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_max_literal().c_str());
3616 ret += "\tf->f.alloc_fta="+generate_alloc_name(node_name)+";\n";
3617 ret+="\tf->f.free_fta=free_fta_"+node_name+";\n";
3618 ret+="\tf->f.control_fta=control_fta_"+node_name+";\n";
3619 ret+="\tf->f.accept_packet=accept_packet_"+node_name+";\n";
3620 ret+="\tf->f.clock_fta=clock_fta_"+node_name+";\n\n";
3622 // Initialize runtime stats
3623 ret+="\tf->in_tuple_cnt = 0;\n";
3624 ret+="\tf->out_tuple_cnt = 0;\n";
3625 ret+="\tf->out_tuple_sz = 0;\n";
3626 ret+="\tf->accepted_tuple_cnt = 0;\n";
3627 ret+="\tf->cycle_cnt = 0;\n";
3628 ret+="\tf->collision_cnt = 0;\n";
3629 ret+="\tf->eviction_cnt = 0;\n";
3630 ret+="\tf->sampling_rate = 1.0;\n";
3632 ret+="\tf->trace_id = 0;\n\n";
3633 if(param_tbl->size() > 0){
3635 "\tif(load_params_"+node_name+"(f, sz, value, 1)){\n"
3636 "#ifndef LFTA_IN_NIC\n"
3637 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small (%d). This query does not have valid parameters, bailing out.\\n\",sz);\n"
3645 // Register the pass-by-handle parameters
3647 for(ph=0;ph<param_handle_table.size();++ph){
3648 data_type pdt(param_handle_table[ph]->type_name);
3649 sprintf(tmpstr,"\tf->handle_param_%d = %s((struct FTA *)f,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
3650 switch(param_handle_table[ph]->val_type){
3653 if(pdt.is_buffer_type()) ret += "&(";
3654 sprintf(tmpstr,"f->complex_literal_%d",param_handle_table[ph]->complex_literal_idx);
3656 if(pdt.is_buffer_type()) ret += ")";
3660 // not complex, no constructor
3662 ret += param_handle_table[ph]->litval->to_C_code("") + ");\n";
3665 // query parameter handles are regstered/deregistered in the
3666 // load_params function.
3667 // ret += "t->param_"+param_handle_table[ph]->param_name;
3670 fprintf(stderr, "INTERNAL ERROR unknown case found when processing pass-by-handle parameter table.\n");
3675 ret += "\treturn (struct FTA *) f;\n";
3684 //////////////////////////////////////////////////////////////////
3686 string generate_lfta_block(qp_node *fs, table_list *schema, int gid,
3687 // map<string,string> &int_fcn_defs,
3688 ext_fcn_list *Ext_fcns, string &schema_embed_str, ifq_t *ifdb, nic_property *nicp, set<unsigned int> &s_pids){
3693 /////////////////////////////////////////////////////////////
3694 /// Do operator-generic processing, such as
3695 /// gathering the set of referenced columns,
3696 /// generating structures, etc.
3698 // Initialize globals to empty.
3699 gb_tbl = NULL; aggr_tbl = NULL;
3700 global_id = -1; nicprop = NULL;
3701 param_tbl = fs->get_param_tbl();
3702 sl_list.clear(); where.clear();
3703 partial_fcns.clear();
3704 fcn_ref_cnt.clear(); is_partial_fcn.clear();
3705 pred_class.clear(); pred_pos.clear();
3706 sl_fcns_start=sl_fcns_end=wh_fcns_start=wh_fcns_end=0;
3707 gb_fcns_start=gb_fcns_end=ag_fcns_start=ag_fcns_end=0;
3710 // Does the lfta read packed results from the NIC?
3711 nicprop = nicp; // load into global
3713 packed_return = false;
3714 if(nicp && nicp->option_exists("Return")){
3715 if(nicp->option_value("Return") == "Packed"){
3716 packed_return = true;
3718 fprintf(stderr,"Warning, nic option value of Return=%s is not recognized, ignoring\n",nicp->option_value("Return").c_str());
3723 // Extract data which defines the query.
3724 // complex literals gathered now.
3725 complex_literals = fs->get_cplx_lit_tbl(Ext_fcns);
3726 param_handle_table = fs->get_handle_param_tbl(Ext_fcns);
3727 string node_name = fs->get_node_name();
3728 bool is_fj = false, uses_bloom = false;
3731 if(fs->node_type() == "spx_qpn"){
3732 is_aggr_query = false;
3733 spx_qpn *spx_node = (spx_qpn *)fs;
3734 sl_list = spx_node->get_select_se_list();
3735 where = spx_node->get_where_clause();
3739 if(fs->node_type() == "sgah_qpn"){
3740 is_aggr_query = true;
3741 sgah_qpn *sgah_node = (sgah_qpn *)fs;
3742 sl_list = sgah_node->get_select_se_list();
3743 where = sgah_node->get_where_clause();
3744 gb_tbl = sgah_node->get_gb_tbl();
3745 aggr_tbl = sgah_node->get_aggr_tbl();
3747 if((sgah_node->get_having_clause()).size() > 0){
3748 fprintf(stderr,"Warning in LFTA %s: HAVING clause will be ignored.\n", fs->get_node_name().c_str());
3751 if(fs->node_type() == "filter_join"){
3752 is_aggr_query = false;
3754 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
3755 sl_list = fj_node->get_select_se_list();
3756 where = fj_node->get_where_clause();
3757 uses_bloom = fj_node->use_bloom;
3761 fprintf(stderr,"INTERNAL ERROR, unrecognized node type %s in generate_lfta_block\n", fs->node_type().c_str());
3765 // Build list of "partial functions", by clause.
3766 // NOTE : partial fcns are not handles well.
3767 // The act of searching for them associates the fcn call
3768 // in the SE with an index to an array. Refs to the
3769 // fcn value are replaced with refs to the variable they are
3770 // unpacked into. A more general tagging mechanism would be better.
3773 vector<bool> *pfunc_ptr = NULL;
3774 vector<int> *ref_cnt_ptr = NULL;
3775 if(!is_aggr_query){ // don't collect cacheable fcns on aggr query.
3776 ref_cnt_ptr = &fcn_ref_cnt;
3777 pfunc_ptr = &is_partial_fcn;
3781 for(i=0;i<sl_list.size();i++){
3782 find_partial_fcns(sl_list[i], &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
3784 wh_fcns_start = sl_fcns_end = partial_fcns.size();
3785 for(i=0;i<where.size();i++){
3786 find_partial_fcns_pr(where[i]->pr, &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
3788 gb_fcns_start = wh_fcns_end = partial_fcns.size();
3790 for(i=0;i<gb_tbl->size();i++){
3791 find_partial_fcns(gb_tbl->get_def(i), &partial_fcns, NULL, NULL, Ext_fcns);
3794 ag_fcns_start = gb_fcns_end = partial_fcns.size();
3795 if(aggr_tbl != NULL){
3796 for(i=0;i<aggr_tbl->size();i++){
3797 find_partial_fcns(aggr_tbl->get_aggr_se(i), NULL, NULL, &is_partial_fcn, Ext_fcns);
3800 ag_fcns_end = partial_fcns.size();
3802 // Fill up the is_partial_fcn and fcn_ref_cnt arrays.
3804 for(i=0; i<partial_fcns.size();i++){
3805 fcn_ref_cnt.push_back(1);
3806 is_partial_fcn.push_back(true);
3810 // Unmark non-partial expensive functions referenced only once.
3811 for(i=0; i<partial_fcns.size();i++){
3812 if(!is_partial_fcn[i] && fcn_ref_cnt[i] <= 1){
3813 partial_fcns[i]->set_partial_ref(-1);
3817 node_name = normalize_name(node_name);
3819 retval += generate_preamble(schema, /*int_fcn_defs,*/ node_name, schema_embed_str);
3821 if(packed_return){ // generate unpack struct
3822 vector<tablevar_t *> input_tbls = fs->get_input_tbls();
3823 int schref = input_tbls[0]->get_schema_ref();
3824 vector<string> refd_cols;
3825 for(s=0;s<sl_list.size();++s){
3826 gather_nicsafe_cols(sl_list[s],refd_cols, nicp, gb_tbl);
3828 for(p=0;p<where.size();++p){
3829 // I'm not disabling these preds ...
3830 gather_nicsafe_cols(where[p]->pr,refd_cols, nicp, gb_tbl);
3833 for(g=0;g<gb_tbl->size();++g){
3834 gather_nicsafe_cols(gb_tbl->get_def(g),refd_cols, nicp, gb_tbl);
3837 sort(refd_cols.begin(), refd_cols.end());
3838 retval += "struct "+node_name+"_input_struct{\n";
3839 retval += "\tint __lfta_id_fm_nic__;\n";
3841 for(vsi=0;vsi<refd_cols.size();++vsi){
3842 data_type dt(schema->get_type_name(schref,refd_cols[vsi]));
3843 retval+="\t"+dt.get_cvar_type()+" unpack_var_"+refd_cols[vsi]+";\n";
3849 /////////////////////////////////////////////////////
3850 // Common stuff unpacked, do some generation
3853 retval += generate_aggr_struct(node_name, gb_tbl, aggr_tbl);
3855 retval += generate_fj_struct((filter_join_qpn *)fs, node_name);
3857 retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, uses_bloom, schema);
3858 retval += generate_tuple_struct(node_name, sl_list) ;
3861 retval += generate_fta_flush(node_name, schema, Ext_fcns) ;
3862 if(param_tbl->size() > 0)
3863 retval += generate_fta_load_params(node_name) ;
3864 retval += generate_fta_free(node_name, is_aggr_query) ;
3865 retval += generate_fta_control(node_name, schema, is_aggr_query) ;
3866 retval += generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, s_pids) ;
3869 /* extract the value of Time_Correlation from interface definition */
3873 vector<tablevar_t *> tvec = fs->get_input_tbls();
3874 vector<string> time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es);
3875 if (time_corr_vec.empty())
3876 time_corr = DEFAULT_TIME_CORR;
3878 time_corr = atoi(time_corr_vec[0].c_str());
3880 retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query) );
3881 retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) );
3888 int compute_snap_len(qp_node *fs, table_list *schema){
3890 // Initialize global vars
3892 sl_list.clear(); where.clear();
3894 if(fs->node_type() == "spx_qpn"){
3895 spx_qpn *spx_node = (spx_qpn *)fs;
3896 sl_list = spx_node->get_select_se_list();
3897 where = spx_node->get_where_clause();
3899 else if(fs->node_type() == "sgah_qpn"){
3900 sgah_qpn *sgah_node = (sgah_qpn *)fs;
3901 sl_list = sgah_node->get_select_se_list();
3902 where = sgah_node->get_where_clause();
3903 gb_tbl = sgah_node->get_gb_tbl();
3905 else if(fs->node_type() == "filter_join"){
3906 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
3907 sl_list = fj_node->get_select_se_list();
3908 where = fj_node->get_where_clause();
3910 fprintf(stderr,"INTERNAL ERROR, node type %s not recognized in compute_snap_len\n",fs->node_type().c_str());
3914 // Gather all column references, need to define unpacking variables.
3917 col_id_set::iterator csi;
3919 for(w=0;w<where.size();++w)
3920 gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
3921 for(s=0;s<sl_list.size();s++){
3922 gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
3927 for(g=0;g<gb_tbl->size();g++)
3928 gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
3931 // compute snap length
3934 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
3935 int schref = (*csi).schema_ref;
3936 int tblref = (*csi).tblvar_ref;
3937 string field = (*csi).field;
3939 param_list *field_params = schema->get_modifier_list(schref, field);
3940 if(field_params->contains_key("snap_len")){
3941 string fld_snap_str = field_params->val_of("snap_len");
3943 if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
3944 if(fld_snap > snap_len) snap_len = fld_snap;
3947 fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
3952 if(n_snap == cid_set.size()){
3961 // Function which computes an optimal
3962 // set of unpacking functions.
3964 void find_optimal_unpack_fcns(col_id_set &upref_cids, table_list *Schema, map<col_id, string,lt_col_id> &ucol_fcn_map){
3965 map<string, int> pfcn_count;
3966 map<string, int>::iterator msii;
3967 col_id_set::iterator cisi;
3968 set<string>::iterator ssi;
3971 while(ucol_fcn_map.size() < upref_cids.size()){
3973 // Gather unpack functions referenced by unaccounted-for
3974 // columns, and increment their reference count.
3976 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
3977 if(ucol_fcn_map.count((*cisi)) == 0){
3978 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
3979 for(ssi=ufcns.begin();ssi!=ufcns.end();++ssi)
3980 pfcn_count[(*ssi)]++;
3984 // Get the lowest cost per field function.
3985 float min_cost = 0.0;
3986 string best_fcn = "";
3987 for(msii=pfcn_count.begin();msii!=pfcn_count.end();++msii){
3988 int fcost = Schema->get_ufcn_cost((*msii).first);
3990 fprintf(stderr,"CONFIGURATION ERROR, unpack function %s either has negative cost or is not defined.\n",(*msii).first.c_str());
3993 float this_cost = (1.0*fcost)/(*msii).second;
3994 if(msii == pfcn_count.begin() || this_cost < min_cost){
3995 min_cost = this_cost;
3996 best_fcn = (*msii).first;
4000 fprintf(stderr,"ERROR, could not find a best field unpqacking function.\n");
4004 // Assign this function to the unassigned fcns which use it.
4005 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
4006 if(ucol_fcn_map.count((*cisi)) == 0){
4007 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
4008 if(ufcns.count(best_fcn)>0)
4009 ucol_fcn_map[(*cisi)] = best_fcn;
4017 // Generate an initial test test for the lfta
4018 // Assume that the predicate references no external functions,
4019 // and especially no partial functions,
4020 // aggregates, internal functions.
4021 string generate_lfta_prefilter(vector<cnf_set *> &pred_list,
4022 col_id_set &temp_cids, table_list *Schema, ext_fcn_list *Ext_fcns,
4023 vector<col_id_set> &lfta_cols, vector<long long int> &lfta_sigs,
4024 vector<int> &lfta_snap_lens, string iface){
4025 col_id_set tmp_cid_set, cid_set,upref_cids,upall_cids;
4026 col_id_set::iterator csi;
4030 // Gather complex literals in the prefilter.
4031 cplx_lit_table *complex_literals = new cplx_lit_table();
4032 for(p=0;p<pred_list.size();++p){
4033 find_complex_literal_pr(pred_list[p]->pr,Ext_fcns, complex_literals);
4037 // Find the combinable predicates
4038 vector<predicate_t *> pr_list;
4039 for(p=0;p<pred_list.size();++p){
4040 find_combinable_preds(pred_list[p]->pr,&pr_list, Schema, Ext_fcns);
4043 // Analyze the combinable predicates to find the predicate classes.
4044 pred_class.clear(); // idx to equiv pred in equiv_list
4045 pred_pos.clear(); // idx to returned bitmask.
4046 vector<predicate_t *> equiv_list;
4047 vector<int> num_equiv;
4050 for(p=0;p<pr_list.size();++p){
4051 for(q=0;q<equiv_list.size();++q){
4052 if(is_equivalent_class_pred_base(equiv_list[q],pr_list[p],Schema,Ext_fcns))
4055 if(q == equiv_list.size()){ // no equiv : create new
4056 pred_class.push_back(equiv_list.size());
4057 equiv_list.push_back(pr_list[p]);
4058 pred_pos.push_back(0);
4059 num_equiv.push_back(1);
4061 }else{ // pr_list[p] is equivalent to pred q
4062 pred_class.push_back(q);
4063 pred_pos.push_back(num_equiv[q]);
4068 // Generate the variables which hold the common pred handles
4069 ret += "/*\t\tprefilter global vars.\t*/\n";
4070 for(q=0;q<equiv_list.size();++q){
4071 for(p=0;p<=(num_equiv[q]/32);++p){
4072 ret += "void *pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface+";\n";
4076 // Struct to hold prefilter complex literals
4077 ret += "struct prefilter_complex_lit_struct_"+iface+" {\n";
4078 if(complex_literals->size() == 0)
4079 ret += "\tint no_variable;\n";
4081 for(cl=0;cl<complex_literals->size();cl++){
4082 literal_t *l = complex_literals->get_literal(cl);
4083 data_type *dtl = new data_type( l->get_type() );
4084 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
4087 ret += "} prefilter_complex_lits_"+iface+";\n\n";
4090 // Generate the prefilter initialziation code
4091 ret += "void init_lfta_prefilter_"+iface+"(){\n";
4093 // First initialize complex literals, if any.
4094 ret += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4095 for(cl=0;cl<complex_literals->size();cl++){
4096 literal_t *l = complex_literals->get_literal(cl);
4097 sprintf(tmpstr,"&(t->complex_literal_%d)",cl);
4098 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
4102 set<int> epred_seen;
4103 for(p=0;p<pr_list.size();++p){
4104 int q = pred_class[p];
4105 //printf("\tq=%d\n",q);
4106 if(epred_seen.count(q)>0){
4107 ret += "\tregister_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4108 vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4109 vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4110 for(o=0;o<op_list.size();++o){
4112 ret += generate_se_code(op_list[o],Schema)+", ";
4115 ret += "pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+","+int_to_string(pred_pos[p]%32)+");\n";
4116 epred_seen.insert(q);
4118 ret += "\tpref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+" = (void *)register_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4119 vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4120 vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4121 for(o=0;o<op_list.size();++o){
4123 ret += generate_se_code(op_list[o],Schema)+", ";
4126 ret += "NULL,"+int_to_string(pred_pos[p]%32)+");\n";
4127 epred_seen.insert(q);
4134 // Start on main body code generation
4135 ret+="gs_uint64_t lfta_prefilter_"+iface+"(void *pkt){\n";
4138 ///--------------------------------------------------------------
4139 /// Generate and store the prefilter body,
4140 /// reuse it for the snap length calculator
4141 ///-------------------------------------------------------------
4144 body += "\tstruct packet *p = (struct packet *)pkt;\n";
4148 // Gather the colids to store unpacked variables.
4149 for(p=0;p<pred_list.size();++p){
4150 gather_pr_col_ids(pred_list[p]->pr,tmp_cid_set, gb_tbl);
4153 // make the col_ids refer to the base tables, and
4154 // grab the col_ids with at least one unpacking function.
4155 for(csi=tmp_cid_set.begin();csi!=tmp_cid_set.end();++csi){
4156 string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
4158 tmp_col_id.field = (*csi).field;
4159 tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
4160 tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
4161 cid_set.insert(tmp_col_id);
4162 field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
4163 if(fe->get_unpack_fcns().size()>0)
4164 upref_cids.insert(tmp_col_id);
4169 // Find the set of unpacking programs needed for the
4170 // prefilter fields.
4171 map<col_id, string,lt_col_id> ucol_fcn_map;
4172 find_optimal_unpack_fcns(upref_cids, Schema, ucol_fcn_map);
4173 set<string> pref_ufcns;
4174 map<col_id, string,lt_col_id>::iterator mcis;
4175 for(mcis=ucol_fcn_map.begin(); mcis!=ucol_fcn_map.end(); mcis++){
4176 pref_ufcns.insert((*mcis).second);
4181 // Variables for unpacking attributes.
4182 body += "/*\t\tVariables for unpacking attributes\t*/\n";
4183 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4184 int schref = (*csi).schema_ref;
4185 int tblref = (*csi).tblvar_ref;
4186 string field = (*csi).field;
4187 data_type dt(Schema->get_type_name(schref,field));
4188 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4189 field.c_str(), tblref);
4191 sprintf(tmpstr,"\tgs_retval_t ret_%s_%d;\n", field.c_str(),tblref);
4194 // Variables for unpacking temporal attributes.
4195 body += "/*\t\tVariables for unpacking temporal attributes\t*/\n";
4196 for(csi=temp_cids.begin(); csi!=temp_cids.end();++csi){
4197 if (cid_set.count(*csi) == 0) {
4198 int schref = (*csi).schema_ref;
4199 int tblref = (*csi).tblvar_ref;
4200 string field = (*csi).field;
4201 data_type dt(Schema->get_type_name(schref,field));
4202 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4203 field.c_str(), tblref);
4210 // Variables for combinable predicate evaluation
4211 body += "/*\t\tVariables for common prdicate evaluation\t*/\n";
4212 for(q=0;q<equiv_list.size();++q){
4213 for(p=0;p<=(num_equiv[q]/32);++p){
4214 body += "unsigned long int pref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = 0;\n";
4219 // Variables that are always needed
4220 body += "/*\t\tVariables which are always needed\t*/\n";
4221 body += "\tgs_uint64_t retval=0, bitpos=1;\n";
4222 body += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4224 // Call the unpacking functions for the prefilter fields
4225 if(pref_ufcns.size() > 0)
4226 body += "\n/*\t\tcall field unpacking functions\t*/\n";
4227 set<string>::iterator ssi;
4228 for(ssi=pref_ufcns.begin(); ssi!=pref_ufcns.end(); ++ssi){
4229 body += "\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
4233 // Unpack the accessed attributes
4234 body += "\n/*\t\tUnpack the accessed attributes.\t*/\n";
4235 for(csi=cid_set.begin();csi!=cid_set.end();++csi){
4236 int tblref = (*csi).tblvar_ref;
4237 int schref = (*csi).schema_ref;
4238 string field = (*csi).field;
4239 sprintf(tmpstr,"\tret_%s_%d = (%s(p, &unpack_var_%s_%d) == 0);\n",
4240 field.c_str(),tblref,Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
4244 // next unpack the temporal attributes and ignore the errors
4245 // We are assuming here that failed unpack of temporal attributes
4246 // is not going to overwrite the last stored value
4247 // Failed upacks are ignored
4248 for(csi=temp_cids.begin();csi!=temp_cids.end();++csi){
4249 int tblref = (*csi).tblvar_ref;
4250 int schref = (*csi).schema_ref;
4251 string field = (*csi).field;
4252 sprintf(tmpstr,"\t%s(p, &prefilter_temp_vars.unpack_var_%s_%d);\n",
4253 Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
4257 // Evaluate the combinable predicates
4258 if(equiv_list.size()>0)
4259 body += "/*\t\tEvaluate the combinable predicates.\t*/\n";
4260 for(q=0;q<equiv_list.size();++q){
4261 for(p=0;p<=(num_equiv[q]/32);++p){
4263 // Only call the common eval fcn if all ref'd fields present.
4264 col_id_set pred_cids;
4265 col_id_set::iterator cpi;
4266 gather_pr_col_ids(equiv_list[q], pred_cids, gb_tbl);
4267 if(pred_cids.size()>0){
4269 for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
4270 if(cpi != pred_cids.begin())
4272 string field = (*cpi).field;
4273 int tblref = (*cpi).tblvar_ref;
4274 body += "ret_"+field+"_"+int_to_string(tblref);
4279 body += "\t\tpref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = eval_commonpred_"+equiv_list[q]->get_op()+"(pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface;
4280 vector<scalarexp_t *> op_list = equiv_list[q]->get_op_list();
4281 vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4282 for(o=0;o<op_list.size();++o){
4284 body += ","+generate_se_code(op_list[o],Schema);
4292 for(p=0;p<pred_list.size();++p){
4293 col_id_set pred_cids;
4294 col_id_set::iterator cpi;
4295 gather_pr_col_ids(pred_list[p]->pr,pred_cids, gb_tbl);
4296 if(pred_cids.size()>0){
4298 for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
4299 if(cpi != pred_cids.begin())
4301 string field = (*cpi).field;
4302 int tblref = (*cpi).tblvar_ref;
4303 body += "ret_"+field+"_"+int_to_string(tblref);
4307 body += "\t\tif("+generate_predicate_code(pred_list[p]->pr,Schema)+")\n\t\t\tretval |= bitpos;\n";
4308 body+="\tbitpos = bitpos << 1;\n";
4311 // ---------------------------------------------------------------
4312 // Finished with the body of the prefilter
4313 // --------------------------------------------------------------
4317 // Collect fields referenced by an lfta but not
4318 // already unpacked for the prefilter.
4320 //printf("upref_cids is:\n");
4321 //for(csi=upref_cids.begin();csi!=upref_cids.end();csi++)
4322 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
4323 //printf("pref_ufcns is:\n");
4324 //for(ssi=pref_ufcns.begin();ssi!=pref_ufcns.end();++ssi)
4325 //printf("\t%s\n",(*ssi).c_str());
4328 for(l=0;l<lfta_cols.size();++l){
4329 for(csi=lfta_cols[l].begin();csi!=lfta_cols[l].end();++csi){
4330 string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
4332 tmp_col_id.field = (*csi).field;
4333 tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
4334 tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
4335 field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
4336 set<string> fld_ufcns = fe->get_unpack_fcns();
4337 //printf("tmpcol is (%s, %d), ufcns size is %d, upref_cids cnt is %d\n",tmp_col_id.field.c_str(),tmp_col_id.schema_ref,fld_ufcns.size(), upref_cids.count(tmp_col_id));
4338 if(fld_ufcns.size()>0 && upref_cids.count(tmp_col_id) == 0){
4339 // Ensure that this field not already unpacked.
4341 for(ssi=fld_ufcns.begin();ssi!=fld_ufcns.end();++ssi){
4342 //printf("\tField has unpacking fcn %s\n",(*ssi).c_str());
4343 if(pref_ufcns.count((*ssi))){
4344 //printf("Field already unpacked.\n");
4349 //printf("\tadding to unpack list\n");
4350 upall_cids.insert(tmp_col_id);
4356 //printf("upall_cids is:\n");
4357 //for(csi=upall_cids.begin();csi!=upall_cids.end();csi++)
4358 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
4360 // Get the set of unpacking programs for these.
4361 map<col_id, string,lt_col_id> uall_fcn_map;
4362 find_optimal_unpack_fcns(upall_cids, Schema, uall_fcn_map);
4363 set<string> pall_ufcns;
4364 for(mcis=uall_fcn_map.begin(); mcis!=uall_fcn_map.end(); mcis++){
4365 //printf("uall_fcn_map[%s %d] = %s\n",(*mcis).first.field.c_str(),(*mcis).first.schema_ref,(*mcis).second.c_str());
4366 pall_ufcns.insert((*mcis).second);
4369 // Iterate through the remaining set of unpacking function
4370 if(pall_ufcns.size() > 0)
4371 ret += "//\t\tcall all remaining field unpacking functions.\n";
4372 for(ssi=pall_ufcns.begin(); ssi!=pall_ufcns.end(); ++ssi){
4373 // gather the set of columns unpacked by this ufcn
4374 col_id_set fcol_set;
4375 for(csi=upall_cids.begin();csi!=upall_cids.end();++csi){
4376 if(uall_fcn_map[(*csi)] == (*ssi))
4377 fcol_set.insert((*csi));
4380 // gather the set of lftas which access a field unpacked by the fcn
4381 set<long long int> clfta;
4382 for(l=0;l<lfta_cols.size();l++){
4383 for(csi=fcol_set.begin();csi!=fcol_set.end();++csi){
4384 if(lfta_cols[l].count((*csi)) > 0)
4387 if(csi != fcol_set.end())
4388 clfta.insert(lfta_sigs[l]);
4391 // generate the unpacking code
4393 set<long long int>::iterator sii;
4394 for(sii=clfta.begin();sii!=clfta.end();++sii){
4395 if(sii!=clfta.begin())
4397 sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sii),(*sii));
4400 ret += ")\n\t\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
4404 ret += "\treturn(retval);\n\n";
4408 // --------------------------------------------------------
4409 // reuse prefilter body for snaplen calculator
4411 // This is dummy code, so I'm commenting it out.
4414 ret+="gs_uint32_t lfta_pkt_snaplen(void *pkt){\n";
4419 vector<int> s_snaps = lfta_snap_lens;
4420 sort(s_snaps.begin(), s_snaps.end());
4422 if(s_snaps[0] == -1){
4423 set<unsigned long long int> sigset;
4424 for(i=0;i<lfta_snap_lens.size();++i){
4425 if(lfta_snap_lens[i] == -1){
4426 sigset.insert(lfta_sigs[i]);
4430 set<unsigned long long int>::iterator sulli;
4431 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
4432 if(sulli!=sigset.begin())
4434 sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
4437 ret += ") return -1;\n";
4440 int nextpos = lfta_snap_lens.size()-1;
4441 int nextval = lfta_snap_lens[nextpos];
4442 while(nextval >= 0){
4443 set<unsigned long long int> sigset;
4444 for(i=0;i<lfta_snap_lens.size();++i){
4445 if(lfta_snap_lens[i] == nextval){
4446 sigset.insert(lfta_sigs[i]);
4450 set<unsigned long long int>::iterator sulli;
4451 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
4452 if(sulli!=sigset.begin())
4454 sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
4457 ret += ") return "+int_to_string(nextval)+";\n";
4459 for(nextpos--;nextpos>0 && lfta_snap_lens[nextpos] == nextval;nextpos--);
4461 nextval = lfta_snap_lens[nextpos];
4465 ret += "\treturn 0;\n";
4476 // Generate the struct which will store the the values of
4477 // temporal attributesunpacked by prefilter
4478 string generate_lfta_prefilter_struct(col_id_set &cid_set, table_list *Schema) {
4480 col_id_set::iterator csi;
4482 // printf("generate_lfta_prefilter_struct : %d vars\n",cid_set.size());
4484 string ret="struct prefilter_unpacked_temp_vars {\n";
4485 ret += "\t/*\tVariables for unpacking temporal attributes\t*/\n";
4489 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4490 int schref = (*csi).schema_ref;
4491 int tblref = (*csi).tblvar_ref;
4492 string field = (*csi).field;
4493 data_type dt(Schema->get_type_name(schref,field), Schema->get_modifier_list(schref,field));
4494 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4495 field.c_str(), tblref);
4498 if (init_code != "")
4500 if (dt.is_increasing())
4501 init_code += dt.get_min_literal();
4503 init_code += dt.get_max_literal();
4508 ret += "struct prefilter_unpacked_temp_vars prefilter_temp_vars = {" + init_code + "};\n\n";