Initial commit
[com/gs-lite.git] / src / ftacmp / generate_lfta_code.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #include <string>
17 #include <stdio.h>
18 #include <stdlib.h>
19 //#include <algo.h>
20 #include<algorithm>
21
22 #include "parse_fta.h"
23 #include "parse_schema.h"
24 #include "analyze_fta.h"
25 #include "generate_utils.h"
26 #include "query_plan.h"
27 #include "generate_lfta_code.h"
28 #include "generate_nic_code.h"
29
30 using namespace std;
31
32 extern int DEFAULT_LFTA_HASH_TABLE_SIZE;
33
34 // default value for correlation between the interface card and
35 // the system clock
36 #define DEFAULT_TIME_CORR 16
37
38
39 //      For fast hashing
40 //#define NRANDS 100
41 extern string hash_nums[NRANDS];
42 /*
43 = {
44 "12916008961267169387ull", "13447227858232756685ull",
45 "15651770379918602919ull", "1154671861688431608ull",
46 "6777078091984849858ull", "14217205709582564356ull",
47 "4955408621820609982ull", "15813680319165523695ull",
48 "9897969721407807129ull", "5799700135519793083ull",
49 "3446529189623437397ull", "2766403683465910630ull",
50 "3759321430908793328ull", "6569396511892890354ull",
51 "11124853911180290924ull", "17425412145238035549ull",
52 "6879931585355039943ull", "16598635011539670441ull",
53 "9615975578494811651ull", "4378135509538422740ull",
54 "741282195344332574ull", "17368612862906255584ull",
55 "17294299200556814618ull", "518343398779663051ull",
56 "3861893449302272757ull", "8951107288843549591ull",
57 "15785139392894559409ull", "5917810836789601602ull",
58 "16169988133001117004ull", "9792861259254509262ull",
59 "5089058010244872136ull", "2130075224835397689ull",
60 "844136788226150435ull", "1303298091153875333ull",
61 "3579898206894361183ull", "7529542662845336496ull",
62 "13151949992653382522ull", "2145333536541545660ull",
63 "11258221828939586934ull", "3741808146124570279ull",
64 "16272841626371307089ull", "12174572036188391283ull",
65 "9749343496254107661ull", "9141275584134508830ull",
66 "10134192232065698216ull", "12944268412561423018ull",
67 "17499725811865666340ull", "5281482378159088661ull",
68 "13254803486023572607ull", "4526762838498717025ull",
69 "15990846379668494011ull", "10680949816169027468ull",
70 "7116154096012931030ull", "5296740689865236632ull",
71 "5222427027515795922ull", "6893215299448261251ull",
72 "10164707755932877485ull", "15325979189512082255ull",
73 "3713267224148573289ull", "12292682741753167354ull",
74 "4098115959960163588ull", "16095675565885113990ull",
75 "11391590846210510720ull", "8432889531466002673ull",
76 "7146668520368482523ull", "7678169991822407997ull",
77 "9882712513525031447ull", "13904414563513869160ull",
78 "1080076724395768626ull", "8448147843172150388ull",
79 "17633093729608185134ull", "10044622457050142303ull",
80 "4128911859292425737ull", "30642269109444395ull",
81 "16124215396922640581ull", "15444089895060081110ull",
82 "16437006538696302944ull", "800338649777443426ull",
83 "5355794945275091932ull", "11656354278827687117ull",
84 "1110873718944691255ull", "10829576045617693977ull",
85 "3846916616884579955ull", "17055821716837625668ull",
86 "13418968402643535758ull", "11671612594828802128ull",
87 "11597298928184328586ull", "13196028510862205499ull",
88 "16539578557089782373ull", "3182048322921507591ull",
89 "10016080431267550241ull", "148751875162592690ull",
90 "10400930266590768572ull", "4023803397139127870ull",
91 "17766462746879108920ull", "14807761432134600873ull",
92 "13521540421053792403ull", "13980983198941385205ull",
93 "16257584414193564367ull", "1760484796451765024ull"
94 };
95 */
96
97
98 // ----------------------------------------------
99 //              Data extracted from the query plan node
100 //              for use by code generation.
101
102 static cplx_lit_table *complex_literals;  //Table of literals with constructors.
103 static vector<handle_param_tbl_entry *> param_handle_table;
104 static param_table *param_tbl;          // Table of all referenced parameters.
105
106 static vector<scalarexp_t *> sl_list;
107 static vector<cnf_elem *> where;
108
109 static gb_table *gb_tbl;                        // Table of all group-by attributes.
110 static aggregate_table *aggr_tbl;       // Table of all referenced aggregates.
111
112 static bool packed_return;              // unpack using structyure, not fcns
113 static nic_property *nicprop;   // nic properties for this interface.
114 static int global_id;
115
116
117 //              The partial_fcns vector can now refer to
118 //              partial functions, or expensive functions
119 //              which can be cached (if there are multiple refs).  A couple
120 //              of int vectors distinguish the cases.
121 static vector<scalarexp_t *> partial_fcns;
122 static vector<int> fcn_ref_cnt;
123 static vector<bool> is_partial_fcn;
124 int sl_fcns_start = 0, sl_fcns_end = 0;
125 int wh_fcns_start = 0, wh_fcns_end = 0;
126 int gb_fcns_start = 0, gb_fcns_end = 0;
127 int ag_fcns_start = 0, ag_fcns_end = 0;
128
129
130 //              These vectors are for combinable predicates.
131 static vector<int> pred_class;  //      identifies the group
132 static vector<int> pred_pos;    //  position in the group.
133
134
135
136 static char tmpstr[1000];
137
138 //////////////////////////////////////////////////////////////////////
139 ///                     Various utilities
140
141 string generate_fta_name(string node_name){
142         string ret = normalize_name(node_name);
143         if(ret == ""){
144                 ret = "default";
145         }
146         ret += "_fta";
147
148         return(ret);
149 }
150
151
152 string generate_aggr_struct_name(string node_name){
153         string ret = normalize_name(node_name);
154         if(ret == ""){
155                 ret = "default";
156         }
157         ret += "_aggr_struct";
158
159         return(ret);
160 }
161
162 string generate_fj_struct_name(string node_name){
163         string ret = normalize_name(node_name);
164         if(ret == ""){
165                 ret = "default";
166         }
167         ret += "_fj_struct";
168
169         return(ret);
170 }
171
172 string generate_unpack_code(int tblref, int schref, string field, table_list *schema, string node_name, string end_goto = string("end")){
173         string ret;
174         if(! packed_return){
175                 sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",
176                         schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
177         ret += tmpstr;
178         if(!schema->get_modifier_list(schref,field)->contains_key("required"))
179                 ret += "\tif(retval) goto "+end_goto+";\n";
180
181         }else{
182 //                      TODO: ntoh xforms (aug 2010 : removing ntoh, hton)
183                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
184                 if(dt.is_buffer_type()){
185                         if(dt.get_type() != v_str_t){
186                           ret += "\tif(sizeof(struct "+node_name+"_input_struct)+"+node_name+"_input_struct_var->unpack_var_"+field+".length+int("+node_name+"_input_struct_var->unpack_var_"+field+".data) > sz)\n";
187                           ret += "\t\tgoto "+end_goto+";\n";
188                           ret+= "\t\t"+node_name+"_input_struct_var->unpack_var_"+field+".data += "+node_name+"_input_struct_var->unpack_var_"+field+".length;\n";
189                           ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
190                                 " = "+node_name+"_input_struct_var->unpack_var_"+field+";+\n";
191                         }else{
192                                 fprintf(stderr,"INTERNAL ERROR buffer type not string type in generate_lfta_code.cc:generate_unpack_code\n");
193                                 exit(1);
194                         }
195                 }else{
196                         ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
197                                 " = "+node_name+"_input_struct_var->unpack_var_"+field+";\n";
198                 }
199         }
200         return ret;
201 }
202
203 string generate_aggr_struct(string node_name, gb_table *gb_tbl, aggregate_table *aggr_tbl){
204   string ret = "struct " + generate_aggr_struct_name(node_name) + "{\n";
205
206   int g;
207   for(g=0;g<gb_tbl->size();g++){
208           sprintf(tmpstr,"gb_var%d",g);
209           ret += "\t"+gb_tbl->get_data_type(g)->make_cvar(tmpstr)+";\n";
210   }
211
212   int a;
213   for(a=0;a<aggr_tbl->size();a++){
214           ret += "\t";
215           sprintf(tmpstr,"aggr_var%d",a);
216           if(aggr_tbl->is_builtin(a))
217                 ret+="\t"+aggr_tbl->get_data_type(a)->make_cvar(tmpstr)+";\n";
218           else
219                 ret+="\t"+aggr_tbl->get_storage_type(a)->make_cvar(tmpstr)+";\n";
220   }
221
222 /*
223   ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *next;\n";
224 */
225
226   ret += "};\n\n";
227
228   return(ret);
229 }
230
231
232 string generate_fj_struct(filter_join_qpn *fs, string node_name ){
233   string ret;
234
235   if(fs->use_bloom == false){   // uses hash table instead
236         ret = "struct " + generate_fj_struct_name(node_name) + "{\n";
237         int k;
238         for(k=0;k<fs->hash_eq.size();++k){
239                 sprintf(tmpstr,"key_var%d",k);
240                 ret += "\t"+fs->hash_eq[k]->pr->get_left_se()->get_data_type()->make_cvar(tmpstr)+";\n";
241         }
242         ret += "\tlong long int ts;\n";
243     ret += "};\n\n";
244   }
245
246   return(ret);
247 }
248
249
250
251
252 string generate_fta_struct(string node_name, gb_table *gb_tbl,
253                 aggregate_table *aggr_tbl, param_table *param_tbl,
254                 cplx_lit_table *complex_literals,
255                 vector<handle_param_tbl_entry *> &param_handle_table,
256                 bool is_aggr_query, bool is_fj, bool uses_bloom,
257                 table_list *schema){
258
259         string ret = "struct " + generate_fta_name(node_name) + "{\n";
260         ret += "\tstruct FTA f;\n";
261
262 //-------------------------------------------------------------
263 //              Aggregate-specific fields
264
265         if(is_aggr_query){
266 /*
267                 ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_head, *flush_head;\n";
268 */
269                 ret+="\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_table; // the groups\n";
270                 ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n";
271 //              ret+="\tint bitmap_size;\n";
272                 ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n";
273                 ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n";
274                 ret += "\tint max_windows; // max number of open windows.\n";
275                 ret += "\tunsigned int generation; // initially zero, increment on\n";
276                 ret += "\t     // every hash table flush - whether regular or induced.\n";
277                 ret += "\t     // Old groups are identified by a generation mismatch.\n";
278                 ret += "\tunsigned int flush_pos; // next aggr_table entry to examine\n";
279                 ret += "\tunsigned int flush_ctr; // control slow flushing\n";
280
281
282
283                 int g;
284                 bool uses_temporal_flush = false;
285                 for(g=0;g<gb_tbl->size();g++){
286                         data_type *dt = gb_tbl->get_data_type(g);
287                         if(dt->is_temporal()){
288 /*
289                                 fprintf(stderr,"group by attribute %s is temporal, ",
290                                                 gb_tbl->get_name(g).c_str());
291                                 if(dt->is_increasing()){
292                                         fprintf(stderr,"increasing.\n");
293                                 }else{
294                                         fprintf(stderr,"decreasing.\n");
295                                 }
296 */
297                                 data_type *gdt = gb_tbl->get_data_type(g);
298                                 if(gdt->is_buffer_type()){
299                                         fprintf(stderr, "\t but temporal BUFFER types are not supported, skipping.\n");
300                                 }else{
301                                         sprintf(tmpstr,"\t%s last_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
302                                         ret += tmpstr;
303                                         sprintf(tmpstr,"\t%s flush_start_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
304                                         ret += tmpstr;
305                                         sprintf(tmpstr,"\t%s last_flushed_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
306                                         ret += tmpstr;
307                                         uses_temporal_flush = true;
308                                 }
309
310                         }
311
312                 }
313                 if(! uses_temporal_flush){
314                         fprintf(stderr,"Warning: no temporal flush.\n");
315                 }
316         }
317
318 // ---------------------------------------------------------
319 //                      Filter-join specific fields
320
321         if(is_fj){
322                 if(uses_bloom){
323                         ret +=
324 "\tunsigned char * bf_table; //array of bloom filters with layout \n"
325 "\t\t// bit 0 bf 0| bit 0 bf 1| bit 0 bf 2| bit 1 bf 0| bit 1 bf 1|.....\n"
326 "\tint first_exec;\n"
327 "\tlong long int last_bin;\n"
328 "\tint last_bloom_pos;\n"
329 "\n"
330 ;
331                 }else{          // limited hash table
332                         ret +=
333 "  struct "+generate_fj_struct_name(node_name)+" *join_table;\n"
334 "\n"
335 ;
336                 }
337
338         }
339
340 //--------------------------------------------------------
341 //                      Common fields
342
343 //                      Create places to hold the parameters.
344         int p;
345         vector<string> param_vec = param_tbl->get_param_names();
346         for(p=0;p<param_vec.size();p++){
347                 data_type *dt = param_tbl->get_data_type(param_vec[p]);
348                 sprintf(tmpstr,"\t%s param_%s;\n",dt->get_cvar_type().c_str(),
349                                 param_vec[p].c_str());
350                 ret += tmpstr;
351                 if(param_tbl->handle_access(param_vec[p])){
352                         ret += "\tstruct search_handle *param_handle_"+param_vec[p]+";\n";
353                 }
354         }
355
356 //                      Create places to hold complex literals.
357         int cl;
358         for(cl=0;cl<complex_literals->size();cl++){
359                 literal_t *l = complex_literals->get_literal(cl);
360                 data_type *dtl = new data_type( l->get_type() );
361                 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
362                 ret += tmpstr;
363         }
364
365 //                      Create places to hold the pass-by-handle parameters.
366         for(p=0;p<param_handle_table.size();++p){
367                 sprintf(tmpstr,"\tgs_param_handle_t handle_param_%d;\n",p);
368                 ret += tmpstr;
369         }
370
371 //                      Create places to hold the last values of temporal
372 //                      attributes referenced in select clause
373 //                      we also need to store values of the temoral attributed
374 //                      of last flushed tuple in aggr queries
375 //                      to make sure we generate the cirrect temporal tuple
376 //                      in the presense of slow flushes
377
378
379         col_id_set temp_cids;           //      col ids of temp attributes in select clause
380
381         int s;
382         col_id_set::iterator csi;
383
384         for(s=0;s<sl_list.size();s++){
385                 data_type *sdt = sl_list[s]->get_data_type();
386                 if (sdt->is_temporal()) {
387                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
388                 }
389         }
390
391         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
392                 int tblref = (*csi).tblvar_ref;
393                 int schref = (*csi).schema_ref;
394                 string field = (*csi).field;
395                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
396                 sprintf(tmpstr,"\t%s last_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
397                 ret += tmpstr;
398         }
399
400         ret += "\tgs_uint64_t trace_id;\n\n";
401
402 //      Fields to store the runtime stats
403
404         ret += "\tgs_uint32_t in_tuple_cnt;\n";
405         ret += "\tgs_uint32_t out_tuple_cnt;\n";
406         ret += "\tgs_uint32_t out_tuple_sz;\n";
407         ret += "\tgs_uint32_t accepted_tuple_cnt;\n";
408         ret += "\tgs_uint64_t cycle_cnt;\n";
409         ret += "\tgs_uint32_t collision_cnt;\n";
410         ret += "\tgs_uint32_t eviction_cnt;\n";
411         ret += "\tgs_float_t sampling_rate;\n";
412
413
414
415         ret += "};\n\n";
416
417         return(ret);
418 }
419
420 //------------------------------------------------------------
421 //              Set colref tblvars to 0..
422 //              (special processing for join-like operators in an lfta).
423
424 void reset_se_col_ids_tblvars(scalarexp_t *se,  gb_table *gtbl){
425         vector<scalarexp_t *> operands;
426         int o;
427
428         if(! se)
429                 return;
430
431         switch(se->get_operator_type()){
432         case SE_LITERAL:
433         case SE_PARAM:
434         case SE_IFACE_PARAM:
435                 return;
436         case SE_UNARY_OP:
437                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
438                 return;
439         case SE_BINARY_OP:
440                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
441                 reset_se_col_ids_tblvars(se->get_right_se(),gtbl);
442                 return;
443         case SE_COLREF:
444                 if(! se->is_gb() ){
445                         se->get_colref()->set_tablevar_ref(0);
446                 }else{
447                         if(gtbl==NULL){
448                                 fprintf(stderr,"INTERNAL ERROR: gbvar ref in gather_se_col_ids, but gtbl is NULL.\n");
449                                 exit(1);
450                         }
451                         reset_se_col_ids_tblvars(gtbl->get_def(se->get_gb_ref()),gtbl);
452                 }
453                 return;
454         case SE_AGGR_STAR:
455                 return;
456         case SE_AGGR_SE:
457                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
458                 return;
459         case SE_FUNC:
460                 operands = se->get_operands();
461                 for(o=0;o<operands.size();o++){
462                         reset_se_col_ids_tblvars(operands[o], gtbl);
463                 }
464                 return;
465         default:
466                 fprintf(stderr,"INTERNAL ERROR in reset_se_col_ids_tblvars, line %d, character %d: unknown operator type %d\n",
467                                 se->get_lineno(), se->get_charno(),se->get_operator_type());
468                 exit(1);
469         }
470 }
471
472
473 //              reset  column tblvars accessed in this pr.
474
475 void reset_pr_col_ids_tblvars(predicate_t *pr,  gb_table *gtbl){
476         vector<scalarexp_t *> op_list;
477         int o;
478
479         switch(pr->get_operator_type()){
480         case PRED_IN:
481                 reset_se_col_ids_tblvars(pr->get_left_se(), gtbl);
482                 return;
483         case PRED_COMPARE:
484                 reset_se_col_ids_tblvars(pr->get_left_se(),gtbl) ;
485                 reset_se_col_ids_tblvars(pr->get_right_se(),gtbl) ;
486                 return;
487         case PRED_UNARY_OP:
488                 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
489                 return;
490         case PRED_BINARY_OP:
491                 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
492                 reset_pr_col_ids_tblvars(pr->get_right_pr(),gtbl) ;
493                 return;
494         case PRED_FUNC:
495                 op_list = pr->get_op_list();
496                 for(o=0;o<op_list.size();++o){
497                         reset_se_col_ids_tblvars(op_list[o],gtbl) ;
498                 }
499                 return;
500         default:
501                 fprintf(stderr,"INTERNAL ERROR in reset_pr_col_ids_tblvars, line %d, character %d, unknown predicate operator type %d\n",
502                         pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
503         }
504 }
505
506
507
508
509 //                      Generate code that makes reference
510 //                      to the tuple, and not to any aggregates.
511 static string generate_se_code(scalarexp_t *se,table_list *schema){
512         string ret;
513     data_type *ldt, *rdt;
514         int o;
515         vector<scalarexp_t *> operands;
516
517
518         switch(se->get_operator_type()){
519         case SE_LITERAL:
520                 if(se->is_handle_ref()){
521                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
522                         ret = tmpstr;
523                         return(ret);
524                 }
525                 if(se->get_literal()->is_cpx_lit()){
526                         sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
527                         ret = tmpstr;
528                         return(ret);
529                 }
530                 return(se->get_literal()->to_C_code("")); // not complex, no constructor
531         case SE_PARAM:
532                 if(se->is_handle_ref()){
533                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
534                         ret = tmpstr;
535                         return(ret);
536                 }
537                 ret += "t->param_";
538                 ret += se->get_param_name();
539                 return(ret);
540         case SE_UNARY_OP:
541         ldt = se->get_left_se()->get_data_type();
542         if(ldt->complex_operator(se->get_op()) ){
543                         ret +=  ldt->get_complex_operator(se->get_op());
544                         ret += "(";
545                         ret += generate_se_code(se->get_left_se(),schema);
546             ret += ")";
547                 }else{
548                         ret += "(";
549                         ret += se->get_op();
550                         ret += generate_se_code(se->get_left_se(),schema);
551                         ret += ")";
552                 }
553                 return(ret);
554         case SE_BINARY_OP:
555         ldt = se->get_left_se()->get_data_type();
556         rdt = se->get_right_se()->get_data_type();
557
558         if(ldt->complex_operator(rdt, se->get_op()) ){
559                         ret +=  ldt->get_complex_operator(rdt, se->get_op());
560                         ret += "(";
561                         ret += generate_se_code(se->get_left_se(),schema);
562                         ret += ", ";
563                         ret += generate_se_code(se->get_right_se(),schema);
564                         ret += ")";
565                 }else{
566                         ret += "(";
567                         ret += generate_se_code(se->get_left_se(),schema);
568                         ret += se->get_op();
569                         ret += generate_se_code(se->get_right_se(),schema);
570                         ret += ")";
571                 }
572                 return(ret);
573         case SE_COLREF:
574                 if(se->is_gb()){                // OK to ref gb attrs, but they're not yet unpacked ...
575                                                         // so return the defining code.
576                         ret = generate_se_code(gb_tbl->get_def(se->get_gb_ref()), schema );
577
578                 }else{
579                 sprintf(tmpstr,"unpack_var_%s_%d",
580                   se->get_colref()->get_field().c_str(), se->get_colref()->get_tablevar_ref() );
581                 ret = tmpstr;
582                 }
583                 return(ret);
584         case SE_FUNC:
585 //                              Should not be ref'ing any aggr here.
586                 if(se->get_aggr_ref() >= 0){
587                         fprintf(stderr,"INTERNAL ERROR, UDAF reference in generate_se_code.\n");
588                         return("ERROR in generate_se_code");
589                 }
590
591                 if(se->is_partial()){
592                         sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
593                         ret = tmpstr;
594                 }else{
595                         ret += se->op + "(";
596                         operands = se->get_operands();
597                         for(o=0;o<operands.size();o++){
598                                 if(o>0) ret += ", ";
599                                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
600                                         ret += "&";
601                                 ret += generate_se_code(operands[o], schema);
602                         }
603                         ret += ")";
604                 }
605                 return(ret);
606         default:
607                 fprintf(stderr,"INTERNAL ERROR in generate_se_code (lfta), line %d, character %d: unknown operator type %d\n",
608                                 se->get_lineno(), se->get_charno(),se->get_operator_type());
609                 return("ERROR in generate_se_code");
610         }
611 }
612
613 //              generate code that refers only to aggregate data and constants.
614 static string generate_se_code_fm_aggr(scalarexp_t *se, string var, table_list *schema){
615
616         string ret;
617     data_type *ldt, *rdt;
618         int o;
619         vector<scalarexp_t *> operands;
620
621
622         switch(se->get_operator_type()){
623         case SE_LITERAL:
624                 if(se->is_handle_ref()){
625                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
626                         ret = tmpstr;
627                         return(ret);
628                 }
629                 if(se->get_literal()->is_cpx_lit()){
630                         sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
631                         ret = tmpstr;
632                         return(ret);
633                 }
634                 return(se->get_literal()->to_C_code("")); // not complex no constructor
635         case SE_PARAM:
636                 if(se->is_handle_ref()){
637                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
638                         ret = tmpstr;
639                         return(ret);
640                 }
641                 ret += "t->param_";
642                 ret += se->get_param_name();
643                 return(ret);
644         case SE_UNARY_OP:
645         ldt = se->get_left_se()->get_data_type();
646         if(ldt->complex_operator(se->get_op()) ){
647                         ret +=  ldt->get_complex_operator(se->get_op());
648                         ret += "(";
649                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
650             ret += ")";
651                 }else{
652                         ret += "(";
653                         ret += se->get_op();
654                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
655                         ret += ")";
656                 }
657                 return(ret);
658         case SE_BINARY_OP:
659         ldt = se->get_left_se()->get_data_type();
660         rdt = se->get_right_se()->get_data_type();
661
662         if(ldt->complex_operator(rdt, se->get_op()) ){
663                         ret +=  ldt->get_complex_operator(rdt, se->get_op());
664                         ret += "(";
665                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
666                         ret += ", ";
667                         ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
668                         ret += ")";
669                 }else{
670                         ret += "(";
671                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
672                         ret += se->get_op();
673                         ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
674                         ret += ")";
675                 }
676                 return(ret);
677         case SE_COLREF:
678                 if(se->is_gb()){                // OK to ref gb attrs, but they're not yet
679                                                         // unpacked ... so return the defining code.
680                         sprintf(tmpstr,"%sgb_var%d",var.c_str(),se->get_gb_ref());
681                         ret = tmpstr;
682
683                 }else{
684                 fprintf(stderr,"ERROR reference to non-GB column ref not permitted here,"
685                                 "error in generate_se_code_fm_aggr, line %d, character %d.\n",
686                                 se->get_lineno(), se->get_charno());
687                 ret = tmpstr;
688                 }
689                 return(ret);
690         case SE_AGGR_STAR:
691         case SE_AGGR_SE:
692                 sprintf(tmpstr,"%saggr_var%d",var.c_str(),se->get_aggr_ref());
693                 ret = tmpstr;
694                 return(ret);
695         case SE_FUNC:
696 //                              Is it a UDAF?
697                 if(se->get_aggr_ref() >= 0){
698                         sprintf(tmpstr,"udaf_ret%d",se->get_aggr_ref());
699                         ret = tmpstr;
700                         return(ret);
701                 }
702
703                 if(se->is_partial()){
704                         sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
705                         ret = tmpstr;
706                 }else{
707                         ret += se->op + "(";
708                         operands = se->get_operands();
709                         for(o=0;o<operands.size();o++){
710                                 if(o>0) ret += ", ";
711                                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
712                                         ret += "&";
713                                 ret += generate_se_code_fm_aggr(operands[o], var, schema);
714                         }
715                         ret += ")";
716                 }
717                 return(ret);
718         default:
719                 fprintf(stderr,"INTERNAL ERROR in generate_lfta_code.cc::generate_se_code_fm_aggr, line %d, character %d: unknown operator type %d\n",
720                                 se->get_lineno(), se->get_charno(),se->get_operator_type());
721                 return("ERROR in generate_se_code");
722         }
723
724 }
725
726
727 static string unpack_partial_fcn_fm_aggr(scalarexp_t *se, int pfn_id, string var, table_list *schema){
728         string ret;
729         int o;
730         vector<scalarexp_t *> operands;
731
732
733         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
734                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn_fm_aggr. line %d, character %d\n",
735                                 se->get_lineno(), se->get_charno());
736                 return("ERROR in generate_se_code");
737         }
738
739         ret = "\tretval = " + se->get_op() + "( ";
740         sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
741         ret += tmpstr;
742
743         operands = se->get_operands();
744         for(o=0;o<operands.size();o++){
745                 ret += ", ";
746                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
747                         ret += "&";
748                 ret += generate_se_code_fm_aggr(operands[o], var, schema);
749         }
750         ret += ");\n";
751
752         return(ret);
753 }
754
755 static string generate_cached_fcn(scalarexp_t *se, table_list *schema){
756         string ret;
757         int o;
758         vector<scalarexp_t *> operands;
759
760         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
761                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to generate_cached_fcn. line %d, character %d\n",
762                                 se->get_lineno(), se->get_charno());
763                 return("ERROR in generate_se_code");
764         }
765
766         ret = se->get_op() + "( ";
767
768         operands = se->get_operands();
769         for(o=0;o<operands.size();o++){
770                 if(o) ret += ", ";
771                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
772                         ret += "&";
773                 ret += generate_se_code(operands[o], schema);
774         }
775         ret += ");\n";
776
777         return(ret);
778 }
779
780
781
782 static string unpack_partial_fcn(scalarexp_t *se, int pfn_id, table_list *schema){
783         string ret;
784         int o;
785         vector<scalarexp_t *> operands;
786
787
788         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
789                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn. line %d, character %d\n",
790                                 se->get_lineno(), se->get_charno());
791                 return("ERROR in generate_se_code");
792         }
793
794         ret = "\tretval = " + se->get_op() + "( ",
795         sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
796         ret += tmpstr;
797
798         operands = se->get_operands();
799         for(o=0;o<operands.size();o++){
800                 ret += ", ";
801                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
802                         ret += "&";
803                 ret += generate_se_code(operands[o], schema);
804         }
805         ret += ");\n";
806
807         return(ret);
808 }
809
810
811
812
813
814 static string generate_C_comparison_op(string op){
815   if(op == "=") return("==");
816   if(op == "<>") return("!=");
817   return(op);
818 }
819
820 static string generate_C_boolean_op(string op){
821         if( (op == "AND") || (op == "And") || (op == "and") ){
822                 return("&&");
823         }
824         if( (op == "OR") || (op == "Or") || (op == "or") ){
825                 return("||");
826         }
827         if( (op == "NOT") || (op == "Not") || (op == "not") ){
828                 return("!");
829         }
830
831         fprintf(stderr,"INTERNAL ERROR: unknown boolean operator %s\n",op.c_str());
832         return("ERROR UNKNOWN BOOLEAN OPERATOR :"+op);
833 }
834
835
836 static string generate_predicate_code(predicate_t *pr,table_list *schema){
837         string ret;
838         vector<literal_t *>  litv;
839         int i;
840     data_type *ldt, *rdt;
841         vector<scalarexp_t *> op_list;
842         int o,cref,ppos;
843         unsigned int bitmask;
844
845         switch(pr->get_operator_type()){
846         case PRED_IN:
847         ldt = pr->get_left_se()->get_data_type();
848
849                 ret += "( ";
850                 litv = pr->get_lit_vec();
851                 for(i=0;i<litv.size();i++){
852                         if(i>0) ret += " || ";
853                         ret += "( ";
854
855                 if(ldt->complex_comparison(ldt) ){
856                                 ret +=  ldt->get_comparison_fcn(ldt) ;
857                                 ret += "( ";
858                                 if(ldt->is_buffer_type() ) ret += "&";
859                                 ret += generate_se_code(pr->get_left_se(), schema);
860                                 ret += ", ";
861                                 if(ldt->is_buffer_type() ) ret += "&";
862                                 if(litv[i]->is_cpx_lit()){
863                                         sprintf(tmpstr,"t->complex_literal_%d",litv[i]->get_cpx_lit_ref() );
864                                         ret += tmpstr;
865                                 }else{
866                                         ret += litv[i]->to_C_code("");
867                                 }
868                                 ret += ") == 0";
869                         }else{
870                                 ret += generate_se_code(pr->get_left_se(), schema);
871                                 ret += " == ";
872                                 ret += litv[i]->to_C_code("");
873                         }
874
875                         ret += " )";
876                 }
877                 ret += " )";
878                 return(ret);
879
880         case PRED_COMPARE:
881         ldt = pr->get_left_se()->get_data_type();
882         rdt = pr->get_right_se()->get_data_type();
883
884                 ret += "( ";
885         if(ldt->complex_comparison(rdt) ){
886                         ret += ldt->get_comparison_fcn(rdt);
887                         ret += "(";
888                         if(ldt->is_buffer_type() ) ret += "&";
889                         ret += generate_se_code(pr->get_left_se(),schema);
890                         ret += ", ";
891                         if(rdt->is_buffer_type() ) ret += "&";
892                         ret += generate_se_code(pr->get_right_se(),schema);
893                         ret += ") ";
894                         ret +=  generate_C_comparison_op(pr->get_op());
895                         ret += "0";
896                 }else{
897                         ret += generate_se_code(pr->get_left_se(),schema);
898                         ret +=  generate_C_comparison_op(pr->get_op());
899                         ret += generate_se_code(pr->get_right_se(),schema);
900                 }
901                 ret += " )";
902                 return(ret);
903         case PRED_UNARY_OP:
904                 ret += "( ";
905                 ret +=  generate_C_boolean_op(pr->get_op());
906                 ret += generate_predicate_code(pr->get_left_pr(),schema);
907                 ret += " )";
908                 return(ret);
909         case PRED_BINARY_OP:
910                 ret += "( ";
911                 ret += generate_predicate_code(pr->get_left_pr(),schema);
912                 ret +=  generate_C_boolean_op(pr->get_op());
913                 ret += generate_predicate_code(pr->get_right_pr(),schema);
914                 ret += " )";
915                 return(ret);
916         case PRED_FUNC:
917                 op_list = pr->get_op_list();
918                 cref = pr->get_combinable_ref();
919                 if(cref >= 0){  // predicate is a combinable pred reference
920                         //              Trust, but verify
921                         if(pred_class.size() >= cref && pred_class[cref] >= 0){
922                                 ppos = pred_pos[cref];
923                                 bitmask = 1 << ppos % 32;
924                                 sprintf(tmpstr,"(pref_common_pred_val_%d_%d & %u)",pred_class[cref],ppos/32,bitmask);
925                                 ret = tmpstr;
926                                 return ret;
927                         }
928                 }
929
930                 ret =  pr->get_op() + "(";
931                 if (pr->is_sampling_fcn) {
932                         ret += "t->sampling_rate";
933                         if (!op_list.empty())
934                                 ret += ", ";
935                 }
936                 for(o=0;o<op_list.size();++o){
937                         if(o>0) ret += ", ";
938                         if(op_list[o]->get_data_type()->is_buffer_type() && (! (op_list[o]->is_handle_ref()) ) )
939                                         ret += "&";
940                         ret += generate_se_code(op_list[o],schema);
941                 }
942                 ret += " )";
943                 return(ret);
944         default:
945                 fprintf(stderr,"INTERNAL ERROR in generate_predicate_code, line %d, character %d, unknown predicate operator type %d\n",
946                         pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
947                 return("ERROR in generate_predicate_code");
948         }
949 }
950
951
952 static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *dt){
953         string ret;
954
955     if(dt->complex_comparison(dt) ){
956                 ret += dt->get_comparison_fcn(dt);
957                 ret += "(";
958                         if(dt->is_buffer_type() ) ret += "&";
959                 ret += lhs_op;
960                 ret += ", ";
961                         if(dt->is_buffer_type() ) ret += "&";
962                 ret += rhs_op;
963                 ret += ") == 0";
964         }else{
965                 ret += lhs_op;
966                 ret += " == ";
967                 ret += rhs_op;
968         }
969
970         return(ret);
971 }
972
973 static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
974         string ret;
975
976     if(dt->complex_comparison(dt) ){
977                 ret += dt->get_comparison_fcn(dt);
978                 ret += "(";
979                         if(dt->is_buffer_type() ) ret += "&";
980                 ret += lhs_op;
981                 ret += ", ";
982                         if(dt->is_buffer_type() ) ret += "&";
983                 ret += rhs_op;
984                 ret += ") == 0";
985         }else{
986                 ret += lhs_op;
987                 ret += " == ";
988                 ret += rhs_op;
989         }
990
991         return(ret);
992 }
993
994 //              Here I assume that only MIN and MAX aggregates can be computed
995 //              over BUFFER data types.
996
997 static string generate_aggr_update(string var, aggregate_table *atbl,int aidx, table_list *schema){
998         string retval = "\t\t";
999         string op = atbl->get_op(aidx);
1000
1001 //              Is it a UDAF
1002         if(! atbl->is_builtin(aidx)) {
1003                 int o;
1004                 retval += op+"_LFTA_AGGR_UPDATE_(";
1005                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1006                 retval+="("+var+")";
1007                 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1008                 for(o=0;o<opl.size();++o){
1009                         retval += ",";
1010                         if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1011                                         retval.append("&");
1012                         retval += generate_se_code(opl[o], schema);
1013                 }
1014                 retval += ");\n";
1015
1016                 return retval;
1017         }
1018
1019 //              Built-in aggregate processing.
1020
1021         data_type *dt = atbl->get_data_type(aidx);
1022
1023         if(op == "COUNT"){
1024                 retval.append(var);
1025                 retval.append("++;\n");
1026                 return(retval);
1027         }
1028         if(op == "SUM"){
1029                 retval.append(var);
1030                 retval.append(" += ");
1031                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1032                 retval.append(";\n");
1033                 return(retval);
1034         }
1035         if(op == "MIN"){
1036                 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1037                 retval.append(tmpstr);
1038                 if(dt->complex_comparison(dt)){
1039                         if(dt->is_buffer_type())
1040                           sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1041                         else
1042                           sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1043                 }else{
1044                         sprintf(tmpstr,"\t\tif(aggr_tmp_%d < %s)\n",aidx,var.c_str());
1045                 }
1046                 retval.append(tmpstr);
1047                 if(dt->is_buffer_type()){
1048                         sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1049                 }else{
1050                         sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1051                 }
1052                 retval.append(tmpstr);
1053
1054                 return(retval);
1055         }
1056         if(op == "MAX"){
1057                 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1058                 retval.append(tmpstr);
1059                 if(dt->complex_comparison(dt)){
1060                         if(dt->is_buffer_type())
1061                          sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1062                         else
1063                          sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1064                 }else{
1065                         sprintf(tmpstr,"\t\tif(aggr_tmp_%d > %s)\n",aidx,var.c_str());
1066                 }
1067                 retval.append(tmpstr);
1068                 if(dt->is_buffer_type()){
1069                         sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1070                 }else{
1071                         sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1072                 }
1073                 retval.append(tmpstr);
1074
1075                 return(retval);
1076
1077         }
1078         if(op == "AND_AGGR"){
1079                 retval.append(var);
1080                 retval.append(" &= ");
1081                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1082                 retval.append(";\n");
1083                 return(retval);
1084         }
1085         if(op == "OR_AGGR"){
1086                 retval.append(var);
1087                 retval.append(" |= ");
1088                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1089                 retval.append(";\n");
1090                 return(retval);
1091         }
1092         if(op == "XOR_AGGR"){
1093                 retval.append(var);
1094                 retval.append(" ^= ");
1095                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1096                 retval.append(";\n");
1097                 return(retval);
1098         }
1099         fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_update.\n",op.c_str());
1100         return("ERROR: aggregate not recognized: "+op);
1101
1102 }
1103
1104
1105
1106 static string generate_aggr_init(string var, aggregate_table *atbl,int aidx, table_list *schema){
1107         string retval;
1108         string op = atbl->get_op(aidx);
1109
1110 //              Is it a UDAF
1111         if(! atbl->is_builtin(aidx)) {
1112                 int o;
1113                 retval += "\t\t"+op+"_LFTA_AGGR_INIT_(";
1114                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1115                 retval+="("+var+"));\n";
1116 //                      Add 1st tupl
1117                 retval += "\t"+atbl->get_op(aidx)+"_LFTA_AGGR_UPDATE_(";
1118                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1119                 retval+="("+var+")";
1120                 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1121                 for(o=0;o<opl.size();++o){
1122                         retval += ",";
1123                         if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1124                                         retval.append("&");
1125                         retval += generate_se_code(opl[o],schema);
1126                 }
1127                 retval += ");\n";
1128                 return(retval);
1129         }
1130
1131 //              Built-in aggregate processing.
1132
1133
1134         data_type *dt = atbl->get_data_type(aidx);
1135
1136         if(op == "COUNT"){
1137                 retval = "\t\t"+var;
1138                 retval.append(" = 1;\n");
1139                 return(retval);
1140         }
1141
1142         if(op == "SUM" || op == "MIN" || op == "MAX" || op == "AND_AGGR" ||
1143                                                                         op == "OR_AGGR" || op == "XOR_AGGR"){
1144                 if(dt->is_buffer_type()){
1145                         sprintf(tmpstr,"\t\taggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1146                         retval.append(tmpstr);
1147                         sprintf(tmpstr,"\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_assign_copy().c_str(),var.c_str(),aidx);
1148                         retval.append(tmpstr);
1149                 }else{
1150                         retval = "\t\t"+var;
1151                         retval += " = ";
1152                         retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema));
1153                         retval.append(";\n");
1154                 }
1155                 return(retval);
1156         }
1157
1158         fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_init.\n",op.c_str());
1159         return("ERROR: aggregate not recognized: "+op);
1160 }
1161
1162
1163 ////////////////////////////////////////////////////////////
1164
1165
1166 string generate_preamble(table_list *schema, //map<string,string> &int_fcn_defs,
1167         std::string &node_name, std::string &schema_embed_str){
1168 //                      Include these only once, not once per lfta
1169 //      string ret = "#include \"rts.h\"\n";
1170 //      ret +=  "#include \"fta.h\"\n\n");
1171
1172         string ret = "#ifndef LFTA_IN_NIC\n";
1173         ret += "char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n";
1174         ret += "#include<stdio.h>\n";
1175         ret += "#include <limits.h>\n";
1176         ret += "#include <float.h>\n";
1177         ret += "#include \"rdtsc.h\"\n";
1178         ret += "#endif\n";
1179
1180
1181
1182         return(ret);
1183 }
1184
1185
1186 string generate_tuple_from_aggr(string node_name, table_list *schema, string idx){
1187         int a,p,s;
1188 //                       need to create and output the tuple.
1189         string ret = "/*\t\tCreate an output tuple for the aggregate being kicked out \t*/\n";
1190 //                      Check for any UDAFs with LFTA_BAILOUT
1191         ret += "\tlfta_bailout = 0;\n";
1192         for(a=0;a<aggr_tbl->size();a++){
1193                 if(aggr_tbl->has_bailout(a)){
1194                         ret += "\tlfta_bailout+="+aggr_tbl->get_op(a)+"_LFTA_AGGR_BAILOUT_(";
1195                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1196                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1197                 }
1198         }
1199         ret += "\tif(! lfta_bailout){\n";
1200
1201 //                      First, compute the size of the tuple.
1202
1203 //                      Unpack UDAF return values
1204         for(a=0;a<aggr_tbl->size();a++){
1205                 if(! aggr_tbl->is_builtin(a)){
1206                         ret += "\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_OUTPUT_(&(udaf_ret"+int_to_string(a)+"),";
1207                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1208                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1209
1210                 }
1211         }
1212
1213
1214 //                      Unpack partial fcns ref'd by the select clause.
1215   if(sl_fcns_start != sl_fcns_end){
1216             ret += "\t\tunpack_failed = 0;\n";
1217     for(p=sl_fcns_start;p<sl_fcns_end;p++){
1218           if(is_partial_fcn[p]){
1219                 ret += "\t" + unpack_partial_fcn_fm_aggr(partial_fcns[p], p,
1220                          "t->aggr_table["+idx+"].",schema);
1221                 ret += "\t\tif(retval) unpack_failed = 1;\n";
1222           }
1223     }
1224                                                                 // BEGIN don't allocate tuple if
1225         ret += "\t\tif( unpack_failed == 0 ){\n"; // unpack failed.
1226   }
1227
1228 //                      Unpack any BUFFER type selections into temporaries
1229 //                      so that I can compute their size and not have
1230 //                      to recompute their value during tuple packing.
1231 //                      I can use regular assignment here because
1232 //                      these temporaries are non-persistent.
1233
1234           for(s=0;s<sl_list.size();s++){
1235                 data_type *sdt = sl_list[s]->get_data_type();
1236                 if(sdt->is_buffer_type()){
1237                         sprintf(tmpstr,"\t\t\tselvar_%d = ",s);
1238                         ret += tmpstr;
1239                         ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1240                         ret += ";\n";
1241                 }
1242           }
1243
1244
1245 //              The size of the tuple is the size of the tuple struct plus the
1246 //              size of the buffers to be copied in.
1247
1248           ret += "\t\t\ttuple_size = sizeof( struct ";
1249           ret +=  generate_tuple_name(node_name);
1250           ret += ")";
1251           for(s=0;s<sl_list.size();s++){
1252                 data_type *sdt = sl_list[s]->get_data_type();
1253                 if(sdt->is_buffer_type()){
1254                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
1255                         ret += tmpstr;
1256                 }
1257           }
1258           ret += ";\n";
1259
1260
1261           ret += "\t\t\ttuple = allocate_tuple(f, tuple_size );\n";
1262           ret += "\t\t\tif( tuple != NULL){\n";
1263
1264
1265 //                      Test passed, make assignments to the tuple.
1266
1267           ret += "\t\t\t\ttuple_pos = sizeof( struct ";
1268           ret +=  generate_tuple_name(node_name) ;
1269           ret += ");\n";
1270
1271 //                      Mark tuple as REGULAR_TUPLE
1272           ret += "\n\t\t\t\ttuple->tuple_type = REGULAR_TUPLE;\n";
1273
1274           for(s=0;s<sl_list.size();s++){
1275                 data_type *sdt = sl_list[s]->get_data_type();
1276                 if(sdt->is_buffer_type()){
1277                         sprintf(tmpstr,"\t\t\t\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
1278                         ret += tmpstr;
1279                         sprintf(tmpstr,"\t\t\t\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
1280                         ret += tmpstr;
1281                 }else{
1282                         sprintf(tmpstr,"\t\t\t\ttuple->tuple_var%d = ",s);
1283                         ret += tmpstr;
1284 //                      if(sdt->needs_hn_translation())
1285 //                              ret += sdt->hton_translation() +"( ";
1286                         ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1287 //                      if(sdt->needs_hn_translation())
1288 //                              ret += ") ";
1289                         ret += ";\n";
1290                 }
1291           }
1292
1293 //              Generate output.
1294           ret += "\t\t\t\tpost_tuple(tuple);\n";
1295           ret += "\t\t\t\t#ifdef LFTA_STATS\n";
1296           ret+="\t\t\t\tt->out_tuple_cnt++;\n";
1297           ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
1298           ret += "\t\t\t\t#endif\n\n";
1299           ret += "\t\t\t}\n";
1300
1301           if(sl_fcns_start != sl_fcns_end)      // END don't allocate tuple if
1302                 ret += "\t\t}\n";                               // unpack failed.
1303           ret += "\t}\n";
1304
1305 //                      Need to release memory held by BUFFER types.
1306                 int g;
1307
1308           for(g=0;g<gb_tbl->size();g++){
1309                 data_type *gdt = gb_tbl->get_data_type(g);
1310                 if(gdt->is_buffer_type()){
1311                         sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].gb_var%d));\n",gdt->get_buffer_destroy().c_str(),idx.c_str(),g);
1312                         ret += tmpstr;
1313                 }
1314           }
1315           for(a=0;a<aggr_tbl->size();a++){
1316                 if(aggr_tbl->is_builtin(a)){
1317                         data_type *adt = aggr_tbl->get_data_type(a);
1318                         if(adt->is_buffer_type()){
1319                                 sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].aggr_var%d));\n",adt->get_buffer_destroy().c_str(),idx.c_str(),a);
1320                                 ret += tmpstr;
1321                         }
1322                 }else{
1323                         ret += "\t\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_DESTROY_(";
1324                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1325                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1326                 }
1327           }
1328
1329         ret += "\t\tt->n_aggrs--;\n";
1330
1331         return(ret);
1332
1333 }
1334
1335 string generate_gb_match_test(string idx){
1336         int g;
1337         string ret="\tif (IS_FILLED(t->aggr_table_bitmap, "+idx+") &&  IS_NEW(t->aggr_table_bitmap,"+idx+")";
1338         if(gb_tbl->size()>0){
1339                 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
1340                 ret+="\t\t";
1341
1342 //                      Next, scan list for a match on the group-by attributes.
1343           string rhs_op, lhs_op;
1344           for(g=0;g<gb_tbl->size();g++){
1345                   ret += " && ";
1346                   ret += "(";
1347                   sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
1348                   sprintf(tmpstr,"t->aggr_table[%s].gb_var%d",idx.c_str(),g); rhs_op = tmpstr;
1349                   ret += generate_equality_test(lhs_op, rhs_op, gb_tbl->get_data_type(g) );
1350                   ret += ")";
1351           }
1352          }
1353
1354           ret += "){\n";
1355
1356         return ret;
1357 }
1358
1359 string generate_gb_update(string node_name, table_list *schema, string idx, bool has_udaf){
1360         int g;
1361         string ret;
1362
1363           ret += "/*\t\tMatch found : update in place.\t*/\n";
1364           int a;
1365           has_udaf = false;
1366           for(a=0;a<aggr_tbl->size();a++){
1367                   sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1368                   ret += generate_aggr_update(tmpstr,aggr_tbl,a, schema);
1369                   if(! aggr_tbl->is_builtin(a)) has_udaf = true;
1370           }
1371
1372 //                      garbage collect copied buffer type gb attrs.
1373   for(g=0;g<gb_tbl->size();g++){
1374           data_type *gdt = gb_tbl->get_data_type(g);
1375           if(gdt->is_buffer_type()){
1376                 sprintf(tmpstr,"\t\t\t%s(&(gb_attr_%d));\n",gdt->get_buffer_destroy().c_str(),g);
1377                 ret+=tmpstr;
1378           }
1379         }
1380
1381
1382
1383           bool first_udaf = true;
1384           if(has_udaf){
1385                 ret += "\t\tif(";
1386                 for(a=0;a<aggr_tbl->size();a++){
1387                         if(! aggr_tbl->is_builtin(a)){
1388                                 if(! first_udaf)ret += " || ";
1389                                 else first_udaf = false;
1390                                 ret += aggr_tbl->get_op(a)+"_LFTA_AGGR_FLUSHME_(";
1391                                 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1392                                 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"))";
1393                         }
1394                 }
1395                 ret+="){\n";
1396                 ret+=" fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
1397                 ret += generate_tuple_from_aggr(node_name,schema,idx);
1398                 ret += "\t\tt->aggr_table_hashmap["+idx+"] &= ~SLOT_FILLED;\n";
1399                 ret+="\t\t}\n";
1400         }
1401         return ret;
1402 }
1403
1404
1405 string generate_init_group( table_list *schema, string idx){
1406           int g,a;
1407           string ret="\t\t\tt->aggr_table_hashmap["+idx+"] = hash2 | SLOT_FILLED | gen_val;\n";
1408 //                      Fill up the aggregate block.
1409           for(g=0;g<gb_tbl->size();g++){
1410                   sprintf(tmpstr,"\t\t\tt->aggr_table[%s].gb_var%d = gb_attr_%d;\n",idx.c_str(),g,g);
1411                   ret += tmpstr;
1412           }
1413           for(a=0;a<aggr_tbl->size();a++){
1414                   sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1415                   ret += generate_aggr_init(tmpstr, aggr_tbl,a,  schema);
1416           }
1417           ret+="\t\tt->n_aggrs++;\n";
1418         return ret;
1419 }
1420
1421
1422 string generate_fta_flush(string node_name, table_list *schema,
1423                 ext_fcn_list *Ext_fcns){
1424
1425    string ret;
1426    string select_var_defs ;
1427         int s, p;
1428
1429 //              Flush from previous epoch
1430
1431         ret+="static void fta_aggr_flush_old_"+node_name+"(struct FTA *f, unsigned int nflush){\n";
1432
1433         ret += "\tgs_int32_t tuple_size, tuple_pos;\n";
1434     ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1435         ret += "\tint i, lfta_bailout;\n";
1436         ret += "\tunsigned int gen_val;\n";
1437
1438         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1439         ret += generate_fta_name(node_name)+" *) f;\n";
1440
1441         ret += "\n";
1442
1443
1444 //              Variables needed to store selected attributes of BUFFER type
1445 //              temporarily, in order to compute their size for storage
1446 //              in an output tuple.
1447
1448   select_var_defs = "";
1449   for(s=0;s<sl_list.size();s++){
1450         data_type *sdt = sl_list[s]->get_data_type();
1451         if(sdt->is_buffer_type()){
1452           sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
1453           select_var_defs.append(tmpstr);
1454         }
1455   }
1456   if(select_var_defs != ""){
1457         ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
1458     ret += select_var_defs;
1459   }
1460
1461
1462 //              Variables to store results of partial functions.
1463   if(sl_fcns_start != sl_fcns_end){
1464         ret += "/*\t\tVariables to store the results of partial functions.\t*/\n";
1465         for(p=sl_fcns_start;p<sl_fcns_end;p++){
1466                 sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
1467                         partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
1468                 ret += tmpstr;
1469         }
1470         ret += "\tgs_retval_t retval = 0;\n\tint unpack_failed = 0;\n;";
1471   }
1472
1473 //              Variables for udaf output temporaries
1474         bool no_udaf = true;
1475         int a;
1476         for(a=0;a<aggr_tbl->size();a++){
1477                 if(! aggr_tbl->is_builtin(a)){
1478                         if(no_udaf){
1479                                 ret+="/*\t\tUDAF output vars.\t*/\n";
1480                                 no_udaf = false;
1481                         }
1482                         int afcn_id = aggr_tbl->get_fcn_id(a);
1483                         data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
1484                         sprintf(tmpstr,"udaf_ret%d", a);
1485                         ret+="\t"+adt->make_cvar(tmpstr)+";\n";
1486                 }
1487         }
1488
1489
1490 //  ret+="\tt->flush_finished=1; /* flush will be completed */\n";
1491   ret+="\n";
1492   ret+="\tgen_val = t->generation & SLOT_GEN_BITS;\n";
1493   ret+="\tfor (i=t->flush_pos; (i < t->max_aggrs) && t->n_aggrs && nflush>0; ++i){\n";
1494   ret+="\t\tif ( (t->aggr_table_hashmap[i] & SLOT_FILLED) && (((t->aggr_table_hashmap[i] & SLOT_GEN_BITS) != gen_val ) || (";
1495                 bool first_g=true;
1496                 int g;
1497                 for(g=0;g<gb_tbl->size();g++){
1498                   data_type *gdt = gb_tbl->get_data_type(g);
1499                   if(gdt->is_temporal()){
1500                         if(first_g) first_g=false; else ret+=" || ";
1501                         ret += "t->last_gb_"+int_to_string(g)+" > t->aggr_table[i].gb_var"+int_to_string(g)+" ";
1502                   }
1503                 }
1504   ret += "))) {\n";
1505   ret+="\t\t\tt->aggr_table_hashmap[i] = 0;\n";
1506   ret+=
1507 "#ifdef LFTA_STATS\n"
1508 "\t\t\tt->eviction_cnt++;\n"
1509 "#endif\n"
1510 ;
1511
1512
1513   ret+=generate_tuple_from_aggr(node_name,schema,"i");
1514
1515 //  ret+="\t\t\tt->n_aggrs--;\n";  // done in generate_tuple_from_aggr
1516   ret+="\t\t\tnflush--;\n";
1517   ret+="\t\t}\n";
1518   ret+="\t}\n";
1519   ret+="\tt->flush_pos=i;\n";
1520   ret+="\tif(t->n_aggrs == 0) {\n";
1521   ret+="\t\tt->flush_pos = t->max_aggrs;\n";
1522   ret += "\t}\n\n";
1523
1524   ret+="\tif(t->flush_pos == t->max_aggrs) {\n";
1525
1526   for(int g=0;g<gb_tbl->size();g++){
1527         data_type *dt = gb_tbl->get_data_type(g);
1528         if(dt->is_temporal()){
1529                 data_type *gdt = gb_tbl->get_data_type(g);
1530                 if(!gdt->is_buffer_type()){
1531                         sprintf(tmpstr,"\t\tt->last_flushed_gb_%d = t->flush_start_gb_%d;\n",g,g);
1532                         ret += tmpstr;
1533                 }
1534         }
1535   }
1536   ret += "\t}\n}\n\n";
1537
1538   return(ret);
1539 }
1540
1541 // TODO Remove sprintf to perform string catenation
1542 string generate_fta_load_params(string node_name){
1543         int p;
1544         vector<string> param_names = param_tbl->get_param_names();
1545
1546         string ret = "static int load_params_"+node_name+"(struct "+generate_fta_name(node_name);
1547                 ret += " *t, int sz, void *value, int initial_call){\n";
1548     ret += "\tint pos=0;\n";
1549     ret += "\tint data_pos;\n";
1550
1551         for(p=0;p<param_names.size();p++){
1552                 data_type *dt = param_tbl->get_data_type(param_names[p]);
1553                 if(dt->is_buffer_type()){
1554                         sprintf(tmpstr,"\t%s tmp_var_%s;\n",dt->get_cvar_type().c_str(), param_names[p].c_str() );
1555                         ret += tmpstr;
1556                         sprintf(tmpstr,"\t%s access_var_%s;\n",dt->get_tuple_cvar_type().c_str(), param_names[p].c_str() );
1557                         ret += tmpstr;
1558                 }
1559         }
1560
1561
1562
1563         ret += "\n\tdata_pos = ";
1564         for(p=0;p<param_names.size();p++){
1565                 if(p>0) ret += " + ";
1566                 data_type *dt = param_tbl->get_data_type(param_names[p]);
1567                 ret += "sizeof( ";
1568                 ret +=  dt->get_tuple_cvar_type();
1569                 ret += " )";
1570         }
1571         ret += ";\n";
1572         ret += "\tif(data_pos > sz) return 1;\n\n";
1573
1574
1575         for(p=0;p<param_names.size();p++){
1576                 data_type *dt = param_tbl->get_data_type(param_names[p]);
1577                 if(dt->is_buffer_type()){
1578                         sprintf(tmpstr,"\taccess_var_%s =  *( (%s *)((char *)value+pos) );\n",param_names[p].c_str(), dt->get_tuple_cvar_type().c_str() );
1579                         ret += tmpstr;
1580                         switch( dt->get_type() ){
1581                         case v_str_t:
1582 //                              ret += "\ttmp_var_"+param_names[p]+".data = ntohl( tmp_var_"+param_names[p]+".data );\n";               // ntoh conversion
1583 //                              ret += "\ttmp_var_"+param_names[p]+".length = ntohl( tmp_var_"+param_names[p]+".length );\n";   // ntoh conversion
1584                                 sprintf(tmpstr,"\tif( (access_var_%s.offset) + access_var_%s.length > sz) return 1;\n",param_names[p].c_str(), param_names[p].c_str() );
1585                                 ret += tmpstr;
1586                                 sprintf(tmpstr,"\ttmp_var_%s.data = (gs_sp_t)(value) + access_var_%s.offset ;\n",param_names[p].c_str(), param_names[p].c_str() );
1587                                 ret += tmpstr;
1588                                 sprintf(tmpstr,"\ttmp_var_%s.length =  access_var_%s.length ;\n",param_names[p].c_str(), param_names[p].c_str() );
1589                                 ret += tmpstr;
1590                         break;
1591                         default:
1592                                 fprintf(stderr,"ERROR: parameter %s is of type %s, a buffered type, but I don't know how to unpack it as a parameter.\n",param_names[p].c_str(), dt->get_type_str().c_str() );
1593                                 exit(1);
1594                         break;
1595                         }
1596 //                                      First, destroy the old
1597                         ret += "\tif(! initial_call)\n";
1598                         sprintf(tmpstr,"\t\t%s(&(t->param_%s));\n",dt->get_buffer_destroy().c_str(),param_names[p].c_str());
1599                         ret += tmpstr;
1600 //                                      Next, create the new.
1601                         sprintf(tmpstr,"\t%s((struct FTA *)t, &(t->param_%s), &tmp_var_%s);\n", dt->get_buffer_assign_copy().c_str(), param_names[p].c_str(), param_names[p].c_str() );
1602                         ret += tmpstr;
1603                 }else{
1604 //                      if(dt->needs_hn_translation()){
1605 //                              sprintf(tmpstr,"\tt->param_%s =  %s( *( (%s *)( (char *)value+pos) ) );\n",
1606 //                                param_names[p].c_str(), dt->ntoh_translation().c_str(), dt->get_cvar_type().c_str() );
1607 //                      }else{
1608                                 sprintf(tmpstr,"\tt->param_%s =  *( (%s *)( (char *)value+pos) );\n",
1609                                   param_names[p].c_str(), dt->get_cvar_type().c_str() );
1610 //                      }
1611                         ret += tmpstr;
1612                 }
1613                 sprintf(tmpstr,"\tpos += sizeof( %s );\n",dt->get_cvar_type().c_str() );
1614                 ret += tmpstr;
1615         }
1616
1617 //                      Register the pass-by-handle parameters
1618
1619         ret += "/* register and de-register the pass-by-handle parameters */\n";
1620
1621     int ph;
1622     for(ph=0;ph<param_handle_table.size();++ph){
1623                 data_type pdt(param_handle_table[ph]->type_name);
1624                 switch(param_handle_table[ph]->val_type){
1625                 case cplx_lit_e:
1626                         break;
1627                 case litval_e:
1628                         break;
1629                 case param_e:
1630                         ret += "\tif(! initial_call)\n";
1631                         sprintf(tmpstr, "\t\t%s(t->handle_param_%d);\n",
1632                                 param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1633                         ret += tmpstr;
1634                         sprintf(tmpstr,"\tt->handle_param_%d = %s((struct FTA *)t,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
1635                         ret += tmpstr;
1636
1637                         if(pdt.is_buffer_type()) ret += "&(";
1638                         ret += "t->param_"+param_handle_table[ph]->param_name;
1639                         if(pdt.is_buffer_type()) ret += ")";
1640                         ret += ");\n";
1641                         break;
1642                 default:
1643                         sprintf(tmpstr, "INTERNAL ERROR unknown case (%d) found when processing pass-by-handle parameter table.",param_handle_table[ph]->val_type);
1644                         fprintf(stderr,"%s\n",tmpstr);
1645                         ret+=tmpstr;
1646                 }
1647         }
1648
1649         ret+="\treturn 0;\n";
1650         ret += "}\n\n";
1651
1652         return(ret);
1653 }
1654
1655
1656
1657
1658 string generate_fta_free(string node_name, bool is_aggr_query){
1659
1660         string ret="static gs_retval_t free_fta_"+node_name+"(struct FTA *f, gs_uint32_t recursive){\n";
1661         ret+= "\tstruct "+generate_fta_name(node_name)+
1662                 " * t = (struct "+generate_fta_name(node_name)+" *) f;\n";
1663         ret += "\tint i;\n";
1664
1665         if(is_aggr_query){
1666                 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1667                 ret+="\t/* \t\tmark all groups as old */\n";
1668                 ret+="\tt->generation++;\n";
1669                 ret+="\tt->flush_pos = 0;\n";
1670                 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1671         }
1672
1673 //                      Deregister the pass-by-handle parameters
1674         ret += "/* de-register the pass-by-handle parameters */\n";
1675     int ph;
1676     for(ph=0;ph<param_handle_table.size();++ph){
1677                 sprintf(tmpstr, "\t%s(t->handle_param_%d);\n",
1678                         param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1679                 ret += tmpstr;
1680         }
1681
1682
1683         ret += "\treturn 0;\n}\n\n";
1684         return(ret);
1685 }
1686
1687
1688 string generate_fta_control(string node_name, table_list *schema, bool is_aggr_query){
1689         string ret="static gs_retval_t control_fta_"+node_name+"(struct FTA *f,  gs_int32_t command, gs_int32_t sz, void *value){\n";
1690         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1691                 ret += generate_fta_name(node_name)+" *) f;\n\n";
1692         ret+="\tint i;\n";
1693
1694
1695         ret += "\t/* temp status tuple */\n";
1696         ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1697         ret += "\tgs_int32_t tuple_size;\n";
1698
1699
1700         if(is_aggr_query){
1701                 ret+="\tif(command == FTA_COMMAND_FLUSH){\n";
1702
1703                 ret+="\t\tif (!t->n_aggrs) {\n";
1704                 ret+="\t\t\ttuple = allocate_tuple(f, 0);\n";
1705                 ret+="\t\t\tif( tuple != NULL)\n";
1706                 ret+="\t\t\t\tpost_tuple(tuple);\n";
1707
1708                 ret+="\t\t}else{\n";
1709
1710                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1711                 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1712                 ret +="\t\tt->generation++;\n";
1713                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1714                 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1715                 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1716                 ret+="\t\t\tt->flush_pos = 0;\n";
1717                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1718                 ret+="\t\t}\n";
1719
1720                 ret+="\t}\n";
1721         }
1722         if(param_tbl->size() > 0){
1723                 ret+=
1724 "\tif(command == FTA_COMMAND_LOAD_PARAMS){\n"
1725 "\t\tif(load_params_"+node_name+"(t, sz, value, 0))\n"
1726 "#ifndef LFTA_IN_NIC\n"
1727 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small, ignored.\\n\");\n"
1728 "#else\n"
1729 "\t\t{}\n"
1730 "#endif\n"
1731 "\t}\n";
1732         }
1733         ret+=
1734 "\tif(command == FTA_COMMAND_SET_SAMPLING_RATE){\n"
1735 "\t\tmemcpy(&t->sampling_rate, value, sizeof(gs_float_t));\n"
1736 "\t}\n\n";
1737
1738
1739         ret += "\tif(command == FTA_COMMAND_FILE_DONE ){\n";
1740
1741         if(is_aggr_query){
1742                 ret+="\t\tif (t->n_aggrs) {\n";
1743                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1744                 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1745                 ret +="\t\tt->generation++;\n";
1746                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1747                 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1748                 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1749                 ret+="\t\t\tt->flush_pos = 0;\n";
1750                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1751                 ret+="\t\t}\n";
1752         }
1753
1754         ret += "\t\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+");\n";
1755         ret += "\t\ttuple = allocate_tuple(f, tuple_size );\n";
1756         ret += "\t\tif( tuple == NULL)\n\t\treturn 1;\n";
1757
1758         /* mark tuple as EOF_TUPLE */
1759         ret += "\n\t\t/* Mark tuple as eof_tuple */\n";
1760         ret += "\t\ttuple->tuple_type = EOF_TUPLE;\n";
1761         ret += "\t\tpost_tuple(tuple);\n";
1762         ret += "\t}\n";
1763
1764         ret += "\treturn 0;\n}\n\n";
1765
1766         return(ret);
1767 }
1768
1769 string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query){
1770         string ret="static gs_retval_t clock_fta_"+node_name+"(struct FTA *f){\n";
1771         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1772                 ret += generate_fta_name(node_name)+" *) f;\n\n";
1773
1774         ret += "\t/* Create a temp status tuple */\n";
1775         ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1776         ret += "\tgs_int32_t tuple_size;\n";
1777         ret += "\tunsigned int i;\n";
1778         ret += "\ttime_t cur_time;\n";
1779         ret += "\tint time_advanced;\n";
1780         ret += "\tstruct fta_stat stats;\n";
1781
1782
1783
1784         /* copy the last seen values of temporal attributes */
1785         col_id_set temp_cids;           //      col ids of temp attributes in select clause
1786
1787
1788         /* HACK: in order to reuse the SE generation code, we need to copy
1789          * the last values of the temp attributes into new variables
1790          * which have names unpack_var_XXX_XXX
1791          */
1792
1793         int s, g;
1794         col_id_set::iterator csi;
1795
1796         for(s=0;s<sl_list.size();s++){
1797                 data_type *sdt = sl_list[s]->get_data_type();
1798                 if (sdt->is_temporal()) {
1799                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
1800                 }
1801         }
1802
1803         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
1804                 int tblref = (*csi).tblvar_ref;
1805                 int schref = (*csi).schema_ref;
1806                 string field = (*csi).field;
1807                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
1808                 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
1809                 ret += tmpstr;
1810         }
1811
1812         if (is_aggr_query) {
1813                 for(g=0;g<gb_tbl->size();g++){
1814                         data_type *gdt = gb_tbl->get_data_type(g);
1815                         if(gdt->is_temporal()){
1816                                 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
1817                                 ret += tmpstr;
1818                                 data_type *gdt = gb_tbl->get_data_type(g);
1819                                 if(gdt->is_buffer_type()){
1820                                 sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
1821                                 ret += tmpstr;
1822                                 }
1823                         }
1824                 }
1825         }
1826         ret += "\n";
1827
1828         ret += "\ttime_advanced = 0;\n";
1829
1830         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
1831                 int tblref = (*csi).tblvar_ref;
1832                 int schref = (*csi).schema_ref;
1833                 string field = (*csi).field;
1834                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
1835
1836                 // update last seen value with the value seen
1837                 ret += "\t#ifdef PREFILTER_DEFINED\n";
1838                 sprintf(tmpstr,"\tif (prefilter_temp_vars.unpack_var_%s_%d > t->last_%s_%d) {\n\t\tt->last_%s_%d = prefilter_temp_vars.unpack_var_%s_%d;\n",
1839                         field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref);
1840                 ret += tmpstr;
1841                 ret += "\t\ttime_advanced = 1;\n\t}\n";
1842                 ret += "\t#endif\n";
1843
1844                 // we need to pay special attention to time fields
1845                 if (field == "time" || field == "timestamp"){
1846                         ret += "\tcur_time = time(&cur_time);\n";
1847
1848                         if (field == "time") {
1849                                 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && (t->last_time_%d < (cur_time - %d))) {\n",
1850                                         tblref, time_corr);
1851                                 ret += tmpstr;
1852                                 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = cur_time - %d;\n",
1853                                         field.c_str(), tblref, field.c_str(), tblref, time_corr);
1854                         } else {
1855                                 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((gs_uint32_t)(t->last_%s_%d>>32) < (cur_time - %d))) {\n",
1856                                         field.c_str(), tblref, time_corr);
1857                                 ret += tmpstr;
1858                                 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = ((gs_uint64_t)(cur_time - %d))<<32;\n",
1859                                         field.c_str(), tblref, field.c_str(), tblref, time_corr);
1860                         }
1861                         ret += tmpstr;
1862
1863                         ret += "\t\ttime_advanced = 1;\n";
1864                         ret += "\t}\n";
1865
1866                         sprintf(tmpstr,"\telse\n\t\tunpack_var_%s_%d = t->last_%s_%d;\n",
1867                                 field.c_str(), tblref, field.c_str(), tblref);
1868                         ret += tmpstr;
1869                 } else {
1870                         sprintf(tmpstr,"\tunpack_var_%s_%d = t->last_%s_%d;\n",
1871                                 field.c_str(), tblref, field.c_str(), tblref);
1872                         ret += tmpstr;
1873                 }
1874         }
1875
1876         // for aggregation lftas we need to check if the time was advanced beyond the current epoch
1877         if (is_aggr_query) {
1878
1879                 string change_test;
1880                 bool first_one = true;
1881                 for(g=0;g<gb_tbl->size();g++){
1882                   data_type *gdt = gb_tbl->get_data_type(g);
1883                   if(gdt->is_temporal()){
1884 //                                      To perform the test, first need to compute the value
1885 //                                      of the temporal gb attrs.
1886                           if(gdt->is_buffer_type()){
1887         //                              NOTE : if the SE defining the gb is anything
1888         //                              other than a ref to a variable, this will generate
1889         //                              illegal code.  To be resolved with Spatch.
1890                                 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
1891                                         g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
1892                                 ret+=tmpstr;
1893                                 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
1894                                         gdt->get_buffer_assign_copy().c_str(), g, g);
1895                           }else{
1896                                 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
1897                           }
1898                           ret += tmpstr;
1899
1900                           sprintf(tmpstr,"t->last_gb_%d",g);   string lhs_op = tmpstr;
1901                           sprintf(tmpstr,"gb_attr_%d",g);   string rhs_op = tmpstr;
1902                           if(first_one){first_one = false;} else {change_test.append(") && (");}
1903                           change_test.append(generate_equality_test(lhs_op, rhs_op, gdt));
1904                   }
1905                 }
1906
1907                 ret += "\n\tif( time_advanced && !( (";
1908                 ret += change_test;
1909                 ret += ") ) ){\n";
1910
1911                 ret += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
1912                 ret += "\t\tif(t->flush_pos<t->max_aggrs) \n";
1913                 ret += "\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
1914
1915                 ret += "\t\t/* \t\tmark all groups as old */\n";
1916                 ret +="\t\tt->generation++;\n";
1917                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1918                 ret += "//\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1919                 ret += "//\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1920                 ret += "\t\tt->flush_pos = 0;\n";
1921
1922                 for(g=0;g<gb_tbl->size();g++){
1923                      data_type *gdt = gb_tbl->get_data_type(g);
1924                      if(gdt->is_temporal()){
1925                         sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);                 ret += tmpstr;
1926                         sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);                        ret += tmpstr;
1927                      }
1928                 }
1929                 ret += "\t}\n\n";
1930
1931         }
1932
1933         ret += "\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t) + sizeof(struct fta_stat);\n";
1934         ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
1935         ret += "\tif( tuple == NULL)\n\t\treturn 1;\n";
1936
1937
1938         for(s=0;s<sl_list.size();s++){
1939                 data_type *sdt = sl_list[s]->get_data_type();
1940                 if(sdt->is_temporal()){
1941
1942                         if (sl_list[s]->is_gb()) {
1943                                 sprintf(tmpstr,"\tt->last_flushed_gb_%d = (t->n_aggrs) ? t->last_flushed_gb_%d : %s;\n",sl_list[s]->get_gb_ref(), sl_list[s]->get_gb_ref(), generate_se_code(sl_list[s],schema).c_str());
1944                                 ret += tmpstr;
1945                         }
1946
1947                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
1948                         ret += tmpstr;
1949 //                      if(sdt->needs_hn_translation())
1950 //                              ret += sdt->hton_translation() +"( ";
1951                         if (sl_list[s]->is_gb()) {
1952                                 sprintf(tmpstr, "t->last_flushed_gb_%d",sl_list[s]->get_gb_ref());
1953                                 ret += tmpstr;
1954                         } else{
1955                                 ret += generate_se_code(sl_list[s],schema);
1956                         }
1957 //                      if(sdt->needs_hn_translation())
1958 //                              ret += " )";
1959                         ret += ";\n";
1960                 }
1961         }
1962
1963         /* mark tuple as temporal */
1964         ret += "\n\t/* Mark tuple as temporal */\n";
1965         ret += "\ttuple->tuple_type = TEMPORAL_TUPLE;\n";
1966
1967         ret += "\n\t/* Copy trace id */\n";
1968         ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+"), &t->trace_id, sizeof(gs_uint64_t));\n\n";
1969
1970         ret += "\n\t/* Populate runtime stats */\n";
1971         ret += "\tstats.ftaid = f->ftaid;\n";
1972         ret += "\tstats.in_tuple_cnt = t->in_tuple_cnt;\n";
1973         ret += "\tstats.out_tuple_cnt = t->out_tuple_cnt;\n";
1974         ret += "\tstats.out_tuple_sz = t->out_tuple_sz;\n";
1975         ret += "\tstats.accepted_tuple_cnt = t->accepted_tuple_cnt;\n";
1976         ret += "\tstats.cycle_cnt = t->cycle_cnt;\n";
1977         ret += "\tstats.collision_cnt = t->collision_cnt;\n";
1978         ret += "\tstats.eviction_cnt = t->eviction_cnt;\n";
1979         ret += "\tstats.sampling_rate    = t->sampling_rate;\n";
1980
1981         ret += "\n#ifdef LFTA_PROFILE\n";
1982         ret += "\n\t/* Print stats */\n";
1983         ret += "\tfprintf(stderr, \"STATS " + node_name + " \");\n";
1984         ret += "\tfprintf(stderr, \"in_tuple_cnt= %u \", t->in_tuple_cnt);\n";
1985         ret += "\tfprintf(stderr, \"out_tuple_cnt= %u \", t->out_tuple_cnt);\n";
1986         ret += "\tfprintf(stderr, \"out_tuple_sz= %u \", t->out_tuple_sz);\n";
1987         ret += "\tfprintf(stderr, \"accepted_tuple_cnt= %u \", t->accepted_tuple_cnt);\n";
1988         ret += "\tfprintf(stderr, \"cycle_cnt= %llu \", t->cycle_cnt);\n";
1989         ret += "\tfprintf(stderr, \"cycle_per_in_tuple= %lf \", ((double)t->cycle_cnt)/((double)t->in_tuple_cnt));\n";
1990         ret += "\tfprintf(stderr, \"collision_cnt= %d\\n\", t->collision_cnt);\n\n";
1991         ret += "\tfprintf(stderr, \"eviction_cnt= %d\\n\", t->eviction_cnt);\n\n";
1992         ret += "\n#endif\n";
1993
1994
1995         ret += "\n\t/* Copy stats */\n";
1996         ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t), &stats, sizeof(fta_stat));\n\n";
1997         ret+="\tpost_tuple(tuple);\n";
1998
1999         ret += "\n\t/* Send a heartbeat message to clearinghouse */\n";
2000         ret += "\tfta_heartbeat(f->ftaid, t->trace_id++, 1, &stats);\n";
2001
2002         ret += "\n\t/* Reset runtime stats */\n";
2003         ret += "\tt->in_tuple_cnt = 0;\n";
2004         ret += "\tt->out_tuple_cnt = 0;\n";
2005         ret += "\tt->out_tuple_sz = 0;\n";
2006         ret += "\tt->accepted_tuple_cnt = 0;\n";
2007         ret += "\tt->cycle_cnt = 0;\n";
2008         ret += "\tt->collision_cnt = 0;\n";
2009         ret += "\tt->eviction_cnt = 0;\n";
2010
2011         ret += "\treturn 0;\n}\n\n";
2012
2013         return(ret);
2014 }
2015
2016
2017 //              accept processing before the where clause,
2018 //              do flush processwing.
2019 string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *schema,   col_id_set &unpacked_cids, string &temporal_flush){
2020         int s;
2021
2022 //              Slow flush
2023   string ret="\n/*\tslow flush\t*/\n";
2024   string slow_flush_str = fs->get_val_of_def("slow_flush");
2025   int n_slow_flush = atoi(slow_flush_str.c_str());
2026   if(n_slow_flush <= 0) n_slow_flush = 2;
2027   if(n_slow_flush > 1){
2028         ret += "\tt->flush_ctr++;\n";
2029         ret += "\tif(t->flush_ctr >= "+int_to_string(n_slow_flush)+"){\n";
2030         ret += "\t\tt->flush_ctr = 0;\n";
2031     ret+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n";
2032         ret += "\t}\n\n";
2033   }else{
2034     ret+="\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n\n";
2035   }
2036
2037
2038         string change_test;
2039         bool first_one = true;
2040         int g;
2041     col_id_set flush_cids;              //      col ids accessed when computing flush variables.
2042                                                         //  unpack them at temporal flush test time.
2043     temporal_flush = "";
2044
2045
2046         for(g=0;g<gb_tbl->size();g++){
2047                   data_type *gdt = gb_tbl->get_data_type(g);
2048                   if(gdt->is_temporal()){
2049                           gather_se_col_ids(gb_tbl->get_def(g), flush_cids, gb_tbl);
2050
2051 //                                      To perform the test, first need to compute the value
2052 //                                      of the temporal gb attrs.
2053                           if(gdt->is_buffer_type()){
2054         //                              NOTE : if the SE defining the gb is anything
2055         //                              other than a ref to a variable, this will generate
2056         //                              illegal code.  To be resolved with Spatch.
2057                                 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2058                                         g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2059                                 temporal_flush += tmpstr;
2060                                 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2061                                         gdt->get_buffer_assign_copy().c_str(), g, g);
2062                           }else{
2063                                 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2064                           }
2065                           temporal_flush += tmpstr;
2066 //                                      END computing the value of the temporal GB attr.
2067
2068
2069                           sprintf(tmpstr,"t->last_gb_%d",g);   string lhs_op = tmpstr;
2070                           sprintf(tmpstr,"gb_attr_%d",g);   string rhs_op = tmpstr;
2071                           if(first_one){first_one = false;} else {change_test.append(") && (");}
2072                           change_test += generate_equality_test(lhs_op, rhs_op, gdt);
2073                   }
2074         }
2075         if(!first_one){         // will be false iff. there is a temporal GB attribute
2076                   temporal_flush += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
2077                   temporal_flush += "\tif( !( (";
2078                   temporal_flush += change_test;
2079                   temporal_flush += ") ) ){\n";
2080
2081 //                temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2082                   temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs){ \n";
2083                   temporal_flush+="\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2084                   temporal_flush+="\t\t}\n";
2085                   temporal_flush+="\t\t/* \t\tmark all groups as old */\n";
2086                   temporal_flush+="\t\tt->generation++;\n";
2087                   temporal_flush+="\t\tt->flush_pos = 0;\n";
2088
2089
2090 //                              Now set the saved temporal value of the gb to the
2091 //                              current value of the gb.  Only for simple types,
2092 //                              not for buffer types -- but the strings are not
2093 //                              temporal in any case.
2094
2095                         for(g=0;g<gb_tbl->size();g++){
2096                           data_type *gdt = gb_tbl->get_data_type(g);
2097                           if(gdt->is_temporal()){
2098                                   if(gdt->is_buffer_type()){
2099
2100                                         fprintf(stderr,"ERROR : can't handle temporal buffer types, ignoring in buffer flush control.\n");
2101                                   }else{
2102                                         sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);
2103                                         temporal_flush += tmpstr;
2104                                         sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);
2105                                         temporal_flush += tmpstr;
2106                                   }
2107                                 }
2108                         }
2109                   temporal_flush += "\t}\n\n";
2110         }
2111
2112 // Unpack all the temporal attributes referenced in select clause
2113 // and update the last value of the attribute
2114         col_id_set temp_cids;           //      col ids of temp attributes in select clause
2115         col_id_set::iterator csi;
2116
2117         for(s=0;s<sl_list.size();s++){
2118                 data_type *sdt = sl_list[s]->get_data_type();
2119                 if (sdt->is_temporal()) {
2120                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
2121                 }
2122         }
2123
2124         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2125                 if(unpacked_cids.count((*csi)) == 0){
2126                         int tblref = (*csi).tblvar_ref;
2127                         int schref = (*csi).schema_ref;
2128                         string field = (*csi).field;
2129                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2130 /*
2131                         data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2132                         sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",
2133                                 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2134                         ret += tmpstr;
2135                         ret += "\tif(retval) return 1;\n";
2136 */
2137                         sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
2138                         ret += tmpstr;
2139
2140                         unpacked_cids.insert( (*csi) );
2141                 }
2142         }
2143
2144
2145 //                              Do the flush here if this is a real_time query
2146         string rt_level = fs->get_val_of_def("real_time");
2147         if(rt_level != "" && temporal_flush != ""){
2148                 for(csi=flush_cids.begin(); csi != flush_cids.end();++csi){
2149                         if(unpacked_cids.count((*csi)) == 0){
2150                         int tblref = (*csi).tblvar_ref;
2151                         int schref = (*csi).schema_ref;
2152                         string field = (*csi).field;
2153                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2154 /*
2155                                 sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",
2156                                 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2157                         ret += tmpstr;
2158                         ret += "\tif(retval) return 1;\n";
2159 */
2160                                 unpacked_cids.insert( (*csi) );
2161                         }
2162                 }
2163                 ret += temporal_flush;
2164         }
2165
2166         return ret;
2167  }
2168
2169 string generate_sel_accept_body(qp_node *fs, string node_name, table_list *schema){
2170
2171 int p,s;
2172 string ret;
2173
2174 ///////////////                 Processing for filter-only query
2175
2176 //                      test passed : create the tuple, then assign to it.
2177           ret += "/*\t\tCreate and post the tuple\t*/\n";
2178
2179 //                      Unpack partial fcns ref'd by the select clause.
2180 //                      Its a kind of a WHERE clause ...
2181   for(p=sl_fcns_start;p<sl_fcns_end;p++){
2182         if(fcn_ref_cnt[p] > 1){
2183                 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2184         }
2185         if(is_partial_fcn[p]){
2186                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2187                 ret += "\tif(retval) goto end;\n";
2188         }
2189         if(fcn_ref_cnt[p] > 1){
2190                 if(!is_partial_fcn[p]){
2191                         ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2192                 }
2193                 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2194                 ret += "\t}\n";
2195         }
2196   }
2197
2198   // increment the counter of accepted tuples
2199   ret += "\n\t#ifdef LFTA_STATS\n";
2200   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2201   ret += "\t#endif\n\n";
2202
2203 //                      First, compute the size of the tuple.
2204
2205 //                      Unpack any BUFFER type selections into temporaries
2206 //                      so that I can compute their size and not have
2207 //                      to recompute their value during tuple packing.
2208 //                      I can use regular assignment here because
2209 //                      these temporaries are non-persistent.
2210
2211           for(s=0;s<sl_list.size();s++){
2212                 data_type *sdt = sl_list[s]->get_data_type();
2213                 if(sdt->is_buffer_type()){
2214                         sprintf(tmpstr,"\tselvar_%d = ",s);
2215                         ret += tmpstr;
2216                         ret += generate_se_code(sl_list[s],schema);
2217                         ret += ";\n";
2218                 }
2219           }
2220
2221
2222 //              The size of the tuple is the size of the tuple struct plus the
2223 //              size of the buffers to be copied in.
2224
2225           ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
2226           for(s=0;s<sl_list.size();s++){
2227                 data_type *sdt = sl_list[s]->get_data_type();
2228                 if(sdt->is_buffer_type()){
2229                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
2230                         ret += tmpstr;
2231                 }
2232           }
2233           ret += ";\n";
2234
2235
2236           ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
2237           ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
2238
2239 //                      Test passed, make assignments to the tuple.
2240
2241           ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
2242
2243 //                      Mark tuple as REGULAR_TUPLE
2244           ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
2245
2246
2247           for(s=0;s<sl_list.size();s++){
2248                 data_type *sdt = sl_list[s]->get_data_type();
2249                 if(sdt->is_buffer_type()){
2250                         sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
2251                         ret += tmpstr;
2252                         sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
2253                         ret += tmpstr;
2254                 }else{
2255                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2256                         ret += tmpstr;
2257 //                      if(sdt->needs_hn_translation())
2258 //                              ret += sdt->hton_translation() +"( ";
2259                         ret += generate_se_code(sl_list[s],schema);
2260 //                      if(sdt->needs_hn_translation())
2261 //                              ret += ") ";
2262                         ret += ";\n";
2263                 }
2264           }
2265
2266 //              Generate output.
2267
2268           ret += "\tpost_tuple(tuple);\n";
2269
2270 //              Increment the counter of posted tuples
2271   ret += "\n\t#ifdef LFTA_STATS\n";
2272   ret += "\tt->out_tuple_cnt++;\n";
2273   ret+="\tt->out_tuple_sz+=tuple_size;\n";
2274   ret += "\t#endif\n\n";
2275
2276
2277
2278         return ret;
2279 }
2280
2281 string generate_fj_accept_body(filter_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
2282
2283 int p,s,w;
2284 string ret;
2285
2286 //                      Get parameters
2287         unsigned int window_len = fs->temporal_range;
2288         unsigned int n_bloom = 11;
2289         string n_bloom_str = fs->get_val_of_def("num_bloom");
2290         int tmp_n_bloom = atoi(n_bloom_str.c_str());
2291         if(tmp_n_bloom>0)
2292                 n_bloom = tmp_n_bloom+1;
2293         float bloom_width = (window_len+1.0)/(1.0*n_bloom-1);
2294         sprintf(tmpstr,"%f",bloom_width);
2295         string bloom_width_str = tmpstr;
2296
2297         if(window_len < n_bloom){
2298                 n_bloom = window_len+1;
2299                 bloom_width_str = "1";
2300         }
2301
2302
2303 //              Grab the current window time
2304         scalarexp_t winvar(fs->temporal_var);
2305         ret += "\tcurr_fj_ts = "+generate_se_code(&winvar,schema)+";\n";
2306
2307         int bf_exp_size = 12;  // base-2 log of number of bits
2308         string bloom_len_str = fs->get_val_of_def("bloom_size");
2309         int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
2310         if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
2311                 bf_exp_size = tmp_bf_exp_size;
2312         }
2313         int bf_bit_size = 1 << bf_exp_size;
2314         int bf_byte_size = bf_bit_size / (8*sizeof(char));
2315
2316         unsigned int ht_size = 4096;
2317         string ht_size_s = fs->get_val_of_def("aggregate_slots");
2318         int tmp_ht_size = atoi(ht_size_s.c_str());
2319         if(tmp_ht_size > 1024){
2320                 unsigned int hs = 1;            // make it power of 2
2321                 while(tmp_ht_size){
2322                         hs =hs << 1;
2323                         tmp_ht_size = tmp_ht_size >> 1;
2324                 }
2325                 ht_size = hs;
2326         }
2327
2328         int i, bf_mask = 0;
2329         if(fs->use_bloom){
2330                 for(i=0;i<bf_exp_size;i++)
2331                         bf_mask = (bf_mask << 1) | 1;
2332         }else{
2333                 for(i=ht_size;i>1;i=i>>1)
2334                         bf_mask = (bf_mask << 1) | 1;
2335         }
2336
2337 /*
2338 printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=%d, bf_byte_size=%d, ht_size=%d, ht_size_s=%s, bf_mask=%d\n",
2339         n_bloom,
2340         window_len,
2341         bloom_width_str.c_str(),
2342         bf_exp_size,
2343         bf_bit_size,
2344         bf_byte_size,
2345         ht_size,
2346         ht_size_s.c_str(),
2347         bf_mask);
2348 */
2349
2350
2351
2352
2353 //              If this is a bloom-filter fj, first test if the
2354 //              bloom filter needs to be advanced.
2355 //              SET_BF_EMPTY(table,number of bloom filters,bloom filter index,bit index)
2356 //              t->bf_size : number of bits in bloom filter
2357         if(fs->use_bloom){
2358                 ret +=
2359 "//                     Clean out old bloom filters if needed.\n"
2360 "       if(t->first_exec){\n"
2361 "               t->first_exec = 0;\n"
2362 "               t->last_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2363 "               t->last_bloom_pos = t->last_bin % "+int_to_string(n_bloom)+";\n"
2364 "       }else{\n"
2365 "               curr_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2366 "               if(curr_bin != t->last_bin){\n"
2367 "                       for(the_bin=t->last_bin+1;the_bin<=curr_bin;the_bin++){\n"
2368 "                               t->last_bloom_pos++;\n"
2369 "                               if(t->last_bloom_pos >= "+int_to_string(n_bloom)+")\n"
2370 "                                       t->last_bloom_pos = 0;\n"
2371 "                               tmp_i = t->last_bloom_pos;\n"
2372 "                               for(j=0;j<"+int_to_string(bf_bit_size)+";j++){\n"
2373 "                                       SET_BF_EMPTY(t->bf_table, "+int_to_string(n_bloom)+", tmp_i,j);\n"
2374 "                               }\n"
2375 "                       }\n"
2376 "               }\n"
2377 "               t->last_bin = curr_bin;\n"
2378 "       }\n"
2379 ;
2380         }
2381
2382
2383 //-----------------------------------------------------------------
2384 //              First, determine whether to do S (filter stream) processing.
2385
2386         ret +=
2387 "//             S (filtering stream) predicate, should it be processed?\n"
2388 "\n"
2389 ;
2390 // Sort S preds based on cost.
2391         vector<cnf_elem *> s_filt = fs->pred_t1;
2392         col_id_set::iterator csi;
2393   if(s_filt.size() > 0){
2394
2395 //                      Unpack fields ref'd in the S pred
2396         for(w=0;w<s_filt.size();++w){
2397                 col_id_set this_pred_cids;
2398                 gather_pr_col_ids(s_filt[w]->pr, this_pred_cids, gb_tbl);
2399                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2400                         if(unpacked_cids.count( (*csi) ) == 0){
2401                                 int tblref = (*csi).tblvar_ref;
2402                                 int schref = (*csi).schema_ref;
2403                                 string field = (*csi).field;
2404                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name,"end_s");
2405                                 unpacked_cids.insert( (*csi) );
2406                         }
2407                 }
2408         }
2409
2410
2411 //              Sort by evaluation cost.
2412 //              First, estimate evaluation costs
2413 //              Eliminate predicates covered by the prefilter (those in s_pids).
2414 //              I need to do it before the sort becuase the indices refer
2415 //              to the position in the unsorted list.
2416         vector<cnf_elem *> tmp_wh;
2417         for(w=0;w<s_filt.size();++w){
2418                 compute_cnf_cost(s_filt[w],Ext_fcns);
2419                 tmp_wh.push_back(s_filt[w]);
2420         }
2421         s_filt = tmp_wh;
2422
2423         sort(s_filt.begin(), s_filt.end(), compare_cnf_cost());
2424
2425 //              Now generate the predicates.
2426         for(w=0;w<s_filt.size();++w){
2427                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,s_filt[w]->cost);
2428                 ret += tmpstr;
2429
2430 //                      Find partial fcns ref'd in this cnf element
2431                 set<int> pfcn_refs;
2432                 collect_partial_fcns_pr(s_filt[w]->pr, pfcn_refs);
2433 //                      Since set<..> is a "Sorted Associative Container",
2434 //                      we can walk through it in sorted order by walking from
2435 //                      begin() to end().  (and the partial fcns must be
2436 //                      evaluated in this order).
2437                 set<int>::iterator si;
2438                 string pf_preds;
2439                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2440                         if(fcn_ref_cnt[(*si)] > 1){
2441                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2442                         }
2443                         if(is_partial_fcn[(*si)]){
2444                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2445                                 ret += "\t\tif(retval) goto end_s;\n";
2446                         }
2447                         if(fcn_ref_cnt[(*si)] > 1){
2448                                 if(!is_partial_fcn[(*si)]){
2449                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2450 //              Testing for S is a side branch.
2451 //              I don't want a cacheable partial function to be
2452 //              marked as evaluated.  Therefore I mark the function
2453 //              as evalauted ONLY IF it is not partial.
2454                                         ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2455                                 }
2456                                 ret += "\t}\n";
2457                         }
2458                 }
2459
2460                 ret += "\tif( !("+generate_predicate_code(s_filt[w]->pr,schema)+
2461                                 ") ) goto end_s;\n";
2462         }
2463   }else{
2464           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2465   }
2466
2467         for(p=0;p<fs->hash_eq.size();++p)
2468                 ret += "\t\ts_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_right_se(),schema)+";\n";
2469
2470         if(fs->use_bloom){
2471 //                      First, generate the S scalar expressions in the hash_eq
2472
2473 //                      Iterate over the bloom filters
2474                 for(i=0;i<3;i++){
2475                         ret += "\t\tbucket=0;\n";
2476                         for(p=0;p<fs->hash_eq.size();++p){
2477                                 ret +=
2478 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2479         fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2480         +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2481                         }
2482 //              SET_BF_BIT(table,number of bloom filters,bloom filter index,bit index)
2483                                 ret +=
2484 "               bucket &= "+int_to_string(bf_mask)+";\n"
2485 "               SET_BF_BIT(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket);\n"
2486 "\n"
2487 ;
2488                 }
2489         }else{
2490                 ret += "\t\tbucket=0;\n";
2491                 for(p=0;p<fs->hash_eq.size();++p){
2492                         ret +=
2493 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2494         fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2495         +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2496                 }
2497                 ret +=
2498 "               bucket &= "+int_to_string(bf_mask)+";\n"
2499 "               bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2500 ;
2501 //                      Try the first bucket
2502                 ret += "\t\tif(";
2503                 for(p=0;p<fs->hash_eq.size();++p){
2504                         if(p>0) ret += " && ";
2505 //                      ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2506 //                                      " == s_equijoin_"+int_to_string(p);
2507                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2508                         string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2509                         string rhs_op = "s_equijoin_"+int_to_string(p);
2510                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2511                 }
2512                 ret += "){\n\t\t\tthe_bucket = bucket;\n";
2513                 ret += "\t\t}else {if(";
2514                 for(p=0;p<fs->hash_eq.size();++p){
2515                         if(p>0) ret += " && ";
2516 //                      ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2517 //                                      " == s_equijoin_"+int_to_string(p);
2518                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2519                         string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2520                         string rhs_op = "s_equijoin_"+int_to_string(p);
2521                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2522                 }
2523                 ret +=  "){\n\t\t\tthe_bucket = bucket1;\n";
2524                 ret += "\t\t}else{ if(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n";
2525                 ret+="\t\t\tthe_bucket = bucket;\n\t\t\telse the_bucket=bucket1;\n";
2526                 ret += "\t\t}}\n";
2527                 for(p=0;p<fs->hash_eq.size();++p){
2528                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2529                         if(hdt->is_buffer_type()){
2530                                 sprintf(tmpstr,"\t\t%s(f, &(t->join_table[the_bucket].key_var%d), &s_equijoin_%d);\n", hdt->get_buffer_assign_copy().c_str(), p, p);
2531                                 ret += tmpstr;
2532                         }else{
2533                                 ret += "\t\tt->join_table[the_bucket].key_var"+int_to_string(p)+
2534                                         " = s_equijoin_"+int_to_string(p)+";\n";
2535                         }
2536                 }
2537                 ret+="\t\tt->join_table[the_bucket].ts =  curr_fj_ts;\n";
2538         }
2539   ret += "\tend_s:\n";
2540
2541 //      ------------------------------------------------------------
2542 //              Next, determine if the R record should be processed.
2543
2544
2545         ret +=
2546 "//             R (main stream) cheap predicate\n"
2547 "\n"
2548 ;
2549
2550 //              Unpack r_filt fields
2551         vector<cnf_elem *> r_filt = fs->pred_t0;
2552         for(w=0;w<r_filt.size();++w){
2553                 col_id_set this_pred_cids;
2554                 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
2555                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2556                         if(unpacked_cids.count( (*csi) ) == 0){
2557                                 int tblref = (*csi).tblvar_ref;
2558                                 int schref = (*csi).schema_ref;
2559                                 string field = (*csi).field;
2560                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2561                                 unpacked_cids.insert( (*csi) );
2562                         }
2563                 }
2564         }
2565
2566 // Sort S preds based on cost.
2567
2568         vector<cnf_elem *> tmp_wh;
2569         for(w=0;w<r_filt.size();++w){
2570                 compute_cnf_cost(r_filt[w],Ext_fcns);
2571                 tmp_wh.push_back(r_filt[w]);
2572         }
2573         r_filt = tmp_wh;
2574
2575         sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
2576
2577 //              WARNING! the constant 20 below is a wild-ass guess.
2578         int cheap_rpos;
2579         for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++)
2580
2581 //              Test the cheap filters on R.
2582   if(cheap_rpos >0){
2583
2584 //              Now generate the predicates.
2585         for(w=0;w<cheap_rpos;++w){
2586                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
2587                 ret += tmpstr;
2588
2589 //                      Find partial fcns ref'd in this cnf element
2590                 set<int> pfcn_refs;
2591                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2592 //                      Since set<..> is a "Sorted Associative Container",
2593 //                      we can walk through it in sorted order by walking from
2594 //                      begin() to end().  (and the partial fcns must be
2595 //                      evaluated in this order).
2596                 set<int>::iterator si;
2597                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2598                         if(fcn_ref_cnt[(*si)] > 1){
2599                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2600                         }
2601                         if(is_partial_fcn[(*si)]){
2602                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2603                                 ret += "\t\tif(retval) goto end;\n";
2604                         }
2605                         if(fcn_ref_cnt[(*si)] > 1){
2606                                 if(!is_partial_fcn[(*si)]){
2607                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2608                                 }
2609                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2610                                 ret += "\t}\n";
2611                         }
2612                 }
2613
2614                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2615                                 ") ) goto end;\n";
2616         }
2617   }else{
2618           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2619   }
2620
2621         ret += "\n//    Do the join\n\n";
2622         for(p=0;p<fs->hash_eq.size();++p)
2623                 ret += "\t\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n";
2624
2625
2626 //                      Passed the cheap pred, now test the join with S.
2627         if(fs->use_bloom){
2628                 for(i=0;i<3;i++){
2629                         ret += "\t\tbucket"+int_to_string(i)+"=0;\n";
2630                         for(p=0;p<fs->hash_eq.size();++p){
2631                                 ret +=
2632 "       bucket"+int_to_string(i)+
2633         " ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2634         fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2635         +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2636                         }
2637                                 ret +=
2638 "       bucket"+int_to_string(i)+" &= "+int_to_string(bf_mask)+";\n";
2639                 }
2640                 ret += "\tfound = 0;\n";
2641                 ret += "\tfor(b=0;b<"+int_to_string(n_bloom)+" && !found; b++){\n";
2642                 ret +=
2643 "\t\tif(IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket0) && "
2644 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket1) && "
2645 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket2))\n "
2646 "\t\t\tfound=1;\n"
2647 "\t}\n"
2648 ;
2649                 ret +=
2650 "       if(!found)\n"
2651 "               goto end;\n"
2652 ;
2653         }else{
2654                 ret += "\tfound = 0;\n";
2655                 ret += "\t\tbucket=0;\n";
2656                 for(p=0;p<fs->hash_eq.size();++p){
2657                         ret +=
2658 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2659         fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2660         +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2661                 }
2662                 ret +=
2663 "               bucket &= "+int_to_string(bf_mask)+";\n"
2664 "               bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2665 ;
2666 //                      Try the first bucket
2667                 ret += "\t\tif(";
2668                 for(p=0;p<fs->hash_eq.size();++p){
2669                         if(p>0) ret += " && ";
2670 //                      ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2671 //                                      " == r_equijoin_"+int_to_string(p);
2672                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2673                         string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2674                         string rhs_op = "s_equijoin_"+int_to_string(p);
2675                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2676                 }
2677                 if(p>0) ret += " && ";
2678                 ret += "t->join_table[bucket].ts+"+int_to_string(fs->temporal_range)+" <=  curr_fj_ts";
2679                 ret += "){\n\t\t\tfound = 1;\n";
2680                 ret += "\t\t}else {if(";
2681                 for(p=0;p<fs->hash_eq.size();++p){
2682                         if(p>0) ret += " && ";
2683 //                      ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2684 //                                      " == r_equijoin_"+int_to_string(p);
2685                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2686                         string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2687                         string rhs_op = "s_equijoin_"+int_to_string(p);
2688                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2689                 }
2690                 if(p>0) ret += " && ";
2691                 ret += "t->join_table[bucket1].ts+"+int_to_string(fs->temporal_range)+" <=  curr_fj_ts";
2692                 ret +=  ")\n\t\t\tfound=1;\n";
2693                 ret+="\t\t}\n";
2694                 ret +=
2695 "       if(!found)\n"
2696 "               goto end;\n"
2697 ;
2698         }
2699
2700
2701 //              Test the expensive filters on R.
2702   if(cheap_rpos < r_filt.size()){
2703
2704 //              Now generate the predicates.
2705         for(w=cheap_rpos;w<r_filt.size();++w){
2706                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
2707                 ret += tmpstr;
2708
2709 //                      Find partial fcns ref'd in this cnf element
2710                 set<int> pfcn_refs;
2711                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2712 //                      Since set<..> is a "Sorted Associative Container",
2713 //                      we can walk through it in sorted order by walking from
2714 //                      begin() to end().  (and the partial fcns must be
2715 //                      evaluated in this order).
2716                 set<int>::iterator si;
2717                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2718                         if(fcn_ref_cnt[(*si)] > 1){
2719                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2720                         }
2721                         if(is_partial_fcn[(*si)]){
2722                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2723                                 ret += "\t\tif(retval) goto end;\n";
2724                         }
2725                         if(fcn_ref_cnt[(*si)] > 1){
2726                                 if(!is_partial_fcn[(*si)]){
2727                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2728                                 }
2729                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2730                                 ret += "\t}\n";
2731                         }
2732                 }
2733
2734                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2735                                 ") ) goto end;\n";
2736         }
2737   }else{
2738           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2739   }
2740
2741
2742
2743 ///////////////                 post the tuple
2744
2745 //                      test passed : create the tuple, then assign to it.
2746           ret += "/*\t\tCreate and post the tuple\t*/\n";
2747
2748 //              Unpack r_filt fields
2749         for(s=0;s<sl_list.size();++s){
2750                 col_id_set this_se_cids;
2751                 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
2752                 for(csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
2753                         if(unpacked_cids.count( (*csi) ) == 0){
2754                                 int tblref = (*csi).tblvar_ref;
2755                                 int schref = (*csi).schema_ref;
2756                                 string field = (*csi).field;
2757                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2758                                 unpacked_cids.insert( (*csi) );
2759                         }
2760                 }
2761         }
2762
2763
2764 //                      Unpack partial fcns ref'd by the select clause.
2765 //                      Its a kind of a WHERE clause ...
2766   for(p=sl_fcns_start;p<sl_fcns_end;p++){
2767         if(fcn_ref_cnt[p] > 1){
2768                 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2769         }
2770         if(is_partial_fcn[p]){
2771                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2772                 ret += "\tif(retval) goto end;\n";
2773         }
2774         if(fcn_ref_cnt[p] > 1){
2775                 if(!is_partial_fcn[p]){
2776                         ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2777                 }
2778                 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2779                 ret += "\t}\n";
2780         }
2781   }
2782
2783   // increment the counter of accepted tuples
2784   ret += "\n\t#ifdef LFTA_STATS\n";
2785   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2786   ret += "\t#endif\n\n";
2787
2788 //                      First, compute the size of the tuple.
2789
2790 //                      Unpack any BUFFER type selections into temporaries
2791 //                      so that I can compute their size and not have
2792 //                      to recompute their value during tuple packing.
2793 //                      I can use regular assignment here because
2794 //                      these temporaries are non-persistent.
2795
2796           for(s=0;s<sl_list.size();s++){
2797                 data_type *sdt = sl_list[s]->get_data_type();
2798                 if(sdt->is_buffer_type()){
2799                         sprintf(tmpstr,"\tselvar_%d = ",s);
2800                         ret += tmpstr;
2801                         ret += generate_se_code(sl_list[s],schema);
2802                         ret += ";\n";
2803                 }
2804           }
2805
2806
2807 //              The size of the tuple is the size of the tuple struct plus the
2808 //              size of the buffers to be copied in.
2809
2810           ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
2811           for(s=0;s<sl_list.size();s++){
2812                 data_type *sdt = sl_list[s]->get_data_type();
2813                 if(sdt->is_buffer_type()){
2814                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
2815                         ret += tmpstr;
2816                 }
2817           }
2818           ret += ";\n";
2819
2820
2821           ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
2822           ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
2823
2824 //                      Test passed, make assignments to the tuple.
2825
2826           ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
2827
2828 //                      Mark tuple as REGULAR_TUPLE
2829           ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
2830
2831
2832           for(s=0;s<sl_list.size();s++){
2833                 data_type *sdt = sl_list[s]->get_data_type();
2834                 if(sdt->is_buffer_type()){
2835                         sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
2836                         ret += tmpstr;
2837                         sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
2838                         ret += tmpstr;
2839                 }else{
2840                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2841                         ret += tmpstr;
2842 //                      if(sdt->needs_hn_translation())
2843 //                              ret += sdt->hton_translation() +"( ";
2844                         ret += generate_se_code(sl_list[s],schema);
2845 //                      if(sdt->needs_hn_translation())
2846 //                              ret += ") ";
2847                         ret += ";\n";
2848                 }
2849           }
2850
2851 //              Generate output.
2852
2853           ret += "\tpost_tuple(tuple);\n";
2854
2855 //              Increment the counter of posted tuples
2856   ret += "\n\t#ifdef LFTA_STATS\n";
2857   ret += "\n\tt->out_tuple_cnt++;\n\n";
2858   ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
2859   ret += "\t#endif\n\n";
2860
2861
2862         return ret;
2863 }
2864
2865 string generate_aggr_accept_body(qp_node *fs,string node_name,table_list *schema, string &temporal_flush){
2866         string ret;
2867         int a,p,g;
2868
2869 //////////////          Processing for aggregtion query
2870
2871 //              First, search for a match.  Start by unpacking the group-by attributes.
2872
2873 //                      One complication : if a real-time aggregate flush occurs,
2874 //                      the GB attr has already been calculated.  So don't compute
2875 //                      it again if 1) its temporal and 2) it will be computed in the
2876 //                      agggregate flush code.
2877
2878 //              Unpack the partial fcns ref'd by the gb's and the aggr defs.
2879   for(p=gb_fcns_start;p<gb_fcns_end;p++){
2880     if(is_partial_fcn[p]){
2881                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2882                 ret += "\tif(retval) goto end;\n";
2883         }
2884   }
2885   for(p=ag_fcns_start;p<ag_fcns_end;p++){
2886     if(is_partial_fcn[p]){
2887                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2888                 ret += "\tif(retval) goto end;\n";
2889         }
2890   }
2891
2892   // increment the counter of accepted tuples
2893   ret += "\n\t#ifdef LFTA_STATS\n";
2894   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2895   ret += "\t#endif\n\n";
2896
2897   ret += "/*\t\tTest if the group is in the hash table \t*/\n";
2898 //                      Compute the values of the group-by variables.
2899   for(g=0;g<gb_tbl->size();g++){
2900           data_type *gdt = gb_tbl->get_data_type(g);
2901           if((! gdt->is_temporal()) || temporal_flush == ""){
2902
2903                   if(gdt->is_buffer_type()){
2904         //                              NOTE : if the SE defining the gb is anything
2905         //                              other than a ref to a variable, this will generate
2906         //                              illegal code.  To be resolved with Spatch.
2907                         sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2908                                 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2909                         ret += tmpstr;
2910                         sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2911                                 gdt->get_buffer_assign_copy().c_str(), g, g);
2912                   }else{
2913                         sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2914                   }
2915                   ret += tmpstr;
2916           }
2917   }
2918   ret += "\n";
2919
2920 //                      A quick aside : if any of the GB attrs are temporal,
2921 //                      test for change and flush if any change occurred.
2922 //                      We've already computed the flush code,
2923 //                      Put it here if this is not a real time query.
2924 //                      We've already unpacked all column refs, so no need to
2925 //                      do it again here.
2926
2927         string rt_level = fs->get_val_of_def("real_time");
2928         if(rt_level == "" && temporal_flush != ""){
2929                 ret += temporal_flush;
2930         }
2931
2932 //                      Compute the hash bucket
2933         if(gb_tbl->size() > 0){
2934                 ret += "\thashval = ";\
2935                 for(g=0;g<gb_tbl->size();g++){
2936                   if(g>0) ret += " ^ ";
2937                   data_type *gdt = gb_tbl->get_data_type(g);
2938                   if(gdt->is_buffer_type()){
2939                         sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
2940                                 gdt->get_type_str().c_str(), g);
2941                   }else{
2942                         sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
2943                                 gdt->get_type_str().c_str(), g);
2944                   }
2945                   ret += tmpstr;
2946                 }
2947                 ret += ";\n";
2948                 ret += "\thash2 = ((hashval * "+hash_nums[g%NRANDS]+") >> 32) & SLOT_HASH_BITS;\n";
2949         ret+="\tprobe = (hashval >> 32) & (t->max_aggrs-1);\n";
2950         }else{
2951                 ret+="\tprobe = 0;\n";
2952                 ret+="\thash2 = 0;\n\n";
2953         }
2954
2955 //              Does the lfta reference a udaf?
2956           bool has_udaf = false;
2957           for(a=0;a<aggr_tbl->size();a++){
2958                   if(! aggr_tbl->is_builtin(a)) has_udaf = true;
2959           }
2960
2961 //              Scan for a match, or alternatively the best slot.
2962 //              Currently, hardcode 5 tests.
2963         ret +=
2964 "       gen_val = t->generation & SLOT_GEN_BITS;\n"
2965 "       match_found = 0;\n"
2966 "       best_slot = probe;\n"
2967 "       for(i=0;i<5 && match_found == 0;i++){\n"
2968 "               if((t->aggr_table_hashmap[probe] & SLOT_FILLED) && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_HASH_BITS) == hash2 ){\n"
2969 ;
2970         if(gb_tbl->size()>0){
2971                 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
2972                 ret+="\t\tif(";
2973                 string rhs_op, lhs_op;
2974                 for(g=0;g<gb_tbl->size();g++){
2975                   if(g>0) ret += " && ";
2976                   ret += "(";
2977                   sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
2978                   sprintf(tmpstr,"t->aggr_table[probe].gb_var%d",g); rhs_op = tmpstr;
2979                   ret += generate_equality_test(lhs_op,rhs_op,gb_tbl->get_data_type(g));
2980                   ret += ")";
2981                 }
2982          }
2983          ret += "){\n"
2984 "                       match_found = 1;\n"
2985 "                       best_slot = probe;\n"
2986 "               }\n"
2987 "       }\n"
2988 "//             Rate slots in case no match found: prefer empty, then full but old slots\n"
2989 "       if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
2990 "               if((t->aggr_table_hashmap[probe] & SLOT_FILLED)==0)\n"
2991 "                       best_slot = probe;\n"
2992 "               }else{\n"
2993 "                       if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) != gen_val){\n"
2994 "                               best_slot = probe;\n"
2995 "                       }\n"
2996 "               }\n"
2997 "               probe++;\n"
2998 "               if(probe >= t->max_aggrs)\n"
2999 "                       probe=0;\n"
3000 "       }\n"
3001 "       if(match_found){\n"
3002 ;
3003         ret += generate_gb_update(node_name, schema, "best_slot",has_udaf);
3004         ret +=
3005 "       }else{\n"
3006 "               if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3007 ;
3008 printf("sgah_qpn name is %s, disorder is %d\n",fs->node_name.c_str(),((sgah_qpn *)fs)->lfta_disorder);
3009         if(((sgah_qpn *)fs)->lfta_disorder <= 1){
3010                 ret +=
3011 "                       if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS)==gen_val){\n"
3012 "                               if((";
3013                 bool first_g = true;
3014                 for(int g=0;g<gb_tbl->size();g++){
3015                         data_type *gdt = gb_tbl->get_data_type(g);
3016                         if(gdt->is_temporal()){
3017                                 if(first_g) first_g = false; else ret+=" + ";
3018                                 ret += "(gb_attr_"+int_to_string(g)+" - t->aggr_table[best_slot].gb_var"+int_to_string(g)+")";
3019                         }
3020                 }
3021                 ret += ") == 0 ){\n";
3022
3023                 ret +=
3024 "                                       fta_aggr_flush_old_"+ node_name+"(f,t->max_aggrs);\n"
3025 "                               }\n"
3026 "                       }\n"
3027 ;
3028         }
3029
3030         ret += generate_tuple_from_aggr(node_name,schema,"best_slot");
3031         ret +=
3032 "\t\t\t#ifdef LFTA_STATS\n"
3033 "\t\t\tif((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val)\n"
3034 "\t\t\t\tt->collision_cnt++;\n\n"
3035 "\t\t\t#endif\n\n"
3036 "\t\t}\n"
3037 ;
3038         ret += generate_init_group(schema,"best_slot");
3039
3040
3041           ret += "\t}\n";
3042
3043         return ret;
3044 }
3045
3046
3047
3048 string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, set<unsigned int> &s_pids){
3049
3050         string ret="static gs_retval_t accept_packet_"+node_name+
3051                 "(struct FTA *f, FTAID * ftaid, void *pkt, gs_int32_t sz){\n";
3052     ret += "\tstruct packet *p = (struct packet *)pkt;\n";
3053
3054   int a;
3055
3056 //                      Define all of the variables needed by this
3057 //                      procedure.
3058
3059
3060 //                      Gather all column references, need to define unpacking variables.
3061   int w,s;
3062   col_id_set cid_set;
3063   col_id_set::iterator csi;
3064
3065 //              If its a filter join, rebind all colrefs
3066 //              to the first range var, to avoid double unpacking.
3067
3068   if(is_fj){
3069     for(w=0;w<where.size();++w)
3070                 reset_pr_col_ids_tblvars(where[w]->pr, gb_tbl);
3071     for(s=0;s<sl_list.size();s++)
3072                 reset_se_col_ids_tblvars(sl_list[s], gb_tbl);
3073   }
3074
3075   for(w=0;w<where.size();++w){
3076         if(is_fj || s_pids.count(w) == 0)
3077                 gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
3078         }
3079   for(s=0;s<sl_list.size();s++){
3080         gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
3081   }
3082
3083   int g;
3084   if(gb_tbl != NULL){
3085         for(g=0;g<gb_tbl->size();g++)
3086           gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
3087   }
3088
3089   //                    Variables for unpacking attributes.
3090   ret += "/*\t\tVariables for unpacking attributes\t*/\n";
3091   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
3092     int schref = (*csi).schema_ref;
3093         int tblref = (*csi).tblvar_ref;
3094     string field = (*csi).field;
3095     data_type dt(schema->get_type_name(schref,field));
3096     sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
3097         field.c_str(), tblref);
3098     ret += tmpstr;
3099   }
3100
3101   ret += "\n\n";
3102
3103 //                      Variables that are always needed
3104   ret += "/*\t\tVariables which are always needed\t*/\n";
3105   ret += "\tgs_retval_t retval;\n";
3106   ret += "\tgs_int32_t tuple_size, tuple_pos, lfta_bailout;\n";
3107   ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
3108
3109   ret+="\tstruct "+generate_fta_name(node_name)+" *t = (struct "+generate_fta_name(node_name)+"*) f;\n\n";
3110
3111
3112 //                      Variables needed for aggregation queries.
3113   if(is_aggr_query){
3114           ret += "\n/*\t\tVariables for aggregation\t*/\n";
3115           ret+="\tunsigned int i, probe;\n";
3116           ret+="\tunsigned int gen_val, match_found, best_slot;\n";
3117           ret+="\tgs_uint64_t hashval, hash2;\n";
3118 //                      Variables for storing group-by attribute values.
3119           if(gb_tbl->size() > 0)
3120                 ret += "/*\t\tGroup-by attributes\t*/\n";
3121           for(g=0;g<gb_tbl->size();g++){
3122                 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3123                 ret += tmpstr;
3124                 data_type *gdt = gb_tbl->get_data_type(g);
3125                 if(gdt->is_buffer_type()){
3126                   sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3127                   ret += tmpstr;
3128                 }
3129           }
3130           ret += "\n";
3131 //                      Temporaries for min/max
3132           string aggr_tmp_str = "";
3133           for(a=0;a<aggr_tbl->size();a++){
3134                 string aggr_op = aggr_tbl->get_op(a);
3135                 if(aggr_op == "MIN" || aggr_op == "MAX"){
3136                         sprintf(tmpstr,"\t%s aggr_tmp_%d;\n",aggr_tbl->get_data_type(a)->get_cvar_type().c_str(),a);
3137                         aggr_tmp_str.append(tmpstr);
3138                 }
3139           }
3140           if(aggr_tmp_str != ""){
3141                 ret += "/*\t\tTemp vars for BUFFER aggregates\t*/\n";
3142                 ret += aggr_tmp_str;
3143                 ret += "\n";
3144           }
3145 //              Variables for udaf output temporaries
3146         bool no_udaf = true;
3147         for(a=0;a<aggr_tbl->size();a++){
3148                 if(! aggr_tbl->is_builtin(a)){
3149                         if(no_udaf){
3150                                 ret+="/*\t\tUDAF output vars.\t*/\n";
3151                                 no_udaf = false;
3152                         }
3153                         int afcn_id = aggr_tbl->get_fcn_id(a);
3154                         data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
3155                         sprintf(tmpstr,"udaf_ret%d", a);
3156                         ret+="\t"+adt->make_cvar(tmpstr)+";\n";
3157                 }
3158         }
3159   }
3160
3161 //                      Variables needed for a filter join query
3162   if(fs->node_type() == "filter_join"){
3163         filter_join_qpn *fjq = (filter_join_qpn *)fs;
3164         bool uses_bloom = fjq->use_bloom;
3165         ret += "/*\t\tJoin fields\t*/\n";
3166         for(g=0;g<fjq->hash_eq.size();g++){
3167                 sprintf(tmpstr,"\t%s s_equijoin_%d, r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g,g);
3168                 ret += tmpstr;
3169           }
3170         if(uses_bloom){
3171                 ret +=
3172 "  /*           Variables for fj bloom filter   */ \n"
3173 "\tunsigned int i=0,j=0,k=0, b, bf_clean = 0, tmp_i, found; \n"
3174 "\tunsigned int bucket, bucket0, bucket1, bucket2;\n"
3175 "\tlong long int curr_fj_ts;\n"
3176 "\tunsigned int curr_bin, the_bin;\n"
3177 "\n"
3178 ;
3179         }else{
3180                 ret +=
3181 "  /*           Variables for fj join table     */ \n"
3182 "\tunsigned int i, bucket, found; \n"
3183 "\tunsigned int bucket1, the_bucket;\n"
3184 "       long long int curr_fj_ts;\n"
3185 "\n"
3186 ;
3187         }
3188   }
3189
3190
3191 //              Variables needed to store selected attributes of BUFFER type
3192 //              temporarily, in order to compute their size for storage
3193 //              in an output tuple.
3194
3195   string select_var_defs = "";
3196   for(s=0;s<sl_list.size();s++){
3197         data_type *sdt = sl_list[s]->get_data_type();
3198         if(sdt->is_buffer_type()){
3199           sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
3200           select_var_defs.append(tmpstr);
3201         }
3202   }
3203   if(select_var_defs != ""){
3204         ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
3205     ret += select_var_defs;
3206   }
3207
3208 //              Variables to store results of partial functions.
3209   int p;
3210   if(partial_fcns.size()>0){
3211           ret += "/*\t\tVariables for storing results of partial functions. \t*/\n";
3212           for(p=0;p<partial_fcns.size();++p){
3213                 if(is_partial_fcn[p] || (!is_aggr_query && fcn_ref_cnt[p] >1)){
3214                   sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
3215                     partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
3216                   ret += tmpstr;
3217                   if(!is_aggr_query && fcn_ref_cnt[p] >1){
3218                         ret += "\tint fcn_ref_cnt_"+int_to_string(p)+" = 0;\n";
3219                   }
3220                 }
3221           }
3222
3223           if(is_aggr_query) ret += "\tint unpack_failed = 0;\n";
3224           ret += "\n";
3225   }
3226
3227 //              variable to hold packet struct  //
3228         if(packed_return){
3229                 ret += "\t struct "+node_name+"_input_struct *"+node_name+"_input_struct_var;\n";
3230         }
3231
3232
3233   ret += "\t#ifdef LFTA_STATS\n";
3234 // variable to store counter of cpu cycles spend in accept_tuple
3235         ret += "\tgs_uint64_t start_cycle = rdtsc();\n";
3236 // increment counter of received tuples
3237         ret += "\tt->in_tuple_cnt++;\n";
3238   ret += "\t#endif\n";
3239
3240
3241 //      -------------------------------------------------
3242 //              If the packet is "packet", test if its for this lfta,
3243 //              and if so load it into its struct
3244
3245         if(packed_return){
3246                 ret+="\n/*  packed tuple : test and load. \t*/\n";
3247                 ret+="\t"+node_name+"_input_struct_var = (struct "+node_name+"_input_struct *) pkt;\n";
3248                 ret+="\tif("+node_name+"_input_struct_var->__lfta_id_fm_nic__ != "+int_to_string(global_id) + ")\n";
3249                 ret+="\t\tgoto end;\n\n";
3250         }
3251
3252
3253
3254   col_id_set unpacked_cids;     //      Keep track of the cols that have been unpacked.
3255
3256   string temporal_flush;
3257   if(is_aggr_query)
3258         ret += generate_aggr_accept_prelim(fs, node_name, schema, unpacked_cids, temporal_flush);
3259   else {        // non-aggregation operators
3260
3261 // Unpack all the temporal attributes referenced in select clause
3262 // and update the last value of the attribute
3263         col_id_set temp_cids;           //      col ids of temp attributes in select clause
3264
3265         for(s=0;s<sl_list.size();s++){
3266                 data_type *sdt = sl_list[s]->get_data_type();
3267                 if (sdt->is_temporal()) {
3268                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
3269                 }
3270         }
3271 //                      If this is a filter join,
3272 //                      ensure that the temporal range field is unpacked.
3273         if(is_fj){
3274                 col_id window_var_cid(((filter_join_qpn *)fs)->temporal_var);
3275                 if(temp_cids.count(window_var_cid)==0)
3276                         temp_cids.insert(window_var_cid);
3277         }
3278
3279         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
3280                 if(unpacked_cids.count((*csi)) == 0){
3281                         int tblref = (*csi).tblvar_ref;
3282                         int schref = (*csi).schema_ref;
3283                         string field = (*csi).field;
3284                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3285                         sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
3286                         ret += tmpstr;
3287
3288                         unpacked_cids.insert( (*csi) );
3289                 }
3290         }
3291
3292   }
3293
3294   vector<cnf_elem *> filter = fs->get_filter_clause();
3295 //              Test the filter predicate (some query types have additional preds).
3296   if(filter.size() > 0){
3297
3298 //              Sort by evaluation cost.
3299 //              First, estimate evaluation costs
3300 //              Eliminate predicates covered by the prefilter (those in s_pids).
3301 //              I need to do it before the sort becuase the indices refer
3302 //              to the position in the unsorted list./
3303         vector<cnf_elem *> tmp_wh;
3304         for(w=0;w<filter.size();++w){
3305                 if(s_pids.count(w) == 0){
3306                         compute_cnf_cost(filter[w],Ext_fcns);
3307                         tmp_wh.push_back(filter[w]);
3308                 }
3309         }
3310         filter = tmp_wh;
3311
3312         sort(filter.begin(), filter.end(), compare_cnf_cost());
3313
3314 //              Now generate the predicates.
3315         for(w=0;w<filter.size();++w){
3316                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,filter[w]->cost);
3317                 ret += tmpstr;
3318 //                      Find the set of variables accessed in this CNF elem,
3319 //                      but in no previous element.
3320                 col_id_set this_pred_cids;
3321                 gather_pr_col_ids(filter[w]->pr, this_pred_cids, gb_tbl);
3322                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3323                         if(unpacked_cids.count( (*csi) ) == 0){
3324                         int tblref = (*csi).tblvar_ref;
3325                         int schref = (*csi).schema_ref;
3326                         string field = (*csi).field;
3327                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3328                                 unpacked_cids.insert( (*csi) );
3329                         }
3330                 }
3331 //                      Find partial fcns ref'd in this cnf element
3332                 set<int> pfcn_refs;
3333                 collect_partial_fcns_pr(filter[w]->pr, pfcn_refs);
3334 //                      Since set<..> is a "Sorted Associative Container",
3335 //                      we can walk through it in sorted order by walking from
3336 //                      begin() to end().  (and the partial fcns must be
3337 //                      evaluated in this order).
3338                 set<int>::iterator si;
3339                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3340                         if(fcn_ref_cnt[(*si)] > 1){
3341                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3342                         }
3343                         if(is_partial_fcn[(*si)]){
3344                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3345                                 ret += "\t\tif(retval) goto end;\n";
3346                         }
3347                         if(fcn_ref_cnt[(*si)] > 1){
3348                                 if(!is_partial_fcn[(*si)]){
3349                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3350                                 }
3351                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3352                                 ret += "\t}\n";
3353                         }
3354                 }
3355
3356                 ret += "\tif( !("+generate_predicate_code(filter[w]->pr,schema)+
3357                                 ") ) goto end;\n";
3358         }
3359   }else{
3360           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
3361   }
3362
3363
3364 //                      We've passed the WHERE clause,
3365 //                      unpack the remainder of the accessed fields.
3366   if(is_fj){
3367         ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
3368         vector<cnf_elem *> h_eq = ((filter_join_qpn *)fs)-> hash_eq;
3369                 for(w=0;w<h_eq.size();++w){
3370                 col_id_set this_pred_cids;
3371                 gather_pr_col_ids(h_eq[w]->pr, this_pred_cids, gb_tbl);
3372                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3373                         if(unpacked_cids.count( (*csi) ) == 0){
3374                                 int tblref = (*csi).tblvar_ref;
3375                                 int schref = (*csi).schema_ref;
3376                                 string field = (*csi).field;
3377                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3378                                 unpacked_cids.insert( (*csi) );
3379                         }
3380                 }
3381         }
3382   }else{
3383           ret += "\n/*\tPassed the WHERE clause, unpack the rest of the accessed fields. */\n";
3384
3385           for(csi=cid_set.begin();csi!=cid_set.end();++csi){
3386                 if(unpacked_cids.count( (*csi) ) == 0){
3387                         int schref = (*csi).schema_ref;
3388                         int tblref = (*csi).tblvar_ref;
3389                         string field = (*csi).field;
3390                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3391                         unpacked_cids.insert( (*csi) );
3392                 }
3393           }
3394   }
3395
3396
3397 //////////////////
3398 //////////////////      After this, the query types
3399 //////////////////      are processed differently.
3400
3401   if(!is_aggr_query && !is_fj)
3402         ret += generate_sel_accept_body(fs, node_name, schema);
3403   else if(is_aggr_query)
3404         ret += generate_aggr_accept_body(fs, node_name, schema, temporal_flush);
3405   else
3406         ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
3407
3408
3409 //              Finish up.
3410
3411    ret += "\n\tend:\n";
3412   ret += "\t#ifdef LFTA_STATS\n";
3413         ret+= "\tt->cycle_cnt += rdtsc() - start_cycle;\n";
3414   ret += "\t#endif\n";
3415    ret += "\n\treturn 1;\n}\n\n";
3416
3417         return(ret);
3418 }
3419
3420
3421 string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, bool is_aggr_query, bool is_fj, bool uses_bloom){
3422         int g, cl;
3423
3424         string ret = "struct FTA * "+generate_alloc_name(node_name) +
3425            "(struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command,  gs_int32_t sz, void *value){\n";
3426
3427         ret+="\tstruct "+generate_fta_name(node_name)+"* f;\n";
3428         ret+="\tint i;\n";
3429         ret += "\n";
3430         ret+="\tif((f=fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n";
3431
3432 //                              assign a streamid to fta instance
3433         ret+="\t/* assign a streamid */\n";
3434         ret+="\tf->f.ftaid = ftaid;\n";
3435         ret+="\tf->f.ftaid.streamid = (gs_p_t)f;\n";
3436         ret+="\tgslog(LOG_INFO,\"Lfta "+node_name+" has FTAID {ip=%u,port=%u,index=%u,streamid=%u}\\n\",f->f.ftaid.ip,f->f.ftaid.port,f->f.ftaid.index,f->f.ftaid.streamid);\n";
3437
3438         if(is_aggr_query){
3439                 ret += "\tf->n_aggrs = 0;\n";
3440
3441                 ret += "\tf->max_aggrs = ";
3442
3443 //                              Computing the number of aggregate blocks is a little
3444 //                              tricky.  If there are no GB attrs, or if all GB attrs
3445 //                              are temporal, then use a single aggregate block, else
3446 //                              use a default value (10).  A user specification overrides
3447 //                              this logic.
3448                 bool single_group = true;
3449                 for(g=0;g<gb_tbl->size();g++){
3450                         data_type *gdt = gb_tbl->get_data_type(g);
3451                         if(! gdt->is_temporal() ){
3452                                 single_group = false;
3453                         }
3454                 }
3455                 string max_aggr_str = fs->get_val_of_def("aggregate_slots");
3456                 int max_aggr_i = atoi(max_aggr_str.c_str());
3457                 if(max_aggr_i <= 0){
3458                         if(single_group)
3459                                 ret += "2";
3460                         else
3461                                 ret += int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
3462                 }else{
3463                         unsigned int naggrs = 1;                // make it power of 2
3464                         unsigned int nones = 0;
3465                         while(max_aggr_i){
3466                                 if(max_aggr_i&1)
3467                                         nones++;
3468                                 naggrs = naggrs << 1;
3469                                 max_aggr_i = max_aggr_i >> 1;
3470                         }
3471                         if(nones==1)            // in case it was already a power of 2.
3472                                 naggrs/=2;
3473                         ret += int_to_string(naggrs);
3474                 }
3475                 ret += ";\n";
3476
3477                 ret+="\tif ((f->aggr_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n";
3478                 ret+="\t\treturn(0);\n";
3479                 ret+="\t}\n\n";
3480 //              ret+="/* compute how many integers we need to store the hashmap */\n";
3481 //              ret+="\tf->bitmap_size = (f->max_aggrs % (sizeof(gs_uint32_t) * 4)) ? (f->max_aggrs / (sizeof(gs_uint32_t) * 4) + 1) : (f->max_aggrs / (sizeof(gs_uint32_t) * 4));\n\n";
3482                 ret+="\tif ((f->aggr_table_hashmap = sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n";
3483                 ret+="\t\treturn(0);\n";
3484                 ret+="\t}\n";
3485                 ret+="/*\t\tfill bitmap with zero \t*/\n";
3486                 ret+="\tfor (i = 0; i < f->max_aggrs; ++i)\n";
3487                 ret+="\t\tf->aggr_table_hashmap[i] = 0;\n";
3488                 ret+="\tf->generation=0;\n";
3489                 ret+="\tf->flush_pos = f->max_aggrs;\n";
3490
3491                 ret += "\tf->flush_ctr = 0;\n";
3492
3493         }
3494
3495         if(is_fj){
3496                 if(uses_bloom){
3497                         ret+="\tf->first_exec = 1;\n";
3498                         unsigned int n_bloom = 11;
3499                         string n_bloom_str = fs->get_val_of_def("num_bloom");
3500                         int tmp_n_bloom = atoi(n_bloom_str.c_str());
3501                         if(tmp_n_bloom>0)
3502                                 n_bloom = tmp_n_bloom+1;
3503
3504                         unsigned int window_len = ((filter_join_qpn *)fs)->temporal_range;
3505                         if(window_len < n_bloom){
3506                                 n_bloom = window_len+1;
3507                         }
3508
3509                         int bf_exp_size = 12;  // base-2 log of number of bits
3510                         string bloom_len_str = fs->get_val_of_def("bloom_size");
3511                         int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
3512                         if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
3513                                 bf_exp_size = tmp_bf_exp_size;
3514                         }
3515                         int bf_bit_size = 1 << 12;
3516                         int bf_byte_size = bf_bit_size / (8*sizeof(char));
3517
3518                         int bf_tot = n_bloom*bf_byte_size;
3519                         ret+="\tif ((f->bf_table = sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n";
3520                         ret+="\t\treturn(0);\n";
3521                         ret+="\t}\n";
3522                         ret +=
3523 "       for(i=0;i<"+int_to_string(bf_tot)+";i++)\n"
3524 "               f->bf_table[i] = 0;\n"
3525 ;
3526                 }else{
3527                         unsigned int ht_size = 4096;
3528                         string ht_size_s = fs->get_val_of_def("aggregate_slots");
3529                         int tmp_ht_size = atoi(ht_size_s.c_str());
3530                         if(tmp_ht_size > 1024){
3531                                 unsigned int hs = 1;            // make it power of 2
3532                                 while(tmp_ht_size){
3533                                         hs =hs << 1;
3534                                         tmp_ht_size = tmp_ht_size >> 1;
3535                                 }
3536                                 ht_size = hs;
3537                         }
3538                         ret+="\tif ((f->join_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n";
3539                         ret+="\t\treturn(0);\n";
3540                         ret+="\t}\n\n";
3541                         ret +=
3542 "       for(i=0;i<"+int_to_string(ht_size)+";i++)\n"
3543 "               f->join_table[i].ts = 0;\n"
3544 ;
3545                 }
3546         }
3547
3548 //                      Initialize the complex literals (which might be handles).
3549
3550         for(cl=0;cl<complex_literals->size();cl++){
3551                 literal_t *l = complex_literals->get_literal(cl);
3552 //              sprintf(tmpstr,"\tf->complex_literal_%d = ",cl);
3553 //              ret += tmpstr + l->to_C_code() + ";\n";
3554                 sprintf(tmpstr,"&(f->complex_literal_%d)",cl);
3555                 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
3556         }
3557
3558         ret += "\n";
3559
3560 //                      Initialize the last seen values of temporal attributes to min(max) value of
3561 //                      their respective type
3562 //                      Create places to hold the last values of temporal attributes referenced in select clause
3563
3564
3565         col_id_set temp_cids;           //      col ids of temp attributes in select clause
3566
3567         int s;
3568         col_id_set::iterator csi;
3569
3570         for(s=0;s<sl_list.size();s++){
3571                 data_type *sdt = sl_list[s]->get_data_type();
3572                 if (sdt->is_temporal()) {
3573                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
3574                 }
3575         }
3576
3577         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
3578                 int tblref = (*csi).tblvar_ref;
3579                 int schref = (*csi).schema_ref;
3580                 string field = (*csi).field;
3581                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
3582                 if (dt.is_increasing()) {
3583                         sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_min_literal().c_str());
3584                         ret += tmpstr;
3585                 } else if (dt.is_decreasing()) {
3586                         sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_max_literal().c_str());
3587                         ret += tmpstr;
3588                 }
3589         }
3590
3591 //      initialize last seen values of temporal groubpy variables
3592         if(is_aggr_query){
3593                 for(g=0;g<gb_tbl->size();g++){
3594                         data_type *dt = gb_tbl->get_data_type(g);
3595                         if(dt->is_temporal()){
3596 /*
3597                                 fprintf(stderr,"group by attribute %s is temporal, ",
3598                                                 gb_tbl->get_name(g).c_str());
3599 */
3600                                 if(dt->is_increasing()){
3601                                         sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_min_literal().c_str());
3602                                 }else{
3603                                         sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_max_literal().c_str());
3604                                 }
3605                                 ret += tmpstr;
3606                         }
3607                 }
3608         }
3609
3610         ret += "\tf->f.alloc_fta="+generate_alloc_name(node_name)+";\n";
3611         ret+="\tf->f.free_fta=free_fta_"+node_name+";\n";
3612         ret+="\tf->f.control_fta=control_fta_"+node_name+";\n";
3613         ret+="\tf->f.accept_packet=accept_packet_"+node_name+";\n";
3614         ret+="\tf->f.clock_fta=clock_fta_"+node_name+";\n\n";
3615
3616 //                      Initialize runtime stats
3617         ret+="\tf->in_tuple_cnt = 0;\n";
3618         ret+="\tf->out_tuple_cnt = 0;\n";
3619         ret+="\tf->out_tuple_sz = 0;\n";
3620         ret+="\tf->accepted_tuple_cnt = 0;\n";
3621         ret+="\tf->cycle_cnt = 0;\n";
3622         ret+="\tf->collision_cnt = 0;\n";
3623         ret+="\tf->eviction_cnt = 0;\n";
3624         ret+="\tf->sampling_rate = 1.0;\n";
3625
3626         ret+="\tf->trace_id = 0;\n\n";
3627     if(param_tbl->size() > 0){
3628         ret+=
3629 "\tif(load_params_"+node_name+"(f, sz, value, 1)){\n"
3630 "#ifndef LFTA_IN_NIC\n"
3631 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small (%d). This query does not have valid parameters, bailing out.\\n\",sz);\n"
3632 "#else\n"
3633 "\t\t}\n"
3634 "#endif\n"
3635 "\t\t\treturn 0;\n"
3636 "\t\t}\n";
3637         }
3638
3639 //                      Register the pass-by-handle parameters
3640     int ph;
3641     for(ph=0;ph<param_handle_table.size();++ph){
3642                 data_type pdt(param_handle_table[ph]->type_name);
3643                 sprintf(tmpstr,"\tf->handle_param_%d = %s((struct FTA *)f,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
3644                 switch(param_handle_table[ph]->val_type){
3645                 case cplx_lit_e:
3646                         ret += tmpstr;
3647                         if(pdt.is_buffer_type()) ret += "&(";
3648                         sprintf(tmpstr,"f->complex_literal_%d",param_handle_table[ph]->complex_literal_idx);
3649                         ret += tmpstr ;
3650                         if(pdt.is_buffer_type()) ret += ")";
3651                         ret +=  ");\n";
3652                         break;
3653                 case litval_e:
3654 //                              not complex, no constructor
3655                         ret += tmpstr;
3656                         ret += param_handle_table[ph]->litval->to_C_code("") + ");\n";
3657                         break;
3658                 case param_e:
3659 //                              query parameter handles are regstered/deregistered in the
3660 //                              load_params function.
3661 //                      ret += "t->param_"+param_handle_table[ph]->param_name;
3662                         break;
3663                 default:
3664                         fprintf(stderr, "INTERNAL ERROR unknown case found when processing pass-by-handle parameter table.\n");
3665                         exit(1);
3666                 }
3667         }
3668
3669         ret += "\treturn (struct FTA *) f;\n";
3670         ret += "}\n\n";
3671
3672         return(ret);
3673 }
3674
3675
3676
3677
3678 //////////////////////////////////////////////////////////////////
3679
3680 string generate_lfta_block(qp_node *fs, table_list *schema, int gid,
3681 //              map<string,string> &int_fcn_defs,
3682                 ext_fcn_list *Ext_fcns, string &schema_embed_str, ifq_t *ifdb, nic_property *nicp, set<unsigned int> &s_pids){
3683         bool is_aggr_query;
3684         int s,p,g;
3685         string retval;
3686
3687 /////////////////////////////////////////////////////////////
3688 ///             Do operator-generic processing, such as
3689 ///             gathering the set of referenced columns,
3690 ///             generating structures, etc.
3691
3692 //              Initialize globals to empty.
3693         gb_tbl = NULL; aggr_tbl = NULL;
3694         global_id = -1; nicprop = NULL;
3695         param_tbl = fs->get_param_tbl();
3696         sl_list.clear(); where.clear();
3697         partial_fcns.clear();
3698         fcn_ref_cnt.clear(); is_partial_fcn.clear();
3699         pred_class.clear(); pred_pos.clear();
3700         sl_fcns_start=sl_fcns_end=wh_fcns_start=wh_fcns_end=0;
3701         gb_fcns_start=gb_fcns_end=ag_fcns_start=ag_fcns_end=0;
3702
3703
3704 //              Does the lfta read packed results from the NIC?
3705         nicprop = nicp;                 // load into global
3706         global_id = gid;
3707     packed_return = false;
3708         if(nicp && nicp->option_exists("Return")){
3709                 if(nicp->option_value("Return") == "Packed"){
3710                         packed_return = true;
3711                 }else{
3712                         fprintf(stderr,"Warning, nic option value of Return=%s is not recognized, ignoring\n",nicp->option_value("Return").c_str());
3713                 }
3714         }
3715
3716
3717 //                      Extract data which defines the query.
3718 //                              complex literals gathered now.
3719         complex_literals = fs->get_cplx_lit_tbl(Ext_fcns);
3720         param_handle_table = fs->get_handle_param_tbl(Ext_fcns);
3721         string node_name = fs->get_node_name();
3722     bool is_fj = false, uses_bloom = false;
3723
3724
3725         if(fs->node_type() == "spx_qpn"){
3726                 is_aggr_query = false;
3727                 spx_qpn *spx_node = (spx_qpn *)fs;
3728                 sl_list = spx_node->get_select_se_list();
3729                 where = spx_node->get_where_clause();
3730                 gb_tbl = NULL;
3731                 aggr_tbl = NULL;
3732         } else
3733         if(fs->node_type() == "sgah_qpn"){
3734                 is_aggr_query = true;
3735                 sgah_qpn *sgah_node = (sgah_qpn *)fs;
3736                 sl_list = sgah_node->get_select_se_list();
3737                 where = sgah_node->get_where_clause();
3738                 gb_tbl = sgah_node->get_gb_tbl();
3739                 aggr_tbl = sgah_node->get_aggr_tbl();
3740
3741                 if((sgah_node->get_having_clause()).size() > 0){
3742                         fprintf(stderr,"Warning in LFTA %s: HAVING clause will be ignored.\n", fs->get_node_name().c_str());
3743                 }
3744         } else
3745         if(fs->node_type() == "filter_join"){
3746                 is_aggr_query = false;
3747         is_fj = true;
3748                 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
3749                 sl_list = fj_node->get_select_se_list();
3750                 where = fj_node->get_where_clause();
3751                 uses_bloom = fj_node->use_bloom;
3752                 gb_tbl = NULL;
3753                 aggr_tbl = NULL;
3754         } else {
3755                 fprintf(stderr,"INTERNAL ERROR, unrecognized node type %s in generate_lfta_block\n", fs->node_type().c_str());
3756                 exit(1);
3757         }
3758
3759 //                      Build list of "partial functions", by clause.
3760 //                      NOTE : partial fcns are not handles well.
3761 //                      The act of searching for them associates the fcn call
3762 //                      in the SE with an index to an array.  Refs to the
3763 //                      fcn value are replaced with refs to the variable they are
3764 //                      unpacked into.  A more general tagging mechanism would be better.
3765
3766         int i;
3767         vector<bool> *pfunc_ptr = NULL;
3768         vector<int> *ref_cnt_ptr = NULL;
3769         if(!is_aggr_query){             // don't collect cacheable fcns on aggr query.
3770                 ref_cnt_ptr = &fcn_ref_cnt;
3771                 pfunc_ptr = &is_partial_fcn;
3772         }
3773
3774         sl_fcns_start = 0;
3775         for(i=0;i<sl_list.size();i++){
3776                 find_partial_fcns(sl_list[i], &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
3777         }
3778         wh_fcns_start = sl_fcns_end = partial_fcns.size();
3779         for(i=0;i<where.size();i++){
3780                 find_partial_fcns_pr(where[i]->pr, &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
3781         }
3782         gb_fcns_start = wh_fcns_end = partial_fcns.size();
3783         if(gb_tbl != NULL){
3784                 for(i=0;i<gb_tbl->size();i++){
3785                         find_partial_fcns(gb_tbl->get_def(i), &partial_fcns, NULL, NULL, Ext_fcns);
3786                 }
3787         }
3788         ag_fcns_start = gb_fcns_end = partial_fcns.size();
3789         if(aggr_tbl != NULL){
3790                 for(i=0;i<aggr_tbl->size();i++){
3791                         find_partial_fcns(aggr_tbl->get_aggr_se(i), NULL, NULL, &is_partial_fcn, Ext_fcns);
3792                 }
3793         }
3794         ag_fcns_end = partial_fcns.size();
3795
3796 //              Fill up the is_partial_fcn and fcn_ref_cnt arrays.
3797         if(is_aggr_query){
3798                 for(i=0; i<partial_fcns.size();i++){
3799                         fcn_ref_cnt.push_back(1);
3800                         is_partial_fcn.push_back(true);
3801                 }
3802         }
3803
3804 //              Unmark non-partial expensive functions referenced only once.
3805         for(i=0; i<partial_fcns.size();i++){
3806                 if(!is_partial_fcn[i] && fcn_ref_cnt[i] <= 1){
3807                         partial_fcns[i]->set_partial_ref(-1);
3808                 }
3809         }
3810
3811         node_name = normalize_name(node_name);
3812
3813         retval += generate_preamble(schema, /*int_fcn_defs,*/ node_name, schema_embed_str);
3814
3815         if(packed_return){              // generate unpack struct
3816                 vector<tablevar_t *> input_tbls = fs->get_input_tbls();
3817                 int schref = input_tbls[0]->get_schema_ref();
3818                 vector<string> refd_cols;
3819                 for(s=0;s<sl_list.size();++s){
3820                         gather_nicsafe_cols(sl_list[s],refd_cols, nicp, gb_tbl);
3821                 }
3822                 for(p=0;p<where.size();++p){
3823 //                              I'm not disabling these preds ...
3824                         gather_nicsafe_cols(where[p]->pr,refd_cols, nicp, gb_tbl);
3825                 }
3826                 if(gb_tbl){
3827                         for(g=0;g<gb_tbl->size();++g){
3828                           gather_nicsafe_cols(gb_tbl->get_def(g),refd_cols, nicp, gb_tbl);
3829                         }
3830                 }
3831                 sort(refd_cols.begin(), refd_cols.end());
3832                 retval += "struct "+node_name+"_input_struct{\n";
3833                 retval += "\tint __lfta_id_fm_nic__;\n";
3834                 int vsi;
3835                 for(vsi=0;vsi<refd_cols.size();++vsi){
3836                 data_type dt(schema->get_type_name(schref,refd_cols[vsi]));
3837                         retval+="\t"+dt.get_cvar_type()+" unpack_var_"+refd_cols[vsi]+";\n";
3838                 }
3839                 retval+="};\n\n";
3840         }
3841
3842
3843 /////////////////////////////////////////////////////
3844 //                      Common stuff unpacked, do some generation
3845
3846         if(is_aggr_query)
3847           retval += generate_aggr_struct(node_name, gb_tbl, aggr_tbl);
3848         if(is_fj)
3849                 retval += generate_fj_struct((filter_join_qpn *)fs, node_name);
3850
3851         retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, uses_bloom, schema);
3852         retval += generate_tuple_struct(node_name, sl_list) ;
3853
3854         if(is_aggr_query)
3855                 retval += generate_fta_flush(node_name, schema, Ext_fcns) ;
3856         if(param_tbl->size() > 0)
3857                 retval += generate_fta_load_params(node_name) ;
3858         retval += generate_fta_free(node_name, is_aggr_query) ;
3859         retval +=  generate_fta_control(node_name, schema, is_aggr_query) ;
3860         retval +=  generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, s_pids) ;
3861
3862
3863         /* extract the value of Time_Correlation from interface definition */
3864         int e,v;
3865         string es;
3866         unsigned time_corr;
3867         vector<tablevar_t *> tvec =  fs->get_input_tbls();
3868         vector<string> time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es);
3869         if (time_corr_vec.empty())
3870                 time_corr = DEFAULT_TIME_CORR;
3871         else
3872                 time_corr = atoi(time_corr_vec[0].c_str());
3873
3874         retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query) );
3875         retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) );
3876
3877   return(retval);
3878 }
3879
3880
3881
3882 int compute_snap_len(qp_node *fs, table_list *schema){
3883
3884 //              Initialize global vars
3885         gb_tbl = NULL;
3886         sl_list.clear(); where.clear();
3887
3888         if(fs->node_type() == "spx_qpn"){
3889                 spx_qpn *spx_node = (spx_qpn *)fs;
3890                 sl_list = spx_node->get_select_se_list();
3891                 where = spx_node->get_where_clause();
3892         }
3893         else if(fs->node_type() == "sgah_qpn"){
3894                 sgah_qpn *sgah_node = (sgah_qpn *)fs;
3895                 sl_list = sgah_node->get_select_se_list();
3896                 where = sgah_node->get_where_clause();
3897                 gb_tbl = sgah_node->get_gb_tbl();
3898         }
3899         else if(fs->node_type() == "filter_join"){
3900                 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
3901                 sl_list = fj_node->get_select_se_list();
3902                 where = fj_node->get_where_clause();
3903         } else{
3904                 fprintf(stderr,"INTERNAL ERROR, node type %s not recognized in compute_snap_len\n",fs->node_type().c_str());
3905                 exit(1);
3906         }
3907
3908 //                      Gather all column references, need to define unpacking variables.
3909   int w,s;
3910   col_id_set cid_set;
3911   col_id_set::iterator csi;
3912
3913   for(w=0;w<where.size();++w)
3914         gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
3915   for(s=0;s<sl_list.size();s++){
3916         gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
3917   }
3918
3919   int g;
3920   if(gb_tbl != NULL){
3921         for(g=0;g<gb_tbl->size();g++)
3922           gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
3923   }
3924
3925   //                    compute snap length
3926   int snap_len = -1;
3927   int n_snap=0;
3928   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
3929     int schref = (*csi).schema_ref;
3930         int tblref = (*csi).tblvar_ref;
3931     string field = (*csi).field;
3932
3933         param_list *field_params = schema->get_modifier_list(schref, field);
3934         if(field_params->contains_key("snap_len")){
3935                 string fld_snap_str = field_params->val_of("snap_len");
3936                 int fld_snap;
3937                 if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
3938                         if(fld_snap > snap_len) snap_len = fld_snap;
3939                         n_snap++;
3940                 }else{
3941                         fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
3942                 }
3943         }
3944   }
3945
3946   if(n_snap == cid_set.size()){
3947         return (snap_len);
3948   }else{
3949         return -1;
3950   }
3951
3952
3953 }
3954
3955 //              Function which computes an optimal
3956 //              set of unpacking functions.
3957
3958 void find_optimal_unpack_fcns(col_id_set &upref_cids, table_list *Schema, map<col_id, string,lt_col_id> &ucol_fcn_map){
3959         map<string, int> pfcn_count;
3960         map<string, int>::iterator msii;
3961         col_id_set::iterator cisi;
3962         set<string>::iterator ssi;
3963         string best_fcn;
3964
3965         while(ucol_fcn_map.size() < upref_cids.size()){
3966
3967 //                      Gather unpack functions referenced by unaccounted-for
3968 //                      columns, and increment their reference count.
3969                 pfcn_count.clear();
3970                 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
3971                         if(ucol_fcn_map.count((*cisi)) == 0){
3972                                 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
3973                                 for(ssi=ufcns.begin();ssi!=ufcns.end();++ssi)
3974                                         pfcn_count[(*ssi)]++;
3975                         }
3976                 }
3977
3978 //              Get the lowest cost per field function.
3979                 float min_cost = 0.0;
3980                 string best_fcn = "";
3981                 for(msii=pfcn_count.begin();msii!=pfcn_count.end();++msii){
3982                         int fcost = Schema->get_ufcn_cost((*msii).first);
3983                         if(fcost < 0){
3984                                 fprintf(stderr,"CONFIGURATION ERROR, unpack function %s either has negative cost or is not defined.\n",(*msii).first.c_str());
3985                                 exit(1);
3986                         }
3987                         float this_cost = (1.0*fcost)/(*msii).second;
3988                         if(msii == pfcn_count.begin() || this_cost < min_cost){
3989                                 min_cost = this_cost;
3990                                 best_fcn = (*msii).first;
3991                         }
3992                 }
3993                 if(best_fcn == ""){
3994                         fprintf(stderr,"ERROR, could not find a best field unpqacking function.\n");
3995                         exit(1);
3996                 }
3997
3998 //              Assign this function to the unassigned fcns which use it.
3999                 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
4000                         if(ucol_fcn_map.count((*cisi)) == 0){
4001                                 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
4002                                 if(ufcns.count(best_fcn)>0)
4003                                         ucol_fcn_map[(*cisi)] = best_fcn;
4004                         }
4005                 }
4006         }
4007 }
4008
4009
4010
4011 //              Generate an initial test test for the lfta
4012 //              Assume that the predicate references no external functions,
4013 //              and especially no partial functions,
4014 //              aggregates, internal functions.
4015 string generate_lfta_prefilter(vector<cnf_set *> &pred_list,
4016                 col_id_set &temp_cids, table_list *Schema, ext_fcn_list *Ext_fcns,
4017                 vector<col_id_set> &lfta_cols, vector<long long int> &lfta_sigs,
4018                 vector<int> &lfta_snap_lens, string iface){
4019   col_id_set tmp_cid_set, cid_set,upref_cids,upall_cids;
4020   col_id_set::iterator csi;
4021         int o,p,q;
4022         string ret;
4023
4024 //              Gather complex literals in the prefilter.
4025         cplx_lit_table *complex_literals = new cplx_lit_table();
4026         for(p=0;p<pred_list.size();++p){
4027                 find_complex_literal_pr(pred_list[p]->pr,Ext_fcns, complex_literals);
4028         }
4029
4030
4031 //              Find the combinable predicates
4032         vector<predicate_t *> pr_list;
4033         for(p=0;p<pred_list.size();++p){
4034         find_combinable_preds(pred_list[p]->pr,&pr_list, Schema, Ext_fcns);
4035         }
4036
4037 //              Analyze the combinable predicates to find the predicate classes.
4038         pred_class.clear();             // idx to equiv pred in equiv_list
4039         pred_pos.clear();               // idx to returned bitmask.
4040         vector<predicate_t *> equiv_list;
4041         vector<int> num_equiv;
4042
4043
4044         for(p=0;p<pr_list.size();++p){
4045                 for(q=0;q<equiv_list.size();++q){
4046                         if(is_equivalent_class_pred_base(equiv_list[q],pr_list[p],Schema,Ext_fcns))
4047                                 break;
4048                 }
4049                 if(q == equiv_list.size()){             // no equiv : create new
4050                         pred_class.push_back(equiv_list.size());
4051                         equiv_list.push_back(pr_list[p]);
4052                         pred_pos.push_back(0);
4053                         num_equiv.push_back(1);
4054
4055                 }else{                  // pr_list[p] is equivalent to pred q
4056                         pred_class.push_back(q);
4057                         pred_pos.push_back(num_equiv[q]);
4058                         num_equiv[q]++;
4059                 }
4060         }
4061
4062 //              Generate the variables which hold the common pred handles
4063         ret += "/*\t\tprefilter global vars.\t*/\n";
4064         for(q=0;q<equiv_list.size();++q){
4065                 for(p=0;p<=(num_equiv[q]/32);++p){
4066                         ret += "void *pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface+";\n";
4067                 }
4068         }
4069
4070 //              Struct to hold prefilter complex literals
4071         ret += "struct prefilter_complex_lit_struct_"+iface+" {\n";
4072         if(complex_literals->size() == 0)
4073                 ret += "\tint no_variable;\n";
4074         int cl;
4075         for(cl=0;cl<complex_literals->size();cl++){
4076                 literal_t *l = complex_literals->get_literal(cl);
4077                 data_type *dtl = new data_type( l->get_type() );
4078                 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
4079                 ret += tmpstr;
4080         }
4081         ret += "} prefilter_complex_lits_"+iface+";\n\n";
4082
4083
4084 //              Generate the prefilter initialziation code
4085         ret += "void init_lfta_prefilter_"+iface+"(){\n";
4086
4087 //              First initialize complex literals, if any.
4088         ret += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4089         for(cl=0;cl<complex_literals->size();cl++){
4090                 literal_t *l = complex_literals->get_literal(cl);
4091                 sprintf(tmpstr,"&(t->complex_literal_%d)",cl);
4092                 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
4093         }
4094
4095
4096         set<int> epred_seen;
4097         for(p=0;p<pr_list.size();++p){
4098                 int q = pred_class[p];
4099 //printf("\tq=%d\n",q);
4100                 if(epred_seen.count(q)>0){
4101                         ret += "\tregister_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4102                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4103                         vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4104                         for(o=0;o<op_list.size();++o){
4105                                 if(! cl_op[o]){
4106                                         ret += generate_se_code(op_list[o],Schema)+", ";
4107                                 }
4108                         }
4109                         ret += "pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+","+int_to_string(pred_pos[p]%32)+");\n";
4110                         epred_seen.insert(q);
4111                 }else{
4112                         ret += "\tpref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+" = (void *)register_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4113                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4114                         vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4115                         for(o=0;o<op_list.size();++o){
4116                                 if(! cl_op[o]){
4117                                         ret += generate_se_code(op_list[o],Schema)+", ";
4118                                 }
4119                         }
4120                         ret += "NULL,"+int_to_string(pred_pos[p]%32)+");\n";
4121                         epred_seen.insert(q);
4122                 }
4123         }
4124         ret += "}\n\n";
4125
4126
4127
4128 //              Start on main body code generation
4129   ret+="gs_uint64_t lfta_prefilter_"+iface+"(void *pkt){\n";
4130
4131
4132 ///--------------------------------------------------------------
4133 ///             Generate and store the prefilter body,
4134 ///             reuse it for the snap length calculator
4135 ///-------------------------------------------------------------
4136         string body;
4137
4138     body += "\tstruct packet *p = (struct packet *)pkt;\n";
4139
4140
4141
4142 //              Gather the colids to store unpacked variables.
4143         for(p=0;p<pred_list.size();++p){
4144         gather_pr_col_ids(pred_list[p]->pr,tmp_cid_set, gb_tbl);
4145         }
4146
4147 //              make the col_ids refer to the base tables, and
4148 //              grab the col_ids with at least one unpacking function.
4149         for(csi=tmp_cid_set.begin();csi!=tmp_cid_set.end();++csi){
4150                 string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
4151                 col_id tmp_col_id;
4152                 tmp_col_id.field = (*csi).field;
4153                 tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
4154                 tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
4155                 cid_set.insert(tmp_col_id);
4156                 field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
4157                 if(fe->get_unpack_fcns().size()>0)
4158                         upref_cids.insert(tmp_col_id);
4159
4160
4161         }
4162
4163 //              Find the set of unpacking programs needed for the
4164 //              prefilter fields.
4165         map<col_id, string,lt_col_id>  ucol_fcn_map;
4166         find_optimal_unpack_fcns(upref_cids, Schema, ucol_fcn_map);
4167         set<string> pref_ufcns;
4168         map<col_id, string,lt_col_id>::iterator mcis;
4169         for(mcis=ucol_fcn_map.begin(); mcis!=ucol_fcn_map.end(); mcis++){
4170                 pref_ufcns.insert((*mcis).second);
4171         }
4172
4173
4174
4175 //                      Variables for unpacking attributes.
4176     body += "/*\t\tVariables for unpacking attributes\t*/\n";
4177     for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4178       int schref = (*csi).schema_ref;
4179           int tblref = (*csi).tblvar_ref;
4180       string field = (*csi).field;
4181       data_type dt(Schema->get_type_name(schref,field));
4182       sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4183         field.c_str(), tblref);
4184       body += tmpstr;
4185       sprintf(tmpstr,"\tgs_retval_t ret_%s_%d;\n", field.c_str(),tblref);
4186       body += tmpstr;
4187     }
4188 //                      Variables for unpacking temporal attributes.
4189     body += "/*\t\tVariables for unpacking temporal attributes\t*/\n";
4190     for(csi=temp_cids.begin(); csi!=temp_cids.end();++csi){
4191           if (cid_set.count(*csi) == 0) {
4192         int schref = (*csi).schema_ref;
4193                 int tblref = (*csi).tblvar_ref;
4194         string field = (*csi).field;
4195         data_type dt(Schema->get_type_name(schref,field));
4196         sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4197           field.c_str(), tblref);
4198         body += tmpstr;
4199
4200           }
4201     }
4202     body += "\n\n";
4203
4204 //              Variables for combinable predicate evaluation
4205         body += "/*\t\tVariables for common prdicate evaluation\t*/\n";
4206         for(q=0;q<equiv_list.size();++q){
4207                 for(p=0;p<=(num_equiv[q]/32);++p){
4208                         body += "unsigned long int pref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = 0;\n";
4209                 }
4210         }
4211
4212
4213 //                      Variables that are always needed
4214     body += "/*\t\tVariables which are always needed\t*/\n";
4215         body += "\tgs_uint64_t retval=0, bitpos=1;\n";
4216         body += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4217
4218 //              Call the unpacking functions for the prefilter fields
4219         if(pref_ufcns.size() > 0)
4220                 body += "\n/*\t\tcall field unpacking functions\t*/\n";
4221         set<string>::iterator ssi;
4222         for(ssi=pref_ufcns.begin(); ssi!=pref_ufcns.end(); ++ssi){
4223                 body += "\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
4224         }
4225
4226
4227 //              Unpack the accessed attributes
4228         body += "\n/*\t\tUnpack the accessed attributes.\t*/\n";
4229     for(csi=cid_set.begin();csi!=cid_set.end();++csi){
4230           int tblref = (*csi).tblvar_ref;
4231       int schref = (*csi).schema_ref;
4232           string field = (*csi).field;
4233           sprintf(tmpstr,"\tret_%s_%d =  (%s(p, &unpack_var_%s_%d) == 0);\n",
4234                 field.c_str(),tblref,Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
4235           body += tmpstr;
4236     }
4237
4238 //              next unpack the temporal attributes and ignore the errors
4239 //              We are assuming here that failed unpack of temporal attributes
4240 //              is not going to overwrite the last stored value
4241 //              Failed upacks are ignored
4242     for(csi=temp_cids.begin();csi!=temp_cids.end();++csi){
4243           int tblref = (*csi).tblvar_ref;
4244       int schref = (*csi).schema_ref;
4245           string field = (*csi).field;
4246           sprintf(tmpstr,"\t%s(p, &prefilter_temp_vars.unpack_var_%s_%d);\n",
4247                  Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
4248           body += tmpstr;
4249     }
4250
4251 //              Evaluate the combinable predicates
4252         if(equiv_list.size()>0)
4253                 body += "/*\t\tEvaluate the combinable predicates.\t*/\n";
4254         for(q=0;q<equiv_list.size();++q){
4255                 for(p=0;p<=(num_equiv[q]/32);++p){
4256
4257 //              Only call the common eval fcn if all ref'd fields present.
4258                         col_id_set pred_cids;
4259                         col_id_set::iterator cpi;
4260                         gather_pr_col_ids(equiv_list[q], pred_cids, gb_tbl);
4261                         if(pred_cids.size()>0){
4262                         body += "\tif(";
4263                                 for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
4264                                         if(cpi != pred_cids.begin())
4265                                                 body += " && ";
4266                                 string field = (*cpi).field;
4267                                         int tblref = (*cpi).tblvar_ref;
4268                                         body += "ret_"+field+"_"+int_to_string(tblref);
4269                                 }
4270                                 body+=")\n";
4271                         }
4272
4273                         body += "\t\tpref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = eval_commonpred_"+equiv_list[q]->get_op()+"(pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface;
4274                         vector<scalarexp_t *> op_list = equiv_list[q]->get_op_list();
4275                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4276                         for(o=0;o<op_list.size();++o){
4277                                 if(cl_op[o]){
4278                                         body += ","+generate_se_code(op_list[o],Schema);
4279                                 }
4280                         }
4281                         body += ");\n";
4282                 }
4283         }
4284
4285
4286         for(p=0;p<pred_list.size();++p){
4287                 col_id_set pred_cids;
4288                 col_id_set::iterator cpi;
4289                 gather_pr_col_ids(pred_list[p]->pr,pred_cids, gb_tbl);
4290                 if(pred_cids.size()>0){
4291                         body += "\tif(";
4292                         for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
4293                                 if(cpi != pred_cids.begin())
4294                                         body += " && ";
4295                         string field = (*cpi).field;
4296                                 int tblref = (*cpi).tblvar_ref;
4297                                 body += "ret_"+field+"_"+int_to_string(tblref);
4298                         }
4299                         body+=")\n";
4300                 }
4301         body += "\t\tif("+generate_predicate_code(pred_list[p]->pr,Schema)+")\n\t\t\tretval |= bitpos;\n";
4302                 body+="\tbitpos = bitpos << 1;\n";
4303         }
4304
4305 // ---------------------------------------------------------------
4306 //              Finished with the body of the prefilter
4307 // --------------------------------------------------------------
4308
4309         ret += body;
4310
4311 //                      Collect fields referenced by an lfta but not
4312 //                      already unpacked for the prefilter.
4313
4314 //printf("upref_cids is:\n");
4315 //for(csi=upref_cids.begin();csi!=upref_cids.end();csi++)
4316 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
4317 //printf("pref_ufcns is:\n");
4318 //for(ssi=pref_ufcns.begin();ssi!=pref_ufcns.end();++ssi)
4319 //printf("\t%s\n",(*ssi).c_str());
4320
4321         int l;
4322         for(l=0;l<lfta_cols.size();++l){
4323                 for(csi=lfta_cols[l].begin();csi!=lfta_cols[l].end();++csi){
4324                         string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
4325                         col_id tmp_col_id;
4326                         tmp_col_id.field = (*csi).field;
4327                         tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
4328                         tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
4329                         field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
4330                         set<string> fld_ufcns = fe->get_unpack_fcns();
4331 //printf("tmpcol is (%s, %d), ufcns size is %d, upref_cids cnt is %d\n",tmp_col_id.field.c_str(),tmp_col_id.schema_ref,fld_ufcns.size(),  upref_cids.count(tmp_col_id));
4332                         if(fld_ufcns.size()>0 && upref_cids.count(tmp_col_id) == 0){
4333 //              Ensure that this field not already unpacked.
4334                                 bool found = false;
4335                                 for(ssi=fld_ufcns.begin();ssi!=fld_ufcns.end();++ssi){
4336 //printf("\tField has unpacking fcn %s\n",(*ssi).c_str());
4337                                         if(pref_ufcns.count((*ssi))){
4338 //printf("Field already unpacked.\n");
4339                                                 found = true;;
4340                                         }
4341                                 }
4342                                 if(! found){
4343 //printf("\tadding to unpack list\n");
4344                                         upall_cids.insert(tmp_col_id);
4345                                 }
4346                         }
4347                 }
4348         }
4349
4350 //printf("upall_cids is:\n");
4351 //for(csi=upall_cids.begin();csi!=upall_cids.end();csi++)
4352 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
4353
4354 //              Get the set of unpacking programs for these.
4355         map<col_id, string,lt_col_id>  uall_fcn_map;
4356         find_optimal_unpack_fcns(upall_cids, Schema, uall_fcn_map);
4357         set<string> pall_ufcns;
4358         for(mcis=uall_fcn_map.begin(); mcis!=uall_fcn_map.end(); mcis++){
4359 //printf("uall_fcn_map[%s %d] = %s\n",(*mcis).first.field.c_str(),(*mcis).first.schema_ref,(*mcis).second.c_str());
4360                 pall_ufcns.insert((*mcis).second);
4361         }
4362
4363 //              Iterate through the remaining set of unpacking function
4364         if(pall_ufcns.size() > 0)
4365                 ret += "//\t\tcall all remaining field unpacking functions.\n";
4366         for(ssi=pall_ufcns.begin(); ssi!=pall_ufcns.end(); ++ssi){
4367 //              gather the set of columns unpacked by this ufcn
4368                 col_id_set fcol_set;
4369                 for(csi=upall_cids.begin();csi!=upall_cids.end();++csi){
4370                         if(uall_fcn_map[(*csi)] == (*ssi))
4371                                 fcol_set.insert((*csi));
4372                 }
4373
4374 //              gather the set of lftas which access a field unpacked by the fcn
4375                 set<long long int> clfta;
4376                 for(l=0;l<lfta_cols.size();l++){
4377                         for(csi=fcol_set.begin();csi!=fcol_set.end();++csi){
4378                                 if(lfta_cols[l].count((*csi)) > 0)
4379                                         break;
4380                         }
4381                         if(csi != fcol_set.end())
4382                                 clfta.insert(lfta_sigs[l]);
4383                 }
4384
4385 //              generate the unpacking code
4386                 ret += "\tif(";
4387                 set<long long int>::iterator sii;
4388                 for(sii=clfta.begin();sii!=clfta.end();++sii){
4389                         if(sii!=clfta.begin())
4390                                 ret += " || ";
4391                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sii),(*sii));
4392                         ret += tmpstr;
4393                 }
4394                 ret += ")\n\t\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
4395         }
4396
4397
4398     ret += "\treturn(retval);\n\n";
4399   ret += "}\n\n";
4400
4401
4402 // --------------------------------------------------------
4403 //              reuse prefilter body for snaplen calculator
4404 //
4405 //      This is dummy code, so I'm commenting it out.
4406
4407 /*
4408   ret+="gs_uint32_t lfta_pkt_snaplen(void *pkt){\n";
4409
4410         ret += body;
4411
4412         int i;
4413         vector<int> s_snaps = lfta_snap_lens;
4414         sort(s_snaps.begin(), s_snaps.end());
4415
4416         if(s_snaps[0] == -1){
4417                 set<unsigned long long int> sigset;
4418                 for(i=0;i<lfta_snap_lens.size();++i){
4419                         if(lfta_snap_lens[i] == -1){
4420                                 sigset.insert(lfta_sigs[i]);
4421                         }
4422                 }
4423                 ret += "\tif( ";
4424                 set<unsigned long long int>::iterator sulli;
4425                 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
4426                         if(sulli!=sigset.begin())
4427                                 ret += " || ";
4428                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
4429                         ret += tmpstr;
4430                 }
4431                 ret += ") return -1;\n";
4432         }
4433
4434         int nextpos = lfta_snap_lens.size()-1;
4435         int nextval = lfta_snap_lens[nextpos];
4436         while(nextval >= 0){
4437                 set<unsigned long long int> sigset;
4438                 for(i=0;i<lfta_snap_lens.size();++i){
4439                         if(lfta_snap_lens[i] == nextval){
4440                                 sigset.insert(lfta_sigs[i]);
4441                         }
4442                 }
4443                 ret += "\tif( ";
4444                 set<unsigned long long int>::iterator sulli;
4445                 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
4446                         if(sulli!=sigset.begin())
4447                                 ret += " || ";
4448                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
4449                         ret += tmpstr;
4450                 }
4451                 ret += ") return "+int_to_string(nextval)+";\n";
4452
4453                 for(nextpos--;nextpos>0 && lfta_snap_lens[nextpos] == nextval;nextpos--);
4454                 if(nextpos>0)
4455                         nextval = lfta_snap_lens[nextpos];
4456                 else
4457                         nextval = -1;
4458         }
4459         ret += "\treturn 0;\n";
4460         ret += "}\n\n";
4461 */
4462
4463
4464   return(ret);
4465 }
4466
4467
4468
4469
4470 //              Generate the struct which will store the the values of
4471 //              temporal attributesunpacked by prefilter
4472 string generate_lfta_prefilter_struct(col_id_set &cid_set, table_list *Schema) {
4473
4474   col_id_set::iterator csi;
4475
4476 // printf("generate_lfta_prefilter_struct : %d vars\n",cid_set.size());
4477
4478   string ret="struct prefilter_unpacked_temp_vars {\n";
4479   ret += "\t/*\tVariables for unpacking temporal attributes\t*/\n";
4480
4481   string init_code;
4482
4483   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4484     int schref = (*csi).schema_ref;
4485     int tblref = (*csi).tblvar_ref;
4486     string field = (*csi).field;
4487         data_type dt(Schema->get_type_name(schref,field), Schema->get_modifier_list(schref,field));
4488     sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4489         field.c_str(), tblref);
4490     ret += tmpstr;
4491
4492         if (init_code != "")
4493                 init_code += ", ";
4494         if (dt.is_increasing())
4495                 init_code += dt.get_min_literal();
4496         else
4497                 init_code += dt.get_max_literal();
4498
4499   }
4500   ret += "};\n\n";
4501
4502   ret += "struct prefilter_unpacked_temp_vars prefilter_temp_vars = {" + init_code + "};\n\n";
4503
4504   return(ret);
4505 }