Added quantiling UDAFs
[com/gs-lite.git] / src / ftacmp / generate_lfta_code.cc
1 /* ------------------------------------------------\r
2 Copyright 2014 AT&T Intellectual Property\r
3    Licensed under the Apache License, Version 2.0 (the "License");\r
4    you may not use this file except in compliance with the License.\r
5    You may obtain a copy of the License at\r
6 \r
7      http://www.apache.org/licenses/LICENSE-2.0\r
8 \r
9    Unless required by applicable law or agreed to in writing, software\r
10    distributed under the License is distributed on an "AS IS" BASIS,\r
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12    See the License for the specific language governing permissions and\r
13    limitations under the License.\r
14  ------------------------------------------- */\r
15 \r
16 #include <string>\r
17 #include <stdio.h>\r
18 #include <stdlib.h>\r
19 //#include <algo.h>\r
20 #include<algorithm>\r
21 \r
22 #include "parse_fta.h"\r
23 #include "parse_schema.h"\r
24 #include "analyze_fta.h"\r
25 #include "generate_utils.h"\r
26 #include "query_plan.h"\r
27 #include "generate_lfta_code.h"\r
28 #include "generate_nic_code.h"\r
29 \r
30 using namespace std;\r
31 \r
32 extern int DEFAULT_LFTA_HASH_TABLE_SIZE;\r
33 \r
34 // default value for correlation between the interface card and\r
35 // the system clock\r
36 #define DEFAULT_TIME_CORR 16\r
37 \r
38 \r
39 //      For fast hashing\r
40 //#define NRANDS 100\r
41 extern string hash_nums[NRANDS];\r
42 /*\r
43 = {\r
44 "12916008961267169387ull", "13447227858232756685ull",\r
45 "15651770379918602919ull", "1154671861688431608ull",\r
46 "6777078091984849858ull", "14217205709582564356ull",\r
47 "4955408621820609982ull", "15813680319165523695ull",\r
48 "9897969721407807129ull", "5799700135519793083ull",\r
49 "3446529189623437397ull", "2766403683465910630ull",\r
50 "3759321430908793328ull", "6569396511892890354ull",\r
51 "11124853911180290924ull", "17425412145238035549ull",\r
52 "6879931585355039943ull", "16598635011539670441ull",\r
53 "9615975578494811651ull", "4378135509538422740ull",\r
54 "741282195344332574ull", "17368612862906255584ull",\r
55 "17294299200556814618ull", "518343398779663051ull",\r
56 "3861893449302272757ull", "8951107288843549591ull",\r
57 "15785139392894559409ull", "5917810836789601602ull",\r
58 "16169988133001117004ull", "9792861259254509262ull",\r
59 "5089058010244872136ull", "2130075224835397689ull",\r
60 "844136788226150435ull", "1303298091153875333ull",\r
61 "3579898206894361183ull", "7529542662845336496ull",\r
62 "13151949992653382522ull", "2145333536541545660ull",\r
63 "11258221828939586934ull", "3741808146124570279ull",\r
64 "16272841626371307089ull", "12174572036188391283ull",\r
65 "9749343496254107661ull", "9141275584134508830ull",\r
66 "10134192232065698216ull", "12944268412561423018ull",\r
67 "17499725811865666340ull", "5281482378159088661ull",\r
68 "13254803486023572607ull", "4526762838498717025ull",\r
69 "15990846379668494011ull", "10680949816169027468ull",\r
70 "7116154096012931030ull", "5296740689865236632ull",\r
71 "5222427027515795922ull", "6893215299448261251ull",\r
72 "10164707755932877485ull", "15325979189512082255ull",\r
73 "3713267224148573289ull", "12292682741753167354ull",\r
74 "4098115959960163588ull", "16095675565885113990ull",\r
75 "11391590846210510720ull", "8432889531466002673ull",\r
76 "7146668520368482523ull", "7678169991822407997ull",\r
77 "9882712513525031447ull", "13904414563513869160ull",\r
78 "1080076724395768626ull", "8448147843172150388ull",\r
79 "17633093729608185134ull", "10044622457050142303ull",\r
80 "4128911859292425737ull", "30642269109444395ull",\r
81 "16124215396922640581ull", "15444089895060081110ull",\r
82 "16437006538696302944ull", "800338649777443426ull",\r
83 "5355794945275091932ull", "11656354278827687117ull",\r
84 "1110873718944691255ull", "10829576045617693977ull",\r
85 "3846916616884579955ull", "17055821716837625668ull",\r
86 "13418968402643535758ull", "11671612594828802128ull",\r
87 "11597298928184328586ull", "13196028510862205499ull",\r
88 "16539578557089782373ull", "3182048322921507591ull",\r
89 "10016080431267550241ull", "148751875162592690ull",\r
90 "10400930266590768572ull", "4023803397139127870ull",\r
91 "17766462746879108920ull", "14807761432134600873ull",\r
92 "13521540421053792403ull", "13980983198941385205ull",\r
93 "16257584414193564367ull", "1760484796451765024ull"\r
94 };\r
95 */\r
96 \r
97 \r
98 // ----------------------------------------------\r
99 //              Data extracted from the query plan node\r
100 //              for use by code generation.\r
101 \r
102 static cplx_lit_table *complex_literals;  //Table of literals with constructors.\r
103 static vector<handle_param_tbl_entry *> param_handle_table;\r
104 static param_table *param_tbl;          // Table of all referenced parameters.\r
105 \r
106 static vector<scalarexp_t *> sl_list;\r
107 static vector<cnf_elem *> where;\r
108 \r
109 static gb_table *gb_tbl;                        // Table of all group-by attributes.\r
110 static aggregate_table *aggr_tbl;       // Table of all referenced aggregates.\r
111 \r
112 static bool packed_return;              // unpack using structyure, not fcns\r
113 static nic_property *nicprop;   // nic properties for this interface.\r
114 static int global_id;\r
115 \r
116 \r
117 //              The partial_fcns vector can now refer to\r
118 //              partial functions, or expensive functions\r
119 //              which can be cached (if there are multiple refs).  A couple\r
120 //              of int vectors distinguish the cases.\r
121 static vector<scalarexp_t *> partial_fcns;\r
122 static vector<int> fcn_ref_cnt;\r
123 static vector<bool> is_partial_fcn;\r
124 int sl_fcns_start = 0, sl_fcns_end = 0;\r
125 int wh_fcns_start = 0, wh_fcns_end = 0;\r
126 int gb_fcns_start = 0, gb_fcns_end = 0;\r
127 int ag_fcns_start = 0, ag_fcns_end = 0;\r
128 \r
129 \r
130 //              These vectors are for combinable predicates.\r
131 static vector<int> pred_class;  //      identifies the group\r
132 static vector<int> pred_pos;    //  position in the group.\r
133 \r
134 \r
135 \r
136 static char tmpstr[1000];\r
137 \r
138 //////////////////////////////////////////////////////////////////////\r
139 ///                     Various utilities\r
140 \r
141 string generate_fta_name(string node_name){\r
142         string ret = normalize_name(node_name);\r
143         if(ret == ""){\r
144                 ret = "default";\r
145         }\r
146         ret += "_fta";\r
147 \r
148         return(ret);\r
149 }\r
150 \r
151 \r
152 string generate_aggr_struct_name(string node_name){\r
153         string ret = normalize_name(node_name);\r
154         if(ret == ""){\r
155                 ret = "default";\r
156         }\r
157         ret += "_aggr_struct";\r
158 \r
159         return(ret);\r
160 }\r
161 \r
162 string generate_fj_struct_name(string node_name){\r
163         string ret = normalize_name(node_name);\r
164         if(ret == ""){\r
165                 ret = "default";\r
166         }\r
167         ret += "_fj_struct";\r
168 \r
169         return(ret);\r
170 }\r
171 \r
172 string generate_unpack_code(int tblref, int schref, string field, table_list *schema, string node_name, string end_goto = string("end")){\r
173         string ret;\r
174         if(! packed_return){\r
175                 sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",\r
176                         schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);\r
177         ret += tmpstr;\r
178         if(!schema->get_modifier_list(schref,field)->contains_key("required"))\r
179                 ret += "\tif(retval) goto "+end_goto+";\n";\r
180 \r
181         }else{\r
182 //                      TODO: ntoh xforms (aug 2010 : removing ntoh, hton)\r
183                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));\r
184                 if(dt.is_buffer_type()){\r
185                         if(dt.get_type() != v_str_t){\r
186                           ret += "\tif(sizeof(struct "+node_name+"_input_struct)+"+node_name+"_input_struct_var->unpack_var_"+field+".length+int("+node_name+"_input_struct_var->unpack_var_"+field+".data) > sz)\n";\r
187                           ret += "\t\tgoto "+end_goto+";\n";\r
188                           ret+= "\t\t"+node_name+"_input_struct_var->unpack_var_"+field+".data += "+node_name+"_input_struct_var->unpack_var_"+field+".length;\n";\r
189                           ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+\r
190                                 " = "+node_name+"_input_struct_var->unpack_var_"+field+";+\n";\r
191                         }else{\r
192                                 fprintf(stderr,"INTERNAL ERROR buffer type not string type in generate_lfta_code.cc:generate_unpack_code\n");\r
193                                 exit(1);\r
194                         }\r
195                 }else{\r
196                         ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+\r
197                                 " = "+node_name+"_input_struct_var->unpack_var_"+field+";\n";\r
198                 }\r
199         }\r
200         return ret;\r
201 }\r
202 \r
203 string generate_aggr_struct(string node_name, gb_table *gb_tbl, aggregate_table *aggr_tbl){\r
204   string ret = "struct " + generate_aggr_struct_name(node_name) + "{\n";\r
205 \r
206   int g;\r
207   for(g=0;g<gb_tbl->size();g++){\r
208           sprintf(tmpstr,"gb_var%d",g);\r
209           ret += "\t"+gb_tbl->get_data_type(g)->make_cvar(tmpstr)+";\n";\r
210   }\r
211 \r
212   int a;\r
213   for(a=0;a<aggr_tbl->size();a++){\r
214           ret += "\t";\r
215           sprintf(tmpstr,"aggr_var%d",a);\r
216           if(aggr_tbl->is_builtin(a))\r
217                 ret+="\t"+aggr_tbl->get_data_type(a)->make_cvar(tmpstr)+";\n";\r
218           else\r
219                 ret+="\t"+aggr_tbl->get_storage_type(a)->make_cvar(tmpstr)+";\n";\r
220   }\r
221 \r
222 /*\r
223   ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *next;\n";\r
224 */\r
225 \r
226   ret += "};\n\n";\r
227 \r
228   return(ret);\r
229 }\r
230 \r
231 \r
232 string generate_fj_struct(filter_join_qpn *fs, string node_name ){\r
233   string ret;\r
234 \r
235   if(fs->use_bloom == false){   // uses hash table instead\r
236         ret = "struct " + generate_fj_struct_name(node_name) + "{\n";\r
237         int k;\r
238         for(k=0;k<fs->hash_eq.size();++k){\r
239                 sprintf(tmpstr,"key_var%d",k);\r
240                 ret += "\t"+fs->hash_eq[k]->pr->get_left_se()->get_data_type()->make_cvar(tmpstr)+";\n";\r
241         }\r
242         ret += "\tlong long int ts;\n";\r
243     ret += "};\n\n";\r
244   }\r
245 \r
246   return(ret);\r
247 }\r
248 \r
249 \r
250 \r
251 \r
252 string generate_fta_struct(string node_name, gb_table *gb_tbl,\r
253                 aggregate_table *aggr_tbl, param_table *param_tbl,\r
254                 cplx_lit_table *complex_literals,\r
255                 vector<handle_param_tbl_entry *> &param_handle_table,\r
256                 bool is_aggr_query, bool is_fj, bool uses_bloom,\r
257                 table_list *schema){\r
258 \r
259         string ret = "struct " + generate_fta_name(node_name) + "{\n";\r
260         ret += "\tstruct FTA f;\n";\r
261 \r
262 //-------------------------------------------------------------\r
263 //              Aggregate-specific fields\r
264 \r
265         if(is_aggr_query){\r
266 /*\r
267                 ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_head, *flush_head;\n";\r
268 */\r
269                 ret+="\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_table; // the groups\n";\r
270                 ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n";\r
271 //              ret+="\tint bitmap_size;\n";\r
272                 ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n";\r
273                 ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n";\r
274                 ret += "\tint max_windows; // max number of open windows.\n";\r
275                 ret += "\tunsigned int generation; // initially zero, increment on\n";\r
276                 ret += "\t     // every hash table flush - whether regular or induced.\n";\r
277                 ret += "\t     // Old groups are identified by a generation mismatch.\n";\r
278                 ret += "\tunsigned int flush_pos; // next aggr_table entry to examine\n";\r
279                 ret += "\tunsigned int flush_ctr; // control slow flushing\n";\r
280 \r
281 \r
282 \r
283                 int g;\r
284                 bool uses_temporal_flush = false;\r
285                 for(g=0;g<gb_tbl->size();g++){\r
286                         data_type *dt = gb_tbl->get_data_type(g);\r
287                         if(dt->is_temporal()){\r
288 /*\r
289                                 fprintf(stderr,"group by attribute %s is temporal, ",\r
290                                                 gb_tbl->get_name(g).c_str());\r
291                                 if(dt->is_increasing()){\r
292                                         fprintf(stderr,"increasing.\n");\r
293                                 }else{\r
294                                         fprintf(stderr,"decreasing.\n");\r
295                                 }\r
296 */\r
297                                 data_type *gdt = gb_tbl->get_data_type(g);\r
298                                 if(gdt->is_buffer_type()){\r
299                                         fprintf(stderr, "\t but temporal BUFFER types are not supported, skipping.\n");\r
300                                 }else{\r
301                                         sprintf(tmpstr,"\t%s last_gb_%d;\n",gdt->get_cvar_type().c_str(),g);\r
302                                         ret += tmpstr;\r
303                                         sprintf(tmpstr,"\t%s flush_start_gb_%d;\n",gdt->get_cvar_type().c_str(),g);\r
304                                         ret += tmpstr;\r
305                                         sprintf(tmpstr,"\t%s last_flushed_gb_%d;\n",gdt->get_cvar_type().c_str(),g);\r
306                                         ret += tmpstr;\r
307                                         uses_temporal_flush = true;\r
308                                 }\r
309 \r
310                         }\r
311 \r
312                 }\r
313                 if(! uses_temporal_flush){\r
314                         fprintf(stderr,"Warning: no temporal flush.\n");\r
315                 }\r
316         }\r
317 \r
318 // ---------------------------------------------------------\r
319 //                      Filter-join specific fields\r
320 \r
321         if(is_fj){\r
322                 if(uses_bloom){\r
323                         ret +=\r
324 "\tunsigned char * bf_table; //array of bloom filters with layout \n"\r
325 "\t\t// bit 0 bf 0| bit 0 bf 1| bit 0 bf 2| bit 1 bf 0| bit 1 bf 1|.....\n"\r
326 "\tint first_exec;\n"\r
327 "\tlong long int last_bin;\n"\r
328 "\tint last_bloom_pos;\n"\r
329 "\n"\r
330 ;\r
331                 }else{          // limited hash table\r
332                         ret +=\r
333 "  struct "+generate_fj_struct_name(node_name)+" *join_table;\n"\r
334 "\n"\r
335 ;\r
336                 }\r
337 \r
338         }\r
339 \r
340 //--------------------------------------------------------\r
341 //                      Common fields\r
342 \r
343 //                      Create places to hold the parameters.\r
344         int p;\r
345         vector<string> param_vec = param_tbl->get_param_names();\r
346         for(p=0;p<param_vec.size();p++){\r
347                 data_type *dt = param_tbl->get_data_type(param_vec[p]);\r
348                 sprintf(tmpstr,"\t%s param_%s;\n",dt->get_cvar_type().c_str(),\r
349                                 param_vec[p].c_str());\r
350                 ret += tmpstr;\r
351                 if(param_tbl->handle_access(param_vec[p])){\r
352                         ret += "\tstruct search_handle *param_handle_"+param_vec[p]+";\n";\r
353                 }\r
354         }\r
355 \r
356 //                      Create places to hold complex literals.\r
357         int cl;\r
358         for(cl=0;cl<complex_literals->size();cl++){\r
359                 literal_t *l = complex_literals->get_literal(cl);\r
360                 data_type *dtl = new data_type( l->get_type() );\r
361                 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);\r
362                 ret += tmpstr;\r
363         }\r
364 \r
365 //                      Create places to hold the pass-by-handle parameters.\r
366         for(p=0;p<param_handle_table.size();++p){\r
367                 sprintf(tmpstr,"\tgs_param_handle_t handle_param_%d;\n",p);\r
368                 ret += tmpstr;\r
369         }\r
370 \r
371 //                      Create places to hold the last values of temporal\r
372 //                      attributes referenced in select clause\r
373 //                      we also need to store values of the temoral attributed\r
374 //                      of last flushed tuple in aggr queries\r
375 //                      to make sure we generate the cirrect temporal tuple\r
376 //                      in the presense of slow flushes\r
377 \r
378 \r
379         col_id_set temp_cids;           //      col ids of temp attributes in select clause\r
380 \r
381         int s;\r
382         col_id_set::iterator csi;\r
383 \r
384         for(s=0;s<sl_list.size();s++){\r
385                 data_type *sdt = sl_list[s]->get_data_type();\r
386                 if (sdt->is_temporal()) {\r
387                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);\r
388                 }\r
389         }\r
390 \r
391         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){\r
392                 int tblref = (*csi).tblvar_ref;\r
393                 int schref = (*csi).schema_ref;\r
394                 string field = (*csi).field;\r
395                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));\r
396                 sprintf(tmpstr,"\t%s last_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);\r
397                 ret += tmpstr;\r
398         }\r
399 \r
400         ret += "\tgs_uint64_t trace_id;\n\n";\r
401 \r
402 //      Fields to store the runtime stats\r
403 \r
404         ret += "\tgs_uint32_t in_tuple_cnt;\n";\r
405         ret += "\tgs_uint32_t out_tuple_cnt;\n";\r
406         ret += "\tgs_uint32_t out_tuple_sz;\n";\r
407         ret += "\tgs_uint32_t accepted_tuple_cnt;\n";\r
408         ret += "\tgs_uint64_t cycle_cnt;\n";\r
409         ret += "\tgs_uint32_t collision_cnt;\n";\r
410         ret += "\tgs_uint32_t eviction_cnt;\n";\r
411         ret += "\tgs_float_t sampling_rate;\n";\r
412 \r
413 \r
414 \r
415         ret += "};\n\n";\r
416 \r
417         return(ret);\r
418 }\r
419 \r
420 //------------------------------------------------------------\r
421 //              Set colref tblvars to 0..\r
422 //              (special processing for join-like operators in an lfta).\r
423 \r
424 void reset_se_col_ids_tblvars(scalarexp_t *se,  gb_table *gtbl){\r
425         vector<scalarexp_t *> operands;\r
426         int o;\r
427 \r
428         if(! se)\r
429                 return;\r
430 \r
431         switch(se->get_operator_type()){\r
432         case SE_LITERAL:\r
433         case SE_PARAM:\r
434         case SE_IFACE_PARAM:\r
435                 return;\r
436         case SE_UNARY_OP:\r
437                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);\r
438                 return;\r
439         case SE_BINARY_OP:\r
440                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);\r
441                 reset_se_col_ids_tblvars(se->get_right_se(),gtbl);\r
442                 return;\r
443         case SE_COLREF:\r
444                 if(! se->is_gb() ){\r
445                         se->get_colref()->set_tablevar_ref(0);\r
446                 }else{\r
447                         if(gtbl==NULL){\r
448                                 fprintf(stderr,"INTERNAL ERROR: gbvar ref in gather_se_col_ids, but gtbl is NULL.\n");\r
449                                 exit(1);\r
450                         }\r
451                         reset_se_col_ids_tblvars(gtbl->get_def(se->get_gb_ref()),gtbl);\r
452                 }\r
453                 return;\r
454         case SE_AGGR_STAR:\r
455                 return;\r
456         case SE_AGGR_SE:\r
457                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);\r
458                 return;\r
459         case SE_FUNC:\r
460                 operands = se->get_operands();\r
461                 for(o=0;o<operands.size();o++){\r
462                         reset_se_col_ids_tblvars(operands[o], gtbl);\r
463                 }\r
464                 return;\r
465         default:\r
466                 fprintf(stderr,"INTERNAL ERROR in reset_se_col_ids_tblvars, line %d, character %d: unknown operator type %d\n",\r
467                                 se->get_lineno(), se->get_charno(),se->get_operator_type());\r
468                 exit(1);\r
469         }\r
470 }\r
471 \r
472 \r
473 //              reset  column tblvars accessed in this pr.\r
474 \r
475 void reset_pr_col_ids_tblvars(predicate_t *pr,  gb_table *gtbl){\r
476         vector<scalarexp_t *> op_list;\r
477         int o;\r
478 \r
479         switch(pr->get_operator_type()){\r
480         case PRED_IN:\r
481                 reset_se_col_ids_tblvars(pr->get_left_se(), gtbl);\r
482                 return;\r
483         case PRED_COMPARE:\r
484                 reset_se_col_ids_tblvars(pr->get_left_se(),gtbl) ;\r
485                 reset_se_col_ids_tblvars(pr->get_right_se(),gtbl) ;\r
486                 return;\r
487         case PRED_UNARY_OP:\r
488                 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;\r
489                 return;\r
490         case PRED_BINARY_OP:\r
491                 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;\r
492                 reset_pr_col_ids_tblvars(pr->get_right_pr(),gtbl) ;\r
493                 return;\r
494         case PRED_FUNC:\r
495                 op_list = pr->get_op_list();\r
496                 for(o=0;o<op_list.size();++o){\r
497                         reset_se_col_ids_tblvars(op_list[o],gtbl) ;\r
498                 }\r
499                 return;\r
500         default:\r
501                 fprintf(stderr,"INTERNAL ERROR in reset_pr_col_ids_tblvars, line %d, character %d, unknown predicate operator type %d\n",\r
502                         pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );\r
503         }\r
504 }\r
505 \r
506 \r
507 \r
508 \r
509 //                      Generate code that makes reference\r
510 //                      to the tuple, and not to any aggregates.\r
511 static string generate_se_code(scalarexp_t *se,table_list *schema){\r
512         string ret;\r
513     data_type *ldt, *rdt;\r
514         int o;\r
515         vector<scalarexp_t *> operands;\r
516 \r
517 \r
518         switch(se->get_operator_type()){\r
519         case SE_LITERAL:\r
520                 if(se->is_handle_ref()){\r
521                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );\r
522                         ret = tmpstr;\r
523                         return(ret);\r
524                 }\r
525                 if(se->get_literal()->is_cpx_lit()){\r
526                         sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );\r
527                         ret = tmpstr;\r
528                         return(ret);\r
529                 }\r
530                 return(se->get_literal()->to_C_code("")); // not complex, no constructor\r
531         case SE_PARAM:\r
532                 if(se->is_handle_ref()){\r
533                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );\r
534                         ret = tmpstr;\r
535                         return(ret);\r
536                 }\r
537                 ret += "t->param_";\r
538                 ret += se->get_param_name();\r
539                 return(ret);\r
540         case SE_UNARY_OP:\r
541         ldt = se->get_left_se()->get_data_type();\r
542         if(ldt->complex_operator(se->get_op()) ){\r
543                         ret +=  ldt->get_complex_operator(se->get_op());\r
544                         ret += "(";\r
545                         ret += generate_se_code(se->get_left_se(),schema);\r
546             ret += ")";\r
547                 }else{\r
548                         ret += "(";\r
549                         ret += se->get_op();\r
550                         ret += generate_se_code(se->get_left_se(),schema);\r
551                         ret += ")";\r
552                 }\r
553                 return(ret);\r
554         case SE_BINARY_OP:\r
555         ldt = se->get_left_se()->get_data_type();\r
556         rdt = se->get_right_se()->get_data_type();\r
557 \r
558         if(ldt->complex_operator(rdt, se->get_op()) ){\r
559                         ret +=  ldt->get_complex_operator(rdt, se->get_op());\r
560                         ret += "(";\r
561                         ret += generate_se_code(se->get_left_se(),schema);\r
562                         ret += ", ";\r
563                         ret += generate_se_code(se->get_right_se(),schema);\r
564                         ret += ")";\r
565                 }else{\r
566                         ret += "(";\r
567                         ret += generate_se_code(se->get_left_se(),schema);\r
568                         ret += se->get_op();\r
569                         ret += generate_se_code(se->get_right_se(),schema);\r
570                         ret += ")";\r
571                 }\r
572                 return(ret);\r
573         case SE_COLREF:\r
574                 if(se->is_gb()){                // OK to ref gb attrs, but they're not yet unpacked ...\r
575                                                         // so return the defining code.\r
576                         ret = generate_se_code(gb_tbl->get_def(se->get_gb_ref()), schema );\r
577 \r
578                 }else{\r
579                 sprintf(tmpstr,"unpack_var_%s_%d",\r
580                   se->get_colref()->get_field().c_str(), se->get_colref()->get_tablevar_ref() );\r
581                 ret = tmpstr;\r
582                 }\r
583                 return(ret);\r
584         case SE_FUNC:\r
585 //                              Should not be ref'ing any aggr here.\r
586                 if(se->get_aggr_ref() >= 0){\r
587                         fprintf(stderr,"INTERNAL ERROR, UDAF reference in generate_se_code.\n");\r
588                         return("ERROR in generate_se_code");\r
589                 }\r
590 \r
591                 if(se->is_partial()){\r
592                         sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());\r
593                         ret = tmpstr;\r
594                 }else{\r
595                         ret += se->op + "(";\r
596                         operands = se->get_operands();\r
597                         for(o=0;o<operands.size();o++){\r
598                                 if(o>0) ret += ", ";\r
599                                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )\r
600                                         ret += "&";\r
601                                 ret += generate_se_code(operands[o], schema);\r
602                         }\r
603                         ret += ")";\r
604                 }\r
605                 return(ret);\r
606         default:\r
607                 fprintf(stderr,"INTERNAL ERROR in generate_se_code (lfta), line %d, character %d: unknown operator type %d\n",\r
608                                 se->get_lineno(), se->get_charno(),se->get_operator_type());\r
609                 return("ERROR in generate_se_code");\r
610         }\r
611 }\r
612 \r
613 //              generate code that refers only to aggregate data and constants.\r
614 static string generate_se_code_fm_aggr(scalarexp_t *se, string var, table_list *schema){\r
615 \r
616         string ret;\r
617     data_type *ldt, *rdt;\r
618         int o;\r
619         vector<scalarexp_t *> operands;\r
620 \r
621 \r
622         switch(se->get_operator_type()){\r
623         case SE_LITERAL:\r
624                 if(se->is_handle_ref()){\r
625                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );\r
626                         ret = tmpstr;\r
627                         return(ret);\r
628                 }\r
629                 if(se->get_literal()->is_cpx_lit()){\r
630                         sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );\r
631                         ret = tmpstr;\r
632                         return(ret);\r
633                 }\r
634                 return(se->get_literal()->to_C_code("")); // not complex no constructor\r
635         case SE_PARAM:\r
636                 if(se->is_handle_ref()){\r
637                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );\r
638                         ret = tmpstr;\r
639                         return(ret);\r
640                 }\r
641                 ret += "t->param_";\r
642                 ret += se->get_param_name();\r
643                 return(ret);\r
644         case SE_UNARY_OP:\r
645         ldt = se->get_left_se()->get_data_type();\r
646         if(ldt->complex_operator(se->get_op()) ){\r
647                         ret +=  ldt->get_complex_operator(se->get_op());\r
648                         ret += "(";\r
649                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);\r
650             ret += ")";\r
651                 }else{\r
652                         ret += "(";\r
653                         ret += se->get_op();\r
654                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);\r
655                         ret += ")";\r
656                 }\r
657                 return(ret);\r
658         case SE_BINARY_OP:\r
659         ldt = se->get_left_se()->get_data_type();\r
660         rdt = se->get_right_se()->get_data_type();\r
661 \r
662         if(ldt->complex_operator(rdt, se->get_op()) ){\r
663                         ret +=  ldt->get_complex_operator(rdt, se->get_op());\r
664                         ret += "(";\r
665                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);\r
666                         ret += ", ";\r
667                         ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);\r
668                         ret += ")";\r
669                 }else{\r
670                         ret += "(";\r
671                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);\r
672                         ret += se->get_op();\r
673                         ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);\r
674                         ret += ")";\r
675                 }\r
676                 return(ret);\r
677         case SE_COLREF:\r
678                 if(se->is_gb()){                // OK to ref gb attrs, but they're not yet\r
679                                                         // unpacked ... so return the defining code.\r
680                         sprintf(tmpstr,"%sgb_var%d",var.c_str(),se->get_gb_ref());\r
681                         ret = tmpstr;\r
682 \r
683                 }else{\r
684                 fprintf(stderr,"ERROR reference to non-GB column ref not permitted here,"\r
685                                 "error in generate_se_code_fm_aggr, line %d, character %d.\n",\r
686                                 se->get_lineno(), se->get_charno());\r
687                 ret = tmpstr;\r
688                 }\r
689                 return(ret);\r
690         case SE_AGGR_STAR:\r
691         case SE_AGGR_SE:\r
692                 sprintf(tmpstr,"%saggr_var%d",var.c_str(),se->get_aggr_ref());\r
693                 ret = tmpstr;\r
694                 return(ret);\r
695         case SE_FUNC:\r
696 //                              Is it a UDAF?\r
697                 if(se->get_aggr_ref() >= 0){\r
698                         sprintf(tmpstr,"udaf_ret%d",se->get_aggr_ref());\r
699                         ret = tmpstr;\r
700                         return(ret);\r
701                 }\r
702 \r
703                 if(se->is_partial()){\r
704                         sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());\r
705                         ret = tmpstr;\r
706                 }else{\r
707                         ret += se->op + "(";\r
708                         operands = se->get_operands();\r
709                         for(o=0;o<operands.size();o++){\r
710                                 if(o>0) ret += ", ";\r
711                                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )\r
712                                         ret += "&";\r
713                                 ret += generate_se_code_fm_aggr(operands[o], var, schema);\r
714                         }\r
715                         ret += ")";\r
716                 }\r
717                 return(ret);\r
718         default:\r
719                 fprintf(stderr,"INTERNAL ERROR in generate_lfta_code.cc::generate_se_code_fm_aggr, line %d, character %d: unknown operator type %d\n",\r
720                                 se->get_lineno(), se->get_charno(),se->get_operator_type());\r
721                 return("ERROR in generate_se_code");\r
722         }\r
723 \r
724 }\r
725 \r
726 \r
727 static string unpack_partial_fcn_fm_aggr(scalarexp_t *se, int pfn_id, string var, table_list *schema){\r
728         string ret;\r
729         int o;\r
730         vector<scalarexp_t *> operands;\r
731 \r
732 \r
733         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){\r
734                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn_fm_aggr. line %d, character %d\n",\r
735                                 se->get_lineno(), se->get_charno());\r
736                 return("ERROR in generate_se_code");\r
737         }\r
738 \r
739         ret = "\tretval = " + se->get_op() + "( ";\r
740         sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);\r
741         ret += tmpstr;\r
742 \r
743         operands = se->get_operands();\r
744         for(o=0;o<operands.size();o++){\r
745                 ret += ", ";\r
746                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )\r
747                         ret += "&";\r
748                 ret += generate_se_code_fm_aggr(operands[o], var, schema);\r
749         }\r
750         ret += ");\n";\r
751 \r
752         return(ret);\r
753 }\r
754 \r
755 static string generate_cached_fcn(scalarexp_t *se, table_list *schema){\r
756         string ret;\r
757         int o;\r
758         vector<scalarexp_t *> operands;\r
759 \r
760         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){\r
761                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to generate_cached_fcn. line %d, character %d\n",\r
762                                 se->get_lineno(), se->get_charno());\r
763                 return("ERROR in generate_se_code");\r
764         }\r
765 \r
766         ret = se->get_op() + "( ";\r
767 \r
768         operands = se->get_operands();\r
769         for(o=0;o<operands.size();o++){\r
770                 if(o) ret += ", ";\r
771                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )\r
772                         ret += "&";\r
773                 ret += generate_se_code(operands[o], schema);\r
774         }\r
775         ret += ");\n";\r
776 \r
777         return(ret);\r
778 }\r
779 \r
780 \r
781 \r
782 static string unpack_partial_fcn(scalarexp_t *se, int pfn_id, table_list *schema){\r
783         string ret;\r
784         int o;\r
785         vector<scalarexp_t *> operands;\r
786 \r
787 \r
788         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){\r
789                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn. line %d, character %d\n",\r
790                                 se->get_lineno(), se->get_charno());\r
791                 return("ERROR in generate_se_code");\r
792         }\r
793 \r
794         ret = "\tretval = " + se->get_op() + "( ",\r
795         sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);\r
796         ret += tmpstr;\r
797 \r
798         operands = se->get_operands();\r
799         for(o=0;o<operands.size();o++){\r
800                 ret += ", ";\r
801                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )\r
802                         ret += "&";\r
803                 ret += generate_se_code(operands[o], schema);\r
804         }\r
805         ret += ");\n";\r
806 \r
807         return(ret);\r
808 }\r
809 \r
810 \r
811 \r
812 \r
813 \r
814 static string generate_C_comparison_op(string op){\r
815   if(op == "=") return("==");\r
816   if(op == "<>") return("!=");\r
817   return(op);\r
818 }\r
819 \r
820 static string generate_C_boolean_op(string op){\r
821         if( (op == "AND") || (op == "And") || (op == "and") ){\r
822                 return("&&");\r
823         }\r
824         if( (op == "OR") || (op == "Or") || (op == "or") ){\r
825                 return("||");\r
826         }\r
827         if( (op == "NOT") || (op == "Not") || (op == "not") ){\r
828                 return("!");\r
829         }\r
830 \r
831         fprintf(stderr,"INTERNAL ERROR: unknown boolean operator %s\n",op.c_str());\r
832         return("ERROR UNKNOWN BOOLEAN OPERATOR :"+op);\r
833 }\r
834 \r
835 \r
836 static string generate_predicate_code(predicate_t *pr,table_list *schema){\r
837         string ret;\r
838         vector<literal_t *>  litv;\r
839         int i;\r
840     data_type *ldt, *rdt;\r
841         vector<scalarexp_t *> op_list;\r
842         int o,cref,ppos;\r
843         unsigned int bitmask;\r
844 \r
845         switch(pr->get_operator_type()){\r
846         case PRED_IN:\r
847         ldt = pr->get_left_se()->get_data_type();\r
848 \r
849                 ret += "( ";\r
850                 litv = pr->get_lit_vec();\r
851                 for(i=0;i<litv.size();i++){\r
852                         if(i>0) ret += " || ";\r
853                         ret += "( ";\r
854 \r
855                 if(ldt->complex_comparison(ldt) ){\r
856                                 ret +=  ldt->get_comparison_fcn(ldt) ;\r
857                                 ret += "( ";\r
858                                 if(ldt->is_buffer_type() ) ret += "&";\r
859                                 ret += generate_se_code(pr->get_left_se(), schema);\r
860                                 ret += ", ";\r
861                                 if(ldt->is_buffer_type() ) ret += "&";\r
862                                 if(litv[i]->is_cpx_lit()){\r
863                                         sprintf(tmpstr,"t->complex_literal_%d",litv[i]->get_cpx_lit_ref() );\r
864                                         ret += tmpstr;\r
865                                 }else{\r
866                                         ret += litv[i]->to_C_code("");\r
867                                 }\r
868                                 ret += ") == 0";\r
869                         }else{\r
870                                 ret += generate_se_code(pr->get_left_se(), schema);\r
871                                 ret += " == ";\r
872                                 ret += litv[i]->to_C_code("");\r
873                         }\r
874 \r
875                         ret += " )";\r
876                 }\r
877                 ret += " )";\r
878                 return(ret);\r
879 \r
880         case PRED_COMPARE:\r
881         ldt = pr->get_left_se()->get_data_type();\r
882         rdt = pr->get_right_se()->get_data_type();\r
883 \r
884                 ret += "( ";\r
885         if(ldt->complex_comparison(rdt) ){\r
886                         ret += ldt->get_comparison_fcn(rdt);\r
887                         ret += "(";\r
888                         if(ldt->is_buffer_type() ) ret += "&";\r
889                         ret += generate_se_code(pr->get_left_se(),schema);\r
890                         ret += ", ";\r
891                         if(rdt->is_buffer_type() ) ret += "&";\r
892                         ret += generate_se_code(pr->get_right_se(),schema);\r
893                         ret += ") ";\r
894                         ret +=  generate_C_comparison_op(pr->get_op());\r
895                         ret += "0";\r
896                 }else{\r
897                         ret += generate_se_code(pr->get_left_se(),schema);\r
898                         ret +=  generate_C_comparison_op(pr->get_op());\r
899                         ret += generate_se_code(pr->get_right_se(),schema);\r
900                 }\r
901                 ret += " )";\r
902                 return(ret);\r
903         case PRED_UNARY_OP:\r
904                 ret += "( ";\r
905                 ret +=  generate_C_boolean_op(pr->get_op());\r
906                 ret += generate_predicate_code(pr->get_left_pr(),schema);\r
907                 ret += " )";\r
908                 return(ret);\r
909         case PRED_BINARY_OP:\r
910                 ret += "( ";\r
911                 ret += generate_predicate_code(pr->get_left_pr(),schema);\r
912                 ret +=  generate_C_boolean_op(pr->get_op());\r
913                 ret += generate_predicate_code(pr->get_right_pr(),schema);\r
914                 ret += " )";\r
915                 return(ret);\r
916         case PRED_FUNC:\r
917                 op_list = pr->get_op_list();\r
918                 cref = pr->get_combinable_ref();\r
919                 if(cref >= 0){  // predicate is a combinable pred reference\r
920                         //              Trust, but verify\r
921                         if(pred_class.size() >= cref && pred_class[cref] >= 0){\r
922                                 ppos = pred_pos[cref];\r
923                                 bitmask = 1 << ppos % 32;\r
924                                 sprintf(tmpstr,"(pref_common_pred_val_%d_%d & %u)",pred_class[cref],ppos/32,bitmask);\r
925                                 ret = tmpstr;\r
926                                 return ret;\r
927                         }\r
928                 }\r
929 \r
930                 ret =  pr->get_op() + "(";\r
931                 if (pr->is_sampling_fcn) {\r
932                         ret += "t->sampling_rate";\r
933                         if (!op_list.empty())\r
934                                 ret += ", ";\r
935                 }\r
936                 for(o=0;o<op_list.size();++o){\r
937                         if(o>0) ret += ", ";\r
938                         if(op_list[o]->get_data_type()->is_buffer_type() && (! (op_list[o]->is_handle_ref()) ) )\r
939                                         ret += "&";\r
940                         ret += generate_se_code(op_list[o],schema);\r
941                 }\r
942                 ret += " )";\r
943                 return(ret);\r
944         default:\r
945                 fprintf(stderr,"INTERNAL ERROR in generate_predicate_code, line %d, character %d, unknown predicate operator type %d\n",\r
946                         pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );\r
947                 return("ERROR in generate_predicate_code");\r
948         }\r
949 }\r
950 \r
951 \r
952 static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *dt){\r
953         string ret;\r
954 \r
955     if(dt->complex_comparison(dt) ){\r
956                 ret += dt->get_comparison_fcn(dt);\r
957                 ret += "(";\r
958                         if(dt->is_buffer_type() ) ret += "&";\r
959                 ret += lhs_op;\r
960                 ret += ", ";\r
961                         if(dt->is_buffer_type() ) ret += "&";\r
962                 ret += rhs_op;\r
963                 ret += ") == 0";\r
964         }else{\r
965                 ret += lhs_op;\r
966                 ret += " == ";\r
967                 ret += rhs_op;\r
968         }\r
969 \r
970         return(ret);\r
971 }\r
972 \r
973 static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){\r
974         string ret;\r
975 \r
976     if(dt->complex_comparison(dt) ){\r
977                 ret += dt->get_comparison_fcn(dt);\r
978                 ret += "(";\r
979                         if(dt->is_buffer_type() ) ret += "&";\r
980                 ret += lhs_op;\r
981                 ret += ", ";\r
982                         if(dt->is_buffer_type() ) ret += "&";\r
983                 ret += rhs_op;\r
984                 ret += ") == 0";\r
985         }else{\r
986                 ret += lhs_op;\r
987                 ret += " == ";\r
988                 ret += rhs_op;\r
989         }\r
990 \r
991         return(ret);\r
992 }\r
993 \r
994 //              Here I assume that only MIN and MAX aggregates can be computed\r
995 //              over BUFFER data types.\r
996 \r
997 static string generate_aggr_update(string var, aggregate_table *atbl,int aidx, table_list *schema){\r
998         string retval = "\t\t";\r
999         string op = atbl->get_op(aidx);\r
1000 \r
1001 //              Is it a UDAF\r
1002         if(! atbl->is_builtin(aidx)) {\r
1003                 int o;\r
1004                 retval += op+"_LFTA_AGGR_UPDATE_(";\r
1005                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";\r
1006                 retval+="("+var+")";\r
1007                 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);\r
1008                 for(o=0;o<opl.size();++o){\r
1009                         retval += ",";\r
1010                         if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )\r
1011                                         retval.append("&");\r
1012                         retval += generate_se_code(opl[o], schema);\r
1013                 }\r
1014                 retval += ");\n";\r
1015 \r
1016                 return retval;\r
1017         }\r
1018 \r
1019 //              Built-in aggregate processing.\r
1020 \r
1021         data_type *dt = atbl->get_data_type(aidx);\r
1022 \r
1023         if(op == "COUNT"){\r
1024                 retval.append(var);\r
1025                 retval.append("++;\n");\r
1026                 return(retval);\r
1027         }\r
1028         if(op == "SUM"){\r
1029                 retval.append(var);\r
1030                 retval.append(" += ");\r
1031                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );\r
1032                 retval.append(";\n");\r
1033                 return(retval);\r
1034         }\r
1035         if(op == "MIN"){\r
1036                 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );\r
1037                 retval.append(tmpstr);\r
1038                 if(dt->complex_comparison(dt)){\r
1039                         if(dt->is_buffer_type())\r
1040                           sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());\r
1041                         else\r
1042                           sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());\r
1043                 }else{\r
1044                         sprintf(tmpstr,"\t\tif(aggr_tmp_%d < %s)\n",aidx,var.c_str());\r
1045                 }\r
1046                 retval.append(tmpstr);\r
1047                 if(dt->is_buffer_type()){\r
1048                         sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);\r
1049                 }else{\r
1050                         sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);\r
1051                 }\r
1052                 retval.append(tmpstr);\r
1053 \r
1054                 return(retval);\r
1055         }\r
1056         if(op == "MAX"){\r
1057                 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );\r
1058                 retval.append(tmpstr);\r
1059                 if(dt->complex_comparison(dt)){\r
1060                         if(dt->is_buffer_type())\r
1061                          sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());\r
1062                         else\r
1063                          sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());\r
1064                 }else{\r
1065                         sprintf(tmpstr,"\t\tif(aggr_tmp_%d > %s)\n",aidx,var.c_str());\r
1066                 }\r
1067                 retval.append(tmpstr);\r
1068                 if(dt->is_buffer_type()){\r
1069                         sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);\r
1070                 }else{\r
1071                         sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);\r
1072                 }\r
1073                 retval.append(tmpstr);\r
1074 \r
1075                 return(retval);\r
1076 \r
1077         }\r
1078         if(op == "AND_AGGR"){\r
1079                 retval.append(var);\r
1080                 retval.append(" &= ");\r
1081                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );\r
1082                 retval.append(";\n");\r
1083                 return(retval);\r
1084         }\r
1085         if(op == "OR_AGGR"){\r
1086                 retval.append(var);\r
1087                 retval.append(" |= ");\r
1088                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );\r
1089                 retval.append(";\n");\r
1090                 return(retval);\r
1091         }\r
1092         if(op == "XOR_AGGR"){\r
1093                 retval.append(var);\r
1094                 retval.append(" ^= ");\r
1095                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );\r
1096                 retval.append(";\n");\r
1097                 return(retval);\r
1098         }\r
1099         fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_update.\n",op.c_str());\r
1100         return("ERROR: aggregate not recognized: "+op);\r
1101 \r
1102 }\r
1103 \r
1104 \r
1105 \r
1106 static string generate_aggr_init(string var, aggregate_table *atbl,int aidx, table_list *schema){\r
1107         string retval;\r
1108         string op = atbl->get_op(aidx);\r
1109 \r
1110 //              Is it a UDAF\r
1111         if(! atbl->is_builtin(aidx)) {\r
1112                 int o;\r
1113                 retval += "\t\t"+op+"_LFTA_AGGR_INIT_(";\r
1114                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";\r
1115                 retval+="("+var+"));\n";\r
1116 //                      Add 1st tupl\r
1117                 retval += "\t"+atbl->get_op(aidx)+"_LFTA_AGGR_UPDATE_(";\r
1118                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";\r
1119                 retval+="("+var+")";\r
1120                 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);\r
1121                 for(o=0;o<opl.size();++o){\r
1122                         retval += ",";\r
1123                         if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )\r
1124                                         retval.append("&");\r
1125                         retval += generate_se_code(opl[o],schema);\r
1126                 }\r
1127                 retval += ");\n";\r
1128                 return(retval);\r
1129         }\r
1130 \r
1131 //              Built-in aggregate processing.\r
1132 \r
1133 \r
1134         data_type *dt = atbl->get_data_type(aidx);\r
1135 \r
1136         if(op == "COUNT"){\r
1137                 retval = "\t\t"+var;\r
1138                 retval.append(" = 1;\n");\r
1139                 return(retval);\r
1140         }\r
1141 \r
1142         if(op == "SUM" || op == "MIN" || op == "MAX" || op == "AND_AGGR" ||\r
1143                                                                         op == "OR_AGGR" || op == "XOR_AGGR"){\r
1144                 if(dt->is_buffer_type()){\r
1145                         sprintf(tmpstr,"\t\taggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );\r
1146                         retval.append(tmpstr);\r
1147                         sprintf(tmpstr,"\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_assign_copy().c_str(),var.c_str(),aidx);\r
1148                         retval.append(tmpstr);\r
1149                 }else{\r
1150                         retval = "\t\t"+var;\r
1151                         retval += " = ";\r
1152                         retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema));\r
1153                         retval.append(";\n");\r
1154                 }\r
1155                 return(retval);\r
1156         }\r
1157 \r
1158         fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_init.\n",op.c_str());\r
1159         return("ERROR: aggregate not recognized: "+op);\r
1160 }\r
1161 \r
1162 \r
1163 ////////////////////////////////////////////////////////////\r
1164 \r
1165 \r
1166 string generate_preamble(table_list *schema, //map<string,string> &int_fcn_defs,\r
1167         std::string &node_name, std::string &schema_embed_str){\r
1168 //                      Include these only once, not once per lfta\r
1169 //      string ret = "#include \"rts.h\"\n";\r
1170 //      ret +=  "#include \"fta.h\"\n\n");\r
1171 \r
1172         string ret = "#ifndef LFTA_IN_NIC\n";\r
1173         ret += "char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n";\r
1174         ret += "#include<stdio.h>\n";\r
1175         ret += "#include <limits.h>\n";\r
1176         ret += "#include <float.h>\n";\r
1177         ret += "#include \"rdtsc.h\"\n";\r
1178         ret += "#endif\n";\r
1179 \r
1180 \r
1181 \r
1182         return(ret);\r
1183 }\r
1184 \r
1185 \r
1186 string generate_tuple_from_aggr(string node_name, table_list *schema, string idx){\r
1187         int a,p,s;\r
1188 //                       need to create and output the tuple.\r
1189         string ret = "/*\t\tCreate an output tuple for the aggregate being kicked out \t*/\n";\r
1190 //                      Check for any UDAFs with LFTA_BAILOUT\r
1191         ret += "\tlfta_bailout = 0;\n";\r
1192         for(a=0;a<aggr_tbl->size();a++){\r
1193                 if(aggr_tbl->has_bailout(a)){\r
1194                         ret += "\tlfta_bailout+="+aggr_tbl->get_op(a)+"_LFTA_AGGR_BAILOUT_(";\r
1195                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";\r
1196                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";\r
1197                 }\r
1198         }\r
1199         ret += "\tif(! lfta_bailout){\n";\r
1200 \r
1201 //                      First, compute the size of the tuple.\r
1202 \r
1203 //                      Unpack UDAF return values\r
1204         for(a=0;a<aggr_tbl->size();a++){\r
1205                 if(! aggr_tbl->is_builtin(a)){\r
1206                         ret += "\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_OUTPUT_(&(udaf_ret"+int_to_string(a)+"),";\r
1207                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";\r
1208                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";\r
1209 \r
1210                 }\r
1211         }\r
1212 \r
1213 \r
1214 //                      Unpack partial fcns ref'd by the select clause.\r
1215   if(sl_fcns_start != sl_fcns_end){\r
1216             ret += "\t\tunpack_failed = 0;\n";\r
1217     for(p=sl_fcns_start;p<sl_fcns_end;p++){\r
1218           if(is_partial_fcn[p]){\r
1219                 ret += "\t" + unpack_partial_fcn_fm_aggr(partial_fcns[p], p,\r
1220                          "t->aggr_table["+idx+"].",schema);\r
1221                 ret += "\t\tif(retval) unpack_failed = 1;\n";\r
1222           }\r
1223     }\r
1224                                                                 // BEGIN don't allocate tuple if\r
1225         ret += "\t\tif( unpack_failed == 0 ){\n"; // unpack failed.\r
1226   }\r
1227 \r
1228 //                      Unpack any BUFFER type selections into temporaries\r
1229 //                      so that I can compute their size and not have\r
1230 //                      to recompute their value during tuple packing.\r
1231 //                      I can use regular assignment here because\r
1232 //                      these temporaries are non-persistent.\r
1233 \r
1234           for(s=0;s<sl_list.size();s++){\r
1235                 data_type *sdt = sl_list[s]->get_data_type();\r
1236                 if(sdt->is_buffer_type()){\r
1237                         sprintf(tmpstr,"\t\t\tselvar_%d = ",s);\r
1238                         ret += tmpstr;\r
1239                         ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);\r
1240                         ret += ";\n";\r
1241                 }\r
1242           }\r
1243 \r
1244 \r
1245 //              The size of the tuple is the size of the tuple struct plus the\r
1246 //              size of the buffers to be copied in.\r
1247 \r
1248           ret += "\t\t\ttuple_size = sizeof( struct ";\r
1249           ret +=  generate_tuple_name(node_name);\r
1250           ret += ")";\r
1251           for(s=0;s<sl_list.size();s++){\r
1252                 data_type *sdt = sl_list[s]->get_data_type();\r
1253                 if(sdt->is_buffer_type()){\r
1254                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);\r
1255                         ret += tmpstr;\r
1256                 }\r
1257           }\r
1258           ret += ";\n";\r
1259 \r
1260 \r
1261           ret += "\t\t\ttuple = allocate_tuple(f, tuple_size );\n";\r
1262           ret += "\t\t\tif( tuple != NULL){\n";\r
1263 \r
1264 \r
1265 //                      Test passed, make assignments to the tuple.\r
1266 \r
1267           ret += "\t\t\t\ttuple_pos = sizeof( struct ";\r
1268           ret +=  generate_tuple_name(node_name) ;\r
1269           ret += ");\n";\r
1270 \r
1271 //                      Mark tuple as REGULAR_TUPLE\r
1272           ret += "\n\t\t\t\ttuple->tuple_type = REGULAR_TUPLE;\n";\r
1273 \r
1274           for(s=0;s<sl_list.size();s++){\r
1275                 data_type *sdt = sl_list[s]->get_data_type();\r
1276                 if(sdt->is_buffer_type()){\r
1277                         sprintf(tmpstr,"\t\t\t\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);\r
1278                         ret += tmpstr;\r
1279                         sprintf(tmpstr,"\t\t\t\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);\r
1280                         ret += tmpstr;\r
1281                 }else{\r
1282                         sprintf(tmpstr,"\t\t\t\ttuple->tuple_var%d = ",s);\r
1283                         ret += tmpstr;\r
1284 //                      if(sdt->needs_hn_translation())\r
1285 //                              ret += sdt->hton_translation() +"( ";\r
1286                         ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);\r
1287 //                      if(sdt->needs_hn_translation())\r
1288 //                              ret += ") ";\r
1289                         ret += ";\n";\r
1290                 }\r
1291           }\r
1292 \r
1293 //              Generate output.\r
1294           ret += "\t\t\t\tpost_tuple(tuple);\n";\r
1295           ret += "\t\t\t\t#ifdef LFTA_STATS\n";\r
1296           ret+="\t\t\t\tt->out_tuple_cnt++;\n";\r
1297           ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";\r
1298           ret += "\t\t\t\t#endif\n\n";\r
1299           ret += "\t\t\t}\n";\r
1300 \r
1301           if(sl_fcns_start != sl_fcns_end)      // END don't allocate tuple if\r
1302                 ret += "\t\t}\n";                               // unpack failed.\r
1303           ret += "\t}\n";\r
1304 \r
1305 //                      Need to release memory held by BUFFER types.\r
1306                 int g;\r
1307 \r
1308           for(g=0;g<gb_tbl->size();g++){\r
1309                 data_type *gdt = gb_tbl->get_data_type(g);\r
1310                 if(gdt->is_buffer_type()){\r
1311                         sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].gb_var%d));\n",gdt->get_buffer_destroy().c_str(),idx.c_str(),g);\r
1312                         ret += tmpstr;\r
1313                 }\r
1314           }\r
1315           for(a=0;a<aggr_tbl->size();a++){\r
1316                 if(aggr_tbl->is_builtin(a)){\r
1317                         data_type *adt = aggr_tbl->get_data_type(a);\r
1318                         if(adt->is_buffer_type()){\r
1319                                 sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].aggr_var%d));\n",adt->get_buffer_destroy().c_str(),idx.c_str(),a);\r
1320                                 ret += tmpstr;\r
1321                         }\r
1322                 }else{\r
1323                         ret += "\t\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_DESTROY_(";\r
1324                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";\r
1325                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";\r
1326                 }\r
1327           }\r
1328 \r
1329         ret += "\t\tt->n_aggrs--;\n";\r
1330 \r
1331         return(ret);\r
1332 \r
1333 }\r
1334 \r
1335 string generate_gb_match_test(string idx){\r
1336         int g;\r
1337         string ret="\tif (IS_FILLED(t->aggr_table_bitmap, "+idx+") &&  IS_NEW(t->aggr_table_bitmap,"+idx+")";\r
1338         if(gb_tbl->size()>0){\r
1339                 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";\r
1340                 ret+="\t\t";\r
1341 \r
1342 //                      Next, scan list for a match on the group-by attributes.\r
1343           string rhs_op, lhs_op;\r
1344           for(g=0;g<gb_tbl->size();g++){\r
1345                   ret += " && ";\r
1346                   ret += "(";\r
1347                   sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;\r
1348                   sprintf(tmpstr,"t->aggr_table[%s].gb_var%d",idx.c_str(),g); rhs_op = tmpstr;\r
1349                   ret += generate_equality_test(lhs_op, rhs_op, gb_tbl->get_data_type(g) );\r
1350                   ret += ")";\r
1351           }\r
1352          }\r
1353 \r
1354           ret += "){\n";\r
1355 \r
1356         return ret;\r
1357 }\r
1358 \r
1359 string generate_gb_update(string node_name, table_list *schema, string idx, bool has_udaf){\r
1360         int g;\r
1361         string ret;\r
1362 \r
1363           ret += "/*\t\tMatch found : update in place.\t*/\n";\r
1364           int a;\r
1365           has_udaf = false;\r
1366           for(a=0;a<aggr_tbl->size();a++){\r
1367                   sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);\r
1368                   ret += generate_aggr_update(tmpstr,aggr_tbl,a, schema);\r
1369                   if(! aggr_tbl->is_builtin(a)) has_udaf = true;\r
1370           }\r
1371 \r
1372 //                      garbage collect copied buffer type gb attrs.\r
1373   for(g=0;g<gb_tbl->size();g++){\r
1374           data_type *gdt = gb_tbl->get_data_type(g);\r
1375           if(gdt->is_buffer_type()){\r
1376                 sprintf(tmpstr,"\t\t\t%s(&(gb_attr_%d));\n",gdt->get_buffer_destroy().c_str(),g);\r
1377                 ret+=tmpstr;\r
1378           }\r
1379         }\r
1380 \r
1381 \r
1382 \r
1383           bool first_udaf = true;\r
1384           if(has_udaf){\r
1385                 ret += "\t\tif(";\r
1386                 for(a=0;a<aggr_tbl->size();a++){\r
1387                         if(! aggr_tbl->is_builtin(a)){\r
1388                                 if(! first_udaf)ret += " || ";\r
1389                                 else first_udaf = false;\r
1390                                 ret += aggr_tbl->get_op(a)+"_LFTA_AGGR_FLUSHME_(";\r
1391                                 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";\r
1392                                 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"))";\r
1393                         }\r
1394                 }\r
1395                 ret+="){\n";\r
1396                 ret+=" fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";\r
1397                 ret += generate_tuple_from_aggr(node_name,schema,idx);\r
1398                 ret += "\t\tt->aggr_table_hashmap["+idx+"] &= ~SLOT_FILLED;\n";\r
1399                 ret+="\t\t}\n";\r
1400         }\r
1401         return ret;\r
1402 }\r
1403 \r
1404 \r
1405 string generate_init_group( table_list *schema, string idx){\r
1406           int g,a;\r
1407           string ret="\t\t\tt->aggr_table_hashmap["+idx+"] = hash2 | SLOT_FILLED | gen_val;\n";\r
1408 //                      Fill up the aggregate block.\r
1409           for(g=0;g<gb_tbl->size();g++){\r
1410                   sprintf(tmpstr,"\t\t\tt->aggr_table[%s].gb_var%d = gb_attr_%d;\n",idx.c_str(),g,g);\r
1411                   ret += tmpstr;\r
1412           }\r
1413           for(a=0;a<aggr_tbl->size();a++){\r
1414                   sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);\r
1415                   ret += generate_aggr_init(tmpstr, aggr_tbl,a,  schema);\r
1416           }\r
1417           ret+="\t\tt->n_aggrs++;\n";\r
1418         return ret;\r
1419 }\r
1420 \r
1421 \r
1422 string generate_fta_flush(string node_name, table_list *schema,\r
1423                 ext_fcn_list *Ext_fcns){\r
1424 \r
1425    string ret;\r
1426    string select_var_defs ;\r
1427         int s, p;\r
1428 \r
1429 //              Flush from previous epoch\r
1430 \r
1431         ret+="static void fta_aggr_flush_old_"+node_name+"(struct FTA *f, unsigned int nflush){\n";\r
1432 \r
1433         ret += "\tgs_int32_t tuple_size, tuple_pos;\n";\r
1434     ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";\r
1435         ret += "\tint i, lfta_bailout;\n";\r
1436         ret += "\tunsigned int gen_val;\n";\r
1437 \r
1438         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";\r
1439         ret += generate_fta_name(node_name)+" *) f;\n";\r
1440 \r
1441         ret += "\n";\r
1442 \r
1443 \r
1444 //              Variables needed to store selected attributes of BUFFER type\r
1445 //              temporarily, in order to compute their size for storage\r
1446 //              in an output tuple.\r
1447 \r
1448   select_var_defs = "";\r
1449   for(s=0;s<sl_list.size();s++){\r
1450         data_type *sdt = sl_list[s]->get_data_type();\r
1451         if(sdt->is_buffer_type()){\r
1452           sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);\r
1453           select_var_defs.append(tmpstr);\r
1454         }\r
1455   }\r
1456   if(select_var_defs != ""){\r
1457         ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";\r
1458     ret += select_var_defs;\r
1459   }\r
1460 \r
1461 \r
1462 //              Variables to store results of partial functions.\r
1463   if(sl_fcns_start != sl_fcns_end){\r
1464         ret += "/*\t\tVariables to store the results of partial functions.\t*/\n";\r
1465         for(p=sl_fcns_start;p<sl_fcns_end;p++){\r
1466                 sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",\r
1467                         partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);\r
1468                 ret += tmpstr;\r
1469         }\r
1470         ret += "\tgs_retval_t retval = 0;\n\tint unpack_failed = 0;\n;";\r
1471   }\r
1472 \r
1473 //              Variables for udaf output temporaries\r
1474         bool no_udaf = true;\r
1475         int a;\r
1476         for(a=0;a<aggr_tbl->size();a++){\r
1477                 if(! aggr_tbl->is_builtin(a)){\r
1478                         if(no_udaf){\r
1479                                 ret+="/*\t\tUDAF output vars.\t*/\n";\r
1480                                 no_udaf = false;\r
1481                         }\r
1482                         int afcn_id = aggr_tbl->get_fcn_id(a);\r
1483                         data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);\r
1484                         sprintf(tmpstr,"udaf_ret%d", a);\r
1485                         ret+="\t"+adt->make_cvar(tmpstr)+";\n";\r
1486                 }\r
1487         }\r
1488 \r
1489 \r
1490 //  ret+="\tt->flush_finished=1; /* flush will be completed */\n";\r
1491   ret+="\n";\r
1492   ret+="\tgen_val = t->generation & SLOT_GEN_BITS;\n";\r
1493   ret+="\tfor (i=t->flush_pos; (i < t->max_aggrs) && t->n_aggrs && nflush>0; ++i){\n";\r
1494   ret+="\t\tif ( (t->aggr_table_hashmap[i] & SLOT_FILLED) && (((t->aggr_table_hashmap[i] & SLOT_GEN_BITS) != gen_val ) || (";\r
1495                 bool first_g=true;\r
1496                 int g;\r
1497                 for(g=0;g<gb_tbl->size();g++){\r
1498                   data_type *gdt = gb_tbl->get_data_type(g);\r
1499                   if(gdt->is_temporal()){\r
1500                         if(first_g) first_g=false; else ret+=" || ";\r
1501                         ret += "t->last_gb_"+int_to_string(g)+" > t->aggr_table[i].gb_var"+int_to_string(g)+" ";\r
1502                   }\r
1503                 }\r
1504   ret += "))) {\n";\r
1505   ret+="\t\t\tt->aggr_table_hashmap[i] = 0;\n";\r
1506   ret+=\r
1507 "#ifdef LFTA_STATS\n"\r
1508 "\t\t\tt->eviction_cnt++;\n"\r
1509 "#endif\n"\r
1510 ;\r
1511 \r
1512 \r
1513   ret+=generate_tuple_from_aggr(node_name,schema,"i");\r
1514 \r
1515 //  ret+="\t\t\tt->n_aggrs--;\n";  // done in generate_tuple_from_aggr\r
1516   ret+="\t\t\tnflush--;\n";\r
1517   ret+="\t\t}\n";\r
1518   ret+="\t}\n";\r
1519   ret+="\tt->flush_pos=i;\n";\r
1520   ret+="\tif(t->n_aggrs == 0) {\n";\r
1521   ret+="\t\tt->flush_pos = t->max_aggrs;\n";\r
1522   ret += "\t}\n\n";\r
1523 \r
1524   ret+="\tif(t->flush_pos == t->max_aggrs) {\n";\r
1525 \r
1526   for(int g=0;g<gb_tbl->size();g++){\r
1527         data_type *dt = gb_tbl->get_data_type(g);\r
1528         if(dt->is_temporal()){\r
1529                 data_type *gdt = gb_tbl->get_data_type(g);\r
1530                 if(!gdt->is_buffer_type()){\r
1531                         sprintf(tmpstr,"\t\tt->last_flushed_gb_%d = t->flush_start_gb_%d;\n",g,g);\r
1532                         ret += tmpstr;\r
1533                 }\r
1534         }\r
1535   }\r
1536   ret += "\t}\n}\n\n";\r
1537 \r
1538   return(ret);\r
1539 }\r
1540 \r
1541 // TODO Remove sprintf to perform string catenation\r
1542 string generate_fta_load_params(string node_name){\r
1543         int p;\r
1544         vector<string> param_names = param_tbl->get_param_names();\r
1545 \r
1546         string ret = "static int load_params_"+node_name+"(struct "+generate_fta_name(node_name);\r
1547                 ret += " *t, int sz, void *value, int initial_call){\n";\r
1548     ret += "\tint pos=0;\n";\r
1549     ret += "\tint data_pos;\n";\r
1550 \r
1551         for(p=0;p<param_names.size();p++){\r
1552                 data_type *dt = param_tbl->get_data_type(param_names[p]);\r
1553                 if(dt->is_buffer_type()){\r
1554                         sprintf(tmpstr,"\t%s tmp_var_%s;\n",dt->get_cvar_type().c_str(), param_names[p].c_str() );\r
1555                         ret += tmpstr;\r
1556                         sprintf(tmpstr,"\t%s access_var_%s;\n",dt->get_tuple_cvar_type().c_str(), param_names[p].c_str() );\r
1557                         ret += tmpstr;\r
1558                 }\r
1559         }\r
1560 \r
1561 \r
1562 \r
1563         ret += "\n\tdata_pos = ";\r
1564         for(p=0;p<param_names.size();p++){\r
1565                 if(p>0) ret += " + ";\r
1566                 data_type *dt = param_tbl->get_data_type(param_names[p]);\r
1567                 ret += "sizeof( ";\r
1568                 ret +=  dt->get_tuple_cvar_type();\r
1569                 ret += " )";\r
1570         }\r
1571         ret += ";\n";\r
1572         ret += "\tif(data_pos > sz) return 1;\n\n";\r
1573 \r
1574 \r
1575         for(p=0;p<param_names.size();p++){\r
1576                 data_type *dt = param_tbl->get_data_type(param_names[p]);\r
1577                 if(dt->is_buffer_type()){\r
1578                         sprintf(tmpstr,"\taccess_var_%s =  *( (%s *)((char *)value+pos) );\n",param_names[p].c_str(), dt->get_tuple_cvar_type().c_str() );\r
1579                         ret += tmpstr;\r
1580                         switch( dt->get_type() ){\r
1581                         case v_str_t:\r
1582 //                              ret += "\ttmp_var_"+param_names[p]+".data = ntohl( tmp_var_"+param_names[p]+".data );\n";               // ntoh conversion\r
1583 //                              ret += "\ttmp_var_"+param_names[p]+".length = ntohl( tmp_var_"+param_names[p]+".length );\n";   // ntoh conversion\r
1584                                 sprintf(tmpstr,"\tif( (access_var_%s.offset) + access_var_%s.length > sz) return 1;\n",param_names[p].c_str(), param_names[p].c_str() );\r
1585                                 ret += tmpstr;\r
1586                                 sprintf(tmpstr,"\ttmp_var_%s.data = (gs_sp_t)(value) + access_var_%s.offset ;\n",param_names[p].c_str(), param_names[p].c_str() );\r
1587                                 ret += tmpstr;\r
1588                                 sprintf(tmpstr,"\ttmp_var_%s.length =  access_var_%s.length ;\n",param_names[p].c_str(), param_names[p].c_str() );\r
1589                                 ret += tmpstr;\r
1590                         break;\r
1591                         default:\r
1592                                 fprintf(stderr,"ERROR: parameter %s is of type %s, a buffered type, but I don't know how to unpack it as a parameter.\n",param_names[p].c_str(), dt->get_type_str().c_str() );\r
1593                                 exit(1);\r
1594                         break;\r
1595                         }\r
1596 //                                      First, destroy the old\r
1597                         ret += "\tif(! initial_call)\n";\r
1598                         sprintf(tmpstr,"\t\t%s(&(t->param_%s));\n",dt->get_buffer_destroy().c_str(),param_names[p].c_str());\r
1599                         ret += tmpstr;\r
1600 //                                      Next, create the new.\r
1601                         sprintf(tmpstr,"\t%s((struct FTA *)t, &(t->param_%s), &tmp_var_%s);\n", dt->get_buffer_assign_copy().c_str(), param_names[p].c_str(), param_names[p].c_str() );\r
1602                         ret += tmpstr;\r
1603                 }else{\r
1604 //                      if(dt->needs_hn_translation()){\r
1605 //                              sprintf(tmpstr,"\tt->param_%s =  %s( *( (%s *)( (char *)value+pos) ) );\n",\r
1606 //                                param_names[p].c_str(), dt->ntoh_translation().c_str(), dt->get_cvar_type().c_str() );\r
1607 //                      }else{\r
1608                                 sprintf(tmpstr,"\tt->param_%s =  *( (%s *)( (char *)value+pos) );\n",\r
1609                                   param_names[p].c_str(), dt->get_cvar_type().c_str() );\r
1610 //                      }\r
1611                         ret += tmpstr;\r
1612                 }\r
1613                 sprintf(tmpstr,"\tpos += sizeof( %s );\n",dt->get_cvar_type().c_str() );\r
1614                 ret += tmpstr;\r
1615         }\r
1616 \r
1617 //                      Register the pass-by-handle parameters\r
1618 \r
1619         ret += "/* register and de-register the pass-by-handle parameters */\n";\r
1620 \r
1621     int ph;\r
1622     for(ph=0;ph<param_handle_table.size();++ph){\r
1623                 data_type pdt(param_handle_table[ph]->type_name);\r
1624                 switch(param_handle_table[ph]->val_type){\r
1625                 case cplx_lit_e:\r
1626                         break;\r
1627                 case litval_e:\r
1628                         break;\r
1629                 case param_e:\r
1630                         ret += "\tif(! initial_call)\n";\r
1631                         sprintf(tmpstr, "\t\t%s(t->handle_param_%d);\n",\r
1632                                 param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);\r
1633                         ret += tmpstr;\r
1634                         sprintf(tmpstr,"\tt->handle_param_%d = %s((struct FTA *)t,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());\r
1635                         ret += tmpstr;\r
1636 \r
1637                         if(pdt.is_buffer_type()) ret += "&(";\r
1638                         ret += "t->param_"+param_handle_table[ph]->param_name;\r
1639                         if(pdt.is_buffer_type()) ret += ")";\r
1640                         ret += ");\n";\r
1641                         break;\r
1642                 default:\r
1643                         sprintf(tmpstr, "INTERNAL ERROR unknown case (%d) found when processing pass-by-handle parameter table.",param_handle_table[ph]->val_type);\r
1644                         fprintf(stderr,"%s\n",tmpstr);\r
1645                         ret+=tmpstr;\r
1646                 }\r
1647         }\r
1648 \r
1649         ret+="\treturn 0;\n";\r
1650         ret += "}\n\n";\r
1651 \r
1652         return(ret);\r
1653 }\r
1654 \r
1655 \r
1656 \r
1657 \r
1658 string generate_fta_free(string node_name, bool is_aggr_query){\r
1659 \r
1660         string ret="static gs_retval_t free_fta_"+node_name+"(struct FTA *f, gs_uint32_t recursive){\n";\r
1661         ret+= "\tstruct "+generate_fta_name(node_name)+\r
1662                 " * t = (struct "+generate_fta_name(node_name)+" *) f;\n";\r
1663         ret += "\tint i;\n";\r
1664 \r
1665         if(is_aggr_query){\r
1666                 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";\r
1667                 ret+="\t/* \t\tmark all groups as old */\n";\r
1668                 ret+="\tt->generation++;\n";\r
1669                 ret+="\tt->flush_pos = 0;\n";\r
1670                 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";\r
1671         }\r
1672 \r
1673 //                      Deregister the pass-by-handle parameters\r
1674         ret += "/* de-register the pass-by-handle parameters */\n";\r
1675     int ph;\r
1676     for(ph=0;ph<param_handle_table.size();++ph){\r
1677                 sprintf(tmpstr, "\t%s(t->handle_param_%d);\n",\r
1678                         param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);\r
1679                 ret += tmpstr;\r
1680         }\r
1681 \r
1682 \r
1683         ret += "\treturn 0;\n}\n\n";\r
1684         return(ret);\r
1685 }\r
1686 \r
1687 \r
1688 string generate_fta_control(string node_name, table_list *schema, bool is_aggr_query){\r
1689         string ret="static gs_retval_t control_fta_"+node_name+"(struct FTA *f,  gs_int32_t command, gs_int32_t sz, void *value){\n";\r
1690         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";\r
1691                 ret += generate_fta_name(node_name)+" *) f;\n\n";\r
1692         ret+="\tint i;\n";\r
1693 \r
1694 \r
1695         ret += "\t/* temp status tuple */\n";\r
1696         ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";\r
1697         ret += "\tgs_int32_t tuple_size;\n";\r
1698 \r
1699 \r
1700         if(is_aggr_query){\r
1701                 ret+="\tif(command == FTA_COMMAND_FLUSH){\n";\r
1702 \r
1703                 ret+="\t\tif (!t->n_aggrs) {\n";\r
1704                 ret+="\t\t\ttuple = allocate_tuple(f, 0);\n";\r
1705                 ret+="\t\t\tif( tuple != NULL)\n";\r
1706                 ret+="\t\t\t\tpost_tuple(tuple);\n";\r
1707 \r
1708                 ret+="\t\t}else{\n";\r
1709 \r
1710                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";\r
1711                 ret+="\t\t\t/* \t\tmark all groups as old */\n";\r
1712                 ret +="\t\tt->generation++;\n";\r
1713                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";\r
1714                 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";\r
1715                 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";\r
1716                 ret+="\t\t\tt->flush_pos = 0;\n";\r
1717                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";\r
1718                 ret+="\t\t}\n";\r
1719 \r
1720                 ret+="\t}\n";\r
1721         }\r
1722         if(param_tbl->size() > 0){\r
1723                 ret+=\r
1724 "\tif(command == FTA_COMMAND_LOAD_PARAMS){\n"\r
1725 "\t\tif(load_params_"+node_name+"(t, sz, value, 0))\n"\r
1726 "#ifndef LFTA_IN_NIC\n"\r
1727 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small, ignored.\\n\");\n"\r
1728 "#else\n"\r
1729 "\t\t{}\n"\r
1730 "#endif\n"\r
1731 "\t}\n";\r
1732         }\r
1733         ret+=\r
1734 "\tif(command == FTA_COMMAND_SET_SAMPLING_RATE){\n"\r
1735 "\t\tmemcpy(&t->sampling_rate, value, sizeof(gs_float_t));\n"\r
1736 "\t}\n\n";\r
1737 \r
1738 \r
1739         ret += "\tif(command == FTA_COMMAND_FILE_DONE ){\n";\r
1740 \r
1741         if(is_aggr_query){\r
1742                 ret+="\t\tif (t->n_aggrs) {\n";\r
1743                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";\r
1744                 ret+="\t\t\t/* \t\tmark all groups as old */\n";\r
1745                 ret +="\t\tt->generation++;\n";\r
1746                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";\r
1747                 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";\r
1748                 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";\r
1749                 ret+="\t\t\tt->flush_pos = 0;\n";\r
1750                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";\r
1751                 ret+="\t\t}\n";\r
1752         }\r
1753 \r
1754         ret += "\t\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+");\n";\r
1755         ret += "\t\ttuple = allocate_tuple(f, tuple_size );\n";\r
1756         ret += "\t\tif( tuple == NULL)\n\t\treturn 1;\n";\r
1757 \r
1758         /* mark tuple as EOF_TUPLE */\r
1759         ret += "\n\t\t/* Mark tuple as eof_tuple */\n";\r
1760         ret += "\t\ttuple->tuple_type = EOF_TUPLE;\n";\r
1761         ret += "\t\tpost_tuple(tuple);\n";\r
1762         ret += "\t}\n";\r
1763 \r
1764         ret += "\treturn 0;\n}\n\n";\r
1765 \r
1766         return(ret);\r
1767 }\r
1768 \r
1769 string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query){\r
1770         string ret="static gs_retval_t clock_fta_"+node_name+"(struct FTA *f){\n";\r
1771         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";\r
1772                 ret += generate_fta_name(node_name)+" *) f;\n\n";\r
1773 \r
1774         ret += "\t/* Create a temp status tuple */\n";\r
1775         ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";\r
1776         ret += "\tgs_int32_t tuple_size;\n";\r
1777         ret += "\tunsigned int i;\n";\r
1778         ret += "\ttime_t cur_time;\n";\r
1779         ret += "\tint time_advanced;\n";\r
1780         ret += "\tstruct fta_stat stats;\n";\r
1781 \r
1782 \r
1783 \r
1784         /* copy the last seen values of temporal attributes */\r
1785         col_id_set temp_cids;           //      col ids of temp attributes in select clause\r
1786 \r
1787 \r
1788         /* HACK: in order to reuse the SE generation code, we need to copy\r
1789          * the last values of the temp attributes into new variables\r
1790          * which have names unpack_var_XXX_XXX\r
1791          */\r
1792 \r
1793         int s, g;\r
1794         col_id_set::iterator csi;\r
1795 \r
1796         for(s=0;s<sl_list.size();s++){\r
1797                 data_type *sdt = sl_list[s]->get_data_type();\r
1798                 if (sdt->is_temporal()) {\r
1799                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);\r
1800                 }\r
1801         }\r
1802 \r
1803         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){\r
1804                 int tblref = (*csi).tblvar_ref;\r
1805                 int schref = (*csi).schema_ref;\r
1806                 string field = (*csi).field;\r
1807                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));\r
1808                 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);\r
1809                 ret += tmpstr;\r
1810         }\r
1811 \r
1812         if (is_aggr_query) {\r
1813                 for(g=0;g<gb_tbl->size();g++){\r
1814                         data_type *gdt = gb_tbl->get_data_type(g);\r
1815                         if(gdt->is_temporal()){\r
1816                                 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);\r
1817                                 ret += tmpstr;\r
1818                                 data_type *gdt = gb_tbl->get_data_type(g);\r
1819                                 if(gdt->is_buffer_type()){\r
1820                                 sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);\r
1821                                 ret += tmpstr;\r
1822                                 }\r
1823                         }\r
1824                 }\r
1825         }\r
1826         ret += "\n";\r
1827 \r
1828         ret += "\ttime_advanced = 0;\n";\r
1829 \r
1830         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){\r
1831                 int tblref = (*csi).tblvar_ref;\r
1832                 int schref = (*csi).schema_ref;\r
1833                 string field = (*csi).field;\r
1834                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));\r
1835 \r
1836                 // update last seen value with the value seen\r
1837                 ret += "\t#ifdef PREFILTER_DEFINED\n";\r
1838                 sprintf(tmpstr,"\tif (prefilter_temp_vars.unpack_var_%s_%d > t->last_%s_%d) {\n\t\tt->last_%s_%d = prefilter_temp_vars.unpack_var_%s_%d;\n",\r
1839                         field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref);\r
1840                 ret += tmpstr;\r
1841                 ret += "\t\ttime_advanced = 1;\n\t}\n";\r
1842                 ret += "\t#endif\n";\r
1843 \r
1844                 // we need to pay special attention to time fields\r
1845                 if (field == "time" || field == "timestamp"){\r
1846                         ret += "\tcur_time = time(&cur_time);\n";\r
1847 \r
1848                         if (field == "time") {\r
1849                                 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && (t->last_time_%d < (cur_time - %d))) {\n",\r
1850                                         tblref, time_corr);\r
1851                                 ret += tmpstr;\r
1852                                 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = cur_time - %d;\n",\r
1853                                         field.c_str(), tblref, field.c_str(), tblref, time_corr);\r
1854                         } else {\r
1855                                 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((gs_uint32_t)(t->last_%s_%d>>32) < (cur_time - %d))) {\n",\r
1856                                         field.c_str(), tblref, time_corr);\r
1857                                 ret += tmpstr;\r
1858                                 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = ((gs_uint64_t)(cur_time - %d))<<32;\n",\r
1859                                         field.c_str(), tblref, field.c_str(), tblref, time_corr);\r
1860                         }\r
1861                         ret += tmpstr;\r
1862 \r
1863                         ret += "\t\ttime_advanced = 1;\n";\r
1864                         ret += "\t}\n";\r
1865 \r
1866                         sprintf(tmpstr,"\telse\n\t\tunpack_var_%s_%d = t->last_%s_%d;\n",\r
1867                                 field.c_str(), tblref, field.c_str(), tblref);\r
1868                         ret += tmpstr;\r
1869                 } else {\r
1870                         sprintf(tmpstr,"\tunpack_var_%s_%d = t->last_%s_%d;\n",\r
1871                                 field.c_str(), tblref, field.c_str(), tblref);\r
1872                         ret += tmpstr;\r
1873                 }\r
1874         }\r
1875 \r
1876         // for aggregation lftas we need to check if the time was advanced beyond the current epoch\r
1877         if (is_aggr_query) {\r
1878 \r
1879                 string change_test;\r
1880                 bool first_one = true;\r
1881                 for(g=0;g<gb_tbl->size();g++){\r
1882                   data_type *gdt = gb_tbl->get_data_type(g);\r
1883                   if(gdt->is_temporal()){\r
1884 //                                      To perform the test, first need to compute the value\r
1885 //                                      of the temporal gb attrs.\r
1886                           if(gdt->is_buffer_type()){\r
1887         //                              NOTE : if the SE defining the gb is anything\r
1888         //                              other than a ref to a variable, this will generate\r
1889         //                              illegal code.  To be resolved with Spatch.\r
1890                                 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",\r
1891                                         g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );\r
1892                                 ret+=tmpstr;\r
1893                                 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",\r
1894                                         gdt->get_buffer_assign_copy().c_str(), g, g);\r
1895                           }else{\r
1896                                 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());\r
1897                           }\r
1898                           ret += tmpstr;\r
1899 \r
1900                           sprintf(tmpstr,"t->last_gb_%d",g);   string lhs_op = tmpstr;\r
1901                           sprintf(tmpstr,"gb_attr_%d",g);   string rhs_op = tmpstr;\r
1902                           if(first_one){first_one = false;} else {change_test.append(") && (");}\r
1903                           change_test.append(generate_equality_test(lhs_op, rhs_op, gdt));\r
1904                   }\r
1905                 }\r
1906 \r
1907                 ret += "\n\tif( time_advanced && !( (";\r
1908                 ret += change_test;\r
1909                 ret += ") ) ){\n";\r
1910 \r
1911                 ret += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";\r
1912                 ret += "\t\tif(t->flush_pos<t->max_aggrs) \n";\r
1913                 ret += "\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";\r
1914 \r
1915                 ret += "\t\t/* \t\tmark all groups as old */\n";\r
1916                 ret +="\t\tt->generation++;\n";\r
1917                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";\r
1918                 ret += "//\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";\r
1919                 ret += "//\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";\r
1920                 ret += "\t\tt->flush_pos = 0;\n";\r
1921 \r
1922                 for(g=0;g<gb_tbl->size();g++){\r
1923                      data_type *gdt = gb_tbl->get_data_type(g);\r
1924                      if(gdt->is_temporal()){\r
1925                         sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);                 ret += tmpstr;\r
1926                         sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);                        ret += tmpstr;\r
1927                      }\r
1928                 }\r
1929                 ret += "\t}\n\n";\r
1930 \r
1931         }\r
1932 \r
1933         ret += "\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t) + sizeof(struct fta_stat);\n";\r
1934         ret += "\ttuple = allocate_tuple(f, tuple_size );\n";\r
1935         ret += "\tif( tuple == NULL)\n\t\treturn 1;\n";\r
1936 \r
1937 \r
1938         for(s=0;s<sl_list.size();s++){\r
1939                 data_type *sdt = sl_list[s]->get_data_type();\r
1940                 if(sdt->is_temporal()){\r
1941 \r
1942                         if (sl_list[s]->is_gb()) {\r
1943                                 sprintf(tmpstr,"\tt->last_flushed_gb_%d = (t->n_aggrs) ? t->last_flushed_gb_%d : %s;\n",sl_list[s]->get_gb_ref(), sl_list[s]->get_gb_ref(), generate_se_code(sl_list[s],schema).c_str());\r
1944                                 ret += tmpstr;\r
1945                         }\r
1946 \r
1947                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);\r
1948                         ret += tmpstr;\r
1949 //                      if(sdt->needs_hn_translation())\r
1950 //                              ret += sdt->hton_translation() +"( ";\r
1951                         if (sl_list[s]->is_gb()) {\r
1952                                 sprintf(tmpstr, "t->last_flushed_gb_%d",sl_list[s]->get_gb_ref());\r
1953                                 ret += tmpstr;\r
1954                         } else{\r
1955                                 ret += generate_se_code(sl_list[s],schema);\r
1956                         }\r
1957 //                      if(sdt->needs_hn_translation())\r
1958 //                              ret += " )";\r
1959                         ret += ";\n";\r
1960                 }\r
1961         }\r
1962 \r
1963         /* mark tuple as temporal */\r
1964         ret += "\n\t/* Mark tuple as temporal */\n";\r
1965         ret += "\ttuple->tuple_type = TEMPORAL_TUPLE;\n";\r
1966 \r
1967         ret += "\n\t/* Copy trace id */\n";\r
1968         ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+"), &t->trace_id, sizeof(gs_uint64_t));\n\n";\r
1969 \r
1970         ret += "\n\t/* Populate runtime stats */\n";\r
1971         ret += "\tstats.ftaid = f->ftaid;\n";\r
1972         ret += "\tstats.in_tuple_cnt = t->in_tuple_cnt;\n";\r
1973         ret += "\tstats.out_tuple_cnt = t->out_tuple_cnt;\n";\r
1974         ret += "\tstats.out_tuple_sz = t->out_tuple_sz;\n";\r
1975         ret += "\tstats.accepted_tuple_cnt = t->accepted_tuple_cnt;\n";\r
1976         ret += "\tstats.cycle_cnt = t->cycle_cnt;\n";\r
1977         ret += "\tstats.collision_cnt = t->collision_cnt;\n";\r
1978         ret += "\tstats.eviction_cnt = t->eviction_cnt;\n";\r
1979         ret += "\tstats.sampling_rate    = t->sampling_rate;\n";\r
1980 \r
1981         ret += "\n#ifdef LFTA_PROFILE\n";\r
1982         ret += "\n\t/* Print stats */\n";\r
1983         ret += "\tfprintf(stderr, \"STATS " + node_name + " \");\n";\r
1984         ret += "\tfprintf(stderr, \"in_tuple_cnt= %u \", t->in_tuple_cnt);\n";\r
1985         ret += "\tfprintf(stderr, \"out_tuple_cnt= %u \", t->out_tuple_cnt);\n";\r
1986         ret += "\tfprintf(stderr, \"out_tuple_sz= %u \", t->out_tuple_sz);\n";\r
1987         ret += "\tfprintf(stderr, \"accepted_tuple_cnt= %u \", t->accepted_tuple_cnt);\n";\r
1988         ret += "\tfprintf(stderr, \"cycle_cnt= %llu \", t->cycle_cnt);\n";\r
1989         ret += "\tfprintf(stderr, \"cycle_per_in_tuple= %lf \", ((double)t->cycle_cnt)/((double)t->in_tuple_cnt));\n";\r
1990         ret += "\tfprintf(stderr, \"collision_cnt= %d\\n\", t->collision_cnt);\n\n";\r
1991         ret += "\tfprintf(stderr, \"eviction_cnt= %d\\n\", t->eviction_cnt);\n\n";\r
1992         ret += "\n#endif\n";\r
1993 \r
1994 \r
1995         ret += "\n\t/* Copy stats */\n";\r
1996         ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t), &stats, sizeof(fta_stat));\n\n";\r
1997         ret+="\tpost_tuple(tuple);\n";\r
1998 \r
1999         ret += "\n\t/* Send a heartbeat message to clearinghouse */\n";\r
2000         ret += "\tfta_heartbeat(f->ftaid, t->trace_id++, 1, &stats);\n";\r
2001 \r
2002         ret += "\n\t/* Reset runtime stats */\n";\r
2003         ret += "\tt->in_tuple_cnt = 0;\n";\r
2004         ret += "\tt->out_tuple_cnt = 0;\n";\r
2005         ret += "\tt->out_tuple_sz = 0;\n";\r
2006         ret += "\tt->accepted_tuple_cnt = 0;\n";\r
2007         ret += "\tt->cycle_cnt = 0;\n";\r
2008         ret += "\tt->collision_cnt = 0;\n";\r
2009         ret += "\tt->eviction_cnt = 0;\n";\r
2010 \r
2011         ret += "\treturn 0;\n}\n\n";\r
2012 \r
2013         return(ret);\r
2014 }\r
2015 \r
2016 \r
2017 //              accept processing before the where clause,\r
2018 //              do flush processwing.\r
2019 string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *schema,   col_id_set &unpacked_cids, string &temporal_flush){\r
2020         int s;\r
2021 \r
2022 //              Slow flush\r
2023   string ret="\n/*\tslow flush\t*/\n";\r
2024   string slow_flush_str = fs->get_val_of_def("slow_flush");\r
2025   int n_slow_flush = atoi(slow_flush_str.c_str());\r
2026   if(n_slow_flush <= 0) n_slow_flush = 2;\r
2027   if(n_slow_flush > 1){\r
2028         ret += "\tt->flush_ctr++;\n";\r
2029         ret += "\tif(t->flush_ctr >= "+int_to_string(n_slow_flush)+"){\n";\r
2030         ret += "\t\tt->flush_ctr = 0;\n";\r
2031     ret+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n";\r
2032         ret += "\t}\n\n";\r
2033   }else{\r
2034     ret+="\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n\n";\r
2035   }\r
2036 \r
2037 \r
2038         string change_test;\r
2039         bool first_one = true;\r
2040         int g;\r
2041     col_id_set flush_cids;              //      col ids accessed when computing flush variables.\r
2042                                                         //  unpack them at temporal flush test time.\r
2043     temporal_flush = "";\r
2044 \r
2045 \r
2046         for(g=0;g<gb_tbl->size();g++){\r
2047                   data_type *gdt = gb_tbl->get_data_type(g);\r
2048                   if(gdt->is_temporal()){\r
2049                           gather_se_col_ids(gb_tbl->get_def(g), flush_cids, gb_tbl);\r
2050 \r
2051 //                                      To perform the test, first need to compute the value\r
2052 //                                      of the temporal gb attrs.\r
2053                           if(gdt->is_buffer_type()){\r
2054         //                              NOTE : if the SE defining the gb is anything\r
2055         //                              other than a ref to a variable, this will generate\r
2056         //                              illegal code.  To be resolved with Spatch.\r
2057                                 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",\r
2058                                         g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );\r
2059                                 temporal_flush += tmpstr;\r
2060                                 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",\r
2061                                         gdt->get_buffer_assign_copy().c_str(), g, g);\r
2062                           }else{\r
2063                                 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());\r
2064                           }\r
2065                           temporal_flush += tmpstr;\r
2066 //                                      END computing the value of the temporal GB attr.\r
2067 \r
2068 \r
2069                           sprintf(tmpstr,"t->last_gb_%d",g);   string lhs_op = tmpstr;\r
2070                           sprintf(tmpstr,"gb_attr_%d",g);   string rhs_op = tmpstr;\r
2071                           if(first_one){first_one = false;} else {change_test.append(") && (");}\r
2072                           change_test += generate_equality_test(lhs_op, rhs_op, gdt);\r
2073                   }\r
2074         }\r
2075         if(!first_one){         // will be false iff. there is a temporal GB attribute\r
2076                   temporal_flush += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";\r
2077                   temporal_flush += "\tif( !( (";\r
2078                   temporal_flush += change_test;\r
2079                   temporal_flush += ") ) ){\n";\r
2080 \r
2081 //                temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";\r
2082                   temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs){ \n";\r
2083                   temporal_flush+="\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";\r
2084                   temporal_flush+="\t\t}\n";\r
2085                   temporal_flush+="\t\t/* \t\tmark all groups as old */\n";\r
2086                   temporal_flush+="\t\tt->generation++;\n";\r
2087                   temporal_flush+="\t\tt->flush_pos = 0;\n";\r
2088 \r
2089 \r
2090 //                              Now set the saved temporal value of the gb to the\r
2091 //                              current value of the gb.  Only for simple types,\r
2092 //                              not for buffer types -- but the strings are not\r
2093 //                              temporal in any case.\r
2094 \r
2095                         for(g=0;g<gb_tbl->size();g++){\r
2096                           data_type *gdt = gb_tbl->get_data_type(g);\r
2097                           if(gdt->is_temporal()){\r
2098                                   if(gdt->is_buffer_type()){\r
2099 \r
2100                                         fprintf(stderr,"ERROR : can't handle temporal buffer types, ignoring in buffer flush control.\n");\r
2101                                   }else{\r
2102                                         sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);\r
2103                                         temporal_flush += tmpstr;\r
2104                                         sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);\r
2105                                         temporal_flush += tmpstr;\r
2106                                   }\r
2107                                 }\r
2108                         }\r
2109                   temporal_flush += "\t}\n\n";\r
2110         }\r
2111 \r
2112 // Unpack all the temporal attributes referenced in select clause\r
2113 // and update the last value of the attribute\r
2114         col_id_set temp_cids;           //      col ids of temp attributes in select clause\r
2115         col_id_set::iterator csi;\r
2116 \r
2117         for(s=0;s<sl_list.size();s++){\r
2118                 data_type *sdt = sl_list[s]->get_data_type();\r
2119                 if (sdt->is_temporal()) {\r
2120                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);\r
2121                 }\r
2122         }\r
2123 \r
2124         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){\r
2125                 if(unpacked_cids.count((*csi)) == 0){\r
2126                         int tblref = (*csi).tblvar_ref;\r
2127                         int schref = (*csi).schema_ref;\r
2128                         string field = (*csi).field;\r
2129                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);\r
2130 /*\r
2131                         data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));\r
2132                         sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",\r
2133                                 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);\r
2134                         ret += tmpstr;\r
2135                         ret += "\tif(retval) return 1;\n";\r
2136 */\r
2137                         sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);\r
2138                         ret += tmpstr;\r
2139 \r
2140                         unpacked_cids.insert( (*csi) );\r
2141                 }\r
2142         }\r
2143 \r
2144 \r
2145 //                              Do the flush here if this is a real_time query\r
2146         string rt_level = fs->get_val_of_def("real_time");\r
2147         if(rt_level != "" && temporal_flush != ""){\r
2148                 for(csi=flush_cids.begin(); csi != flush_cids.end();++csi){\r
2149                         if(unpacked_cids.count((*csi)) == 0){\r
2150                         int tblref = (*csi).tblvar_ref;\r
2151                         int schref = (*csi).schema_ref;\r
2152                         string field = (*csi).field;\r
2153                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);\r
2154 /*\r
2155                                 sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",\r
2156                                 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);\r
2157                         ret += tmpstr;\r
2158                         ret += "\tif(retval) return 1;\n";\r
2159 */\r
2160                                 unpacked_cids.insert( (*csi) );\r
2161                         }\r
2162                 }\r
2163                 ret += temporal_flush;\r
2164         }\r
2165 \r
2166         return ret;\r
2167  }\r
2168 \r
2169 string generate_sel_accept_body(qp_node *fs, string node_name, table_list *schema){\r
2170 \r
2171 int p,s;\r
2172 string ret;\r
2173 \r
2174 ///////////////                 Processing for filter-only query\r
2175 \r
2176 //                      test passed : create the tuple, then assign to it.\r
2177           ret += "/*\t\tCreate and post the tuple\t*/\n";\r
2178 \r
2179 //                      Unpack partial fcns ref'd by the select clause.\r
2180 //                      Its a kind of a WHERE clause ...\r
2181   for(p=sl_fcns_start;p<sl_fcns_end;p++){\r
2182         if(fcn_ref_cnt[p] > 1){\r
2183                 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";\r
2184         }\r
2185         if(is_partial_fcn[p]){\r
2186                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);\r
2187                 ret += "\tif(retval) goto end;\n";\r
2188         }\r
2189         if(fcn_ref_cnt[p] > 1){\r
2190                 if(!is_partial_fcn[p]){\r
2191                         ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";\r
2192                 }\r
2193                 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";\r
2194                 ret += "\t}\n";\r
2195         }\r
2196   }\r
2197 \r
2198   // increment the counter of accepted tuples\r
2199   ret += "\n\t#ifdef LFTA_STATS\n";\r
2200   ret += "\n\tt->accepted_tuple_cnt++;\n\n";\r
2201   ret += "\t#endif\n\n";\r
2202 \r
2203 //                      First, compute the size of the tuple.\r
2204 \r
2205 //                      Unpack any BUFFER type selections into temporaries\r
2206 //                      so that I can compute their size and not have\r
2207 //                      to recompute their value during tuple packing.\r
2208 //                      I can use regular assignment here because\r
2209 //                      these temporaries are non-persistent.\r
2210 \r
2211           for(s=0;s<sl_list.size();s++){\r
2212                 data_type *sdt = sl_list[s]->get_data_type();\r
2213                 if(sdt->is_buffer_type()){\r
2214                         sprintf(tmpstr,"\tselvar_%d = ",s);\r
2215                         ret += tmpstr;\r
2216                         ret += generate_se_code(sl_list[s],schema);\r
2217                         ret += ";\n";\r
2218                 }\r
2219           }\r
2220 \r
2221 \r
2222 //              The size of the tuple is the size of the tuple struct plus the\r
2223 //              size of the buffers to be copied in.\r
2224 \r
2225           ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";\r
2226           for(s=0;s<sl_list.size();s++){\r
2227                 data_type *sdt = sl_list[s]->get_data_type();\r
2228                 if(sdt->is_buffer_type()){\r
2229                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);\r
2230                         ret += tmpstr;\r
2231                 }\r
2232           }\r
2233           ret += ";\n";\r
2234 \r
2235 \r
2236           ret += "\ttuple = allocate_tuple(f, tuple_size );\n";\r
2237           ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";\r
2238 \r
2239 //                      Test passed, make assignments to the tuple.\r
2240 \r
2241           ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";\r
2242 \r
2243 //                      Mark tuple as REGULAR_TUPLE\r
2244           ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";\r
2245 \r
2246 \r
2247           for(s=0;s<sl_list.size();s++){\r
2248                 data_type *sdt = sl_list[s]->get_data_type();\r
2249                 if(sdt->is_buffer_type()){\r
2250                         sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);\r
2251                         ret += tmpstr;\r
2252                         sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);\r
2253                         ret += tmpstr;\r
2254                 }else{\r
2255                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);\r
2256                         ret += tmpstr;\r
2257 //                      if(sdt->needs_hn_translation())\r
2258 //                              ret += sdt->hton_translation() +"( ";\r
2259                         ret += generate_se_code(sl_list[s],schema);\r
2260 //                      if(sdt->needs_hn_translation())\r
2261 //                              ret += ") ";\r
2262                         ret += ";\n";\r
2263                 }\r
2264           }\r
2265 \r
2266 //              Generate output.\r
2267 \r
2268           ret += "\tpost_tuple(tuple);\n";\r
2269 \r
2270 //              Increment the counter of posted tuples\r
2271   ret += "\n\t#ifdef LFTA_STATS\n";\r
2272   ret += "\tt->out_tuple_cnt++;\n";\r
2273   ret+="\tt->out_tuple_sz+=tuple_size;\n";\r
2274   ret += "\t#endif\n\n";\r
2275 \r
2276 \r
2277 \r
2278         return ret;\r
2279 }\r
2280 \r
2281 string generate_fj_accept_body(filter_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){\r
2282 \r
2283 int p,s,w;\r
2284 string ret;\r
2285 \r
2286 //                      Get parameters\r
2287         unsigned int window_len = fs->temporal_range;\r
2288         unsigned int n_bloom = 11;\r
2289         string n_bloom_str = fs->get_val_of_def("num_bloom");\r
2290         int tmp_n_bloom = atoi(n_bloom_str.c_str());\r
2291         if(tmp_n_bloom>0)\r
2292                 n_bloom = tmp_n_bloom+1;\r
2293         float bloom_width = (window_len+1.0)/(1.0*n_bloom-1);\r
2294         sprintf(tmpstr,"%f",bloom_width);\r
2295         string bloom_width_str = tmpstr;\r
2296 \r
2297         if(window_len < n_bloom){\r
2298                 n_bloom = window_len+1;\r
2299                 bloom_width_str = "1";\r
2300         }\r
2301 \r
2302 \r
2303 //              Grab the current window time\r
2304         scalarexp_t winvar(fs->temporal_var);\r
2305         ret += "\tcurr_fj_ts = "+generate_se_code(&winvar,schema)+";\n";\r
2306 \r
2307         int bf_exp_size = 12;  // base-2 log of number of bits\r
2308         string bloom_len_str = fs->get_val_of_def("bloom_size");\r
2309         int tmp_bf_exp_size = atoi(bloom_len_str.c_str());\r
2310         if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){\r
2311                 bf_exp_size = tmp_bf_exp_size;\r
2312         }\r
2313         int bf_bit_size = 1 << bf_exp_size;\r
2314         int bf_byte_size = bf_bit_size / (8*sizeof(char));\r
2315 \r
2316         unsigned int ht_size = 4096;\r
2317         string ht_size_s = fs->get_val_of_def("aggregate_slots");\r
2318         int tmp_ht_size = atoi(ht_size_s.c_str());\r
2319         if(tmp_ht_size > 1024){\r
2320                 unsigned int hs = 1;            // make it power of 2\r
2321                 while(tmp_ht_size){\r
2322                         hs =hs << 1;\r
2323                         tmp_ht_size = tmp_ht_size >> 1;\r
2324                 }\r
2325                 ht_size = hs;\r
2326         }\r
2327 \r
2328         int i, bf_mask = 0;\r
2329         if(fs->use_bloom){\r
2330                 for(i=0;i<bf_exp_size;i++)\r
2331                         bf_mask = (bf_mask << 1) | 1;\r
2332         }else{\r
2333                 for(i=ht_size;i>1;i=i>>1)\r
2334                         bf_mask = (bf_mask << 1) | 1;\r
2335         }\r
2336 \r
2337 /*\r
2338 printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=%d, bf_byte_size=%d, ht_size=%d, ht_size_s=%s, bf_mask=%d\n",\r
2339         n_bloom,\r
2340         window_len,\r
2341         bloom_width_str.c_str(),\r
2342         bf_exp_size,\r
2343         bf_bit_size,\r
2344         bf_byte_size,\r
2345         ht_size,\r
2346         ht_size_s.c_str(),\r
2347         bf_mask);\r
2348 */\r
2349 \r
2350 \r
2351 \r
2352 \r
2353 //              If this is a bloom-filter fj, first test if the\r
2354 //              bloom filter needs to be advanced.\r
2355 //              SET_BF_EMPTY(table,number of bloom filters,bloom filter index,bit index)\r
2356 //              t->bf_size : number of bits in bloom filter\r
2357         if(fs->use_bloom){\r
2358                 ret +=\r
2359 "//                     Clean out old bloom filters if needed.\n"\r
2360 "       if(t->first_exec){\n"\r
2361 "               t->first_exec = 0;\n"\r
2362 "               t->last_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"\r
2363 "               t->last_bloom_pos = t->last_bin % "+int_to_string(n_bloom)+";\n"\r
2364 "       }else{\n"\r
2365 "               curr_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"\r
2366 "               if(curr_bin != t->last_bin){\n"\r
2367 "                       for(the_bin=t->last_bin+1;the_bin<=curr_bin;the_bin++){\n"\r
2368 "                               t->last_bloom_pos++;\n"\r
2369 "                               if(t->last_bloom_pos >= "+int_to_string(n_bloom)+")\n"\r
2370 "                                       t->last_bloom_pos = 0;\n"\r
2371 "                               tmp_i = t->last_bloom_pos;\n"\r
2372 "                               for(j=0;j<"+int_to_string(bf_bit_size)+";j++){\n"\r
2373 "                                       SET_BF_EMPTY(t->bf_table, "+int_to_string(n_bloom)+", tmp_i,j);\n"\r
2374 "                               }\n"\r
2375 "                       }\n"\r
2376 "               }\n"\r
2377 "               t->last_bin = curr_bin;\n"\r
2378 "       }\n"\r
2379 ;\r
2380         }\r
2381 \r
2382 \r
2383 //-----------------------------------------------------------------\r
2384 //              First, determine whether to do S (filter stream) processing.\r
2385 \r
2386         ret +=\r
2387 "//             S (filtering stream) predicate, should it be processed?\n"\r
2388 "\n"\r
2389 ;\r
2390 // Sort S preds based on cost.\r
2391         vector<cnf_elem *> s_filt = fs->pred_t1;\r
2392         col_id_set::iterator csi;\r
2393   if(s_filt.size() > 0){\r
2394 \r
2395 //                      Unpack fields ref'd in the S pred\r
2396         for(w=0;w<s_filt.size();++w){\r
2397                 col_id_set this_pred_cids;\r
2398                 gather_pr_col_ids(s_filt[w]->pr, this_pred_cids, gb_tbl);\r
2399                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){\r
2400                         if(unpacked_cids.count( (*csi) ) == 0){\r
2401                                 int tblref = (*csi).tblvar_ref;\r
2402                                 int schref = (*csi).schema_ref;\r
2403                                 string field = (*csi).field;\r
2404                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name,"end_s");\r
2405                                 unpacked_cids.insert( (*csi) );\r
2406                         }\r
2407                 }\r
2408         }\r
2409 \r
2410 \r
2411 //              Sort by evaluation cost.\r
2412 //              First, estimate evaluation costs\r
2413 //              Eliminate predicates covered by the prefilter (those in s_pids).\r
2414 //              I need to do it before the sort becuase the indices refer\r
2415 //              to the position in the unsorted list.\r
2416         vector<cnf_elem *> tmp_wh;\r
2417         for(w=0;w<s_filt.size();++w){\r
2418                 compute_cnf_cost(s_filt[w],Ext_fcns);\r
2419                 tmp_wh.push_back(s_filt[w]);\r
2420         }\r
2421         s_filt = tmp_wh;\r
2422 \r
2423         sort(s_filt.begin(), s_filt.end(), compare_cnf_cost());\r
2424 \r
2425 //              Now generate the predicates.\r
2426         for(w=0;w<s_filt.size();++w){\r
2427                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,s_filt[w]->cost);\r
2428                 ret += tmpstr;\r
2429 \r
2430 //                      Find partial fcns ref'd in this cnf element\r
2431                 set<int> pfcn_refs;\r
2432                 collect_partial_fcns_pr(s_filt[w]->pr, pfcn_refs);\r
2433 //                      Since set<..> is a "Sorted Associative Container",\r
2434 //                      we can walk through it in sorted order by walking from\r
2435 //                      begin() to end().  (and the partial fcns must be\r
2436 //                      evaluated in this order).\r
2437                 set<int>::iterator si;\r
2438                 string pf_preds;\r
2439                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){\r
2440                         if(fcn_ref_cnt[(*si)] > 1){\r
2441                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";\r
2442                         }\r
2443                         if(is_partial_fcn[(*si)]){\r
2444                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);\r
2445                                 ret += "\t\tif(retval) goto end_s;\n";\r
2446                         }\r
2447                         if(fcn_ref_cnt[(*si)] > 1){\r
2448                                 if(!is_partial_fcn[(*si)]){\r
2449                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";\r
2450 //              Testing for S is a side branch.\r
2451 //              I don't want a cacheable partial function to be\r
2452 //              marked as evaluated.  Therefore I mark the function\r
2453 //              as evalauted ONLY IF it is not partial.\r
2454                                         ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";\r
2455                                 }\r
2456                                 ret += "\t}\n";\r
2457                         }\r
2458                 }\r
2459 \r
2460                 ret += "\tif( !("+generate_predicate_code(s_filt[w]->pr,schema)+\r
2461                                 ") ) goto end_s;\n";\r
2462         }\r
2463   }else{\r
2464           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";\r
2465   }\r
2466 \r
2467         for(p=0;p<fs->hash_eq.size();++p)\r
2468                 ret += "\t\ts_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_right_se(),schema)+";\n";\r
2469 \r
2470         if(fs->use_bloom){\r
2471 //                      First, generate the S scalar expressions in the hash_eq\r
2472 \r
2473 //                      Iterate over the bloom filters\r
2474                 for(i=0;i<3;i++){\r
2475                         ret += "\t\tbucket=0;\n";\r
2476                         for(p=0;p<fs->hash_eq.size();++p){\r
2477                                 ret +=\r
2478 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+\r
2479         fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+\r
2480         +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";\r
2481                         }\r
2482 //              SET_BF_BIT(table,number of bloom filters,bloom filter index,bit index)\r
2483                                 ret +=\r
2484 "               bucket &= "+int_to_string(bf_mask)+";\n"\r
2485 "               SET_BF_BIT(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket);\n"\r
2486 "\n"\r
2487 ;\r
2488                 }\r
2489         }else{\r
2490                 ret += "\t\tbucket=0;\n";\r
2491                 for(p=0;p<fs->hash_eq.size();++p){\r
2492                         ret +=\r
2493 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+\r
2494         fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+\r
2495         +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";\r
2496                 }\r
2497                 ret +=\r
2498 "               bucket &= "+int_to_string(bf_mask)+";\n"\r
2499 "               bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"\r
2500 ;\r
2501 //                      Try the first bucket\r
2502                 ret += "\t\tif(";\r
2503                 for(p=0;p<fs->hash_eq.size();++p){\r
2504                         if(p>0) ret += " && ";\r
2505 //                      ret += "t->join_table[bucket].key_var"+int_to_string(p)+\r
2506 //                                      " == s_equijoin_"+int_to_string(p);\r
2507                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();\r
2508                         string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);\r
2509                         string rhs_op = "s_equijoin_"+int_to_string(p);\r
2510                         ret += generate_equality_test(lhs_op,rhs_op,hdt);\r
2511                 }\r
2512                 ret += "){\n\t\t\tthe_bucket = bucket;\n";\r
2513                 ret += "\t\t}else {if(";\r
2514                 for(p=0;p<fs->hash_eq.size();++p){\r
2515                         if(p>0) ret += " && ";\r
2516 //                      ret += "t->join_table[bucket1].key_var"+int_to_string(p)+\r
2517 //                                      " == s_equijoin_"+int_to_string(p);\r
2518                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();\r
2519                         string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);\r
2520                         string rhs_op = "s_equijoin_"+int_to_string(p);\r
2521                         ret += generate_equality_test(lhs_op,rhs_op,hdt);\r
2522                 }\r
2523                 ret +=  "){\n\t\t\tthe_bucket = bucket1;\n";\r
2524                 ret += "\t\t}else{ if(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n";\r
2525                 ret+="\t\t\tthe_bucket = bucket;\n\t\t\telse the_bucket=bucket1;\n";\r
2526                 ret += "\t\t}}\n";\r
2527                 for(p=0;p<fs->hash_eq.size();++p){\r
2528                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();\r
2529                         if(hdt->is_buffer_type()){\r
2530                                 sprintf(tmpstr,"\t\t%s(f, &(t->join_table[the_bucket].key_var%d), &s_equijoin_%d);\n", hdt->get_buffer_assign_copy().c_str(), p, p);\r
2531                                 ret += tmpstr;\r
2532                         }else{\r
2533                                 ret += "\t\tt->join_table[the_bucket].key_var"+int_to_string(p)+\r
2534                                         " = s_equijoin_"+int_to_string(p)+";\n";\r
2535                         }\r
2536                 }\r
2537                 ret+="\t\tt->join_table[the_bucket].ts =  curr_fj_ts;\n";\r
2538         }\r
2539   ret += "\tend_s:\n";\r
2540 \r
2541 //      ------------------------------------------------------------\r
2542 //              Next, determine if the R record should be processed.\r
2543 \r
2544 \r
2545         ret +=\r
2546 "//             R (main stream) cheap predicate\n"\r
2547 "\n"\r
2548 ;\r
2549 \r
2550 //              Unpack r_filt fields\r
2551         vector<cnf_elem *> r_filt = fs->pred_t0;\r
2552         for(w=0;w<r_filt.size();++w){\r
2553                 col_id_set this_pred_cids;\r
2554                 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);\r
2555                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){\r
2556                         if(unpacked_cids.count( (*csi) ) == 0){\r
2557                                 int tblref = (*csi).tblvar_ref;\r
2558                                 int schref = (*csi).schema_ref;\r
2559                                 string field = (*csi).field;\r
2560                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);\r
2561                                 unpacked_cids.insert( (*csi) );\r
2562                         }\r
2563                 }\r
2564         }\r
2565 \r
2566 // Sort S preds based on cost.\r
2567 \r
2568         vector<cnf_elem *> tmp_wh;\r
2569         for(w=0;w<r_filt.size();++w){\r
2570                 compute_cnf_cost(r_filt[w],Ext_fcns);\r
2571                 tmp_wh.push_back(r_filt[w]);\r
2572         }\r
2573         r_filt = tmp_wh;\r
2574 \r
2575         sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());\r
2576 \r
2577 //              WARNING! the constant 20 below is a wild-ass guess.\r
2578         int cheap_rpos;\r
2579         for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++)\r
2580 \r
2581 //              Test the cheap filters on R.\r
2582   if(cheap_rpos >0){\r
2583 \r
2584 //              Now generate the predicates.\r
2585         for(w=0;w<cheap_rpos;++w){\r
2586                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);\r
2587                 ret += tmpstr;\r
2588 \r
2589 //                      Find partial fcns ref'd in this cnf element\r
2590                 set<int> pfcn_refs;\r
2591                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);\r
2592 //                      Since set<..> is a "Sorted Associative Container",\r
2593 //                      we can walk through it in sorted order by walking from\r
2594 //                      begin() to end().  (and the partial fcns must be\r
2595 //                      evaluated in this order).\r
2596                 set<int>::iterator si;\r
2597                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){\r
2598                         if(fcn_ref_cnt[(*si)] > 1){\r
2599                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";\r
2600                         }\r
2601                         if(is_partial_fcn[(*si)]){\r
2602                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);\r
2603                                 ret += "\t\tif(retval) goto end;\n";\r
2604                         }\r
2605                         if(fcn_ref_cnt[(*si)] > 1){\r
2606                                 if(!is_partial_fcn[(*si)]){\r
2607                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";\r
2608                                 }\r
2609                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";\r
2610                                 ret += "\t}\n";\r
2611                         }\r
2612                 }\r
2613 \r
2614                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+\r
2615                                 ") ) goto end;\n";\r
2616         }\r
2617   }else{\r
2618           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";\r
2619   }\r
2620 \r
2621         ret += "\n//    Do the join\n\n";\r
2622         for(p=0;p<fs->hash_eq.size();++p)\r
2623                 ret += "\t\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n";\r
2624 \r
2625 \r
2626 //                      Passed the cheap pred, now test the join with S.\r
2627         if(fs->use_bloom){\r
2628                 for(i=0;i<3;i++){\r
2629                         ret += "\t\tbucket"+int_to_string(i)+"=0;\n";\r
2630                         for(p=0;p<fs->hash_eq.size();++p){\r
2631                                 ret +=\r
2632 "       bucket"+int_to_string(i)+\r
2633         " ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+\r
2634         fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+\r
2635         +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";\r
2636                         }\r
2637                                 ret +=\r
2638 "       bucket"+int_to_string(i)+" &= "+int_to_string(bf_mask)+";\n";\r
2639                 }\r
2640                 ret += "\tfound = 0;\n";\r
2641                 ret += "\tfor(b=0;b<"+int_to_string(n_bloom)+" && !found; b++){\n";\r
2642                 ret +=\r
2643 "\t\tif(IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket0) && "\r
2644 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket1) && "\r
2645 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket2))\n "\r
2646 "\t\t\tfound=1;\n"\r
2647 "\t}\n"\r
2648 ;\r
2649                 ret +=\r
2650 "       if(!found)\n"\r
2651 "               goto end;\n"\r
2652 ;\r
2653         }else{\r
2654                 ret += "\tfound = 0;\n";\r
2655                 ret += "\t\tbucket=0;\n";\r
2656                 for(p=0;p<fs->hash_eq.size();++p){\r
2657                         ret +=\r
2658 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+\r
2659         fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+\r
2660         +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";\r
2661                 }\r
2662                 ret +=\r
2663 "               bucket &= "+int_to_string(bf_mask)+";\n"\r
2664 "               bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"\r
2665 ;\r
2666 //                      Try the first bucket\r
2667                 ret += "\t\tif(";\r
2668                 for(p=0;p<fs->hash_eq.size();++p){\r
2669                         if(p>0) ret += " && ";\r
2670 //                      ret += "t->join_table[bucket].key_var"+int_to_string(p)+\r
2671 //                                      " == r_equijoin_"+int_to_string(p);\r
2672                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();\r
2673                         string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);\r
2674                         string rhs_op = "s_equijoin_"+int_to_string(p);\r
2675                         ret += generate_equality_test(lhs_op,rhs_op,hdt);\r
2676                 }\r
2677                 if(p>0) ret += " && ";\r
2678                 ret += "t->join_table[bucket].ts+"+int_to_string(fs->temporal_range)+" <=  curr_fj_ts";\r
2679                 ret += "){\n\t\t\tfound = 1;\n";\r
2680                 ret += "\t\t}else {if(";\r
2681                 for(p=0;p<fs->hash_eq.size();++p){\r
2682                         if(p>0) ret += " && ";\r
2683 //                      ret += "t->join_table[bucket1].key_var"+int_to_string(p)+\r
2684 //                                      " == r_equijoin_"+int_to_string(p);\r
2685                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();\r
2686                         string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);\r
2687                         string rhs_op = "s_equijoin_"+int_to_string(p);\r
2688                         ret += generate_equality_test(lhs_op,rhs_op,hdt);\r
2689                 }\r
2690                 if(p>0) ret += " && ";\r
2691                 ret += "t->join_table[bucket1].ts+"+int_to_string(fs->temporal_range)+" <=  curr_fj_ts";\r
2692                 ret +=  ")\n\t\t\tfound=1;\n";\r
2693                 ret+="\t\t}\n";\r
2694                 ret +=\r
2695 "       if(!found)\n"\r
2696 "               goto end;\n"\r
2697 ;\r
2698         }\r
2699 \r
2700 \r
2701 //              Test the expensive filters on R.\r
2702   if(cheap_rpos < r_filt.size()){\r
2703 \r
2704 //              Now generate the predicates.\r
2705         for(w=cheap_rpos;w<r_filt.size();++w){\r
2706                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);\r
2707                 ret += tmpstr;\r
2708 \r
2709 //                      Find partial fcns ref'd in this cnf element\r
2710                 set<int> pfcn_refs;\r
2711                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);\r
2712 //                      Since set<..> is a "Sorted Associative Container",\r
2713 //                      we can walk through it in sorted order by walking from\r
2714 //                      begin() to end().  (and the partial fcns must be\r
2715 //                      evaluated in this order).\r
2716                 set<int>::iterator si;\r
2717                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){\r
2718                         if(fcn_ref_cnt[(*si)] > 1){\r
2719                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";\r
2720                         }\r
2721                         if(is_partial_fcn[(*si)]){\r
2722                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);\r
2723                                 ret += "\t\tif(retval) goto end;\n";\r
2724                         }\r
2725                         if(fcn_ref_cnt[(*si)] > 1){\r
2726                                 if(!is_partial_fcn[(*si)]){\r
2727                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";\r
2728                                 }\r
2729                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";\r
2730                                 ret += "\t}\n";\r
2731                         }\r
2732                 }\r
2733 \r
2734                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+\r
2735                                 ") ) goto end;\n";\r
2736         }\r
2737   }else{\r
2738           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";\r
2739   }\r
2740 \r
2741 \r
2742 \r
2743 ///////////////                 post the tuple\r
2744 \r
2745 //                      test passed : create the tuple, then assign to it.\r
2746           ret += "/*\t\tCreate and post the tuple\t*/\n";\r
2747 \r
2748 //              Unpack r_filt fields\r
2749         for(s=0;s<sl_list.size();++s){\r
2750                 col_id_set this_se_cids;\r
2751                 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);\r
2752                 for(csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){\r
2753                         if(unpacked_cids.count( (*csi) ) == 0){\r
2754                                 int tblref = (*csi).tblvar_ref;\r
2755                                 int schref = (*csi).schema_ref;\r
2756                                 string field = (*csi).field;\r
2757                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);\r
2758                                 unpacked_cids.insert( (*csi) );\r
2759                         }\r
2760                 }\r
2761         }\r
2762 \r
2763 \r
2764 //                      Unpack partial fcns ref'd by the select clause.\r
2765 //                      Its a kind of a WHERE clause ...\r
2766   for(p=sl_fcns_start;p<sl_fcns_end;p++){\r
2767         if(fcn_ref_cnt[p] > 1){\r
2768                 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";\r
2769         }\r
2770         if(is_partial_fcn[p]){\r
2771                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);\r
2772                 ret += "\tif(retval) goto end;\n";\r
2773         }\r
2774         if(fcn_ref_cnt[p] > 1){\r
2775                 if(!is_partial_fcn[p]){\r
2776                         ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";\r
2777                 }\r
2778                 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";\r
2779                 ret += "\t}\n";\r
2780         }\r
2781   }\r
2782 \r
2783   // increment the counter of accepted tuples\r
2784   ret += "\n\t#ifdef LFTA_STATS\n";\r
2785   ret += "\n\tt->accepted_tuple_cnt++;\n\n";\r
2786   ret += "\t#endif\n\n";\r
2787 \r
2788 //                      First, compute the size of the tuple.\r
2789 \r
2790 //                      Unpack any BUFFER type selections into temporaries\r
2791 //                      so that I can compute their size and not have\r
2792 //                      to recompute their value during tuple packing.\r
2793 //                      I can use regular assignment here because\r
2794 //                      these temporaries are non-persistent.\r
2795 \r
2796           for(s=0;s<sl_list.size();s++){\r
2797                 data_type *sdt = sl_list[s]->get_data_type();\r
2798                 if(sdt->is_buffer_type()){\r
2799                         sprintf(tmpstr,"\tselvar_%d = ",s);\r
2800                         ret += tmpstr;\r
2801                         ret += generate_se_code(sl_list[s],schema);\r
2802                         ret += ";\n";\r
2803                 }\r
2804           }\r
2805 \r
2806 \r
2807 //              The size of the tuple is the size of the tuple struct plus the\r
2808 //              size of the buffers to be copied in.\r
2809 \r
2810           ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";\r
2811           for(s=0;s<sl_list.size();s++){\r
2812                 data_type *sdt = sl_list[s]->get_data_type();\r
2813                 if(sdt->is_buffer_type()){\r
2814                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);\r
2815                         ret += tmpstr;\r
2816                 }\r
2817           }\r
2818           ret += ";\n";\r
2819 \r
2820 \r
2821           ret += "\ttuple = allocate_tuple(f, tuple_size );\n";\r
2822           ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";\r
2823 \r
2824 //                      Test passed, make assignments to the tuple.\r
2825 \r
2826           ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";\r
2827 \r
2828 //                      Mark tuple as REGULAR_TUPLE\r
2829           ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";\r
2830 \r
2831 \r
2832           for(s=0;s<sl_list.size();s++){\r
2833                 data_type *sdt = sl_list[s]->get_data_type();\r
2834                 if(sdt->is_buffer_type()){\r
2835                         sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);\r
2836                         ret += tmpstr;\r
2837                         sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);\r
2838                         ret += tmpstr;\r
2839                 }else{\r
2840                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);\r
2841                         ret += tmpstr;\r
2842 //                      if(sdt->needs_hn_translation())\r
2843 //                              ret += sdt->hton_translation() +"( ";\r
2844                         ret += generate_se_code(sl_list[s],schema);\r
2845 //                      if(sdt->needs_hn_translation())\r
2846 //                              ret += ") ";\r
2847                         ret += ";\n";\r
2848                 }\r
2849           }\r
2850 \r
2851 //              Generate output.\r
2852 \r
2853           ret += "\tpost_tuple(tuple);\n";\r
2854 \r
2855 //              Increment the counter of posted tuples\r
2856   ret += "\n\t#ifdef LFTA_STATS\n";\r
2857   ret += "\n\tt->out_tuple_cnt++;\n\n";\r
2858   ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";\r
2859   ret += "\t#endif\n\n";\r
2860 \r
2861 \r
2862         return ret;\r
2863 }\r
2864 \r
2865 string generate_aggr_accept_body(qp_node *fs,string node_name,table_list *schema, string &temporal_flush){\r
2866         string ret;\r
2867         int a,p,g;\r
2868 \r
2869 //////////////          Processing for aggregtion query\r
2870 \r
2871 //              First, search for a match.  Start by unpacking the group-by attributes.\r
2872 \r
2873 //                      One complication : if a real-time aggregate flush occurs,\r
2874 //                      the GB attr has already been calculated.  So don't compute\r
2875 //                      it again if 1) its temporal and 2) it will be computed in the\r
2876 //                      agggregate flush code.\r
2877 \r
2878 //              Unpack the partial fcns ref'd by the gb's and the aggr defs.\r
2879   for(p=gb_fcns_start;p<gb_fcns_end;p++){\r
2880     if(is_partial_fcn[p]){\r
2881                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);\r
2882                 ret += "\tif(retval) goto end;\n";\r
2883         }\r
2884   }\r
2885   for(p=ag_fcns_start;p<ag_fcns_end;p++){\r
2886     if(is_partial_fcn[p]){\r
2887                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);\r
2888                 ret += "\tif(retval) goto end;\n";\r
2889         }\r
2890   }\r
2891 \r
2892   // increment the counter of accepted tuples\r
2893   ret += "\n\t#ifdef LFTA_STATS\n";\r
2894   ret += "\n\tt->accepted_tuple_cnt++;\n\n";\r
2895   ret += "\t#endif\n\n";\r
2896 \r
2897   ret += "/*\t\tTest if the group is in the hash table \t*/\n";\r
2898 //                      Compute the values of the group-by variables.\r
2899   for(g=0;g<gb_tbl->size();g++){\r
2900           data_type *gdt = gb_tbl->get_data_type(g);\r
2901           if((! gdt->is_temporal()) || temporal_flush == ""){\r
2902 \r
2903                   if(gdt->is_buffer_type()){\r
2904         //                              NOTE : if the SE defining the gb is anything\r
2905         //                              other than a ref to a variable, this will generate\r
2906         //                              illegal code.  To be resolved with Spatch.\r
2907                         sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",\r
2908                                 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );\r
2909                         ret += tmpstr;\r
2910                         sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",\r
2911                                 gdt->get_buffer_assign_copy().c_str(), g, g);\r
2912                   }else{\r
2913                         sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());\r
2914                   }\r
2915                   ret += tmpstr;\r
2916           }\r
2917   }\r
2918   ret += "\n";\r
2919 \r
2920 //                      A quick aside : if any of the GB attrs are temporal,\r
2921 //                      test for change and flush if any change occurred.\r
2922 //                      We've already computed the flush code,\r
2923 //                      Put it here if this is not a real time query.\r
2924 //                      We've already unpacked all column refs, so no need to\r
2925 //                      do it again here.\r
2926 \r
2927         string rt_level = fs->get_val_of_def("real_time");\r
2928         if(rt_level == "" && temporal_flush != ""){\r
2929                 ret += temporal_flush;\r
2930         }\r
2931 \r
2932 //                      Compute the hash bucket\r
2933         if(gb_tbl->size() > 0){\r
2934                 ret += "\thashval = ";\\r
2935                 for(g=0;g<gb_tbl->size();g++){\r
2936                   if(g>0) ret += " ^ ";\r
2937                   data_type *gdt = gb_tbl->get_data_type(g);\r
2938                   if(gdt->is_buffer_type()){\r
2939                         sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),\r
2940                                 gdt->get_type_str().c_str(), g);\r
2941                   }else{\r
2942                         sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),\r
2943                                 gdt->get_type_str().c_str(), g);\r
2944                   }\r
2945                   ret += tmpstr;\r
2946                 }\r
2947                 ret += ";\n";\r
2948                 ret += "\thash2 = ((hashval * "+hash_nums[g%NRANDS]+") >> 32) & SLOT_HASH_BITS;\n";\r
2949         ret+="\tprobe = (hashval >> 32) & (t->max_aggrs-1);\n";\r
2950         }else{\r
2951                 ret+="\tprobe = 0;\n";\r
2952                 ret+="\thash2 = 0;\n\n";\r
2953         }\r
2954 \r
2955 //              Does the lfta reference a udaf?\r
2956           bool has_udaf = false;\r
2957           for(a=0;a<aggr_tbl->size();a++){\r
2958                   if(! aggr_tbl->is_builtin(a)) has_udaf = true;\r
2959           }\r
2960 \r
2961 //              Scan for a match, or alternatively the best slot.\r
2962 //              Currently, hardcode 5 tests.\r
2963         ret +=\r
2964 "       gen_val = t->generation & SLOT_GEN_BITS;\n"\r
2965 "       match_found = 0;\n"\r
2966 "       best_slot = probe;\n"\r
2967 "       for(i=0;i<5 && match_found == 0;i++){\n"\r
2968 "               if((t->aggr_table_hashmap[probe] & SLOT_FILLED) && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_HASH_BITS) == hash2 ){\n"\r
2969 ;\r
2970         if(gb_tbl->size()>0){\r
2971                 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";\r
2972                 ret+="\t\tif(";\r
2973                 string rhs_op, lhs_op;\r
2974                 for(g=0;g<gb_tbl->size();g++){\r
2975                   if(g>0) ret += " && ";\r
2976                   ret += "(";\r
2977                   sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;\r
2978                   sprintf(tmpstr,"t->aggr_table[probe].gb_var%d",g); rhs_op = tmpstr;\r
2979                   ret += generate_equality_test(lhs_op,rhs_op,gb_tbl->get_data_type(g));\r
2980                   ret += ")";\r
2981                 }\r
2982          }\r
2983          ret += "){\n"\r
2984 "                       match_found = 1;\n"\r
2985 "                       best_slot = probe;\n"\r
2986 "               }\n"\r
2987 "       }\n"\r
2988 "//             Rate slots in case no match found: prefer empty, then full but old slots\n"\r
2989 "       if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"\r
2990 "               if((t->aggr_table_hashmap[probe] & SLOT_FILLED)==0)\n"\r
2991 "                       best_slot = probe;\n"\r
2992 "               }else{\n"\r
2993 "                       if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) != gen_val){\n"\r
2994 "                               best_slot = probe;\n"\r
2995 "                       }\n"\r
2996 "               }\n"\r
2997 "               probe++;\n"\r
2998 "               if(probe >= t->max_aggrs)\n"\r
2999 "                       probe=0;\n"\r
3000 "       }\n"\r
3001 "       if(match_found){\n"\r
3002 ;\r
3003         ret += generate_gb_update(node_name, schema, "best_slot",has_udaf);\r
3004         ret +=\r
3005 "       }else{\n"\r
3006 "               if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"\r
3007 ;\r
3008 printf("sgah_qpn name is %s, disorder is %d\n",fs->node_name.c_str(),((sgah_qpn *)fs)->lfta_disorder);\r
3009         if(((sgah_qpn *)fs)->lfta_disorder <= 1){\r
3010                 ret +=\r
3011 "                       if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS)==gen_val){\n"\r
3012 "                               if((";\r
3013                 bool first_g = true;\r
3014                 for(int g=0;g<gb_tbl->size();g++){\r
3015                         data_type *gdt = gb_tbl->get_data_type(g);\r
3016                         if(gdt->is_temporal()){\r
3017                                 if(first_g) first_g = false; else ret+=" + ";\r
3018                                 ret += "(gb_attr_"+int_to_string(g)+" - t->aggr_table[best_slot].gb_var"+int_to_string(g)+")";\r
3019                         }\r
3020                 }\r
3021                 ret += ") == 0 ){\n";\r
3022 \r
3023                 ret +=\r
3024 "                                       fta_aggr_flush_old_"+ node_name+"(f,t->max_aggrs);\n"\r
3025 "                               }\n"\r
3026 "                       }\n"\r
3027 ;\r
3028         }\r
3029 \r
3030         ret += generate_tuple_from_aggr(node_name,schema,"best_slot");\r
3031         ret +=\r
3032 "\t\t\t#ifdef LFTA_STATS\n"\r
3033 "\t\t\tif((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val)\n"\r
3034 "\t\t\t\tt->collision_cnt++;\n\n"\r
3035 "\t\t\t#endif\n\n"\r
3036 "\t\t}\n"\r
3037 ;\r
3038         ret += generate_init_group(schema,"best_slot");\r
3039 \r
3040 \r
3041           ret += "\t}\n";\r
3042 \r
3043         return ret;\r
3044 }\r
3045 \r
3046 \r
3047 \r
3048 string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, set<unsigned int> &s_pids){\r
3049 \r
3050         string ret="static gs_retval_t accept_packet_"+node_name+\r
3051                 "(struct FTA *f, FTAID * ftaid, void *pkt, gs_int32_t sz){\n";\r
3052     ret += "\tstruct packet *p = (struct packet *)pkt;\n";\r
3053 \r
3054   int a;\r
3055 \r
3056 //                      Define all of the variables needed by this\r
3057 //                      procedure.\r
3058 \r
3059 \r
3060 //                      Gather all column references, need to define unpacking variables.\r
3061   int w,s;\r
3062   col_id_set cid_set;\r
3063   col_id_set::iterator csi;\r
3064 \r
3065 //              If its a filter join, rebind all colrefs\r
3066 //              to the first range var, to avoid double unpacking.\r
3067 \r
3068   if(is_fj){\r
3069     for(w=0;w<where.size();++w)\r
3070                 reset_pr_col_ids_tblvars(where[w]->pr, gb_tbl);\r
3071     for(s=0;s<sl_list.size();s++)\r
3072                 reset_se_col_ids_tblvars(sl_list[s], gb_tbl);\r
3073   }\r
3074 \r
3075   for(w=0;w<where.size();++w){\r
3076         if(is_fj || s_pids.count(w) == 0)\r
3077                 gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);\r
3078         }\r
3079   for(s=0;s<sl_list.size();s++){\r
3080         gather_se_col_ids(sl_list[s],cid_set, gb_tbl);\r
3081   }\r
3082 \r
3083   int g;\r
3084   if(gb_tbl != NULL){\r
3085         for(g=0;g<gb_tbl->size();g++)\r
3086           gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);\r
3087   }\r
3088 \r
3089   //                    Variables for unpacking attributes.\r
3090   ret += "/*\t\tVariables for unpacking attributes\t*/\n";\r
3091   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){\r
3092     int schref = (*csi).schema_ref;\r
3093         int tblref = (*csi).tblvar_ref;\r
3094     string field = (*csi).field;\r
3095     data_type dt(schema->get_type_name(schref,field));\r
3096     sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),\r
3097         field.c_str(), tblref);\r
3098     ret += tmpstr;\r
3099   }\r
3100 \r
3101   ret += "\n\n";\r
3102 \r
3103 //                      Variables that are always needed\r
3104   ret += "/*\t\tVariables which are always needed\t*/\n";\r
3105   ret += "\tgs_retval_t retval;\n";\r
3106   ret += "\tgs_int32_t tuple_size, tuple_pos, lfta_bailout;\n";\r
3107   ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";\r
3108 \r
3109   ret+="\tstruct "+generate_fta_name(node_name)+" *t = (struct "+generate_fta_name(node_name)+"*) f;\n\n";\r
3110 \r
3111 \r
3112 //                      Variables needed for aggregation queries.\r
3113   if(is_aggr_query){\r
3114           ret += "\n/*\t\tVariables for aggregation\t*/\n";\r
3115           ret+="\tunsigned int i, probe;\n";\r
3116           ret+="\tunsigned int gen_val, match_found, best_slot;\n";\r
3117           ret+="\tgs_uint64_t hashval, hash2;\n";\r
3118 //                      Variables for storing group-by attribute values.\r
3119           if(gb_tbl->size() > 0)\r
3120                 ret += "/*\t\tGroup-by attributes\t*/\n";\r
3121           for(g=0;g<gb_tbl->size();g++){\r
3122                 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);\r
3123                 ret += tmpstr;\r
3124                 data_type *gdt = gb_tbl->get_data_type(g);\r
3125                 if(gdt->is_buffer_type()){\r
3126                   sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);\r
3127                   ret += tmpstr;\r
3128                 }\r
3129           }\r
3130           ret += "\n";\r
3131 //                      Temporaries for min/max\r
3132           string aggr_tmp_str = "";\r
3133           for(a=0;a<aggr_tbl->size();a++){\r
3134                 string aggr_op = aggr_tbl->get_op(a);\r
3135                 if(aggr_op == "MIN" || aggr_op == "MAX"){\r
3136                         sprintf(tmpstr,"\t%s aggr_tmp_%d;\n",aggr_tbl->get_data_type(a)->get_cvar_type().c_str(),a);\r
3137                         aggr_tmp_str.append(tmpstr);\r
3138                 }\r
3139           }\r
3140           if(aggr_tmp_str != ""){\r
3141                 ret += "/*\t\tTemp vars for BUFFER aggregates\t*/\n";\r
3142                 ret += aggr_tmp_str;\r
3143                 ret += "\n";\r
3144           }\r
3145 //              Variables for udaf output temporaries\r
3146         bool no_udaf = true;\r
3147         for(a=0;a<aggr_tbl->size();a++){\r
3148                 if(! aggr_tbl->is_builtin(a)){\r
3149                         if(no_udaf){\r
3150                                 ret+="/*\t\tUDAF output vars.\t*/\n";\r
3151                                 no_udaf = false;\r
3152                         }\r
3153                         int afcn_id = aggr_tbl->get_fcn_id(a);\r
3154                         data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);\r
3155                         sprintf(tmpstr,"udaf_ret%d", a);\r
3156                         ret+="\t"+adt->make_cvar(tmpstr)+";\n";\r
3157                 }\r
3158         }\r
3159   }\r
3160 \r
3161 //                      Variables needed for a filter join query\r
3162   if(fs->node_type() == "filter_join"){\r
3163         filter_join_qpn *fjq = (filter_join_qpn *)fs;\r
3164         bool uses_bloom = fjq->use_bloom;\r
3165         ret += "/*\t\tJoin fields\t*/\n";\r
3166         for(g=0;g<fjq->hash_eq.size();g++){\r
3167                 sprintf(tmpstr,"\t%s s_equijoin_%d, r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g,g);\r
3168                 ret += tmpstr;\r
3169           }\r
3170         if(uses_bloom){\r
3171                 ret +=\r
3172 "  /*           Variables for fj bloom filter   */ \n"\r
3173 "\tunsigned int i=0,j=0,k=0, b, bf_clean = 0, tmp_i, found; \n"\r
3174 "\tunsigned int bucket, bucket0, bucket1, bucket2;\n"\r
3175 "\tlong long int curr_fj_ts;\n"\r
3176 "\tunsigned int curr_bin, the_bin;\n"\r
3177 "\n"\r
3178 ;\r
3179         }else{\r
3180                 ret +=\r
3181 "  /*           Variables for fj join table     */ \n"\r
3182 "\tunsigned int i, bucket, found; \n"\r
3183 "\tunsigned int bucket1, the_bucket;\n"\r
3184 "       long long int curr_fj_ts;\n"\r
3185 "\n"\r
3186 ;\r
3187         }\r
3188   }\r
3189 \r
3190 \r
3191 //              Variables needed to store selected attributes of BUFFER type\r
3192 //              temporarily, in order to compute their size for storage\r
3193 //              in an output tuple.\r
3194 \r
3195   string select_var_defs = "";\r
3196   for(s=0;s<sl_list.size();s++){\r
3197         data_type *sdt = sl_list[s]->get_data_type();\r
3198         if(sdt->is_buffer_type()){\r
3199           sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);\r
3200           select_var_defs.append(tmpstr);\r
3201         }\r
3202   }\r
3203   if(select_var_defs != ""){\r
3204         ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";\r
3205     ret += select_var_defs;\r
3206   }\r
3207 \r
3208 //              Variables to store results of partial functions.\r
3209   int p;\r
3210   if(partial_fcns.size()>0){\r
3211           ret += "/*\t\tVariables for storing results of partial functions. \t*/\n";\r
3212           for(p=0;p<partial_fcns.size();++p){\r
3213                 if(is_partial_fcn[p] || (!is_aggr_query && fcn_ref_cnt[p] >1)){\r
3214                   sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",\r
3215                     partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);\r
3216                   ret += tmpstr;\r
3217                   if(!is_aggr_query && fcn_ref_cnt[p] >1){\r
3218                         ret += "\tint fcn_ref_cnt_"+int_to_string(p)+" = 0;\n";\r
3219                   }\r
3220                 }\r
3221           }\r
3222 \r
3223           if(is_aggr_query) ret += "\tint unpack_failed = 0;\n";\r
3224           ret += "\n";\r
3225   }\r
3226 \r
3227 //              variable to hold packet struct  //\r
3228         if(packed_return){\r
3229                 ret += "\t struct "+node_name+"_input_struct *"+node_name+"_input_struct_var;\n";\r
3230         }\r
3231 \r
3232 \r
3233   ret += "\t#ifdef LFTA_STATS\n";\r
3234 // variable to store counter of cpu cycles spend in accept_tuple\r
3235         ret += "\tgs_uint64_t start_cycle = rdtsc();\n";\r
3236 // increment counter of received tuples\r
3237         ret += "\tt->in_tuple_cnt++;\n";\r
3238   ret += "\t#endif\n";\r
3239 \r
3240 \r
3241 //      -------------------------------------------------\r
3242 //              If the packet is "packet", test if its for this lfta,\r
3243 //              and if so load it into its struct\r
3244 \r
3245         if(packed_return){\r
3246                 ret+="\n/*  packed tuple : test and load. \t*/\n";\r
3247                 ret+="\t"+node_name+"_input_struct_var = (struct "+node_name+"_input_struct *) pkt;\n";\r
3248                 ret+="\tif("+node_name+"_input_struct_var->__lfta_id_fm_nic__ != "+int_to_string(global_id) + ")\n";\r
3249                 ret+="\t\tgoto end;\n\n";\r
3250         }\r
3251 \r
3252 \r
3253 \r
3254   col_id_set unpacked_cids;     //      Keep track of the cols that have been unpacked.\r
3255 \r
3256   string temporal_flush;\r
3257   if(is_aggr_query)\r
3258         ret += generate_aggr_accept_prelim(fs, node_name, schema, unpacked_cids, temporal_flush);\r
3259   else {        // non-aggregation operators\r
3260 \r
3261 // Unpack all the temporal attributes referenced in select clause\r
3262 // and update the last value of the attribute\r
3263         col_id_set temp_cids;           //      col ids of temp attributes in select clause\r
3264 \r
3265         for(s=0;s<sl_list.size();s++){\r
3266                 data_type *sdt = sl_list[s]->get_data_type();\r
3267                 if (sdt->is_temporal()) {\r
3268                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);\r
3269                 }\r
3270         }\r
3271 //                      If this is a filter join,\r
3272 //                      ensure that the temporal range field is unpacked.\r
3273         if(is_fj){\r
3274                 col_id window_var_cid(((filter_join_qpn *)fs)->temporal_var);\r
3275                 if(temp_cids.count(window_var_cid)==0)\r
3276                         temp_cids.insert(window_var_cid);\r
3277         }\r
3278 \r
3279         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){\r
3280                 if(unpacked_cids.count((*csi)) == 0){\r
3281                         int tblref = (*csi).tblvar_ref;\r
3282                         int schref = (*csi).schema_ref;\r
3283                         string field = (*csi).field;\r
3284                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);\r
3285                         sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);\r
3286                         ret += tmpstr;\r
3287 \r
3288                         unpacked_cids.insert( (*csi) );\r
3289                 }\r
3290         }\r
3291 \r
3292   }\r
3293 \r
3294   vector<cnf_elem *> filter = fs->get_filter_clause();\r
3295 //              Test the filter predicate (some query types have additional preds).\r
3296   if(filter.size() > 0){\r
3297 \r
3298 //              Sort by evaluation cost.\r
3299 //              First, estimate evaluation costs\r
3300 //              Eliminate predicates covered by the prefilter (those in s_pids).\r
3301 //              I need to do it before the sort becuase the indices refer\r
3302 //              to the position in the unsorted list./\r
3303         vector<cnf_elem *> tmp_wh;\r
3304         for(w=0;w<filter.size();++w){\r
3305                 if(s_pids.count(w) == 0){\r
3306                         compute_cnf_cost(filter[w],Ext_fcns);\r
3307                         tmp_wh.push_back(filter[w]);\r
3308                 }\r
3309         }\r
3310         filter = tmp_wh;\r
3311 \r
3312         sort(filter.begin(), filter.end(), compare_cnf_cost());\r
3313 \r
3314 //              Now generate the predicates.\r
3315         for(w=0;w<filter.size();++w){\r
3316                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,filter[w]->cost);\r
3317                 ret += tmpstr;\r
3318 //                      Find the set of variables accessed in this CNF elem,\r
3319 //                      but in no previous element.\r
3320                 col_id_set this_pred_cids;\r
3321                 gather_pr_col_ids(filter[w]->pr, this_pred_cids, gb_tbl);\r
3322                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){\r
3323                         if(unpacked_cids.count( (*csi) ) == 0){\r
3324                         int tblref = (*csi).tblvar_ref;\r
3325                         int schref = (*csi).schema_ref;\r
3326                         string field = (*csi).field;\r
3327                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);\r
3328                                 unpacked_cids.insert( (*csi) );\r
3329                         }\r
3330                 }\r
3331 //                      Find partial fcns ref'd in this cnf element\r
3332                 set<int> pfcn_refs;\r
3333                 collect_partial_fcns_pr(filter[w]->pr, pfcn_refs);\r
3334 //                      Since set<..> is a "Sorted Associative Container",\r
3335 //                      we can walk through it in sorted order by walking from\r
3336 //                      begin() to end().  (and the partial fcns must be\r
3337 //                      evaluated in this order).\r
3338                 set<int>::iterator si;\r
3339                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){\r
3340                         if(fcn_ref_cnt[(*si)] > 1){\r
3341                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";\r
3342                         }\r
3343                         if(is_partial_fcn[(*si)]){\r
3344                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);\r
3345                                 ret += "\t\tif(retval) goto end;\n";\r
3346                         }\r
3347                         if(fcn_ref_cnt[(*si)] > 1){\r
3348                                 if(!is_partial_fcn[(*si)]){\r
3349                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";\r
3350                                 }\r
3351                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";\r
3352                                 ret += "\t}\n";\r
3353                         }\r
3354                 }\r
3355 \r
3356                 ret += "\tif( !("+generate_predicate_code(filter[w]->pr,schema)+\r
3357                                 ") ) goto end;\n";\r
3358         }\r
3359   }else{\r
3360           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";\r
3361   }\r
3362 \r
3363 \r
3364 //                      We've passed the WHERE clause,\r
3365 //                      unpack the remainder of the accessed fields.\r
3366   if(is_fj){\r
3367         ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";\r
3368         vector<cnf_elem *> h_eq = ((filter_join_qpn *)fs)-> hash_eq;\r
3369                 for(w=0;w<h_eq.size();++w){\r
3370                 col_id_set this_pred_cids;\r
3371                 gather_pr_col_ids(h_eq[w]->pr, this_pred_cids, gb_tbl);\r
3372                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){\r
3373                         if(unpacked_cids.count( (*csi) ) == 0){\r
3374                                 int tblref = (*csi).tblvar_ref;\r
3375                                 int schref = (*csi).schema_ref;\r
3376                                 string field = (*csi).field;\r
3377                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);\r
3378                                 unpacked_cids.insert( (*csi) );\r
3379                         }\r
3380                 }\r
3381         }\r
3382   }else{\r
3383           ret += "\n/*\tPassed the WHERE clause, unpack the rest of the accessed fields. */\n";\r
3384 \r
3385           for(csi=cid_set.begin();csi!=cid_set.end();++csi){\r
3386                 if(unpacked_cids.count( (*csi) ) == 0){\r
3387                         int schref = (*csi).schema_ref;\r
3388                         int tblref = (*csi).tblvar_ref;\r
3389                         string field = (*csi).field;\r
3390                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);\r
3391                         unpacked_cids.insert( (*csi) );\r
3392                 }\r
3393           }\r
3394   }\r
3395 \r
3396 \r
3397 //////////////////\r
3398 //////////////////      After this, the query types\r
3399 //////////////////      are processed differently.\r
3400 \r
3401   if(!is_aggr_query && !is_fj)\r
3402         ret += generate_sel_accept_body(fs, node_name, schema);\r
3403   else if(is_aggr_query)\r
3404         ret += generate_aggr_accept_body(fs, node_name, schema, temporal_flush);\r
3405   else\r
3406         ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);\r
3407 \r
3408 \r
3409 //              Finish up.\r
3410 \r
3411    ret += "\n\tend:\n";\r
3412   ret += "\t#ifdef LFTA_STATS\n";\r
3413         ret+= "\tt->cycle_cnt += rdtsc() - start_cycle;\n";\r
3414   ret += "\t#endif\n";\r
3415    ret += "\n\treturn 1;\n}\n\n";\r
3416 \r
3417         return(ret);\r
3418 }\r
3419 \r
3420 \r
3421 string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, bool is_aggr_query, bool is_fj, bool uses_bloom){\r
3422         int g, cl;\r
3423 \r
3424         string ret = "struct FTA * "+generate_alloc_name(node_name) +\r
3425            "(struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command,  gs_int32_t sz, void *value){\n";\r
3426 \r
3427         ret+="\tstruct "+generate_fta_name(node_name)+"* f;\n";\r
3428         ret+="\tint i;\n";\r
3429         ret += "\n";\r
3430         ret+="\tif((f=fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n";\r
3431 \r
3432 //                              assign a streamid to fta instance\r
3433         ret+="\t/* assign a streamid */\n";\r
3434         ret+="\tf->f.ftaid = ftaid;\n";\r
3435         ret+="\tf->f.ftaid.streamid = (gs_p_t)f;\n";\r
3436         ret+="\tgslog(LOG_INFO,\"Lfta "+node_name+" has FTAID {ip=%u,port=%u,index=%u,streamid=%u}\\n\",f->f.ftaid.ip,f->f.ftaid.port,f->f.ftaid.index,f->f.ftaid.streamid);\n";\r
3437 \r
3438         if(is_aggr_query){\r
3439                 ret += "\tf->n_aggrs = 0;\n";\r
3440 \r
3441                 ret += "\tf->max_aggrs = ";\r
3442 \r
3443 //                              Computing the number of aggregate blocks is a little\r
3444 //                              tricky.  If there are no GB attrs, or if all GB attrs\r
3445 //                              are temporal, then use a single aggregate block, else\r
3446 //                              use a default value (10).  A user specification overrides\r
3447 //                              this logic.\r
3448                 bool single_group = true;\r
3449                 for(g=0;g<gb_tbl->size();g++){\r
3450                         data_type *gdt = gb_tbl->get_data_type(g);\r
3451                         if(! gdt->is_temporal() ){\r
3452                                 single_group = false;\r
3453                         }\r
3454                 }\r
3455                 string max_aggr_str = fs->get_val_of_def("aggregate_slots");\r
3456                 int max_aggr_i = atoi(max_aggr_str.c_str());\r
3457                 if(max_aggr_i <= 0){\r
3458                         if(single_group)\r
3459                                 ret += "2";\r
3460                         else\r
3461                                 ret += int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);\r
3462                 }else{\r
3463                         unsigned int naggrs = 1;                // make it power of 2\r
3464                         unsigned int nones = 0;\r
3465                         while(max_aggr_i){\r
3466                                 if(max_aggr_i&1)\r
3467                                         nones++;\r
3468                                 naggrs = naggrs << 1;\r
3469                                 max_aggr_i = max_aggr_i >> 1;\r
3470                         }\r
3471                         if(nones==1)            // in case it was already a power of 2.\r
3472                                 naggrs/=2;\r
3473                         ret += int_to_string(naggrs);\r
3474                 }\r
3475                 ret += ";\n";\r
3476 \r
3477                 ret+="\tif ((f->aggr_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n";\r
3478                 ret+="\t\treturn(0);\n";\r
3479                 ret+="\t}\n\n";\r
3480 //              ret+="/* compute how many integers we need to store the hashmap */\n";\r
3481 //              ret+="\tf->bitmap_size = (f->max_aggrs % (sizeof(gs_uint32_t) * 4)) ? (f->max_aggrs / (sizeof(gs_uint32_t) * 4) + 1) : (f->max_aggrs / (sizeof(gs_uint32_t) * 4));\n\n";\r
3482                 ret+="\tif ((f->aggr_table_hashmap = sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n";\r
3483                 ret+="\t\treturn(0);\n";\r
3484                 ret+="\t}\n";\r
3485                 ret+="/*\t\tfill bitmap with zero \t*/\n";\r
3486                 ret+="\tfor (i = 0; i < f->max_aggrs; ++i)\n";\r
3487                 ret+="\t\tf->aggr_table_hashmap[i] = 0;\n";\r
3488                 ret+="\tf->generation=0;\n";\r
3489                 ret+="\tf->flush_pos = f->max_aggrs;\n";\r
3490 \r
3491                 ret += "\tf->flush_ctr = 0;\n";\r
3492 \r
3493         }\r
3494 \r
3495         if(is_fj){\r
3496                 if(uses_bloom){\r
3497                         ret+="\tf->first_exec = 1;\n";\r
3498                         unsigned int n_bloom = 11;\r
3499                         string n_bloom_str = fs->get_val_of_def("num_bloom");\r
3500                         int tmp_n_bloom = atoi(n_bloom_str.c_str());\r
3501                         if(tmp_n_bloom>0)\r
3502                                 n_bloom = tmp_n_bloom+1;\r
3503 \r
3504                         unsigned int window_len = ((filter_join_qpn *)fs)->temporal_range;\r
3505                         if(window_len < n_bloom){\r
3506                                 n_bloom = window_len+1;\r
3507                         }\r
3508 \r
3509                         int bf_exp_size = 12;  // base-2 log of number of bits\r
3510                         string bloom_len_str = fs->get_val_of_def("bloom_size");\r
3511                         int tmp_bf_exp_size = atoi(bloom_len_str.c_str());\r
3512                         if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){\r
3513                                 bf_exp_size = tmp_bf_exp_size;\r
3514                         }\r
3515                         int bf_bit_size = 1 << 12;\r
3516                         int bf_byte_size = bf_bit_size / (8*sizeof(char));\r
3517 \r
3518                         int bf_tot = n_bloom*bf_byte_size;\r
3519                         ret+="\tif ((f->bf_table = sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n";\r
3520                         ret+="\t\treturn(0);\n";\r
3521                         ret+="\t}\n";\r
3522                         ret +=\r
3523 "       for(i=0;i<"+int_to_string(bf_tot)+";i++)\n"\r
3524 "               f->bf_table[i] = 0;\n"\r
3525 ;\r
3526                 }else{\r
3527                         unsigned int ht_size = 4096;\r
3528                         string ht_size_s = fs->get_val_of_def("aggregate_slots");\r
3529                         int tmp_ht_size = atoi(ht_size_s.c_str());\r
3530                         if(tmp_ht_size > 1024){\r
3531                                 unsigned int hs = 1;            // make it power of 2\r
3532                                 while(tmp_ht_size){\r
3533                                         hs =hs << 1;\r
3534                                         tmp_ht_size = tmp_ht_size >> 1;\r
3535                                 }\r
3536                                 ht_size = hs;\r
3537                         }\r
3538                         ret+="\tif ((f->join_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n";\r
3539                         ret+="\t\treturn(0);\n";\r
3540                         ret+="\t}\n\n";\r
3541                         ret +=\r
3542 "       for(i=0;i<"+int_to_string(ht_size)+";i++)\n"\r
3543 "               f->join_table[i].ts = 0;\n"\r
3544 ;\r
3545                 }\r
3546         }\r
3547 \r
3548 //                      Initialize the complex literals (which might be handles).\r
3549 \r
3550         for(cl=0;cl<complex_literals->size();cl++){\r
3551                 literal_t *l = complex_literals->get_literal(cl);\r
3552 //              sprintf(tmpstr,"\tf->complex_literal_%d = ",cl);\r
3553 //              ret += tmpstr + l->to_C_code() + ";\n";\r
3554                 sprintf(tmpstr,"&(f->complex_literal_%d)",cl);\r
3555                 ret += "\t" + l->to_C_code(tmpstr) + ";\n";\r
3556         }\r
3557 \r
3558         ret += "\n";\r
3559 \r
3560 //                      Initialize the last seen values of temporal attributes to min(max) value of\r
3561 //                      their respective type\r
3562 //                      Create places to hold the last values of temporal attributes referenced in select clause\r
3563 \r
3564 \r
3565         col_id_set temp_cids;           //      col ids of temp attributes in select clause\r
3566 \r
3567         int s;\r
3568         col_id_set::iterator csi;\r
3569 \r
3570         for(s=0;s<sl_list.size();s++){\r
3571                 data_type *sdt = sl_list[s]->get_data_type();\r
3572                 if (sdt->is_temporal()) {\r
3573                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);\r
3574                 }\r
3575         }\r
3576 \r
3577         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){\r
3578                 int tblref = (*csi).tblvar_ref;\r
3579                 int schref = (*csi).schema_ref;\r
3580                 string field = (*csi).field;\r
3581                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));\r
3582                 if (dt.is_increasing()) {\r
3583                         sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_min_literal().c_str());\r
3584                         ret += tmpstr;\r
3585                 } else if (dt.is_decreasing()) {\r
3586                         sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_max_literal().c_str());\r
3587                         ret += tmpstr;\r
3588                 }\r
3589         }\r
3590 \r
3591 //      initialize last seen values of temporal groubpy variables\r
3592         if(is_aggr_query){\r
3593                 for(g=0;g<gb_tbl->size();g++){\r
3594                         data_type *dt = gb_tbl->get_data_type(g);\r
3595                         if(dt->is_temporal()){\r
3596 /*\r
3597                                 fprintf(stderr,"group by attribute %s is temporal, ",\r
3598                                                 gb_tbl->get_name(g).c_str());\r
3599 */\r
3600                                 if(dt->is_increasing()){\r
3601                                         sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_min_literal().c_str());\r
3602                                 }else{\r
3603                                         sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_max_literal().c_str());\r
3604                                 }\r
3605                                 ret += tmpstr;\r
3606                         }\r
3607                 }\r
3608         }\r
3609 \r
3610         ret += "\tf->f.alloc_fta="+generate_alloc_name(node_name)+";\n";\r
3611         ret+="\tf->f.free_fta=free_fta_"+node_name+";\n";\r
3612         ret+="\tf->f.control_fta=control_fta_"+node_name+";\n";\r
3613         ret+="\tf->f.accept_packet=accept_packet_"+node_name+";\n";\r
3614         ret+="\tf->f.clock_fta=clock_fta_"+node_name+";\n\n";\r
3615 \r
3616 //                      Initialize runtime stats\r
3617         ret+="\tf->in_tuple_cnt = 0;\n";\r
3618         ret+="\tf->out_tuple_cnt = 0;\n";\r
3619         ret+="\tf->out_tuple_sz = 0;\n";\r
3620         ret+="\tf->accepted_tuple_cnt = 0;\n";\r
3621         ret+="\tf->cycle_cnt = 0;\n";\r
3622         ret+="\tf->collision_cnt = 0;\n";\r
3623         ret+="\tf->eviction_cnt = 0;\n";\r
3624         ret+="\tf->sampling_rate = 1.0;\n";\r
3625 \r
3626         ret+="\tf->trace_id = 0;\n\n";\r
3627     if(param_tbl->size() > 0){\r
3628         ret+=\r
3629 "\tif(load_params_"+node_name+"(f, sz, value, 1)){\n"\r
3630 "#ifndef LFTA_IN_NIC\n"\r
3631 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small (%d). This query does not have valid parameters, bailing out.\\n\",sz);\n"\r
3632 "#else\n"\r
3633 "\t\t}\n"\r
3634 "#endif\n"\r
3635 "\t\t\treturn 0;\n"\r
3636 "\t\t}\n";\r
3637         }\r
3638 \r
3639 //                      Register the pass-by-handle parameters\r
3640     int ph;\r
3641     for(ph=0;ph<param_handle_table.size();++ph){\r
3642                 data_type pdt(param_handle_table[ph]->type_name);\r
3643                 sprintf(tmpstr,"\tf->handle_param_%d = %s((struct FTA *)f,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());\r
3644                 switch(param_handle_table[ph]->val_type){\r
3645                 case cplx_lit_e:\r
3646                         ret += tmpstr;\r
3647                         if(pdt.is_buffer_type()) ret += "&(";\r
3648                         sprintf(tmpstr,"f->complex_literal_%d",param_handle_table[ph]->complex_literal_idx);\r
3649                         ret += tmpstr ;\r
3650                         if(pdt.is_buffer_type()) ret += ")";\r
3651                         ret +=  ");\n";\r
3652                         break;\r
3653                 case litval_e:\r
3654 //                              not complex, no constructor\r
3655                         ret += tmpstr;\r
3656                         ret += param_handle_table[ph]->litval->to_C_code("") + ");\n";\r
3657                         break;\r
3658                 case param_e:\r
3659 //                              query parameter handles are regstered/deregistered in the\r
3660 //                              load_params function.\r
3661 //                      ret += "t->param_"+param_handle_table[ph]->param_name;\r
3662                         break;\r
3663                 default:\r
3664                         fprintf(stderr, "INTERNAL ERROR unknown case found when processing pass-by-handle parameter table.\n");\r
3665                         exit(1);\r
3666                 }\r
3667         }\r
3668 \r
3669         ret += "\treturn (struct FTA *) f;\n";\r
3670         ret += "}\n\n";\r
3671 \r
3672         return(ret);\r
3673 }\r
3674 \r
3675 \r
3676 \r
3677 \r
3678 //////////////////////////////////////////////////////////////////\r
3679 \r
3680 string generate_lfta_block(qp_node *fs, table_list *schema, int gid,\r
3681 //              map<string,string> &int_fcn_defs,\r
3682                 ext_fcn_list *Ext_fcns, string &schema_embed_str, ifq_t *ifdb, nic_property *nicp, set<unsigned int> &s_pids){\r
3683         bool is_aggr_query;\r
3684         int s,p,g;\r
3685         string retval;\r
3686 \r
3687 /////////////////////////////////////////////////////////////\r
3688 ///             Do operator-generic processing, such as\r
3689 ///             gathering the set of referenced columns,\r
3690 ///             generating structures, etc.\r
3691 \r
3692 //              Initialize globals to empty.\r
3693         gb_tbl = NULL; aggr_tbl = NULL;\r
3694         global_id = -1; nicprop = NULL;\r
3695         param_tbl = fs->get_param_tbl();\r
3696         sl_list.clear(); where.clear();\r
3697         partial_fcns.clear();\r
3698         fcn_ref_cnt.clear(); is_partial_fcn.clear();\r
3699         pred_class.clear(); pred_pos.clear();\r
3700         sl_fcns_start=sl_fcns_end=wh_fcns_start=wh_fcns_end=0;\r
3701         gb_fcns_start=gb_fcns_end=ag_fcns_start=ag_fcns_end=0;\r
3702 \r
3703 \r
3704 //              Does the lfta read packed results from the NIC?\r
3705         nicprop = nicp;                 // load into global\r
3706         global_id = gid;\r
3707     packed_return = false;\r
3708         if(nicp && nicp->option_exists("Return")){\r
3709                 if(nicp->option_value("Return") == "Packed"){\r
3710                         packed_return = true;\r
3711                 }else{\r
3712                         fprintf(stderr,"Warning, nic option value of Return=%s is not recognized, ignoring\n",nicp->option_value("Return").c_str());\r
3713                 }\r
3714         }\r
3715 \r
3716 \r
3717 //                      Extract data which defines the query.\r
3718 //                              complex literals gathered now.\r
3719         complex_literals = fs->get_cplx_lit_tbl(Ext_fcns);\r
3720         param_handle_table = fs->get_handle_param_tbl(Ext_fcns);\r
3721         string node_name = fs->get_node_name();\r
3722     bool is_fj = false, uses_bloom = false;\r
3723 \r
3724 \r
3725         if(fs->node_type() == "spx_qpn"){\r
3726                 is_aggr_query = false;\r
3727                 spx_qpn *spx_node = (spx_qpn *)fs;\r
3728                 sl_list = spx_node->get_select_se_list();\r
3729                 where = spx_node->get_where_clause();\r
3730                 gb_tbl = NULL;\r
3731                 aggr_tbl = NULL;\r
3732         } else\r
3733         if(fs->node_type() == "sgah_qpn"){\r
3734                 is_aggr_query = true;\r
3735                 sgah_qpn *sgah_node = (sgah_qpn *)fs;\r
3736                 sl_list = sgah_node->get_select_se_list();\r
3737                 where = sgah_node->get_where_clause();\r
3738                 gb_tbl = sgah_node->get_gb_tbl();\r
3739                 aggr_tbl = sgah_node->get_aggr_tbl();\r
3740 \r
3741                 if((sgah_node->get_having_clause()).size() > 0){\r
3742                         fprintf(stderr,"Warning in LFTA %s: HAVING clause will be ignored.\n", fs->get_node_name().c_str());\r
3743                 }\r
3744         } else\r
3745         if(fs->node_type() == "filter_join"){\r
3746                 is_aggr_query = false;\r
3747         is_fj = true;\r
3748                 filter_join_qpn *fj_node = (filter_join_qpn *)fs;\r
3749                 sl_list = fj_node->get_select_se_list();\r
3750                 where = fj_node->get_where_clause();\r
3751                 uses_bloom = fj_node->use_bloom;\r
3752                 gb_tbl = NULL;\r
3753                 aggr_tbl = NULL;\r
3754         } else {\r
3755                 fprintf(stderr,"INTERNAL ERROR, unrecognized node type %s in generate_lfta_block\n", fs->node_type().c_str());\r
3756                 exit(1);\r
3757         }\r
3758 \r
3759 //                      Build list of "partial functions", by clause.\r
3760 //                      NOTE : partial fcns are not handles well.\r
3761 //                      The act of searching for them associates the fcn call\r
3762 //                      in the SE with an index to an array.  Refs to the\r
3763 //                      fcn value are replaced with refs to the variable they are\r
3764 //                      unpacked into.  A more general tagging mechanism would be better.\r
3765 \r
3766         int i;\r
3767         vector<bool> *pfunc_ptr = NULL;\r
3768         vector<int> *ref_cnt_ptr = NULL;\r
3769         if(!is_aggr_query){             // don't collect cacheable fcns on aggr query.\r
3770                 ref_cnt_ptr = &fcn_ref_cnt;\r
3771                 pfunc_ptr = &is_partial_fcn;\r
3772         }\r
3773 \r
3774         sl_fcns_start = 0;\r
3775         for(i=0;i<sl_list.size();i++){\r
3776                 find_partial_fcns(sl_list[i], &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);\r
3777         }\r
3778         wh_fcns_start = sl_fcns_end = partial_fcns.size();\r
3779         for(i=0;i<where.size();i++){\r
3780                 find_partial_fcns_pr(where[i]->pr, &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);\r
3781         }\r
3782         gb_fcns_start = wh_fcns_end = partial_fcns.size();\r
3783         if(gb_tbl != NULL){\r
3784                 for(i=0;i<gb_tbl->size();i++){\r
3785                         find_partial_fcns(gb_tbl->get_def(i), &partial_fcns, NULL, NULL, Ext_fcns);\r
3786                 }\r
3787         }\r
3788         ag_fcns_start = gb_fcns_end = partial_fcns.size();\r
3789         if(aggr_tbl != NULL){\r
3790                 for(i=0;i<aggr_tbl->size();i++){\r
3791                         find_partial_fcns(aggr_tbl->get_aggr_se(i), NULL, NULL, &is_partial_fcn, Ext_fcns);\r
3792                 }\r
3793         }\r
3794         ag_fcns_end = partial_fcns.size();\r
3795 \r
3796 //              Fill up the is_partial_fcn and fcn_ref_cnt arrays.\r
3797         if(is_aggr_query){\r
3798                 for(i=0; i<partial_fcns.size();i++){\r
3799                         fcn_ref_cnt.push_back(1);\r
3800                         is_partial_fcn.push_back(true);\r
3801                 }\r
3802         }\r
3803 \r
3804 //              Unmark non-partial expensive functions referenced only once.\r
3805         for(i=0; i<partial_fcns.size();i++){\r
3806                 if(!is_partial_fcn[i] && fcn_ref_cnt[i] <= 1){\r
3807                         partial_fcns[i]->set_partial_ref(-1);\r
3808                 }\r
3809         }\r
3810 \r
3811         node_name = normalize_name(node_name);\r
3812 \r
3813         retval += generate_preamble(schema, /*int_fcn_defs,*/ node_name, schema_embed_str);\r
3814 \r
3815         if(packed_return){              // generate unpack struct\r
3816                 vector<tablevar_t *> input_tbls = fs->get_input_tbls();\r
3817                 int schref = input_tbls[0]->get_schema_ref();\r
3818                 vector<string> refd_cols;\r
3819                 for(s=0;s<sl_list.size();++s){\r
3820                         gather_nicsafe_cols(sl_list[s],refd_cols, nicp, gb_tbl);\r
3821                 }\r
3822                 for(p=0;p<where.size();++p){\r
3823 //                              I'm not disabling these preds ...\r
3824                         gather_nicsafe_cols(where[p]->pr,refd_cols, nicp, gb_tbl);\r
3825                 }\r
3826                 if(gb_tbl){\r
3827                         for(g=0;g<gb_tbl->size();++g){\r
3828                           gather_nicsafe_cols(gb_tbl->get_def(g),refd_cols, nicp, gb_tbl);\r
3829                         }\r
3830                 }\r
3831                 sort(refd_cols.begin(), refd_cols.end());\r
3832                 retval += "struct "+node_name+"_input_struct{\n";\r
3833                 retval += "\tint __lfta_id_fm_nic__;\n";\r
3834                 int vsi;\r
3835                 for(vsi=0;vsi<refd_cols.size();++vsi){\r
3836                 data_type dt(schema->get_type_name(schref,refd_cols[vsi]));\r
3837                         retval+="\t"+dt.get_cvar_type()+" unpack_var_"+refd_cols[vsi]+";\n";\r
3838                 }\r
3839                 retval+="};\n\n";\r
3840         }\r
3841 \r
3842 \r
3843 /////////////////////////////////////////////////////\r
3844 //                      Common stuff unpacked, do some generation\r
3845 \r
3846         if(is_aggr_query)\r
3847           retval += generate_aggr_struct(node_name, gb_tbl, aggr_tbl);\r
3848         if(is_fj)\r
3849                 retval += generate_fj_struct((filter_join_qpn *)fs, node_name);\r
3850 \r
3851         retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, uses_bloom, schema);\r
3852         retval += generate_tuple_struct(node_name, sl_list) ;\r
3853 \r
3854         if(is_aggr_query)\r
3855                 retval += generate_fta_flush(node_name, schema, Ext_fcns) ;\r
3856         if(param_tbl->size() > 0)\r
3857                 retval += generate_fta_load_params(node_name) ;\r
3858         retval += generate_fta_free(node_name, is_aggr_query) ;\r
3859         retval +=  generate_fta_control(node_name, schema, is_aggr_query) ;\r
3860         retval +=  generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, s_pids) ;\r
3861 \r
3862 \r
3863         /* extract the value of Time_Correlation from interface definition */\r
3864         int e,v;\r
3865         string es;\r
3866         unsigned time_corr;\r
3867         vector<tablevar_t *> tvec =  fs->get_input_tbls();\r
3868         vector<string> time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es);\r
3869         if (time_corr_vec.empty())\r
3870                 time_corr = DEFAULT_TIME_CORR;\r
3871         else\r
3872                 time_corr = atoi(time_corr_vec[0].c_str());\r
3873 \r
3874         retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query) );\r
3875         retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) );\r
3876 \r
3877   return(retval);\r
3878 }\r
3879 \r
3880 \r
3881 \r
3882 int compute_snap_len(qp_node *fs, table_list *schema){\r
3883 \r
3884 //              Initialize global vars\r
3885         gb_tbl = NULL;\r
3886         sl_list.clear(); where.clear();\r
3887 \r
3888         if(fs->node_type() == "spx_qpn"){\r
3889                 spx_qpn *spx_node = (spx_qpn *)fs;\r
3890                 sl_list = spx_node->get_select_se_list();\r
3891                 where = spx_node->get_where_clause();\r
3892         }\r
3893         else if(fs->node_type() == "sgah_qpn"){\r
3894                 sgah_qpn *sgah_node = (sgah_qpn *)fs;\r
3895                 sl_list = sgah_node->get_select_se_list();\r
3896                 where = sgah_node->get_where_clause();\r
3897                 gb_tbl = sgah_node->get_gb_tbl();\r
3898         }\r
3899         else if(fs->node_type() == "filter_join"){\r
3900                 filter_join_qpn *fj_node = (filter_join_qpn *)fs;\r
3901                 sl_list = fj_node->get_select_se_list();\r
3902                 where = fj_node->get_where_clause();\r
3903         } else{\r
3904                 fprintf(stderr,"INTERNAL ERROR, node type %s not recognized in compute_snap_len\n",fs->node_type().c_str());\r
3905                 exit(1);\r
3906         }\r
3907 \r
3908 //                      Gather all column references, need to define unpacking variables.\r
3909   int w,s;\r
3910   col_id_set cid_set;\r
3911   col_id_set::iterator csi;\r
3912 \r
3913   for(w=0;w<where.size();++w)\r
3914         gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);\r
3915   for(s=0;s<sl_list.size();s++){\r
3916         gather_se_col_ids(sl_list[s],cid_set, gb_tbl);\r
3917   }\r
3918 \r
3919   int g;\r
3920   if(gb_tbl != NULL){\r
3921         for(g=0;g<gb_tbl->size();g++)\r
3922           gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);\r
3923   }\r
3924 \r
3925   //                    compute snap length\r
3926   int snap_len = -1;\r
3927   int n_snap=0;\r
3928   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){\r
3929     int schref = (*csi).schema_ref;\r
3930         int tblref = (*csi).tblvar_ref;\r
3931     string field = (*csi).field;\r
3932 \r
3933         param_list *field_params = schema->get_modifier_list(schref, field);\r
3934         if(field_params->contains_key("snap_len")){\r
3935                 string fld_snap_str = field_params->val_of("snap_len");\r
3936                 int fld_snap;\r
3937                 if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){\r
3938                         if(fld_snap > snap_len) snap_len = fld_snap;\r
3939                         n_snap++;\r
3940                 }else{\r
3941                         fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );\r
3942                 }\r
3943         }\r
3944   }\r
3945 \r
3946   if(n_snap == cid_set.size()){\r
3947         return (snap_len);\r
3948   }else{\r
3949         return -1;\r
3950   }\r
3951 \r
3952 \r
3953 }\r
3954 \r
3955 //              Function which computes an optimal\r
3956 //              set of unpacking functions.\r
3957 \r
3958 void find_optimal_unpack_fcns(col_id_set &upref_cids, table_list *Schema, map<col_id, string,lt_col_id> &ucol_fcn_map){\r
3959         map<string, int> pfcn_count;\r
3960         map<string, int>::iterator msii;\r
3961         col_id_set::iterator cisi;\r
3962         set<string>::iterator ssi;\r
3963         string best_fcn;\r
3964 \r
3965         while(ucol_fcn_map.size() < upref_cids.size()){\r
3966 \r
3967 //                      Gather unpack functions referenced by unaccounted-for\r
3968 //                      columns, and increment their reference count.\r
3969                 pfcn_count.clear();\r
3970                 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){\r
3971                         if(ucol_fcn_map.count((*cisi)) == 0){\r
3972                                 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();\r
3973                                 for(ssi=ufcns.begin();ssi!=ufcns.end();++ssi)\r
3974                                         pfcn_count[(*ssi)]++;\r
3975                         }\r
3976                 }\r
3977 \r
3978 //              Get the lowest cost per field function.\r
3979                 float min_cost = 0.0;\r
3980                 string best_fcn = "";\r
3981                 for(msii=pfcn_count.begin();msii!=pfcn_count.end();++msii){\r
3982                         int fcost = Schema->get_ufcn_cost((*msii).first);\r
3983                         if(fcost < 0){\r
3984                                 fprintf(stderr,"CONFIGURATION ERROR, unpack function %s either has negative cost or is not defined.\n",(*msii).first.c_str());\r
3985                                 exit(1);\r
3986                         }\r
3987                         float this_cost = (1.0*fcost)/(*msii).second;\r
3988                         if(msii == pfcn_count.begin() || this_cost < min_cost){\r
3989                                 min_cost = this_cost;\r
3990                                 best_fcn = (*msii).first;\r
3991                         }\r
3992                 }\r
3993                 if(best_fcn == ""){\r
3994                         fprintf(stderr,"ERROR, could not find a best field unpqacking function.\n");\r
3995                         exit(1);\r
3996                 }\r
3997 \r
3998 //              Assign this function to the unassigned fcns which use it.\r
3999                 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){\r
4000                         if(ucol_fcn_map.count((*cisi)) == 0){\r
4001                                 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();\r
4002                                 if(ufcns.count(best_fcn)>0)\r
4003                                         ucol_fcn_map[(*cisi)] = best_fcn;\r
4004                         }\r
4005                 }\r
4006         }\r
4007 }\r
4008 \r
4009 \r
4010 \r
4011 //              Generate an initial test test for the lfta\r
4012 //              Assume that the predicate references no external functions,\r
4013 //              and especially no partial functions,\r
4014 //              aggregates, internal functions.\r
4015 string generate_lfta_prefilter(vector<cnf_set *> &pred_list,\r
4016                 col_id_set &temp_cids, table_list *Schema, ext_fcn_list *Ext_fcns,\r
4017                 vector<col_id_set> &lfta_cols, vector<long long int> &lfta_sigs,\r
4018                 vector<int> &lfta_snap_lens, string iface){\r
4019   col_id_set tmp_cid_set, cid_set,upref_cids,upall_cids;\r
4020   col_id_set::iterator csi;\r
4021         int o,p,q;\r
4022         string ret;\r
4023 \r
4024 //              Gather complex literals in the prefilter.\r
4025         cplx_lit_table *complex_literals = new cplx_lit_table();\r
4026         for(p=0;p<pred_list.size();++p){\r
4027                 find_complex_literal_pr(pred_list[p]->pr,Ext_fcns, complex_literals);\r
4028         }\r
4029 \r
4030 \r
4031 //              Find the combinable predicates\r
4032         vector<predicate_t *> pr_list;\r
4033         for(p=0;p<pred_list.size();++p){\r
4034         find_combinable_preds(pred_list[p]->pr,&pr_list, Schema, Ext_fcns);\r
4035         }\r
4036 \r
4037 //              Analyze the combinable predicates to find the predicate classes.\r
4038         pred_class.clear();             // idx to equiv pred in equiv_list\r
4039         pred_pos.clear();               // idx to returned bitmask.\r
4040         vector<predicate_t *> equiv_list;\r
4041         vector<int> num_equiv;\r
4042 \r
4043 \r
4044         for(p=0;p<pr_list.size();++p){\r
4045                 for(q=0;q<equiv_list.size();++q){\r
4046                         if(is_equivalent_class_pred_base(equiv_list[q],pr_list[p],Schema,Ext_fcns))\r
4047                                 break;\r
4048                 }\r
4049                 if(q == equiv_list.size()){             // no equiv : create new\r
4050                         pred_class.push_back(equiv_list.size());\r
4051                         equiv_list.push_back(pr_list[p]);\r
4052                         pred_pos.push_back(0);\r
4053                         num_equiv.push_back(1);\r
4054 \r
4055                 }else{                  // pr_list[p] is equivalent to pred q\r
4056                         pred_class.push_back(q);\r
4057                         pred_pos.push_back(num_equiv[q]);\r
4058                         num_equiv[q]++;\r
4059                 }\r
4060         }\r
4061 \r
4062 //              Generate the variables which hold the common pred handles\r
4063         ret += "/*\t\tprefilter global vars.\t*/\n";\r
4064         for(q=0;q<equiv_list.size();++q){\r
4065                 for(p=0;p<=(num_equiv[q]/32);++p){\r
4066                         ret += "void *pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface+";\n";\r
4067                 }\r
4068         }\r
4069 \r
4070 //              Struct to hold prefilter complex literals\r
4071         ret += "struct prefilter_complex_lit_struct_"+iface+" {\n";\r
4072         if(complex_literals->size() == 0)\r
4073                 ret += "\tint no_variable;\n";\r
4074         int cl;\r
4075         for(cl=0;cl<complex_literals->size();cl++){\r
4076                 literal_t *l = complex_literals->get_literal(cl);\r
4077                 data_type *dtl = new data_type( l->get_type() );\r
4078                 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);\r
4079                 ret += tmpstr;\r
4080         }\r
4081         ret += "} prefilter_complex_lits_"+iface+";\n\n";\r
4082 \r
4083 \r
4084 //              Generate the prefilter initialziation code\r
4085         ret += "void init_lfta_prefilter_"+iface+"(){\n";\r
4086 \r
4087 //              First initialize complex literals, if any.\r
4088         ret += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";\r
4089         for(cl=0;cl<complex_literals->size();cl++){\r
4090                 literal_t *l = complex_literals->get_literal(cl);\r
4091                 sprintf(tmpstr,"&(t->complex_literal_%d)",cl);\r
4092                 ret += "\t" + l->to_C_code(tmpstr) + ";\n";\r
4093         }\r
4094 \r
4095 \r
4096         set<int> epred_seen;\r
4097         for(p=0;p<pr_list.size();++p){\r
4098                 int q = pred_class[p];\r
4099 //printf("\tq=%d\n",q);\r
4100                 if(epred_seen.count(q)>0){\r
4101                         ret += "\tregister_commonpred_handles_"+equiv_list[q]->get_op()+"(";\r
4102                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());\r
4103                         vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();\r
4104                         for(o=0;o<op_list.size();++o){\r
4105                                 if(! cl_op[o]){\r
4106                                         ret += generate_se_code(op_list[o],Schema)+", ";\r
4107                                 }\r
4108                         }\r
4109                         ret += "pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+","+int_to_string(pred_pos[p]%32)+");\n";\r
4110                         epred_seen.insert(q);\r
4111                 }else{\r
4112                         ret += "\tpref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+" = (void *)register_commonpred_handles_"+equiv_list[q]->get_op()+"(";\r
4113                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());\r
4114                         vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();\r
4115                         for(o=0;o<op_list.size();++o){\r
4116                                 if(! cl_op[o]){\r
4117                                         ret += generate_se_code(op_list[o],Schema)+", ";\r
4118                                 }\r
4119                         }\r
4120                         ret += "NULL,"+int_to_string(pred_pos[p]%32)+");\n";\r
4121                         epred_seen.insert(q);\r
4122                 }\r
4123         }\r
4124         ret += "}\n\n";\r
4125 \r
4126 \r
4127 \r
4128 //              Start on main body code generation\r
4129   ret+="gs_uint64_t lfta_prefilter_"+iface+"(void *pkt){\n";\r
4130 \r
4131 \r
4132 ///--------------------------------------------------------------\r
4133 ///             Generate and store the prefilter body,\r
4134 ///             reuse it for the snap length calculator\r
4135 ///-------------------------------------------------------------\r
4136         string body;\r
4137 \r
4138     body += "\tstruct packet *p = (struct packet *)pkt;\n";\r
4139 \r
4140 \r
4141 \r
4142 //              Gather the colids to store unpacked variables.\r
4143         for(p=0;p<pred_list.size();++p){\r
4144         gather_pr_col_ids(pred_list[p]->pr,tmp_cid_set, gb_tbl);\r
4145         }\r
4146 \r
4147 //              make the col_ids refer to the base tables, and\r
4148 //              grab the col_ids with at least one unpacking function.\r
4149         for(csi=tmp_cid_set.begin();csi!=tmp_cid_set.end();++csi){\r
4150                 string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);\r
4151                 col_id tmp_col_id;\r
4152                 tmp_col_id.field = (*csi).field;\r
4153                 tmp_col_id.tblvar_ref = (*csi).tblvar_ref;\r
4154                 tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);\r
4155                 cid_set.insert(tmp_col_id);\r
4156                 field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);\r
4157                 if(fe->get_unpack_fcns().size()>0)\r
4158                         upref_cids.insert(tmp_col_id);\r
4159 \r
4160 \r
4161         }\r
4162 \r
4163 //              Find the set of unpacking programs needed for the\r
4164 //              prefilter fields.\r
4165         map<col_id, string,lt_col_id>  ucol_fcn_map;\r
4166         find_optimal_unpack_fcns(upref_cids, Schema, ucol_fcn_map);\r
4167         set<string> pref_ufcns;\r
4168         map<col_id, string,lt_col_id>::iterator mcis;\r
4169         for(mcis=ucol_fcn_map.begin(); mcis!=ucol_fcn_map.end(); mcis++){\r
4170                 pref_ufcns.insert((*mcis).second);\r
4171         }\r
4172 \r
4173 \r
4174 \r
4175 //                      Variables for unpacking attributes.\r
4176     body += "/*\t\tVariables for unpacking attributes\t*/\n";\r
4177     for(csi=cid_set.begin(); csi!=cid_set.end();++csi){\r
4178       int schref = (*csi).schema_ref;\r
4179           int tblref = (*csi).tblvar_ref;\r
4180       string field = (*csi).field;\r
4181       data_type dt(Schema->get_type_name(schref,field));\r
4182       sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),\r
4183         field.c_str(), tblref);\r
4184       body += tmpstr;\r
4185       sprintf(tmpstr,"\tgs_retval_t ret_%s_%d;\n", field.c_str(),tblref);\r
4186       body += tmpstr;\r
4187     }\r
4188 //                      Variables for unpacking temporal attributes.\r
4189     body += "/*\t\tVariables for unpacking temporal attributes\t*/\n";\r
4190     for(csi=temp_cids.begin(); csi!=temp_cids.end();++csi){\r
4191           if (cid_set.count(*csi) == 0) {\r
4192         int schref = (*csi).schema_ref;\r
4193                 int tblref = (*csi).tblvar_ref;\r
4194         string field = (*csi).field;\r
4195         data_type dt(Schema->get_type_name(schref,field));\r
4196         sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),\r
4197           field.c_str(), tblref);\r
4198         body += tmpstr;\r
4199 \r
4200           }\r
4201     }\r
4202     body += "\n\n";\r
4203 \r
4204 //              Variables for combinable predicate evaluation\r
4205         body += "/*\t\tVariables for common prdicate evaluation\t*/\n";\r
4206         for(q=0;q<equiv_list.size();++q){\r
4207                 for(p=0;p<=(num_equiv[q]/32);++p){\r
4208                         body += "unsigned long int pref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = 0;\n";\r
4209                 }\r
4210         }\r
4211 \r
4212 \r
4213 //                      Variables that are always needed\r
4214     body += "/*\t\tVariables which are always needed\t*/\n";\r
4215         body += "\tgs_uint64_t retval=0, bitpos=1;\n";\r
4216         body += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";\r
4217 \r
4218 //              Call the unpacking functions for the prefilter fields\r
4219         if(pref_ufcns.size() > 0)\r
4220                 body += "\n/*\t\tcall field unpacking functions\t*/\n";\r
4221         set<string>::iterator ssi;\r
4222         for(ssi=pref_ufcns.begin(); ssi!=pref_ufcns.end(); ++ssi){\r
4223                 body += "\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";\r
4224         }\r
4225 \r
4226 \r
4227 //              Unpack the accessed attributes\r
4228         body += "\n/*\t\tUnpack the accessed attributes.\t*/\n";\r
4229     for(csi=cid_set.begin();csi!=cid_set.end();++csi){\r
4230           int tblref = (*csi).tblvar_ref;\r
4231       int schref = (*csi).schema_ref;\r
4232           string field = (*csi).field;\r
4233           sprintf(tmpstr,"\tret_%s_%d =  (%s(p, &unpack_var_%s_%d) == 0);\n",\r
4234                 field.c_str(),tblref,Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);\r
4235           body += tmpstr;\r
4236     }\r
4237 \r
4238 //              next unpack the temporal attributes and ignore the errors\r
4239 //              We are assuming here that failed unpack of temporal attributes\r
4240 //              is not going to overwrite the last stored value\r
4241 //              Failed upacks are ignored\r
4242     for(csi=temp_cids.begin();csi!=temp_cids.end();++csi){\r
4243           int tblref = (*csi).tblvar_ref;\r
4244       int schref = (*csi).schema_ref;\r
4245           string field = (*csi).field;\r
4246           sprintf(tmpstr,"\t%s(p, &prefilter_temp_vars.unpack_var_%s_%d);\n",\r
4247                  Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);\r
4248           body += tmpstr;\r
4249     }\r
4250 \r
4251 //              Evaluate the combinable predicates\r
4252         if(equiv_list.size()>0)\r
4253                 body += "/*\t\tEvaluate the combinable predicates.\t*/\n";\r
4254         for(q=0;q<equiv_list.size();++q){\r
4255                 for(p=0;p<=(num_equiv[q]/32);++p){\r
4256 \r
4257 //              Only call the common eval fcn if all ref'd fields present.\r
4258                         col_id_set pred_cids;\r
4259                         col_id_set::iterator cpi;\r
4260                         gather_pr_col_ids(equiv_list[q], pred_cids, gb_tbl);\r
4261                         if(pred_cids.size()>0){\r
4262                         body += "\tif(";\r
4263                                 for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){\r
4264                                         if(cpi != pred_cids.begin())\r
4265                                                 body += " && ";\r
4266                                 string field = (*cpi).field;\r
4267                                         int tblref = (*cpi).tblvar_ref;\r
4268                                         body += "ret_"+field+"_"+int_to_string(tblref);\r
4269                                 }\r
4270                                 body+=")\n";\r
4271                         }\r
4272 \r
4273                         body += "\t\tpref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = eval_commonpred_"+equiv_list[q]->get_op()+"(pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface;\r
4274                         vector<scalarexp_t *> op_list = equiv_list[q]->get_op_list();\r
4275                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());\r
4276                         for(o=0;o<op_list.size();++o){\r
4277                                 if(cl_op[o]){\r
4278                                         body += ","+generate_se_code(op_list[o],Schema);\r
4279                                 }\r
4280                         }\r
4281                         body += ");\n";\r
4282                 }\r
4283         }\r
4284 \r
4285 \r
4286         for(p=0;p<pred_list.size();++p){\r
4287                 col_id_set pred_cids;\r
4288                 col_id_set::iterator cpi;\r
4289                 gather_pr_col_ids(pred_list[p]->pr,pred_cids, gb_tbl);\r
4290                 if(pred_cids.size()>0){\r
4291                         body += "\tif(";\r
4292                         for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){\r
4293                                 if(cpi != pred_cids.begin())\r
4294                                         body += " && ";\r
4295                         string field = (*cpi).field;\r
4296                                 int tblref = (*cpi).tblvar_ref;\r
4297                                 body += "ret_"+field+"_"+int_to_string(tblref);\r
4298                         }\r
4299                         body+=")\n";\r
4300                 }\r
4301         body += "\t\tif("+generate_predicate_code(pred_list[p]->pr,Schema)+")\n\t\t\tretval |= bitpos;\n";\r
4302                 body+="\tbitpos = bitpos << 1;\n";\r
4303         }\r
4304 \r
4305 // ---------------------------------------------------------------\r
4306 //              Finished with the body of the prefilter\r
4307 // --------------------------------------------------------------\r
4308 \r
4309         ret += body;\r
4310 \r
4311 //                      Collect fields referenced by an lfta but not\r
4312 //                      already unpacked for the prefilter.\r
4313 \r
4314 //printf("upref_cids is:\n");\r
4315 //for(csi=upref_cids.begin();csi!=upref_cids.end();csi++)\r
4316 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);\r
4317 //printf("pref_ufcns is:\n");\r
4318 //for(ssi=pref_ufcns.begin();ssi!=pref_ufcns.end();++ssi)\r
4319 //printf("\t%s\n",(*ssi).c_str());\r
4320 \r
4321         int l;\r
4322         for(l=0;l<lfta_cols.size();++l){\r
4323                 for(csi=lfta_cols[l].begin();csi!=lfta_cols[l].end();++csi){\r
4324                         string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);\r
4325                         col_id tmp_col_id;\r
4326                         tmp_col_id.field = (*csi).field;\r
4327                         tmp_col_id.tblvar_ref = (*csi).tblvar_ref;\r
4328                         tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);\r
4329                         field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);\r
4330                         set<string> fld_ufcns = fe->get_unpack_fcns();\r
4331 //printf("tmpcol is (%s, %d), ufcns size is %d, upref_cids cnt is %d\n",tmp_col_id.field.c_str(),tmp_col_id.schema_ref,fld_ufcns.size(),  upref_cids.count(tmp_col_id));\r
4332                         if(fld_ufcns.size()>0 && upref_cids.count(tmp_col_id) == 0){\r
4333 //              Ensure that this field not already unpacked.\r
4334                                 bool found = false;\r
4335                                 for(ssi=fld_ufcns.begin();ssi!=fld_ufcns.end();++ssi){\r
4336 //printf("\tField has unpacking fcn %s\n",(*ssi).c_str());\r
4337                                         if(pref_ufcns.count((*ssi))){\r
4338 //printf("Field already unpacked.\n");\r
4339                                                 found = true;;\r
4340                                         }\r
4341                                 }\r
4342                                 if(! found){\r
4343 //printf("\tadding to unpack list\n");\r
4344                                         upall_cids.insert(tmp_col_id);\r
4345                                 }\r
4346                         }\r
4347                 }\r
4348         }\r
4349 \r
4350 //printf("upall_cids is:\n");\r
4351 //for(csi=upall_cids.begin();csi!=upall_cids.end();csi++)\r
4352 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);\r
4353 \r
4354 //              Get the set of unpacking programs for these.\r
4355         map<col_id, string,lt_col_id>  uall_fcn_map;\r
4356         find_optimal_unpack_fcns(upall_cids, Schema, uall_fcn_map);\r
4357         set<string> pall_ufcns;\r
4358         for(mcis=uall_fcn_map.begin(); mcis!=uall_fcn_map.end(); mcis++){\r
4359 //printf("uall_fcn_map[%s %d] = %s\n",(*mcis).first.field.c_str(),(*mcis).first.schema_ref,(*mcis).second.c_str());\r
4360                 pall_ufcns.insert((*mcis).second);\r
4361         }\r
4362 \r
4363 //              Iterate through the remaining set of unpacking function\r
4364         if(pall_ufcns.size() > 0)\r
4365                 ret += "//\t\tcall all remaining field unpacking functions.\n";\r
4366         for(ssi=pall_ufcns.begin(); ssi!=pall_ufcns.end(); ++ssi){\r
4367 //              gather the set of columns unpacked by this ufcn\r
4368                 col_id_set fcol_set;\r
4369                 for(csi=upall_cids.begin();csi!=upall_cids.end();++csi){\r
4370                         if(uall_fcn_map[(*csi)] == (*ssi))\r
4371                                 fcol_set.insert((*csi));\r
4372                 }\r
4373 \r
4374 //              gather the set of lftas which access a field unpacked by the fcn\r
4375                 set<long long int> clfta;\r
4376                 for(l=0;l<lfta_cols.size();l++){\r
4377                         for(csi=fcol_set.begin();csi!=fcol_set.end();++csi){\r
4378                                 if(lfta_cols[l].count((*csi)) > 0)\r
4379                                         break;\r
4380                         }\r
4381                         if(csi != fcol_set.end())\r
4382                                 clfta.insert(lfta_sigs[l]);\r
4383                 }\r
4384 \r
4385 //              generate the unpacking code\r
4386                 ret += "\tif(";\r
4387                 set<long long int>::iterator sii;\r
4388                 for(sii=clfta.begin();sii!=clfta.end();++sii){\r
4389                         if(sii!=clfta.begin())\r
4390                                 ret += " || ";\r
4391                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sii),(*sii));\r
4392                         ret += tmpstr;\r
4393                 }\r
4394                 ret += ")\n\t\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";\r
4395         }\r
4396 \r
4397 \r
4398     ret += "\treturn(retval);\n\n";\r
4399   ret += "}\n\n";\r
4400 \r
4401 \r
4402 // --------------------------------------------------------\r
4403 //              reuse prefilter body for snaplen calculator\r
4404 //\r
4405 //      This is dummy code, so I'm commenting it out.\r
4406 \r
4407 /*\r
4408   ret+="gs_uint32_t lfta_pkt_snaplen(void *pkt){\n";\r
4409 \r
4410         ret += body;\r
4411 \r
4412         int i;\r
4413         vector<int> s_snaps = lfta_snap_lens;\r
4414         sort(s_snaps.begin(), s_snaps.end());\r
4415 \r
4416         if(s_snaps[0] == -1){\r
4417                 set<unsigned long long int> sigset;\r
4418                 for(i=0;i<lfta_snap_lens.size();++i){\r
4419                         if(lfta_snap_lens[i] == -1){\r
4420                                 sigset.insert(lfta_sigs[i]);\r
4421                         }\r
4422                 }\r
4423                 ret += "\tif( ";\r
4424                 set<unsigned long long int>::iterator sulli;\r
4425                 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){\r
4426                         if(sulli!=sigset.begin())\r
4427                                 ret += " || ";\r
4428                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));\r
4429                         ret += tmpstr;\r
4430                 }\r
4431                 ret += ") return -1;\n";\r
4432         }\r
4433 \r
4434         int nextpos = lfta_snap_lens.size()-1;\r
4435         int nextval = lfta_snap_lens[nextpos];\r
4436         while(nextval >= 0){\r
4437                 set<unsigned long long int> sigset;\r
4438                 for(i=0;i<lfta_snap_lens.size();++i){\r
4439                         if(lfta_snap_lens[i] == nextval){\r
4440                                 sigset.insert(lfta_sigs[i]);\r
4441                         }\r
4442                 }\r
4443                 ret += "\tif( ";\r
4444                 set<unsigned long long int>::iterator sulli;\r
4445                 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){\r
4446                         if(sulli!=sigset.begin())\r
4447                                 ret += " || ";\r
4448                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));\r
4449                         ret += tmpstr;\r
4450                 }\r
4451                 ret += ") return "+int_to_string(nextval)+";\n";\r
4452 \r
4453                 for(nextpos--;nextpos>0 && lfta_snap_lens[nextpos] == nextval;nextpos--);\r
4454                 if(nextpos>0)\r
4455                         nextval = lfta_snap_lens[nextpos];\r
4456                 else\r
4457                         nextval = -1;\r
4458         }\r
4459         ret += "\treturn 0;\n";\r
4460         ret += "}\n\n";\r
4461 */\r
4462 \r
4463 \r
4464   return(ret);\r
4465 }\r
4466 \r
4467 \r
4468 \r
4469 \r
4470 //              Generate the struct which will store the the values of\r
4471 //              temporal attributesunpacked by prefilter\r
4472 string generate_lfta_prefilter_struct(col_id_set &cid_set, table_list *Schema) {\r
4473 \r
4474   col_id_set::iterator csi;\r
4475 \r
4476 // printf("generate_lfta_prefilter_struct : %d vars\n",cid_set.size());\r
4477 \r
4478   string ret="struct prefilter_unpacked_temp_vars {\n";\r
4479   ret += "\t/*\tVariables for unpacking temporal attributes\t*/\n";\r
4480 \r
4481   string init_code;\r
4482 \r
4483   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){\r
4484     int schref = (*csi).schema_ref;\r
4485     int tblref = (*csi).tblvar_ref;\r
4486     string field = (*csi).field;\r
4487         data_type dt(Schema->get_type_name(schref,field), Schema->get_modifier_list(schref,field));\r
4488     sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),\r
4489         field.c_str(), tblref);\r
4490     ret += tmpstr;\r
4491 \r
4492         if (init_code != "")\r
4493                 init_code += ", ";\r
4494         if (dt.is_increasing())\r
4495                 init_code += dt.get_min_literal();\r
4496         else\r
4497                 init_code += dt.get_max_literal();\r
4498 \r
4499   }\r
4500   ret += "};\n\n";\r
4501 \r
4502   ret += "struct prefilter_unpacked_temp_vars prefilter_temp_vars = {" + init_code + "};\n\n";\r
4503 \r
4504   return(ret);\r
4505 }\r