aea03a8dd3b536e9d42d163b1a74c5d82442ead2
[com/gs-lite.git] / src / ftacmp / generate_lfta_code.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #include <string>
17 #include <stdio.h>
18 #include <stdlib.h>
19 //#include <algo.h>
20 #include<algorithm>
21
22 #include "parse_fta.h"
23 #include "parse_schema.h"
24 #include "analyze_fta.h"
25 #include "generate_utils.h"
26 #include "query_plan.h"
27 #include "generate_lfta_code.h"
28 #include "generate_nic_code.h"
29
30 using namespace std;
31
32 extern int DEFAULT_LFTA_HASH_TABLE_SIZE;
33
34 // default value for correlation between the interface card and
35 // the system clock
36 #define DEFAULT_TIME_CORR 16
37
38
39 //      For fast hashing
40 //#define NRANDS 100
41 extern string hash_nums[NRANDS];
42 /*
43 = {
44 "12916008961267169387ull", "13447227858232756685ull",
45 "15651770379918602919ull", "1154671861688431608ull",
46 "6777078091984849858ull", "14217205709582564356ull",
47 "4955408621820609982ull", "15813680319165523695ull",
48 "9897969721407807129ull", "5799700135519793083ull",
49 "3446529189623437397ull", "2766403683465910630ull",
50 "3759321430908793328ull", "6569396511892890354ull",
51 "11124853911180290924ull", "17425412145238035549ull",
52 "6879931585355039943ull", "16598635011539670441ull",
53 "9615975578494811651ull", "4378135509538422740ull",
54 "741282195344332574ull", "17368612862906255584ull",
55 "17294299200556814618ull", "518343398779663051ull",
56 "3861893449302272757ull", "8951107288843549591ull",
57 "15785139392894559409ull", "5917810836789601602ull",
58 "16169988133001117004ull", "9792861259254509262ull",
59 "5089058010244872136ull", "2130075224835397689ull",
60 "844136788226150435ull", "1303298091153875333ull",
61 "3579898206894361183ull", "7529542662845336496ull",
62 "13151949992653382522ull", "2145333536541545660ull",
63 "11258221828939586934ull", "3741808146124570279ull",
64 "16272841626371307089ull", "12174572036188391283ull",
65 "9749343496254107661ull", "9141275584134508830ull",
66 "10134192232065698216ull", "12944268412561423018ull",
67 "17499725811865666340ull", "5281482378159088661ull",
68 "13254803486023572607ull", "4526762838498717025ull",
69 "15990846379668494011ull", "10680949816169027468ull",
70 "7116154096012931030ull", "5296740689865236632ull",
71 "5222427027515795922ull", "6893215299448261251ull",
72 "10164707755932877485ull", "15325979189512082255ull",
73 "3713267224148573289ull", "12292682741753167354ull",
74 "4098115959960163588ull", "16095675565885113990ull",
75 "11391590846210510720ull", "8432889531466002673ull",
76 "7146668520368482523ull", "7678169991822407997ull",
77 "9882712513525031447ull", "13904414563513869160ull",
78 "1080076724395768626ull", "8448147843172150388ull",
79 "17633093729608185134ull", "10044622457050142303ull",
80 "4128911859292425737ull", "30642269109444395ull",
81 "16124215396922640581ull", "15444089895060081110ull",
82 "16437006538696302944ull", "800338649777443426ull",
83 "5355794945275091932ull", "11656354278827687117ull",
84 "1110873718944691255ull", "10829576045617693977ull",
85 "3846916616884579955ull", "17055821716837625668ull",
86 "13418968402643535758ull", "11671612594828802128ull",
87 "11597298928184328586ull", "13196028510862205499ull",
88 "16539578557089782373ull", "3182048322921507591ull",
89 "10016080431267550241ull", "148751875162592690ull",
90 "10400930266590768572ull", "4023803397139127870ull",
91 "17766462746879108920ull", "14807761432134600873ull",
92 "13521540421053792403ull", "13980983198941385205ull",
93 "16257584414193564367ull", "1760484796451765024ull"
94 };
95 */
96
97
98 // ----------------------------------------------
99 //              Data extracted from the query plan node
100 //              for use by code generation.
101
102 static cplx_lit_table *complex_literals;  //Table of literals with constructors.
103 static vector<handle_param_tbl_entry *> param_handle_table;
104 static param_table *param_tbl;          // Table of all referenced parameters.
105
106 static vector<scalarexp_t *> sl_list;
107 static vector<cnf_elem *> where;
108
109 static gb_table *gb_tbl;                        // Table of all group-by attributes.
110 static aggregate_table *aggr_tbl;       // Table of all referenced aggregates.
111
112 static bool packed_return;              // unpack using structyure, not fcns
113 static nic_property *nicprop;   // nic properties for this interface.
114 static int global_id;
115
116
117 //              The partial_fcns vector can now refer to
118 //              partial functions, or expensive functions
119 //              which can be cached (if there are multiple refs).  A couple
120 //              of int vectors distinguish the cases.
121 static vector<scalarexp_t *> partial_fcns;
122 static vector<int> fcn_ref_cnt;
123 static vector<bool> is_partial_fcn;
124 int sl_fcns_start = 0, sl_fcns_end = 0;
125 int wh_fcns_start = 0, wh_fcns_end = 0;
126 int gb_fcns_start = 0, gb_fcns_end = 0;
127 int ag_fcns_start = 0, ag_fcns_end = 0;
128
129
130 //              These vectors are for combinable predicates.
131 static vector<int> pred_class;  //      identifies the group
132 static vector<int> pred_pos;    //  position in the group.
133
134
135
136 static char tmpstr[1000];
137
138 //////////////////////////////////////////////////////////////////////
139 ///                     Various utilities
140
141 string generate_fta_name(string node_name){
142         string ret = normalize_name(node_name);
143         if(ret == ""){
144                 ret = "default";
145         }
146         ret += "_fta";
147
148         return(ret);
149 }
150
151
152 string generate_aggr_struct_name(string node_name){
153         string ret = normalize_name(node_name);
154         if(ret == ""){
155                 ret = "default";
156         }
157         ret += "_aggr_struct";
158
159         return(ret);
160 }
161
162 string generate_fj_struct_name(string node_name){
163         string ret = normalize_name(node_name);
164         if(ret == ""){
165                 ret = "default";
166         }
167         ret += "_fj_struct";
168
169         return(ret);
170 }
171
172 string generate_unpack_code(int tblref, int schref, string field, table_list *schema, string node_name, string end_goto = string("end")){
173         string ret;
174         if(! packed_return){
175                 sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",
176                         schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
177         ret += tmpstr;
178         if(!schema->get_modifier_list(schref,field)->contains_key("required"))
179                 ret += "\tif(retval) goto "+end_goto+";\n";
180
181         }else{
182 //                      TODO: ntoh xforms (aug 2010 : removing ntoh, hton)
183                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
184                 if(dt.is_buffer_type()){
185                         if(dt.get_type() != v_str_t){
186                           ret += "\tif(sizeof(struct "+node_name+"_input_struct)+"+node_name+"_input_struct_var->unpack_var_"+field+".length+int("+node_name+"_input_struct_var->unpack_var_"+field+".data) > sz)\n";
187                           ret += "\t\tgoto "+end_goto+";\n";
188                           ret+= "\t\t"+node_name+"_input_struct_var->unpack_var_"+field+".data += "+node_name+"_input_struct_var->unpack_var_"+field+".length;\n";
189                           ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
190                                 " = "+node_name+"_input_struct_var->unpack_var_"+field+";+\n";
191                         }else{
192                                 fprintf(stderr,"INTERNAL ERROR buffer type not string type in generate_lfta_code.cc:generate_unpack_code\n");
193                                 exit(1);
194                         }
195                 }else{
196                         ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
197                                 " = "+node_name+"_input_struct_var->unpack_var_"+field+";\n";
198                 }
199         }
200         return ret;
201 }
202
203 string generate_aggr_struct(string node_name, gb_table *gb_tbl, aggregate_table *aggr_tbl){
204   string ret = "struct " + generate_aggr_struct_name(node_name) + "{\n";
205
206   int g;
207   for(g=0;g<gb_tbl->size();g++){
208           sprintf(tmpstr,"gb_var%d",g);
209           ret += "\t"+gb_tbl->get_data_type(g)->make_cvar(tmpstr)+";\n";
210   }
211
212   int a;
213   for(a=0;a<aggr_tbl->size();a++){
214           ret += "\t";
215           sprintf(tmpstr,"aggr_var%d",a);
216           if(aggr_tbl->is_builtin(a))
217                 ret+="\t"+aggr_tbl->get_data_type(a)->make_cvar(tmpstr)+";\n";
218           else
219                 ret+="\t"+aggr_tbl->get_storage_type(a)->make_cvar(tmpstr)+";\n";
220   }
221
222 /*
223   ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *next;\n";
224 */
225
226   ret += "};\n\n";
227
228   return(ret);
229 }
230
231
232 string generate_fj_struct(filter_join_qpn *fs, string node_name ){
233   string ret;
234
235   if(fs->use_bloom == false){   // uses hash table instead
236         ret = "struct " + generate_fj_struct_name(node_name) + "{\n";
237         int k;
238         for(k=0;k<fs->hash_eq.size();++k){
239                 sprintf(tmpstr,"key_var%d",k);
240                 ret += "\t"+fs->hash_eq[k]->pr->get_right_se()->get_data_type()->make_cvar(tmpstr)+";\n";
241         }
242         ret += "\tlong long int ts;\n";
243     ret += "};\n\n";
244   }
245
246   return(ret);
247 }
248
249
250
251
252 string generate_fta_struct(string node_name, gb_table *gb_tbl,
253                 aggregate_table *aggr_tbl, param_table *param_tbl,
254                 cplx_lit_table *complex_literals,
255                 vector<handle_param_tbl_entry *> &param_handle_table,
256                 bool is_aggr_query, bool is_fj, bool uses_bloom,
257                 table_list *schema){
258
259         string ret = "struct " + generate_fta_name(node_name) + "{\n";
260         ret += "\tstruct FTA f;\n";
261
262 //-------------------------------------------------------------
263 //              Aggregate-specific fields
264
265         if(is_aggr_query){
266 /*
267                 ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_head, *flush_head;\n";
268 */
269                 ret+="\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_table; // the groups\n";
270                 ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n";
271 //              ret+="\tint bitmap_size;\n";
272                 ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n";
273                 ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n";
274                 ret += "\tint max_windows; // max number of open windows.\n";
275                 ret += "\tunsigned int generation; // initially zero, increment on\n";
276                 ret += "\t     // every hash table flush - whether regular or induced.\n";
277                 ret += "\t     // Old groups are identified by a generation mismatch.\n";
278                 ret += "\tunsigned int flush_pos; // next aggr_table entry to examine\n";
279                 ret += "\tunsigned int flush_ctr; // control slow flushing\n";
280
281
282
283                 int g;
284                 bool uses_temporal_flush = false;
285                 for(g=0;g<gb_tbl->size();g++){
286                         data_type *dt = gb_tbl->get_data_type(g);
287                         if(dt->is_temporal()){
288 /*
289                                 fprintf(stderr,"group by attribute %s is temporal, ",
290                                                 gb_tbl->get_name(g).c_str());
291                                 if(dt->is_increasing()){
292                                         fprintf(stderr,"increasing.\n");
293                                 }else{
294                                         fprintf(stderr,"decreasing.\n");
295                                 }
296 */
297                                 data_type *gdt = gb_tbl->get_data_type(g);
298                                 if(gdt->is_buffer_type()){
299                                         fprintf(stderr, "\t but temporal BUFFER types are not supported, skipping.\n");
300                                 }else{
301                                         sprintf(tmpstr,"\t%s last_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
302                                         ret += tmpstr;
303                                         sprintf(tmpstr,"\t%s flush_start_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
304                                         ret += tmpstr;
305                                         sprintf(tmpstr,"\t%s last_flushed_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
306                                         ret += tmpstr;
307                                         uses_temporal_flush = true;
308                                 }
309
310                         }
311
312                 }
313                 if(! uses_temporal_flush){
314                         fprintf(stderr,"Warning: no temporal flush.\n");
315                 }
316         }
317
318 // ---------------------------------------------------------
319 //                      Filter-join specific fields
320
321         if(is_fj){
322                 if(uses_bloom){
323                         ret +=
324 "\tunsigned char * bf_table; //array of bloom filters with layout \n"
325 "\t\t// bit 0 bf 0| bit 0 bf 1| bit 0 bf 2| bit 1 bf 0| bit 1 bf 1|.....\n"
326 "\tint first_exec;\n"
327 "\tlong long int last_bin;\n"
328 "\tint last_bloom_pos;\n"
329 "\n"
330 ;
331                 }else{          // limited hash table
332                         ret +=
333 "  struct "+generate_fj_struct_name(node_name)+" *join_table;\n"
334 "\n"
335 ;
336                 }
337
338         }
339
340 //--------------------------------------------------------
341 //                      Common fields
342
343 //                      Create places to hold the parameters.
344         int p;
345         vector<string> param_vec = param_tbl->get_param_names();
346         for(p=0;p<param_vec.size();p++){
347                 data_type *dt = param_tbl->get_data_type(param_vec[p]);
348                 sprintf(tmpstr,"\t%s param_%s;\n",dt->get_cvar_type().c_str(),
349                                 param_vec[p].c_str());
350                 ret += tmpstr;
351                 if(param_tbl->handle_access(param_vec[p])){
352                         ret += "\tstruct search_handle *param_handle_"+param_vec[p]+";\n";
353                 }
354         }
355
356 //                      Create places to hold complex literals.
357         int cl;
358         for(cl=0;cl<complex_literals->size();cl++){
359                 literal_t *l = complex_literals->get_literal(cl);
360                 data_type *dtl = new data_type( l->get_type() );
361                 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
362                 ret += tmpstr;
363         }
364
365 //                      Create places to hold the pass-by-handle parameters.
366         for(p=0;p<param_handle_table.size();++p){
367                 sprintf(tmpstr,"\tgs_param_handle_t handle_param_%d;\n",p);
368                 ret += tmpstr;
369         }
370
371 //                      Create places to hold the last values of temporal
372 //                      attributes referenced in select clause
373 //                      we also need to store values of the temoral attributed
374 //                      of last flushed tuple in aggr queries
375 //                      to make sure we generate the cirrect temporal tuple
376 //                      in the presense of slow flushes
377
378
379         col_id_set temp_cids;           //      col ids of temp attributes in select clause
380
381         int s;
382         col_id_set::iterator csi;
383
384         for(s=0;s<sl_list.size();s++){
385                 data_type *sdt = sl_list[s]->get_data_type();
386                 if (sdt->is_temporal()) {
387                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
388                 }
389         }
390
391         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
392                 int tblref = (*csi).tblvar_ref;
393                 int schref = (*csi).schema_ref;
394                 string field = (*csi).field;
395                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
396                 sprintf(tmpstr,"\t%s last_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
397                 ret += tmpstr;
398         }
399
400         ret += "\tgs_uint64_t trace_id;\n\n";
401
402 //      Fields to store the runtime stats
403
404         ret += "\tgs_uint32_t in_tuple_cnt;\n";
405         ret += "\tgs_uint32_t out_tuple_cnt;\n";
406         ret += "\tgs_uint32_t out_tuple_sz;\n";
407         ret += "\tgs_uint32_t accepted_tuple_cnt;\n";
408         ret += "\tgs_uint64_t cycle_cnt;\n";
409         ret += "\tgs_uint32_t collision_cnt;\n";
410         ret += "\tgs_uint32_t eviction_cnt;\n";
411         ret += "\tgs_float_t sampling_rate;\n";
412
413
414
415         ret += "};\n\n";
416
417         return(ret);
418 }
419
420 //------------------------------------------------------------
421 //              Set colref tblvars to 0..
422 //              (special processing for join-like operators in an lfta).
423
424 void reset_se_col_ids_tblvars(scalarexp_t *se,  gb_table *gtbl){
425         vector<scalarexp_t *> operands;
426         int o;
427
428         if(! se)
429                 return;
430
431         switch(se->get_operator_type()){
432         case SE_LITERAL:
433         case SE_PARAM:
434         case SE_IFACE_PARAM:
435                 return;
436         case SE_UNARY_OP:
437                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
438                 return;
439         case SE_BINARY_OP:
440                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
441                 reset_se_col_ids_tblvars(se->get_right_se(),gtbl);
442                 return;
443         case SE_COLREF:
444                 if(! se->is_gb() ){
445                         se->get_colref()->set_tablevar_ref(0);
446                 }else{
447                         if(gtbl==NULL){
448                                 fprintf(stderr,"INTERNAL ERROR: gbvar ref in gather_se_col_ids, but gtbl is NULL.\n");
449                                 exit(1);
450                         }
451                         reset_se_col_ids_tblvars(gtbl->get_def(se->get_gb_ref()),gtbl);
452                 }
453                 return;
454         case SE_AGGR_STAR:
455                 return;
456         case SE_AGGR_SE:
457                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
458                 return;
459         case SE_FUNC:
460                 operands = se->get_operands();
461                 for(o=0;o<operands.size();o++){
462                         reset_se_col_ids_tblvars(operands[o], gtbl);
463                 }
464                 return;
465         default:
466                 fprintf(stderr,"INTERNAL ERROR in reset_se_col_ids_tblvars, line %d, character %d: unknown operator type %d\n",
467                                 se->get_lineno(), se->get_charno(),se->get_operator_type());
468                 exit(1);
469         }
470 }
471
472
473 //              reset  column tblvars accessed in this pr.
474
475 void reset_pr_col_ids_tblvars(predicate_t *pr,  gb_table *gtbl){
476         vector<scalarexp_t *> op_list;
477         int o;
478
479         switch(pr->get_operator_type()){
480         case PRED_IN:
481                 reset_se_col_ids_tblvars(pr->get_left_se(), gtbl);
482                 return;
483         case PRED_COMPARE:
484                 reset_se_col_ids_tblvars(pr->get_left_se(),gtbl) ;
485                 reset_se_col_ids_tblvars(pr->get_right_se(),gtbl) ;
486                 return;
487         case PRED_UNARY_OP:
488                 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
489                 return;
490         case PRED_BINARY_OP:
491                 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
492                 reset_pr_col_ids_tblvars(pr->get_right_pr(),gtbl) ;
493                 return;
494         case PRED_FUNC:
495                 op_list = pr->get_op_list();
496                 for(o=0;o<op_list.size();++o){
497                         reset_se_col_ids_tblvars(op_list[o],gtbl) ;
498                 }
499                 return;
500         default:
501                 fprintf(stderr,"INTERNAL ERROR in reset_pr_col_ids_tblvars, line %d, character %d, unknown predicate operator type %d\n",
502                         pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
503         }
504 }
505
506
507
508
509 //                      Generate code that makes reference
510 //                      to the tuple, and not to any aggregates.
511 static string generate_se_code(scalarexp_t *se,table_list *schema){
512         string ret;
513     data_type *ldt, *rdt;
514         int o;
515         vector<scalarexp_t *> operands;
516
517
518         switch(se->get_operator_type()){
519         case SE_LITERAL:
520                 if(se->is_handle_ref()){
521                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
522                         ret = tmpstr;
523                         return(ret);
524                 }
525                 if(se->get_literal()->is_cpx_lit()){
526                         sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
527                         ret = tmpstr;
528                         return(ret);
529                 }
530                 return(se->get_literal()->to_C_code("")); // not complex, no constructor
531         case SE_PARAM:
532                 if(se->is_handle_ref()){
533                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
534                         ret = tmpstr;
535                         return(ret);
536                 }
537                 ret += "t->param_";
538                 ret += se->get_param_name();
539                 return(ret);
540         case SE_UNARY_OP:
541         ldt = se->get_left_se()->get_data_type();
542         if(ldt->complex_operator(se->get_op()) ){
543                         ret +=  ldt->get_complex_operator(se->get_op());
544                         ret += "(";
545                         ret += generate_se_code(se->get_left_se(),schema);
546             ret += ")";
547                 }else{
548                         ret += "(";
549                         ret += se->get_op();
550                         ret += generate_se_code(se->get_left_se(),schema);
551                         ret += ")";
552                 }
553                 return(ret);
554         case SE_BINARY_OP:
555         ldt = se->get_left_se()->get_data_type();
556         rdt = se->get_right_se()->get_data_type();
557
558         if(ldt->complex_operator(rdt, se->get_op()) ){
559                         ret +=  ldt->get_complex_operator(rdt, se->get_op());
560                         ret += "(";
561                         ret += generate_se_code(se->get_left_se(),schema);
562                         ret += ", ";
563                         ret += generate_se_code(se->get_right_se(),schema);
564                         ret += ")";
565                 }else{
566                         ret += "(";
567                         ret += generate_se_code(se->get_left_se(),schema);
568                         ret += se->get_op();
569                         ret += generate_se_code(se->get_right_se(),schema);
570                         ret += ")";
571                 }
572                 return(ret);
573         case SE_COLREF:
574                 if(se->is_gb()){                // OK to ref gb attrs, but they're not yet unpacked ...
575                                                         // so return the defining code.
576                         ret = generate_se_code(gb_tbl->get_def(se->get_gb_ref()), schema );
577
578                 }else{
579                 sprintf(tmpstr,"unpack_var_%s_%d",
580                   se->get_colref()->get_field().c_str(), se->get_colref()->get_tablevar_ref() );
581                 ret = tmpstr;
582                 }
583                 return(ret);
584         case SE_FUNC:
585 //                              Should not be ref'ing any aggr here.
586                 if(se->get_aggr_ref() >= 0){
587                         fprintf(stderr,"INTERNAL ERROR, UDAF reference in generate_se_code.\n");
588                         return("ERROR in generate_se_code");
589                 }
590
591                 if(se->is_partial()){
592                         sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
593                         ret = tmpstr;
594                 }else{
595                         ret += se->op + "(";
596                         operands = se->get_operands();
597                         for(o=0;o<operands.size();o++){
598                                 if(o>0) ret += ", ";
599                                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
600                                         ret += "&";
601                                 ret += generate_se_code(operands[o], schema);
602                         }
603                         ret += ")";
604                 }
605                 return(ret);
606         default:
607                 fprintf(stderr,"INTERNAL ERROR in generate_se_code (lfta), line %d, character %d: unknown operator type %d\n",
608                                 se->get_lineno(), se->get_charno(),se->get_operator_type());
609                 return("ERROR in generate_se_code");
610         }
611 }
612
613 //              generate code that refers only to aggregate data and constants.
614 static string generate_se_code_fm_aggr(scalarexp_t *se, string var, table_list *schema){
615
616         string ret;
617     data_type *ldt, *rdt;
618         int o;
619         vector<scalarexp_t *> operands;
620
621
622         switch(se->get_operator_type()){
623         case SE_LITERAL:
624                 if(se->is_handle_ref()){
625                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
626                         ret = tmpstr;
627                         return(ret);
628                 }
629                 if(se->get_literal()->is_cpx_lit()){
630                         sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
631                         ret = tmpstr;
632                         return(ret);
633                 }
634                 return(se->get_literal()->to_C_code("")); // not complex no constructor
635         case SE_PARAM:
636                 if(se->is_handle_ref()){
637                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
638                         ret = tmpstr;
639                         return(ret);
640                 }
641                 ret += "t->param_";
642                 ret += se->get_param_name();
643                 return(ret);
644         case SE_UNARY_OP:
645         ldt = se->get_left_se()->get_data_type();
646         if(ldt->complex_operator(se->get_op()) ){
647                         ret +=  ldt->get_complex_operator(se->get_op());
648                         ret += "(";
649                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
650             ret += ")";
651                 }else{
652                         ret += "(";
653                         ret += se->get_op();
654                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
655                         ret += ")";
656                 }
657                 return(ret);
658         case SE_BINARY_OP:
659         ldt = se->get_left_se()->get_data_type();
660         rdt = se->get_right_se()->get_data_type();
661
662         if(ldt->complex_operator(rdt, se->get_op()) ){
663                         ret +=  ldt->get_complex_operator(rdt, se->get_op());
664                         ret += "(";
665                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
666                         ret += ", ";
667                         ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
668                         ret += ")";
669                 }else{
670                         ret += "(";
671                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
672                         ret += se->get_op();
673                         ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
674                         ret += ")";
675                 }
676                 return(ret);
677         case SE_COLREF:
678                 if(se->is_gb()){                // OK to ref gb attrs, but they're not yet
679                                                         // unpacked ... so return the defining code.
680                         sprintf(tmpstr,"%sgb_var%d",var.c_str(),se->get_gb_ref());
681                         ret = tmpstr;
682
683                 }else{
684                 fprintf(stderr,"ERROR reference to non-GB column ref not permitted here,"
685                                 "error in generate_se_code_fm_aggr, line %d, character %d.\n",
686                                 se->get_lineno(), se->get_charno());
687                 ret = tmpstr;
688                 }
689                 return(ret);
690         case SE_AGGR_STAR:
691         case SE_AGGR_SE:
692                 sprintf(tmpstr,"%saggr_var%d",var.c_str(),se->get_aggr_ref());
693                 ret = tmpstr;
694                 return(ret);
695         case SE_FUNC:
696 //                              Is it a UDAF?
697                 if(se->get_aggr_ref() >= 0){
698                         sprintf(tmpstr,"udaf_ret%d",se->get_aggr_ref());
699                         ret = tmpstr;
700                         return(ret);
701                 }
702
703                 if(se->is_partial()){
704                         sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
705                         ret = tmpstr;
706                 }else{
707                         ret += se->op + "(";
708                         operands = se->get_operands();
709                         for(o=0;o<operands.size();o++){
710                                 if(o>0) ret += ", ";
711                                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
712                                         ret += "&";
713                                 ret += generate_se_code_fm_aggr(operands[o], var, schema);
714                         }
715                         ret += ")";
716                 }
717                 return(ret);
718         default:
719                 fprintf(stderr,"INTERNAL ERROR in generate_lfta_code.cc::generate_se_code_fm_aggr, line %d, character %d: unknown operator type %d\n",
720                                 se->get_lineno(), se->get_charno(),se->get_operator_type());
721                 return("ERROR in generate_se_code");
722         }
723
724 }
725
726
727 static string unpack_partial_fcn_fm_aggr(scalarexp_t *se, int pfn_id, string var, table_list *schema){
728         string ret;
729         int o;
730         vector<scalarexp_t *> operands;
731
732
733         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
734                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn_fm_aggr. line %d, character %d\n",
735                                 se->get_lineno(), se->get_charno());
736                 return("ERROR in generate_se_code");
737         }
738
739         ret = "\tretval = " + se->get_op() + "( ";
740         sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
741         ret += tmpstr;
742
743         operands = se->get_operands();
744         for(o=0;o<operands.size();o++){
745                 ret += ", ";
746                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
747                         ret += "&";
748                 ret += generate_se_code_fm_aggr(operands[o], var, schema);
749         }
750         ret += ");\n";
751
752         return(ret);
753 }
754
755 static string generate_cached_fcn(scalarexp_t *se, table_list *schema){
756         string ret;
757         int o;
758         vector<scalarexp_t *> operands;
759
760         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
761                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to generate_cached_fcn. line %d, character %d\n",
762                                 se->get_lineno(), se->get_charno());
763                 return("ERROR in generate_se_code");
764         }
765
766         ret = se->get_op() + "( ";
767
768         operands = se->get_operands();
769         for(o=0;o<operands.size();o++){
770                 if(o) ret += ", ";
771                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
772                         ret += "&";
773                 ret += generate_se_code(operands[o], schema);
774         }
775         ret += ");\n";
776
777         return(ret);
778 }
779
780
781
782 static string unpack_partial_fcn(scalarexp_t *se, int pfn_id, table_list *schema){
783         string ret;
784         int o;
785         vector<scalarexp_t *> operands;
786
787
788         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
789                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn. line %d, character %d\n",
790                                 se->get_lineno(), se->get_charno());
791                 return("ERROR in generate_se_code");
792         }
793
794         ret = "\tretval = " + se->get_op() + "( ",
795         sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
796         ret += tmpstr;
797
798         operands = se->get_operands();
799         for(o=0;o<operands.size();o++){
800                 ret += ", ";
801                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
802                         ret += "&";
803                 ret += generate_se_code(operands[o], schema);
804         }
805         ret += ");\n";
806
807         return(ret);
808 }
809
810
811
812
813
814 static string generate_C_comparison_op(string op){
815   if(op == "=") return("==");
816   if(op == "<>") return("!=");
817   return(op);
818 }
819
820 static string generate_C_boolean_op(string op){
821         if( (op == "AND") || (op == "And") || (op == "and") ){
822                 return("&&");
823         }
824         if( (op == "OR") || (op == "Or") || (op == "or") ){
825                 return("||");
826         }
827         if( (op == "NOT") || (op == "Not") || (op == "not") ){
828                 return("!");
829         }
830
831         fprintf(stderr,"INTERNAL ERROR: unknown boolean operator %s\n",op.c_str());
832         return("ERROR UNKNOWN BOOLEAN OPERATOR :"+op);
833 }
834
835
836 static string generate_predicate_code(predicate_t *pr,table_list *schema){
837         string ret;
838         vector<literal_t *>  litv;
839         int i;
840     data_type *ldt, *rdt;
841         vector<scalarexp_t *> op_list;
842         int o,cref,ppos;
843         unsigned int bitmask;
844
845         switch(pr->get_operator_type()){
846         case PRED_IN:
847         ldt = pr->get_left_se()->get_data_type();
848
849                 ret += "( ";
850                 litv = pr->get_lit_vec();
851                 for(i=0;i<litv.size();i++){
852                         if(i>0) ret += " || ";
853                         ret += "( ";
854
855                 if(ldt->complex_comparison(ldt) ){
856                                 ret +=  ldt->get_comparison_fcn(ldt) ;
857                                 ret += "( ";
858                                 if(ldt->is_buffer_type() ) ret += "&";
859                                 ret += generate_se_code(pr->get_left_se(), schema);
860                                 ret += ", ";
861                                 if(ldt->is_buffer_type() ) ret += "&";
862                                 if(litv[i]->is_cpx_lit()){
863                                         sprintf(tmpstr,"t->complex_literal_%d",litv[i]->get_cpx_lit_ref() );
864                                         ret += tmpstr;
865                                 }else{
866                                         ret += litv[i]->to_C_code("");
867                                 }
868                                 ret += ") == 0";
869                         }else{
870                                 ret += generate_se_code(pr->get_left_se(), schema);
871                                 ret += " == ";
872                                 ret += litv[i]->to_C_code("");
873                         }
874
875                         ret += " )";
876                 }
877                 ret += " )";
878                 return(ret);
879
880         case PRED_COMPARE:
881         ldt = pr->get_left_se()->get_data_type();
882         rdt = pr->get_right_se()->get_data_type();
883
884                 ret += "( ";
885         if(ldt->complex_comparison(rdt) ){
886                         ret += ldt->get_comparison_fcn(rdt);
887                         ret += "(";
888                         if(ldt->is_buffer_type() ) ret += "&";
889                         ret += generate_se_code(pr->get_left_se(),schema);
890                         ret += ", ";
891                         if(rdt->is_buffer_type() ) ret += "&";
892                         ret += generate_se_code(pr->get_right_se(),schema);
893                         ret += ") ";
894                         ret +=  generate_C_comparison_op(pr->get_op());
895                         ret += "0";
896                 }else{
897                         ret += generate_se_code(pr->get_left_se(),schema);
898                         ret +=  generate_C_comparison_op(pr->get_op());
899                         ret += generate_se_code(pr->get_right_se(),schema);
900                 }
901                 ret += " )";
902                 return(ret);
903         case PRED_UNARY_OP:
904                 ret += "( ";
905                 ret +=  generate_C_boolean_op(pr->get_op());
906                 ret += generate_predicate_code(pr->get_left_pr(),schema);
907                 ret += " )";
908                 return(ret);
909         case PRED_BINARY_OP:
910                 ret += "( ";
911                 ret += generate_predicate_code(pr->get_left_pr(),schema);
912                 ret +=  generate_C_boolean_op(pr->get_op());
913                 ret += generate_predicate_code(pr->get_right_pr(),schema);
914                 ret += " )";
915                 return(ret);
916         case PRED_FUNC:
917                 op_list = pr->get_op_list();
918                 cref = pr->get_combinable_ref();
919                 if(cref >= 0){  // predicate is a combinable pred reference
920                         //              Trust, but verify
921                         if(pred_class.size() >= cref && pred_class[cref] >= 0){
922                                 ppos = pred_pos[cref];
923                                 bitmask = 1 << ppos % 32;
924                                 sprintf(tmpstr,"(pref_common_pred_val_%d_%d & %u)",pred_class[cref],ppos/32,bitmask);
925                                 ret = tmpstr;
926                                 return ret;
927                         }
928                 }
929
930                 ret =  pr->get_op() + "(";
931                 if (pr->is_sampling_fcn) {
932                         ret += "t->sampling_rate";
933                         if (!op_list.empty())
934                                 ret += ", ";
935                 }
936                 for(o=0;o<op_list.size();++o){
937                         if(o>0) ret += ", ";
938                         if(op_list[o]->get_data_type()->is_buffer_type() && (! (op_list[o]->is_handle_ref()) ) )
939                                         ret += "&";
940                         ret += generate_se_code(op_list[o],schema);
941                 }
942                 ret += " )";
943                 return(ret);
944         default:
945                 fprintf(stderr,"INTERNAL ERROR in generate_predicate_code, line %d, character %d, unknown predicate operator type %d\n",
946                         pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
947                 return("ERROR in generate_predicate_code");
948         }
949 }
950
951
952 static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *dt){
953         string ret;
954
955     if(dt->complex_comparison(dt) ){
956                 ret += dt->get_comparison_fcn(dt);
957                 ret += "(";
958                         if(dt->is_buffer_type() ) ret += "&";
959                 ret += lhs_op;
960                 ret += ", ";
961                         if(dt->is_buffer_type() ) ret += "&";
962                 ret += rhs_op;
963                 ret += ") == 0";
964         }else{
965                 ret += lhs_op;
966                 ret += " == ";
967                 ret += rhs_op;
968         }
969
970         return(ret);
971 }
972
973 static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
974         string ret;
975
976     if(dt->complex_comparison(dt) ){
977                 ret += dt->get_comparison_fcn(dt);
978                 ret += "(";
979                         if(dt->is_buffer_type() ) ret += "&";
980                 ret += lhs_op;
981                 ret += ", ";
982                         if(dt->is_buffer_type() ) ret += "&";
983                 ret += rhs_op;
984                 ret += ") == 0";
985         }else{
986                 ret += lhs_op;
987                 ret += " == ";
988                 ret += rhs_op;
989         }
990
991         return(ret);
992 }
993
994 //              Here I assume that only MIN and MAX aggregates can be computed
995 //              over BUFFER data types.
996
997 static string generate_aggr_update(string var, aggregate_table *atbl,int aidx, table_list *schema){
998         string retval = "\t\t";
999         string op = atbl->get_op(aidx);
1000
1001 //              Is it a UDAF
1002         if(! atbl->is_builtin(aidx)) {
1003                 int o;
1004                 retval += op+"_LFTA_AGGR_UPDATE_(";
1005                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1006                 retval+="("+var+")";
1007                 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1008                 for(o=0;o<opl.size();++o){
1009                         retval += ",";
1010                         if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1011                                         retval.append("&");
1012                         retval += generate_se_code(opl[o], schema);
1013                 }
1014                 retval += ");\n";
1015
1016                 return retval;
1017         }
1018
1019 //              Built-in aggregate processing.
1020
1021         data_type *dt = atbl->get_data_type(aidx);
1022
1023         if(op == "COUNT"){
1024                 retval.append(var);
1025                 retval.append("++;\n");
1026                 return(retval);
1027         }
1028         if(op == "SUM"){
1029                 retval.append(var);
1030                 retval.append(" += ");
1031                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1032                 retval.append(";\n");
1033                 return(retval);
1034         }
1035         if(op == "MIN"){
1036                 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1037                 retval.append(tmpstr);
1038                 if(dt->complex_comparison(dt)){
1039                         if(dt->is_buffer_type())
1040                           sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1041                         else
1042                           sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1043                 }else{
1044                         sprintf(tmpstr,"\t\tif(aggr_tmp_%d < %s)\n",aidx,var.c_str());
1045                 }
1046                 retval.append(tmpstr);
1047                 if(dt->is_buffer_type()){
1048                         sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1049                 }else{
1050                         sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1051                 }
1052                 retval.append(tmpstr);
1053
1054                 return(retval);
1055         }
1056         if(op == "MAX"){
1057                 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1058                 retval.append(tmpstr);
1059                 if(dt->complex_comparison(dt)){
1060                         if(dt->is_buffer_type())
1061                          sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1062                         else
1063                          sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1064                 }else{
1065                         sprintf(tmpstr,"\t\tif(aggr_tmp_%d > %s)\n",aidx,var.c_str());
1066                 }
1067                 retval.append(tmpstr);
1068                 if(dt->is_buffer_type()){
1069                         sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1070                 }else{
1071                         sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1072                 }
1073                 retval.append(tmpstr);
1074
1075                 return(retval);
1076
1077         }
1078         if(op == "AND_AGGR"){
1079                 retval.append(var);
1080                 retval.append(" &= ");
1081                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1082                 retval.append(";\n");
1083                 return(retval);
1084         }
1085         if(op == "OR_AGGR"){
1086                 retval.append(var);
1087                 retval.append(" |= ");
1088                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1089                 retval.append(";\n");
1090                 return(retval);
1091         }
1092         if(op == "XOR_AGGR"){
1093                 retval.append(var);
1094                 retval.append(" ^= ");
1095                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1096                 retval.append(";\n");
1097                 return(retval);
1098         }
1099         fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_update.\n",op.c_str());
1100         return("ERROR: aggregate not recognized: "+op);
1101
1102 }
1103
1104
1105
1106 static string generate_aggr_init(string var, aggregate_table *atbl,int aidx, table_list *schema){
1107         string retval;
1108         string op = atbl->get_op(aidx);
1109
1110 //              Is it a UDAF
1111         if(! atbl->is_builtin(aidx)) {
1112                 int o;
1113                 retval += "\t\t"+op+"_LFTA_AGGR_INIT_(";
1114                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1115                 retval+="("+var+"));\n";
1116 //                      Add 1st tupl
1117                 retval += "\t"+atbl->get_op(aidx)+"_LFTA_AGGR_UPDATE_(";
1118                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1119                 retval+="("+var+")";
1120                 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1121                 for(o=0;o<opl.size();++o){
1122                         retval += ",";
1123                         if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1124                                         retval.append("&");
1125                         retval += generate_se_code(opl[o],schema);
1126                 }
1127                 retval += ");\n";
1128                 return(retval);
1129         }
1130
1131 //              Built-in aggregate processing.
1132
1133
1134         data_type *dt = atbl->get_data_type(aidx);
1135
1136         if(op == "COUNT"){
1137                 retval = "\t\t"+var;
1138                 retval.append(" = 1;\n");
1139                 return(retval);
1140         }
1141
1142         if(op == "SUM" || op == "MIN" || op == "MAX" || op == "AND_AGGR" ||
1143                                                                         op == "OR_AGGR" || op == "XOR_AGGR"){
1144                 if(dt->is_buffer_type()){
1145                         sprintf(tmpstr,"\t\taggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1146                         retval.append(tmpstr);
1147                         sprintf(tmpstr,"\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_assign_copy().c_str(),var.c_str(),aidx);
1148                         retval.append(tmpstr);
1149                 }else{
1150                         retval = "\t\t"+var;
1151                         retval += " = ";
1152                         retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema));
1153                         retval.append(";\n");
1154                 }
1155                 return(retval);
1156         }
1157
1158         fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_init.\n",op.c_str());
1159         return("ERROR: aggregate not recognized: "+op);
1160 }
1161
1162
1163 ////////////////////////////////////////////////////////////
1164
1165
1166 string generate_preamble(table_list *schema, //map<string,string> &int_fcn_defs,
1167         std::string &node_name, std::string &schema_embed_str){
1168 //                      Include these only once, not once per lfta
1169 //      string ret = "#include \"rts.h\"\n";
1170 //      ret +=  "#include \"fta.h\"\n\n");
1171
1172         string ret = "#ifndef LFTA_IN_NIC\n";
1173         ret += "char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n";
1174         ret += "#include<stdio.h>\n";
1175         ret += "#include <limits.h>\n";
1176         ret += "#include <float.h>\n";
1177         ret += "#include \"rdtsc.h\"\n";
1178         ret += "#endif\n";
1179
1180
1181
1182         return(ret);
1183 }
1184
1185
1186 string generate_tuple_from_aggr(string node_name, table_list *schema, string idx){
1187         int a,p,s;
1188 //                       need to create and output the tuple.
1189         string ret = "/*\t\tCreate an output tuple for the aggregate being kicked out \t*/\n";
1190 //                      Check for any UDAFs with LFTA_BAILOUT
1191         ret += "\tlfta_bailout = 0;\n";
1192         for(a=0;a<aggr_tbl->size();a++){
1193                 if(aggr_tbl->has_bailout(a)){
1194                         ret += "\tlfta_bailout+="+aggr_tbl->get_op(a)+"_LFTA_AGGR_BAILOUT_(";
1195                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1196                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1197                 }
1198         }
1199         ret += "\tif(! lfta_bailout){\n";
1200
1201 //                      First, compute the size of the tuple.
1202
1203 //                      Unpack UDAF return values
1204         for(a=0;a<aggr_tbl->size();a++){
1205                 if(! aggr_tbl->is_builtin(a)){
1206                         ret += "\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_OUTPUT_(&(udaf_ret"+int_to_string(a)+"),";
1207                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1208                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1209
1210                 }
1211         }
1212
1213
1214 //                      Unpack partial fcns ref'd by the select clause.
1215   if(sl_fcns_start != sl_fcns_end){
1216             ret += "\t\tunpack_failed = 0;\n";
1217     for(p=sl_fcns_start;p<sl_fcns_end;p++){
1218           if(is_partial_fcn[p]){
1219                 ret += "\t" + unpack_partial_fcn_fm_aggr(partial_fcns[p], p,
1220                          "t->aggr_table["+idx+"].",schema);
1221                 ret += "\t\tif(retval) unpack_failed = 1;\n";
1222           }
1223     }
1224                                                                 // BEGIN don't allocate tuple if
1225         ret += "\t\tif( unpack_failed == 0 ){\n"; // unpack failed.
1226   }
1227
1228 //                      Unpack any BUFFER type selections into temporaries
1229 //                      so that I can compute their size and not have
1230 //                      to recompute their value during tuple packing.
1231 //                      I can use regular assignment here because
1232 //                      these temporaries are non-persistent.
1233
1234           for(s=0;s<sl_list.size();s++){
1235                 data_type *sdt = sl_list[s]->get_data_type();
1236                 if(sdt->is_buffer_type()){
1237                         sprintf(tmpstr,"\t\t\tselvar_%d = ",s);
1238                         ret += tmpstr;
1239                         ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1240                         ret += ";\n";
1241                 }
1242           }
1243
1244
1245 //              The size of the tuple is the size of the tuple struct plus the
1246 //              size of the buffers to be copied in.
1247
1248           ret += "\t\t\ttuple_size = sizeof( struct ";
1249           ret +=  generate_tuple_name(node_name);
1250           ret += ")";
1251           for(s=0;s<sl_list.size();s++){
1252                 data_type *sdt = sl_list[s]->get_data_type();
1253                 if(sdt->is_buffer_type()){
1254                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
1255                         ret += tmpstr;
1256                 }
1257           }
1258           ret += ";\n";
1259
1260
1261           ret += "\t\t\ttuple = allocate_tuple(f, tuple_size );\n";
1262           ret += "\t\t\tif( tuple != NULL){\n";
1263
1264
1265 //                      Test passed, make assignments to the tuple.
1266
1267           ret += "\t\t\t\ttuple_pos = sizeof( struct ";
1268           ret +=  generate_tuple_name(node_name) ;
1269           ret += ");\n";
1270
1271 //                      Mark tuple as REGULAR_TUPLE
1272           ret += "\n\t\t\t\ttuple->tuple_type = REGULAR_TUPLE;\n";
1273
1274           for(s=0;s<sl_list.size();s++){
1275                 data_type *sdt = sl_list[s]->get_data_type();
1276                 if(sdt->is_buffer_type()){
1277                         sprintf(tmpstr,"\t\t\t\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
1278                         ret += tmpstr;
1279                         sprintf(tmpstr,"\t\t\t\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
1280                         ret += tmpstr;
1281                 }else{
1282                         sprintf(tmpstr,"\t\t\t\ttuple->tuple_var%d = ",s);
1283                         ret += tmpstr;
1284 //                      if(sdt->needs_hn_translation())
1285 //                              ret += sdt->hton_translation() +"( ";
1286                         ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1287 //                      if(sdt->needs_hn_translation())
1288 //                              ret += ") ";
1289                         ret += ";\n";
1290                 }
1291           }
1292
1293 //              Generate output.
1294           ret += "\t\t\t\tpost_tuple(tuple);\n";
1295           ret += "\t\t\t\t#ifdef LFTA_STATS\n";
1296           ret+="\t\t\t\tt->out_tuple_cnt++;\n";
1297           ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
1298           ret += "\t\t\t\t#endif\n\n";
1299           ret += "\t\t\t}\n";
1300
1301           if(sl_fcns_start != sl_fcns_end)      // END don't allocate tuple if
1302                 ret += "\t\t}\n";                               // unpack failed.
1303           ret += "\t}\n";
1304
1305 //                      Need to release memory held by BUFFER types.
1306                 int g;
1307
1308           for(g=0;g<gb_tbl->size();g++){
1309                 data_type *gdt = gb_tbl->get_data_type(g);
1310                 if(gdt->is_buffer_type()){
1311                         sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].gb_var%d));\n",gdt->get_buffer_destroy().c_str(),idx.c_str(),g);
1312                         ret += tmpstr;
1313                 }
1314           }
1315           for(a=0;a<aggr_tbl->size();a++){
1316                 if(aggr_tbl->is_builtin(a)){
1317                         data_type *adt = aggr_tbl->get_data_type(a);
1318                         if(adt->is_buffer_type()){
1319                                 sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].aggr_var%d));\n",adt->get_buffer_destroy().c_str(),idx.c_str(),a);
1320                                 ret += tmpstr;
1321                         }
1322                 }else{
1323                         ret += "\t\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_DESTROY_(";
1324                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1325                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1326                 }
1327           }
1328
1329         ret += "\t\tt->n_aggrs--;\n";
1330
1331         return(ret);
1332
1333 }
1334
1335 string generate_gb_match_test(string idx){
1336         int g;
1337         string ret="\tif (IS_FILLED(t->aggr_table_bitmap, "+idx+") &&  IS_NEW(t->aggr_table_bitmap,"+idx+")";
1338         if(gb_tbl->size()>0){
1339                 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
1340                 ret+="\t\t";
1341
1342 //                      Next, scan list for a match on the group-by attributes.
1343           string rhs_op, lhs_op;
1344           for(g=0;g<gb_tbl->size();g++){
1345                   ret += " && ";
1346                   ret += "(";
1347                   sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
1348                   sprintf(tmpstr,"t->aggr_table[%s].gb_var%d",idx.c_str(),g); rhs_op = tmpstr;
1349                   ret += generate_equality_test(lhs_op, rhs_op, gb_tbl->get_data_type(g) );
1350                   ret += ")";
1351           }
1352          }
1353
1354           ret += "){\n";
1355
1356         return ret;
1357 }
1358
1359 string generate_gb_update(string node_name, table_list *schema, string idx, bool has_udaf){
1360         int g;
1361         string ret;
1362
1363           ret += "/*\t\tMatch found : update in place.\t*/\n";
1364           int a;
1365           has_udaf = false;
1366           for(a=0;a<aggr_tbl->size();a++){
1367                   sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1368                   ret += generate_aggr_update(tmpstr,aggr_tbl,a, schema);
1369                   if(! aggr_tbl->is_builtin(a)) has_udaf = true;
1370           }
1371
1372 //                      garbage collect copied buffer type gb attrs.
1373   for(g=0;g<gb_tbl->size();g++){
1374           data_type *gdt = gb_tbl->get_data_type(g);
1375           if(gdt->is_buffer_type()){
1376                 sprintf(tmpstr,"\t\t\t%s(&(gb_attr_%d));\n",gdt->get_buffer_destroy().c_str(),g);
1377                 ret+=tmpstr;
1378           }
1379         }
1380
1381
1382
1383           bool first_udaf = true;
1384           if(has_udaf){
1385                 ret += "\t\tif(";
1386                 for(a=0;a<aggr_tbl->size();a++){
1387                         if(! aggr_tbl->is_builtin(a)){
1388                                 if(! first_udaf)ret += " || ";
1389                                 else first_udaf = false;
1390                                 ret += aggr_tbl->get_op(a)+"_LFTA_AGGR_FLUSHME_(";
1391                                 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1392                                 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"))";
1393                         }
1394                 }
1395                 ret+="){\n";
1396                 ret+=" fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
1397                 ret += generate_tuple_from_aggr(node_name,schema,idx);
1398                 ret += "\t\tt->aggr_table_hashmap["+idx+"] &= ~SLOT_FILLED;\n";
1399                 ret+="\t\t}\n";
1400         }
1401         return ret;
1402 }
1403
1404
1405 string generate_init_group( table_list *schema, string idx){
1406           int g,a;
1407           string ret="\t\t\tt->aggr_table_hashmap["+idx+"] = hash2 | SLOT_FILLED | gen_val;\n";
1408 //                      Fill up the aggregate block.
1409           for(g=0;g<gb_tbl->size();g++){
1410                   sprintf(tmpstr,"\t\t\tt->aggr_table[%s].gb_var%d = gb_attr_%d;\n",idx.c_str(),g,g);
1411                   ret += tmpstr;
1412           }
1413           for(a=0;a<aggr_tbl->size();a++){
1414                   sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1415                   ret += generate_aggr_init(tmpstr, aggr_tbl,a,  schema);
1416           }
1417           ret+="\t\tt->n_aggrs++;\n";
1418         return ret;
1419 }
1420
1421
1422 string generate_fta_flush(string node_name, table_list *schema,
1423                 ext_fcn_list *Ext_fcns){
1424
1425    string ret;
1426    string select_var_defs ;
1427         int s, p;
1428
1429 //              Flush from previous epoch
1430
1431         ret+="static void fta_aggr_flush_old_"+node_name+"(struct FTA *f, unsigned int nflush){\n";
1432
1433         ret += "\tgs_int32_t tuple_size, tuple_pos;\n";
1434     ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1435         ret += "\tint i, lfta_bailout;\n";
1436         ret += "\tunsigned int gen_val;\n";
1437
1438         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1439         ret += generate_fta_name(node_name)+" *) f;\n";
1440
1441         ret += "\n";
1442
1443
1444 //              Variables needed to store selected attributes of BUFFER type
1445 //              temporarily, in order to compute their size for storage
1446 //              in an output tuple.
1447
1448   select_var_defs = "";
1449   for(s=0;s<sl_list.size();s++){
1450         data_type *sdt = sl_list[s]->get_data_type();
1451         if(sdt->is_buffer_type()){
1452           sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
1453           select_var_defs.append(tmpstr);
1454         }
1455   }
1456   if(select_var_defs != ""){
1457         ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
1458     ret += select_var_defs;
1459   }
1460
1461
1462 //              Variables to store results of partial functions.
1463   if(sl_fcns_start != sl_fcns_end){
1464         ret += "/*\t\tVariables to store the results of partial functions.\t*/\n";
1465         for(p=sl_fcns_start;p<sl_fcns_end;p++){
1466                 sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
1467                         partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
1468                 ret += tmpstr;
1469         }
1470         ret += "\tgs_retval_t retval = 0;\n\tint unpack_failed = 0;\n;";
1471   }
1472
1473 //              Variables for udaf output temporaries
1474         bool no_udaf = true;
1475         int a;
1476         for(a=0;a<aggr_tbl->size();a++){
1477                 if(! aggr_tbl->is_builtin(a)){
1478                         if(no_udaf){
1479                                 ret+="/*\t\tUDAF output vars.\t*/\n";
1480                                 no_udaf = false;
1481                         }
1482                         int afcn_id = aggr_tbl->get_fcn_id(a);
1483                         data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
1484                         sprintf(tmpstr,"udaf_ret%d", a);
1485                         ret+="\t"+adt->make_cvar(tmpstr)+";\n";
1486                 }
1487         }
1488
1489
1490 //  ret+="\tt->flush_finished=1; /* flush will be completed */\n";
1491   ret+="\n";
1492   ret+="\tgen_val = t->generation & SLOT_GEN_BITS;\n";
1493   ret+="\tfor (i=t->flush_pos; (i < t->max_aggrs) && t->n_aggrs && nflush>0; ++i){\n";
1494   ret+="\t\tif ( (t->aggr_table_hashmap[i] & SLOT_FILLED) && (((t->aggr_table_hashmap[i] & SLOT_GEN_BITS) != gen_val ) || (";
1495                 bool first_g=true;
1496                 int g;
1497                 for(g=0;g<gb_tbl->size();g++){
1498                   data_type *gdt = gb_tbl->get_data_type(g);
1499                   if(gdt->is_temporal()){
1500                         if(first_g) first_g=false; else ret+=" || ";
1501                         ret += "t->last_gb_"+int_to_string(g)+" > t->aggr_table[i].gb_var"+int_to_string(g)+" ";
1502                   }
1503                 }
1504   ret += "))) {\n";
1505   ret+="\t\t\tt->aggr_table_hashmap[i] = 0;\n";
1506   ret+=
1507 "#ifdef LFTA_STATS\n"
1508 "\t\t\tt->eviction_cnt++;\n"
1509 "#endif\n"
1510 ;
1511
1512
1513   ret+=generate_tuple_from_aggr(node_name,schema,"i");
1514
1515 //  ret+="\t\t\tt->n_aggrs--;\n";  // done in generate_tuple_from_aggr
1516   ret+="\t\t\tnflush--;\n";
1517   ret+="\t\t}\n";
1518   ret+="\t}\n";
1519   ret+="\tt->flush_pos=i;\n";
1520   ret+="\tif(t->n_aggrs == 0) {\n";
1521   ret+="\t\tt->flush_pos = t->max_aggrs;\n";
1522   ret += "\t}\n\n";
1523
1524   ret+="\tif(t->flush_pos == t->max_aggrs) {\n";
1525
1526   for(int g=0;g<gb_tbl->size();g++){
1527         data_type *dt = gb_tbl->get_data_type(g);
1528         if(dt->is_temporal()){
1529                 data_type *gdt = gb_tbl->get_data_type(g);
1530                 if(!gdt->is_buffer_type()){
1531                         sprintf(tmpstr,"\t\tt->last_flushed_gb_%d = t->flush_start_gb_%d;\n",g,g);
1532                         ret += tmpstr;
1533                 }
1534         }
1535   }
1536   ret += "\t}\n}\n\n";
1537
1538   return(ret);
1539 }
1540
1541 // TODO Remove sprintf to perform string catenation
1542 string generate_fta_load_params(string node_name){
1543         int p;
1544         vector<string> param_names = param_tbl->get_param_names();
1545
1546         string ret = "static int load_params_"+node_name+"(struct "+generate_fta_name(node_name);
1547                 ret += " *t, int sz, void *value, int initial_call){\n";
1548     ret += "\tint pos=0;\n";
1549     ret += "\tint data_pos;\n";
1550
1551         for(p=0;p<param_names.size();p++){
1552                 data_type *dt = param_tbl->get_data_type(param_names[p]);
1553                 if(dt->is_buffer_type()){
1554                         sprintf(tmpstr,"\t%s tmp_var_%s;\n",dt->get_cvar_type().c_str(), param_names[p].c_str() );
1555                         ret += tmpstr;
1556                         sprintf(tmpstr,"\t%s access_var_%s;\n",dt->get_tuple_cvar_type().c_str(), param_names[p].c_str() );
1557                         ret += tmpstr;
1558                 }
1559         }
1560
1561
1562
1563         ret += "\n\tdata_pos = ";
1564         for(p=0;p<param_names.size();p++){
1565                 if(p>0) ret += " + ";
1566                 data_type *dt = param_tbl->get_data_type(param_names[p]);
1567                 ret += "sizeof( ";
1568                 ret +=  dt->get_tuple_cvar_type();
1569                 ret += " )";
1570         }
1571         ret += ";\n";
1572         ret += "\tif(data_pos > sz) return 1;\n\n";
1573
1574
1575         for(p=0;p<param_names.size();p++){
1576                 data_type *dt = param_tbl->get_data_type(param_names[p]);
1577                 if(dt->is_buffer_type()){
1578                         sprintf(tmpstr,"\taccess_var_%s =  *( (%s *)((char *)value+pos) );\n",param_names[p].c_str(), dt->get_tuple_cvar_type().c_str() );
1579                         ret += tmpstr;
1580                         switch( dt->get_type() ){
1581                         case v_str_t:
1582 //                              ret += "\ttmp_var_"+param_names[p]+".data = ntohl( tmp_var_"+param_names[p]+".data );\n";               // ntoh conversion
1583 //                              ret += "\ttmp_var_"+param_names[p]+".length = ntohl( tmp_var_"+param_names[p]+".length );\n";   // ntoh conversion
1584                                 sprintf(tmpstr,"\tif( (access_var_%s.offset) + access_var_%s.length > sz) return 1;\n",param_names[p].c_str(), param_names[p].c_str() );
1585                                 ret += tmpstr;
1586                                 sprintf(tmpstr,"\ttmp_var_%s.data = (gs_sp_t)(value) + access_var_%s.offset ;\n",param_names[p].c_str(), param_names[p].c_str() );
1587                                 ret += tmpstr;
1588                                 sprintf(tmpstr,"\ttmp_var_%s.length =  access_var_%s.length ;\n",param_names[p].c_str(), param_names[p].c_str() );
1589                                 ret += tmpstr;
1590                         break;
1591                         default:
1592                                 fprintf(stderr,"ERROR: parameter %s is of type %s, a buffered type, but I don't know how to unpack it as a parameter.\n",param_names[p].c_str(), dt->get_type_str().c_str() );
1593                                 exit(1);
1594                         break;
1595                         }
1596 //                                      First, destroy the old
1597                         ret += "\tif(! initial_call)\n";
1598                         sprintf(tmpstr,"\t\t%s(&(t->param_%s));\n",dt->get_buffer_destroy().c_str(),param_names[p].c_str());
1599                         ret += tmpstr;
1600 //                                      Next, create the new.
1601                         sprintf(tmpstr,"\t%s((struct FTA *)t, &(t->param_%s), &tmp_var_%s);\n", dt->get_buffer_assign_copy().c_str(), param_names[p].c_str(), param_names[p].c_str() );
1602                         ret += tmpstr;
1603                 }else{
1604 //                      if(dt->needs_hn_translation()){
1605 //                              sprintf(tmpstr,"\tt->param_%s =  %s( *( (%s *)( (char *)value+pos) ) );\n",
1606 //                                param_names[p].c_str(), dt->ntoh_translation().c_str(), dt->get_cvar_type().c_str() );
1607 //                      }else{
1608                                 sprintf(tmpstr,"\tt->param_%s =  *( (%s *)( (char *)value+pos) );\n",
1609                                   param_names[p].c_str(), dt->get_cvar_type().c_str() );
1610 //                      }
1611                         ret += tmpstr;
1612                 }
1613                 sprintf(tmpstr,"\tpos += sizeof( %s );\n",dt->get_cvar_type().c_str() );
1614                 ret += tmpstr;
1615         }
1616
1617 //                      Register the pass-by-handle parameters
1618
1619         ret += "/* register and de-register the pass-by-handle parameters */\n";
1620
1621     int ph;
1622     for(ph=0;ph<param_handle_table.size();++ph){
1623                 data_type pdt(param_handle_table[ph]->type_name);
1624                 switch(param_handle_table[ph]->val_type){
1625                 case cplx_lit_e:
1626                         break;
1627                 case litval_e:
1628                         break;
1629                 case param_e:
1630                         ret += "\tif(! initial_call)\n";
1631                         sprintf(tmpstr, "\t\t%s(t->handle_param_%d);\n",
1632                                 param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1633                         ret += tmpstr;
1634                         sprintf(tmpstr,"\tt->handle_param_%d = %s((struct FTA *)t,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
1635                         ret += tmpstr;
1636
1637                         if(pdt.is_buffer_type()) ret += "&(";
1638                         ret += "t->param_"+param_handle_table[ph]->param_name;
1639                         if(pdt.is_buffer_type()) ret += ")";
1640                         ret += ");\n";
1641                         break;
1642                 default:
1643                         sprintf(tmpstr, "INTERNAL ERROR unknown case (%d) found when processing pass-by-handle parameter table.",param_handle_table[ph]->val_type);
1644                         fprintf(stderr,"%s\n",tmpstr);
1645                         ret+=tmpstr;
1646                 }
1647         }
1648
1649         ret+="\treturn 0;\n";
1650         ret += "}\n\n";
1651
1652         return(ret);
1653 }
1654
1655
1656
1657
1658 string generate_fta_free(string node_name, bool is_aggr_query){
1659
1660         string ret="static gs_retval_t free_fta_"+node_name+"(struct FTA *f, gs_uint32_t recursive){\n";
1661         ret+= "\tstruct "+generate_fta_name(node_name)+
1662                 " * t = (struct "+generate_fta_name(node_name)+" *) f;\n";
1663         ret += "\tint i;\n";
1664
1665         if(is_aggr_query){
1666                 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1667                 ret+="\t/* \t\tmark all groups as old */\n";
1668                 ret+="\tt->generation++;\n";
1669                 ret+="\tt->flush_pos = 0;\n";
1670                 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1671         }
1672
1673 //                      Deregister the pass-by-handle parameters
1674         ret += "/* de-register the pass-by-handle parameters */\n";
1675     int ph;
1676     for(ph=0;ph<param_handle_table.size();++ph){
1677                 sprintf(tmpstr, "\t%s(t->handle_param_%d);\n",
1678                         param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1679                 ret += tmpstr;
1680         }
1681
1682
1683         ret += "\treturn 0;\n}\n\n";
1684         return(ret);
1685 }
1686
1687
1688 string generate_fta_control(string node_name, table_list *schema, bool is_aggr_query){
1689         string ret="static gs_retval_t control_fta_"+node_name+"(struct FTA *f,  gs_int32_t command, gs_int32_t sz, void *value){\n";
1690         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1691                 ret += generate_fta_name(node_name)+" *) f;\n\n";
1692         ret+="\tint i;\n";
1693
1694
1695         ret += "\t/* temp status tuple */\n";
1696         ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1697         ret += "\tgs_int32_t tuple_size;\n";
1698
1699
1700         if(is_aggr_query){
1701                 ret+="\tif(command == FTA_COMMAND_FLUSH){\n";
1702
1703                 ret+="\t\tif (!t->n_aggrs) {\n";
1704                 ret+="\t\t\ttuple = allocate_tuple(f, 0);\n";
1705                 ret+="\t\t\tif( tuple != NULL)\n";
1706                 ret+="\t\t\t\tpost_tuple(tuple);\n";
1707
1708                 ret+="\t\t}else{\n";
1709
1710                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1711                 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1712                 ret +="\t\tt->generation++;\n";
1713                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1714                 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1715                 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1716                 ret+="\t\t\tt->flush_pos = 0;\n";
1717                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1718                 ret+="\t\t}\n";
1719
1720                 ret+="\t}\n";
1721         }
1722         if(param_tbl->size() > 0){
1723                 ret+=
1724 "\tif(command == FTA_COMMAND_LOAD_PARAMS){\n"
1725 "\t\tif(load_params_"+node_name+"(t, sz, value, 0))\n"
1726 "#ifndef LFTA_IN_NIC\n"
1727 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small, ignored.\\n\");\n"
1728 "#else\n"
1729 "\t\t{}\n"
1730 "#endif\n"
1731 "\t}\n";
1732         }
1733         ret+=
1734 "\tif(command == FTA_COMMAND_SET_SAMPLING_RATE){\n"
1735 "\t\tmemcpy(&t->sampling_rate, value, sizeof(gs_float_t));\n"
1736 "\t}\n\n";
1737
1738
1739         ret += "\tif(command == FTA_COMMAND_FILE_DONE ){\n";
1740
1741         if(is_aggr_query){
1742                 ret+="\t\tif (t->n_aggrs) {\n";
1743                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1744                 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1745                 ret +="\t\tt->generation++;\n";
1746                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1747                 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1748                 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1749                 ret+="\t\t\tt->flush_pos = 0;\n";
1750                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1751                 ret+="\t\t}\n";
1752         }
1753
1754         ret += "\t\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+");\n";
1755         ret += "\t\ttuple = allocate_tuple(f, tuple_size );\n";
1756         ret += "\t\tif( tuple == NULL)\n\t\treturn 1;\n";
1757
1758         /* mark tuple as EOF_TUPLE */
1759         ret += "\n\t\t/* Mark tuple as eof_tuple */\n";
1760         ret += "\t\ttuple->tuple_type = EOF_TUPLE;\n";
1761         ret += "\t\tpost_tuple(tuple);\n";
1762         ret += "\t}\n";
1763
1764         ret += "\treturn 0;\n}\n\n";
1765
1766         return(ret);
1767 }
1768
1769 string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query){
1770         string ret="static gs_retval_t clock_fta_"+node_name+"(struct FTA *f){\n";
1771         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1772                 ret += generate_fta_name(node_name)+" *) f;\n\n";
1773
1774         ret += "\t/* Create a temp status tuple */\n";
1775         ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1776         ret += "\tgs_int32_t tuple_size;\n";
1777         ret += "\tunsigned int i;\n";
1778         ret += "\ttime_t cur_time;\n";
1779         ret += "\tint time_advanced;\n";
1780         ret += "\tstruct fta_stat stats;\n";
1781
1782
1783
1784         /* copy the last seen values of temporal attributes */
1785         col_id_set temp_cids;           //      col ids of temp attributes in select clause
1786
1787
1788         /* HACK: in order to reuse the SE generation code, we need to copy
1789          * the last values of the temp attributes into new variables
1790          * which have names unpack_var_XXX_XXX
1791          */
1792
1793         int s, g;
1794         col_id_set::iterator csi;
1795
1796         for(s=0;s<sl_list.size();s++){
1797                 data_type *sdt = sl_list[s]->get_data_type();
1798                 if (sdt->is_temporal()) {
1799                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
1800                 }
1801         }
1802
1803         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
1804                 int tblref = (*csi).tblvar_ref;
1805                 int schref = (*csi).schema_ref;
1806                 string field = (*csi).field;
1807                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
1808                 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
1809                 ret += tmpstr;
1810         }
1811
1812         if (is_aggr_query) {
1813                 for(g=0;g<gb_tbl->size();g++){
1814                         data_type *gdt = gb_tbl->get_data_type(g);
1815                         if(gdt->is_temporal()){
1816                                 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
1817                                 ret += tmpstr;
1818                                 data_type *gdt = gb_tbl->get_data_type(g);
1819                                 if(gdt->is_buffer_type()){
1820                                 sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
1821                                 ret += tmpstr;
1822                                 }
1823                         }
1824                 }
1825         }
1826         ret += "\n";
1827
1828         ret += "\ttime_advanced = 0;\n";
1829
1830         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
1831                 int tblref = (*csi).tblvar_ref;
1832                 int schref = (*csi).schema_ref;
1833                 string field = (*csi).field;
1834                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
1835
1836                 // update last seen value with the value seen
1837                 ret += "\t#ifdef PREFILTER_DEFINED\n";
1838                 sprintf(tmpstr,"\tif (prefilter_temp_vars.unpack_var_%s_%d > t->last_%s_%d) {\n\t\tt->last_%s_%d = prefilter_temp_vars.unpack_var_%s_%d;\n",
1839                         field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref);
1840                 ret += tmpstr;
1841                 ret += "\t\ttime_advanced = 1;\n\t}\n";
1842                 ret += "\t#endif\n";
1843
1844                 // we need to pay special attention to time fields
1845                 if (field == "time" || field == "timestamp" || field == "timestamp_ms"){
1846                         ret += "\tcur_time = time(&cur_time);\n";
1847
1848                         if (field == "time") {
1849                                 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && (t->last_time_%d < (cur_time - %d))) {\n",
1850                                         tblref, time_corr);
1851                                 ret += tmpstr;
1852                                 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = cur_time - %d;\n",
1853                                 field.c_str(), tblref, field.c_str(), tblref, time_corr);
1854                         } else if (field == "timestamp_ms") {
1855                                 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((t->last_timestamp_ms_%d/1000) < (cur_time - %d))) {\n",
1856                                         tblref, time_corr);
1857                                 ret += tmpstr;
1858                                 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = (cur_time - %d) * 1000;\n",
1859                                 field.c_str(), tblref, field.c_str(), tblref, time_corr);
1860                         }else{
1861                                 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((gs_uint32_t)(t->last_%s_%d>>32) < (cur_time - %d))) {\n",
1862                                         field.c_str(), tblref, time_corr);
1863                                 ret += tmpstr;
1864                                 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = ((gs_uint64_t)(cur_time - %d))<<32;\n",
1865                                         field.c_str(), tblref, field.c_str(), tblref, time_corr);
1866                         }
1867                         ret += tmpstr;
1868
1869                         ret += "\t\ttime_advanced = 1;\n";
1870                         ret += "\t}\n";
1871
1872                         sprintf(tmpstr,"\telse\n\t\tunpack_var_%s_%d = t->last_%s_%d;\n",
1873                         field.c_str(), tblref, field.c_str(), tblref);
1874                         ret += tmpstr;
1875                 } else {
1876                         sprintf(tmpstr,"\tunpack_var_%s_%d = t->last_%s_%d;\n",
1877                                 field.c_str(), tblref, field.c_str(), tblref);
1878                         ret += tmpstr;
1879                 }
1880         }
1881
1882         // for aggregation lftas we need to check if the time was advanced beyond the current epoch
1883         if (is_aggr_query) {
1884
1885                 string change_test;
1886                 bool first_one = true;
1887                 for(g=0;g<gb_tbl->size();g++){
1888                   data_type *gdt = gb_tbl->get_data_type(g);
1889                   if(gdt->is_temporal()){
1890 //                                      To perform the test, first need to compute the value
1891 //                                      of the temporal gb attrs.
1892                           if(gdt->is_buffer_type()){
1893         //                              NOTE : if the SE defining the gb is anything
1894         //                              other than a ref to a variable, this will generate
1895         //                              illegal code.  To be resolved with Spatch.
1896                                 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
1897                                         g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
1898                                 ret+=tmpstr;
1899                                 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
1900                                         gdt->get_buffer_assign_copy().c_str(), g, g);
1901                           }else{
1902                                 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
1903                           }
1904                           ret += tmpstr;
1905
1906                           sprintf(tmpstr,"t->last_gb_%d",g);   string lhs_op = tmpstr;
1907                           sprintf(tmpstr,"gb_attr_%d",g);   string rhs_op = tmpstr;
1908                           if(first_one){first_one = false;} else {change_test.append(") && (");}
1909                           change_test.append(generate_equality_test(lhs_op, rhs_op, gdt));
1910                   }
1911                 }
1912
1913                 ret += "\n\tif( time_advanced && !( (";
1914                 ret += change_test;
1915                 ret += ") ) ){\n";
1916
1917                 ret += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
1918                 ret += "\t\tif(t->flush_pos<t->max_aggrs) \n";
1919                 ret += "\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
1920
1921                 ret += "\t\t/* \t\tmark all groups as old */\n";
1922                 ret +="\t\tt->generation++;\n";
1923                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1924                 ret += "//\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1925                 ret += "//\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1926                 ret += "\t\tt->flush_pos = 0;\n";
1927
1928                 for(g=0;g<gb_tbl->size();g++){
1929                      data_type *gdt = gb_tbl->get_data_type(g);
1930                      if(gdt->is_temporal()){
1931                         sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);                 ret += tmpstr;
1932                         sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);                        ret += tmpstr;
1933                      }
1934                 }
1935                 ret += "\t}\n\n";
1936
1937         }
1938
1939         ret += "\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t) + sizeof(struct fta_stat);\n";
1940         ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
1941         ret += "\tif( tuple == NULL)\n\t\treturn 1;\n";
1942
1943
1944         for(s=0;s<sl_list.size();s++){
1945                 data_type *sdt = sl_list[s]->get_data_type();
1946                 if(sdt->is_temporal()){
1947
1948                         if (sl_list[s]->is_gb()) {
1949                                 sprintf(tmpstr,"\tt->last_flushed_gb_%d = (t->n_aggrs) ? t->last_flushed_gb_%d : %s;\n",sl_list[s]->get_gb_ref(), sl_list[s]->get_gb_ref(), generate_se_code(sl_list[s],schema).c_str());
1950                                 ret += tmpstr;
1951                         }
1952
1953                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
1954                         ret += tmpstr;
1955 //                      if(sdt->needs_hn_translation())
1956 //                              ret += sdt->hton_translation() +"( ";
1957                         if (sl_list[s]->is_gb()) {
1958                                 sprintf(tmpstr, "t->last_flushed_gb_%d",sl_list[s]->get_gb_ref());
1959                                 ret += tmpstr;
1960                         } else{
1961                                 ret += generate_se_code(sl_list[s],schema);
1962                         }
1963 //                      if(sdt->needs_hn_translation())
1964 //                              ret += " )";
1965                         ret += ";\n";
1966                 }
1967         }
1968
1969         /* mark tuple as temporal */
1970         ret += "\n\t/* Mark tuple as temporal */\n";
1971         ret += "\ttuple->tuple_type = TEMPORAL_TUPLE;\n";
1972
1973         ret += "\n\t/* Copy trace id */\n";
1974         ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+"), &t->trace_id, sizeof(gs_uint64_t));\n\n";
1975
1976         ret += "\n\t/* Populate runtime stats */\n";
1977         ret += "\tstats.ftaid = f->ftaid;\n";
1978         ret += "\tstats.in_tuple_cnt = t->in_tuple_cnt;\n";
1979         ret += "\tstats.out_tuple_cnt = t->out_tuple_cnt;\n";
1980         ret += "\tstats.out_tuple_sz = t->out_tuple_sz;\n";
1981         ret += "\tstats.accepted_tuple_cnt = t->accepted_tuple_cnt;\n";
1982         ret += "\tstats.cycle_cnt = t->cycle_cnt;\n";
1983         ret += "\tstats.collision_cnt = t->collision_cnt;\n";
1984         ret += "\tstats.eviction_cnt = t->eviction_cnt;\n";
1985         ret += "\tstats.sampling_rate    = t->sampling_rate;\n";
1986
1987         ret += "\n#ifdef LFTA_PROFILE\n";
1988         ret += "\n\t/* Print stats */\n";
1989         ret += "\tfprintf(stderr, \"STATS " + node_name + " \");\n";
1990         ret += "\tfprintf(stderr, \"in_tuple_cnt= %u \", t->in_tuple_cnt);\n";
1991         ret += "\tfprintf(stderr, \"out_tuple_cnt= %u \", t->out_tuple_cnt);\n";
1992         ret += "\tfprintf(stderr, \"out_tuple_sz= %u \", t->out_tuple_sz);\n";
1993         ret += "\tfprintf(stderr, \"accepted_tuple_cnt= %u \", t->accepted_tuple_cnt);\n";
1994         ret += "\tfprintf(stderr, \"cycle_cnt= %llu \", t->cycle_cnt);\n";
1995         ret += "\tfprintf(stderr, \"cycle_per_in_tuple= %lf \", ((double)t->cycle_cnt)/((double)t->in_tuple_cnt));\n";
1996         ret += "\tfprintf(stderr, \"collision_cnt= %d\\n\", t->collision_cnt);\n\n";
1997         ret += "\tfprintf(stderr, \"eviction_cnt= %d\\n\", t->eviction_cnt);\n\n";
1998         ret += "\n#endif\n";
1999
2000
2001         ret += "\n\t/* Copy stats */\n";
2002         ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t), &stats, sizeof(fta_stat));\n\n";
2003         ret+="\tpost_tuple(tuple);\n";
2004
2005         ret += "\n\t/* Send a heartbeat message to clearinghouse */\n";
2006         ret += "\tfta_heartbeat(f->ftaid, t->trace_id++, 1, &stats);\n";
2007
2008         ret += "\n\t/* Reset runtime stats */\n";
2009         ret += "\tt->in_tuple_cnt = 0;\n";
2010         ret += "\tt->out_tuple_cnt = 0;\n";
2011         ret += "\tt->out_tuple_sz = 0;\n";
2012         ret += "\tt->accepted_tuple_cnt = 0;\n";
2013         ret += "\tt->cycle_cnt = 0;\n";
2014         ret += "\tt->collision_cnt = 0;\n";
2015         ret += "\tt->eviction_cnt = 0;\n";
2016
2017         ret += "\treturn 0;\n}\n\n";
2018
2019         return(ret);
2020 }
2021
2022
2023 //              accept processing before the where clause,
2024 //              do flush processwing.
2025 string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *schema,   col_id_set &unpacked_cids, string &temporal_flush){
2026         int s;
2027
2028 //              Slow flush
2029   string ret="\n/*\tslow flush\t*/\n";
2030   string slow_flush_str = fs->get_val_of_def("slow_flush");
2031   int n_slow_flush = atoi(slow_flush_str.c_str());
2032   if(n_slow_flush <= 0) n_slow_flush = 2;
2033   if(n_slow_flush > 1){
2034         ret += "\tt->flush_ctr++;\n";
2035         ret += "\tif(t->flush_ctr >= "+int_to_string(n_slow_flush)+"){\n";
2036         ret += "\t\tt->flush_ctr = 0;\n";
2037     ret+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n";
2038         ret += "\t}\n\n";
2039   }else{
2040     ret+="\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n\n";
2041   }
2042
2043
2044         string change_test;
2045         bool first_one = true;
2046         int g;
2047     col_id_set flush_cids;              //      col ids accessed when computing flush variables.
2048                                                         //  unpack them at temporal flush test time.
2049     temporal_flush = "";
2050
2051
2052         for(g=0;g<gb_tbl->size();g++){
2053                   data_type *gdt = gb_tbl->get_data_type(g);
2054                   if(gdt->is_temporal()){
2055                           gather_se_col_ids(gb_tbl->get_def(g), flush_cids, gb_tbl);
2056
2057 //                                      To perform the test, first need to compute the value
2058 //                                      of the temporal gb attrs.
2059                           if(gdt->is_buffer_type()){
2060         //                              NOTE : if the SE defining the gb is anything
2061         //                              other than a ref to a variable, this will generate
2062         //                              illegal code.  To be resolved with Spatch.
2063                                 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2064                                         g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2065                                 temporal_flush += tmpstr;
2066                                 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2067                                         gdt->get_buffer_assign_copy().c_str(), g, g);
2068                           }else{
2069                                 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2070                           }
2071                           temporal_flush += tmpstr;
2072 //                                      END computing the value of the temporal GB attr.
2073
2074
2075                           sprintf(tmpstr,"t->last_gb_%d",g);   string lhs_op = tmpstr;
2076                           sprintf(tmpstr,"gb_attr_%d",g);   string rhs_op = tmpstr;
2077                           if(first_one){first_one = false;} else {change_test.append(") && (");}
2078                           change_test += generate_equality_test(lhs_op, rhs_op, gdt);
2079                   }
2080         }
2081         if(!first_one){         // will be false iff. there is a temporal GB attribute
2082                   temporal_flush += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
2083                   temporal_flush += "\tif( !( (";
2084                   temporal_flush += change_test;
2085                   temporal_flush += ") ) ){\n";
2086
2087 //                temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2088                   temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs){ \n";
2089                   temporal_flush+="\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2090                   temporal_flush+="\t\t}\n";
2091                   temporal_flush+="\t\t/* \t\tmark all groups as old */\n";
2092                   temporal_flush+="\t\tt->generation++;\n";
2093                   temporal_flush+="\t\tt->flush_pos = 0;\n";
2094
2095
2096 //                              Now set the saved temporal value of the gb to the
2097 //                              current value of the gb.  Only for simple types,
2098 //                              not for buffer types -- but the strings are not
2099 //                              temporal in any case.
2100
2101                         for(g=0;g<gb_tbl->size();g++){
2102                           data_type *gdt = gb_tbl->get_data_type(g);
2103                           if(gdt->is_temporal()){
2104                                   if(gdt->is_buffer_type()){
2105
2106                                         fprintf(stderr,"ERROR : can't handle temporal buffer types, ignoring in buffer flush control.\n");
2107                                   }else{
2108                                         sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);
2109                                         temporal_flush += tmpstr;
2110                                         sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);
2111                                         temporal_flush += tmpstr;
2112                                   }
2113                                 }
2114                         }
2115                   temporal_flush += "\t}\n\n";
2116         }
2117
2118 // Unpack all the temporal attributes referenced in select clause
2119 // and update the last value of the attribute
2120         col_id_set temp_cids;           //      col ids of temp attributes in select clause
2121         col_id_set::iterator csi;
2122
2123         for(s=0;s<sl_list.size();s++){
2124                 data_type *sdt = sl_list[s]->get_data_type();
2125                 if (sdt->is_temporal()) {
2126                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
2127                 }
2128         }
2129
2130         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2131                 if(unpacked_cids.count((*csi)) == 0){
2132                         int tblref = (*csi).tblvar_ref;
2133                         int schref = (*csi).schema_ref;
2134                         string field = (*csi).field;
2135                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2136 /*
2137                         data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2138                         sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",
2139                                 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2140                         ret += tmpstr;
2141                         ret += "\tif(retval) return 1;\n";
2142 */
2143                         sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
2144                         ret += tmpstr;
2145
2146                         unpacked_cids.insert( (*csi) );
2147                 }
2148         }
2149
2150
2151 //                              Do the flush here if this is a real_time query
2152         string rt_level = fs->get_val_of_def("real_time");
2153         if(rt_level != "" && temporal_flush != ""){
2154                 for(csi=flush_cids.begin(); csi != flush_cids.end();++csi){
2155                         if(unpacked_cids.count((*csi)) == 0){
2156                         int tblref = (*csi).tblvar_ref;
2157                         int schref = (*csi).schema_ref;
2158                         string field = (*csi).field;
2159                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2160 /*
2161                                 sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",
2162                                 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2163                         ret += tmpstr;
2164                         ret += "\tif(retval) return 1;\n";
2165 */
2166                                 unpacked_cids.insert( (*csi) );
2167                         }
2168                 }
2169                 ret += temporal_flush;
2170         }
2171
2172         return ret;
2173  }
2174
2175 string generate_sel_accept_body(qp_node *fs, string node_name, table_list *schema){
2176
2177 int p,s;
2178 string ret;
2179
2180 ///////////////                 Processing for filter-only query
2181
2182 //                      test passed : create the tuple, then assign to it.
2183           ret += "/*\t\tCreate and post the tuple\t*/\n";
2184
2185 //                      Unpack partial fcns ref'd by the select clause.
2186 //                      Its a kind of a WHERE clause ...
2187   for(p=sl_fcns_start;p<sl_fcns_end;p++){
2188         if(fcn_ref_cnt[p] > 1){
2189                 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2190         }
2191         if(is_partial_fcn[p]){
2192                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2193                 ret += "\tif(retval) goto end;\n";
2194         }
2195         if(fcn_ref_cnt[p] > 1){
2196                 if(!is_partial_fcn[p]){
2197                         ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2198                 }
2199                 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2200                 ret += "\t}\n";
2201         }
2202   }
2203
2204   // increment the counter of accepted tuples
2205   ret += "\n\t#ifdef LFTA_STATS\n";
2206   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2207   ret += "\t#endif\n\n";
2208
2209 //                      First, compute the size of the tuple.
2210
2211 //                      Unpack any BUFFER type selections into temporaries
2212 //                      so that I can compute their size and not have
2213 //                      to recompute their value during tuple packing.
2214 //                      I can use regular assignment here because
2215 //                      these temporaries are non-persistent.
2216
2217           for(s=0;s<sl_list.size();s++){
2218                 data_type *sdt = sl_list[s]->get_data_type();
2219                 if(sdt->is_buffer_type()){
2220                         sprintf(tmpstr,"\tselvar_%d = ",s);
2221                         ret += tmpstr;
2222                         ret += generate_se_code(sl_list[s],schema);
2223                         ret += ";\n";
2224                 }
2225           }
2226
2227
2228 //              The size of the tuple is the size of the tuple struct plus the
2229 //              size of the buffers to be copied in.
2230
2231           ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
2232           for(s=0;s<sl_list.size();s++){
2233                 data_type *sdt = sl_list[s]->get_data_type();
2234                 if(sdt->is_buffer_type()){
2235                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
2236                         ret += tmpstr;
2237                 }
2238           }
2239           ret += ";\n";
2240
2241
2242           ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
2243           ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
2244
2245 //                      Test passed, make assignments to the tuple.
2246
2247           ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
2248
2249 //                      Mark tuple as REGULAR_TUPLE
2250           ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
2251
2252
2253           for(s=0;s<sl_list.size();s++){
2254                 data_type *sdt = sl_list[s]->get_data_type();
2255                 if(sdt->is_buffer_type()){
2256                         sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
2257                         ret += tmpstr;
2258                         sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
2259                         ret += tmpstr;
2260                 }else{
2261                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2262                         ret += tmpstr;
2263 //                      if(sdt->needs_hn_translation())
2264 //                              ret += sdt->hton_translation() +"( ";
2265                         ret += generate_se_code(sl_list[s],schema);
2266 //                      if(sdt->needs_hn_translation())
2267 //                              ret += ") ";
2268                         ret += ";\n";
2269                 }
2270           }
2271
2272 //              Generate output.
2273
2274           ret += "\tpost_tuple(tuple);\n";
2275
2276 //              Increment the counter of posted tuples
2277   ret += "\n\t#ifdef LFTA_STATS\n";
2278   ret += "\tt->out_tuple_cnt++;\n";
2279   ret+="\tt->out_tuple_sz+=tuple_size;\n";
2280   ret += "\t#endif\n\n";
2281
2282
2283
2284         return ret;
2285 }
2286
2287 string generate_fj_accept_body(filter_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
2288
2289 int p,s,w;
2290 string ret;
2291
2292 //                      Get parameters
2293         unsigned int window_len = fs->temporal_range;
2294         unsigned int n_bloom = 11;
2295         string n_bloom_str = fs->get_val_of_def("num_bloom");
2296         int tmp_n_bloom = atoi(n_bloom_str.c_str());
2297         if(tmp_n_bloom>0)
2298                 n_bloom = tmp_n_bloom+1;
2299         float bloom_width = (window_len+1.0)/(1.0*n_bloom-1);
2300         sprintf(tmpstr,"%f",bloom_width);
2301         string bloom_width_str = tmpstr;
2302
2303         if(window_len < n_bloom){
2304                 n_bloom = window_len+1;
2305                 bloom_width_str = "1";
2306         }
2307
2308
2309 //              Grab the current window time
2310         scalarexp_t winvar(fs->temporal_var);
2311         ret += "\tcurr_fj_ts = "+generate_se_code(&winvar,schema)+";\n";
2312
2313         int bf_exp_size = 12;  // base-2 log of number of bits
2314         string bloom_len_str = fs->get_val_of_def("bloom_size");
2315         int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
2316         if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
2317                 bf_exp_size = tmp_bf_exp_size;
2318         }
2319         int bf_bit_size = 1 << bf_exp_size;
2320         int bf_byte_size = bf_bit_size / (8*sizeof(char));
2321
2322         unsigned int ht_size = 4096;
2323         string ht_size_s = fs->get_val_of_def("aggregate_slots");
2324         int tmp_ht_size = atoi(ht_size_s.c_str());
2325         if(tmp_ht_size > 1024){
2326                 unsigned int hs = 1;            // make it power of 2
2327                 while(tmp_ht_size){
2328                         hs =hs << 1;
2329                         tmp_ht_size = tmp_ht_size >> 1;
2330                 }
2331                 ht_size = hs;
2332         }
2333
2334         int i, bf_mask = 0;
2335         if(fs->use_bloom){
2336                 for(i=0;i<bf_exp_size;i++)
2337                         bf_mask = (bf_mask << 1) | 1;
2338         }else{
2339                 for(i=ht_size;i>1;i=i>>1)
2340                         bf_mask = (bf_mask << 1) | 1;
2341         }
2342
2343 /*
2344 printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=%d, bf_byte_size=%d, ht_size=%d, ht_size_s=%s, bf_mask=%d\n",
2345         n_bloom,
2346         window_len,
2347         bloom_width_str.c_str(),
2348         bf_exp_size,
2349         bf_bit_size,
2350         bf_byte_size,
2351         ht_size,
2352         ht_size_s.c_str(),
2353         bf_mask);
2354 */
2355
2356
2357
2358
2359 //              If this is a bloom-filter fj, first test if the
2360 //              bloom filter needs to be advanced.
2361 //              SET_BF_EMPTY(table,number of bloom filters,bloom filter index,bit index)
2362 //              t->bf_size : number of bits in bloom filter
2363 //              TODO: vectorize?
2364 //              TODO: Don't iterate more than n_bloom times!
2365 //                      As written, its possible to wrap around many times.
2366         if(fs->use_bloom){
2367                 ret +=
2368 "//                     Clean out old bloom filters if needed.\n"
2369 "//                     TODO vectorize this ? \n"
2370 "       if(t->first_exec){\n"
2371 "               t->first_exec = 0;\n"
2372 "               t->last_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2373 "               t->last_bloom_pos = t->last_bin % "+int_to_string(n_bloom)+";\n"
2374 "       }else{\n"
2375 "               curr_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2376 "               if(curr_bin != t->last_bin){\n"
2377 "                       for(the_bin=t->last_bin+1;the_bin<=curr_bin;the_bin++){\n"
2378 "                               t->last_bloom_pos++;\n"
2379 "                               if(t->last_bloom_pos >= "+int_to_string(n_bloom)+")\n"
2380 "                                       t->last_bloom_pos = 0;\n"
2381 "                               tmp_i = t->last_bloom_pos;\n"
2382 "                               for(j=0;j<"+int_to_string(bf_bit_size)+";j++){\n"
2383 "                                       SET_BF_EMPTY(t->bf_table, "+int_to_string(n_bloom)+", tmp_i,j);\n"
2384 "                               }\n"
2385 "                       }\n"
2386 "               }\n"
2387 "               t->last_bin = curr_bin;\n"
2388 "       }\n"
2389 ;
2390         }
2391
2392
2393 //-----------------------------------------------------------------
2394 //              First, determine whether to do S (filter stream) processing.
2395
2396         ret +=
2397 "//             S (filtering stream) predicate, should it be processed?\n"
2398 "\n"
2399 ;
2400 // Sort S preds based on cost.
2401         vector<cnf_elem *> s_filt = fs->pred_t1;
2402         col_id_set::iterator csi;
2403   if(s_filt.size() > 0){
2404
2405 //                      Unpack fields ref'd in the S pred
2406         for(w=0;w<s_filt.size();++w){
2407                 col_id_set this_pred_cids;
2408                 gather_pr_col_ids(s_filt[w]->pr, this_pred_cids, gb_tbl);
2409                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2410                         if(unpacked_cids.count( (*csi) ) == 0){
2411                                 int tblref = (*csi).tblvar_ref;
2412                                 int schref = (*csi).schema_ref;
2413                                 string field = (*csi).field;
2414                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name,"end_s");
2415                                 unpacked_cids.insert( (*csi) );
2416                         }
2417                 }
2418         }
2419
2420
2421 //              Sort by evaluation cost.
2422 //              First, estimate evaluation costs
2423 //              Eliminate predicates covered by the prefilter (those in s_pids).
2424 //              I need to do it before the sort becuase the indices refer
2425 //              to the position in the unsorted list.
2426         vector<cnf_elem *> tmp_wh;
2427         for(w=0;w<s_filt.size();++w){
2428                 compute_cnf_cost(s_filt[w],Ext_fcns);
2429                 tmp_wh.push_back(s_filt[w]);
2430         }
2431         s_filt = tmp_wh;
2432
2433         sort(s_filt.begin(), s_filt.end(), compare_cnf_cost());
2434
2435 //              Now generate the predicates.
2436         for(w=0;w<s_filt.size();++w){
2437                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,s_filt[w]->cost);
2438                 ret += tmpstr;
2439
2440 //                      Find partial fcns ref'd in this cnf element
2441                 set<int> pfcn_refs;
2442                 collect_partial_fcns_pr(s_filt[w]->pr, pfcn_refs);
2443 //                      Since set<..> is a "Sorted Associative Container",
2444 //                      we can walk through it in sorted order by walking from
2445 //                      begin() to end().  (and the partial fcns must be
2446 //                      evaluated in this order).
2447                 set<int>::iterator si;
2448                 string pf_preds;
2449                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2450                         if(fcn_ref_cnt[(*si)] > 1){
2451                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2452                         }
2453                         if(is_partial_fcn[(*si)]){
2454                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2455                                 ret += "\t\tif(retval) goto end_s;\n";
2456                         }
2457                         if(fcn_ref_cnt[(*si)] > 1){
2458                                 if(!is_partial_fcn[(*si)]){
2459                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2460 //              Testing for S is a side branch.
2461 //              I don't want a cacheable partial function to be
2462 //              marked as evaluated.  Therefore I mark the function
2463 //              as evalauted ONLY IF it is not partial.
2464                                         ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2465                                 }
2466                                 ret += "\t}\n";
2467                         }
2468                 }
2469
2470                 ret += "\tif( !("+generate_predicate_code(s_filt[w]->pr,schema)+
2471                                 ") ) goto end_s;\n";
2472         }
2473   }else{
2474           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2475   }
2476
2477         for(p=0;p<fs->hash_eq.size();++p)
2478                 ret += "\t\ts_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_right_se(),schema)+";\n";
2479
2480         if(fs->use_bloom){
2481 //                      First, generate the S scalar expressions in the hash_eq
2482
2483 //                      Iterate over the bloom filters
2484                 for(i=0;i<3;i++){
2485                         ret += "\t\tbucket=0;\n";
2486                         for(p=0;p<fs->hash_eq.size();++p){
2487                                 ret +=
2488 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2489         fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2490         +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2491                         }
2492 //              SET_BF_BIT(table,number of bloom filters,bloom filter index,bit index)
2493                                 ret +=
2494 "               bucket &= "+int_to_string(bf_mask)+";\n"
2495 "               SET_BF_BIT(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket);\n"
2496 "\n"
2497 ;
2498                 }
2499         }else{
2500                 ret += "// Add the S record to the hash table, choose a position\n";
2501                 ret += "\t\tbucket=0;\n";
2502                 for(p=0;p<fs->hash_eq.size();++p){
2503                         ret +=
2504 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2505         fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2506         +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2507                 }
2508                 ret +=
2509 "               bucket &= "+int_to_string(bf_mask)+";\n"
2510 "               bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2511 ;
2512 //                      Try the first bucket
2513                 ret += "\t\tif(";
2514                 for(p=0;p<fs->hash_eq.size();++p){
2515                         if(p>0) ret += " && ";
2516 //                      ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2517 //                                      " == s_equijoin_"+int_to_string(p);
2518                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2519                         string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2520                         string rhs_op = "s_equijoin_"+int_to_string(p);
2521                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2522                 }
2523                 ret += "){\n\t\t\tthe_bucket = bucket;\n";
2524                 ret += "\t\t}else{\n\t\t\tif(";
2525                 for(p=0;p<fs->hash_eq.size();++p){
2526                         if(p>0) ret += " && ";
2527 //                      ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2528 //                                      " == s_equijoin_"+int_to_string(p);
2529                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2530                         string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
2531                         string rhs_op = "s_equijoin_"+int_to_string(p);
2532                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2533                 }
2534                 ret +=  "){\n\t\t\t\tthe_bucket = bucket1;\n";
2535                 ret += "\t\t\t}else{ \n\t\t\t\tif(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n";
2536                 ret+="\t\t\t\t\tthe_bucket = bucket;\n\t\t\t\telse\n\t\t\t\t\tthe_bucket=bucket1;\n";
2537                 ret += "\t\t\t}\n\t\t}\n";
2538                 for(p=0;p<fs->hash_eq.size();++p){
2539                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2540                         if(hdt->is_buffer_type()){
2541                                 sprintf(tmpstr,"\t\t%s(f, &(t->join_table[the_bucket].key_var%d), &s_equijoin_%d);\n", hdt->get_buffer_assign_copy().c_str(), p, p);
2542                                 ret += tmpstr;
2543                         }else{
2544                                 ret += "\t\tt->join_table[the_bucket].key_var"+int_to_string(p)+
2545                                         " = s_equijoin_"+int_to_string(p)+";\n";
2546                         }
2547                 }
2548                 ret+="\t\tt->join_table[the_bucket].ts =  curr_fj_ts;\n";
2549         }
2550   ret += "\tend_s:\n";
2551
2552 //      ------------------------------------------------------------
2553 //              Next, determine if the R record should be processed.
2554
2555
2556         ret +=
2557 "//             R (main stream) cheap predicate\n"
2558 "\n"
2559 ;
2560
2561 //              Unpack r_filt fields
2562         vector<cnf_elem *> r_filt = fs->pred_t0;
2563         for(w=0;w<r_filt.size();++w){
2564                 col_id_set this_pred_cids;
2565                 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
2566                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2567                         if(unpacked_cids.count( (*csi) ) == 0){
2568                                 int tblref = (*csi).tblvar_ref;
2569                                 int schref = (*csi).schema_ref;
2570                                 string field = (*csi).field;
2571                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2572                                 unpacked_cids.insert( (*csi) );
2573                         }
2574                 }
2575         }
2576
2577 // Sort R preds based on cost.
2578
2579         vector<cnf_elem *> tmp_wh;
2580         for(w=0;w<r_filt.size();++w){
2581                 compute_cnf_cost(r_filt[w],Ext_fcns);
2582                 tmp_wh.push_back(r_filt[w]);
2583         }
2584         r_filt = tmp_wh;
2585
2586         sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
2587
2588 //              WARNING! the constant 20 below is a wild-ass guess.
2589         int cheap_rpos;
2590         for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++)
2591
2592 //              Test the cheap filters on R.
2593   if(cheap_rpos >0){
2594
2595 //              Now generate the predicates.
2596         for(w=0;w<cheap_rpos;++w){
2597                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
2598                 ret += tmpstr;
2599
2600 //                      Find partial fcns ref'd in this cnf element
2601                 set<int> pfcn_refs;
2602                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2603 //                      Since set<..> is a "Sorted Associative Container",
2604 //                      we can walk through it in sorted order by walking from
2605 //                      begin() to end().  (and the partial fcns must be
2606 //                      evaluated in this order).
2607                 set<int>::iterator si;
2608                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2609                         if(fcn_ref_cnt[(*si)] > 1){
2610                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2611                         }
2612                         if(is_partial_fcn[(*si)]){
2613                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2614                                 ret += "\t\tif(retval) goto end;\n";
2615                         }
2616                         if(fcn_ref_cnt[(*si)] > 1){
2617                                 if(!is_partial_fcn[(*si)]){
2618                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2619                                 }
2620                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2621                                 ret += "\t}\n";
2622                         }
2623                 }
2624
2625                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2626                                 ") ) goto end;\n";
2627         }
2628   }else{
2629           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2630   }
2631
2632         ret += "\n//    Do the join\n\n";
2633         for(p=0;p<fs->hash_eq.size();++p)
2634                 ret += "\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n";
2635
2636
2637 //                      Passed the cheap pred, now test the join with S.
2638         if(fs->use_bloom){
2639                 for(i=0;i<3;i++){
2640                         ret += "\t\tbucket"+int_to_string(i)+"=0;\n";
2641                         for(p=0;p<fs->hash_eq.size();++p){
2642                                 ret +=
2643 "       bucket"+int_to_string(i)+
2644         " ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2645         fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
2646         +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2647                         }
2648                                 ret +=
2649 "       bucket"+int_to_string(i)+" &= "+int_to_string(bf_mask)+";\n";
2650                 }
2651                 ret += "\tfound = 0;\n";
2652                 ret += "\tfor(b=0;b<"+int_to_string(n_bloom)+" && !found; b++){\n";
2653                 ret +=
2654 "\t\tif(IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket0) && "
2655 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket1) && "
2656 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket2))\n "
2657 "\t\t\tfound=1;\n"
2658 "\t}\n"
2659 ;
2660                 ret +=
2661 "       if(!found)\n"
2662 "               goto end;\n"
2663 ;
2664         }else{
2665                 ret += "\tfound = 0;\n";
2666                 ret += "\t\tbucket=0;\n";
2667                 for(p=0;p<fs->hash_eq.size();++p){
2668                         ret +=
2669 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2670         fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
2671         +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2672                 }
2673                 ret +=
2674 "               bucket &= "+int_to_string(bf_mask)+";\n"
2675 "               bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2676 ;
2677 //                      Try the first bucket
2678                 ret += "\t\tif(";
2679                 for(p=0;p<fs->hash_eq.size();++p){
2680                         if(p>0) ret += " && ";
2681 //                      ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2682 //                                      " == r_equijoin_"+int_to_string(p);
2683                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2684                         string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2685                         string rhs_op = "s_equijoin_"+int_to_string(p);
2686                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2687                 }
2688                 if(p>0) ret += " && ";
2689                 ret += "t->join_table[bucket].ts+"+int_to_string(fs->temporal_range)+" <=  curr_fj_ts";
2690                 ret += "){\n\t\t\tfound = 1;\n";
2691                 ret += "\t\t}else {if(";
2692                 for(p=0;p<fs->hash_eq.size();++p){
2693                         if(p>0) ret += " && ";
2694 //                      ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2695 //                                      " == r_equijoin_"+int_to_string(p);
2696                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2697                         string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
2698                         string rhs_op = "s_equijoin_"+int_to_string(p);
2699                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2700                 }
2701                 if(p>0) ret += " && ";
2702                 ret += "t->join_table[bucket1].ts+"+int_to_string(fs->temporal_range)+" <=  curr_fj_ts";
2703                 ret +=  ")\n\t\t\tfound=1;\n";
2704                 ret+="\t\t}\n";
2705                 ret +=
2706 "       if(!found)\n"
2707 "               goto end;\n"
2708 ;
2709         }
2710
2711
2712 //              Test the expensive filters on R.
2713   if(cheap_rpos < r_filt.size()){
2714
2715 //              Now generate the predicates.
2716         for(w=cheap_rpos;w<r_filt.size();++w){
2717                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
2718                 ret += tmpstr;
2719
2720 //                      Find partial fcns ref'd in this cnf element
2721                 set<int> pfcn_refs;
2722                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2723 //                      Since set<..> is a "Sorted Associative Container",
2724 //                      we can walk through it in sorted order by walking from
2725 //                      begin() to end().  (and the partial fcns must be
2726 //                      evaluated in this order).
2727                 set<int>::iterator si;
2728                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2729                         if(fcn_ref_cnt[(*si)] > 1){
2730                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2731                         }
2732                         if(is_partial_fcn[(*si)]){
2733                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2734                                 ret += "\t\tif(retval) goto end;\n";
2735                         }
2736                         if(fcn_ref_cnt[(*si)] > 1){
2737                                 if(!is_partial_fcn[(*si)]){
2738                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2739                                 }
2740                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2741                                 ret += "\t}\n";
2742                         }
2743                 }
2744
2745                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2746                                 ") ) goto end;\n";
2747         }
2748   }else{
2749           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2750   }
2751
2752
2753
2754 ///////////////                 post the tuple
2755
2756 //                      test passed : create the tuple, then assign to it.
2757           ret += "/*\t\tCreate and post the tuple\t*/\n";
2758
2759 //              Unpack r_filt fields
2760         for(s=0;s<sl_list.size();++s){
2761                 col_id_set this_se_cids;
2762                 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
2763                 for(csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
2764                         if(unpacked_cids.count( (*csi) ) == 0){
2765                                 int tblref = (*csi).tblvar_ref;
2766                                 int schref = (*csi).schema_ref;
2767                                 string field = (*csi).field;
2768                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2769                                 unpacked_cids.insert( (*csi) );
2770                         }
2771                 }
2772         }
2773
2774
2775 //                      Unpack partial fcns ref'd by the select clause.
2776 //                      Its a kind of a WHERE clause ...
2777   for(p=sl_fcns_start;p<sl_fcns_end;p++){
2778         if(fcn_ref_cnt[p] > 1){
2779                 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2780         }
2781         if(is_partial_fcn[p]){
2782                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2783                 ret += "\tif(retval) goto end;\n";
2784         }
2785         if(fcn_ref_cnt[p] > 1){
2786                 if(!is_partial_fcn[p]){
2787                         ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2788                 }
2789                 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2790                 ret += "\t}\n";
2791         }
2792   }
2793
2794   // increment the counter of accepted tuples
2795   ret += "\n\t#ifdef LFTA_STATS\n";
2796   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2797   ret += "\t#endif\n\n";
2798
2799 //                      First, compute the size of the tuple.
2800
2801 //                      Unpack any BUFFER type selections into temporaries
2802 //                      so that I can compute their size and not have
2803 //                      to recompute their value during tuple packing.
2804 //                      I can use regular assignment here because
2805 //                      these temporaries are non-persistent.
2806
2807           for(s=0;s<sl_list.size();s++){
2808                 data_type *sdt = sl_list[s]->get_data_type();
2809                 if(sdt->is_buffer_type()){
2810                         sprintf(tmpstr,"\tselvar_%d = ",s);
2811                         ret += tmpstr;
2812                         ret += generate_se_code(sl_list[s],schema);
2813                         ret += ";\n";
2814                 }
2815           }
2816
2817
2818 //              The size of the tuple is the size of the tuple struct plus the
2819 //              size of the buffers to be copied in.
2820
2821           ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
2822           for(s=0;s<sl_list.size();s++){
2823                 data_type *sdt = sl_list[s]->get_data_type();
2824                 if(sdt->is_buffer_type()){
2825                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
2826                         ret += tmpstr;
2827                 }
2828           }
2829           ret += ";\n";
2830
2831
2832           ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
2833           ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
2834
2835 //                      Test passed, make assignments to the tuple.
2836
2837           ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
2838
2839 //                      Mark tuple as REGULAR_TUPLE
2840           ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
2841
2842
2843           for(s=0;s<sl_list.size();s++){
2844                 data_type *sdt = sl_list[s]->get_data_type();
2845                 if(sdt->is_buffer_type()){
2846                         sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
2847                         ret += tmpstr;
2848                         sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
2849                         ret += tmpstr;
2850                 }else{
2851                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2852                         ret += tmpstr;
2853 //                      if(sdt->needs_hn_translation())
2854 //                              ret += sdt->hton_translation() +"( ";
2855                         ret += generate_se_code(sl_list[s],schema);
2856 //                      if(sdt->needs_hn_translation())
2857 //                              ret += ") ";
2858                         ret += ";\n";
2859                 }
2860           }
2861
2862 //              Generate output.
2863
2864           ret += "\tpost_tuple(tuple);\n";
2865
2866 //              Increment the counter of posted tuples
2867   ret += "\n\t#ifdef LFTA_STATS\n";
2868   ret += "\n\tt->out_tuple_cnt++;\n\n";
2869   ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
2870   ret += "\t#endif\n\n";
2871
2872
2873         return ret;
2874 }
2875
2876 string generate_aggr_accept_body(qp_node *fs,string node_name,table_list *schema, string &temporal_flush){
2877         string ret;
2878         int a,p,g;
2879
2880 //////////////          Processing for aggregtion query
2881
2882 //              First, search for a match.  Start by unpacking the group-by attributes.
2883
2884 //                      One complication : if a real-time aggregate flush occurs,
2885 //                      the GB attr has already been calculated.  So don't compute
2886 //                      it again if 1) its temporal and 2) it will be computed in the
2887 //                      agggregate flush code.
2888
2889 //              Unpack the partial fcns ref'd by the gb's and the aggr defs.
2890   for(p=gb_fcns_start;p<gb_fcns_end;p++){
2891     if(is_partial_fcn[p]){
2892                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2893                 ret += "\tif(retval) goto end;\n";
2894         }
2895   }
2896   for(p=ag_fcns_start;p<ag_fcns_end;p++){
2897     if(is_partial_fcn[p]){
2898                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2899                 ret += "\tif(retval) goto end;\n";
2900         }
2901   }
2902
2903   // increment the counter of accepted tuples
2904   ret += "\n\t#ifdef LFTA_STATS\n";
2905   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2906   ret += "\t#endif\n\n";
2907
2908   ret += "/*\t\tTest if the group is in the hash table \t*/\n";
2909 //                      Compute the values of the group-by variables.
2910   for(g=0;g<gb_tbl->size();g++){
2911           data_type *gdt = gb_tbl->get_data_type(g);
2912           if((! gdt->is_temporal()) || temporal_flush == ""){
2913
2914                   if(gdt->is_buffer_type()){
2915         //                              NOTE : if the SE defining the gb is anything
2916         //                              other than a ref to a variable, this will generate
2917         //                              illegal code.  To be resolved with Spatch.
2918                         sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2919                                 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2920                         ret += tmpstr;
2921                         sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2922                                 gdt->get_buffer_assign_copy().c_str(), g, g);
2923                   }else{
2924                         sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2925                   }
2926                   ret += tmpstr;
2927           }
2928   }
2929   ret += "\n";
2930
2931 //                      A quick aside : if any of the GB attrs are temporal,
2932 //                      test for change and flush if any change occurred.
2933 //                      We've already computed the flush code,
2934 //                      Put it here if this is not a real time query.
2935 //                      We've already unpacked all column refs, so no need to
2936 //                      do it again here.
2937
2938         string rt_level = fs->get_val_of_def("real_time");
2939         if(rt_level == "" && temporal_flush != ""){
2940                 ret += temporal_flush;
2941         }
2942
2943 //                      Compute the hash bucket
2944         if(gb_tbl->size() > 0){
2945                 ret += "\thashval = ";\
2946                 for(g=0;g<gb_tbl->size();g++){
2947                   if(g>0) ret += " ^ ";
2948                   data_type *gdt = gb_tbl->get_data_type(g);
2949                   if(gdt->is_buffer_type()){
2950                         sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
2951                                 gdt->get_type_str().c_str(), g);
2952                   }else{
2953                         sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
2954                                 gdt->get_type_str().c_str(), g);
2955                   }
2956                   ret += tmpstr;
2957                 }
2958                 ret += ";\n";
2959                 ret += "\thash2 = ((hashval * "+hash_nums[g%NRANDS]+") >> 32) & SLOT_HASH_BITS;\n";
2960         ret+="\tprobe = (hashval >> 32) & (t->max_aggrs-1);\n";
2961         }else{
2962                 ret+="\tprobe = 0;\n";
2963                 ret+="\thash2 = 0;\n\n";
2964         }
2965
2966 //              Does the lfta reference a udaf?
2967           bool has_udaf = false;
2968           for(a=0;a<aggr_tbl->size();a++){
2969                   if(! aggr_tbl->is_builtin(a)) has_udaf = true;
2970           }
2971
2972 //              Scan for a match, or alternatively the best slot.
2973 //              Currently, hardcode 5 tests.
2974         ret +=
2975 "       gen_val = t->generation & SLOT_GEN_BITS;\n"
2976 "       match_found = 0;\n"
2977 "       best_slot = probe;\n"
2978 "       for(i=0;i<5 && match_found == 0;i++){\n"
2979 "               if((t->aggr_table_hashmap[probe] & SLOT_FILLED) && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_HASH_BITS) == hash2 ){\n"
2980 ;
2981         if(gb_tbl->size()>0){
2982                 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
2983                 ret+="\t\tif(";
2984                 string rhs_op, lhs_op;
2985                 for(g=0;g<gb_tbl->size();g++){
2986                   if(g>0) ret += " && ";
2987                   ret += "(";
2988                   sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
2989                   sprintf(tmpstr,"t->aggr_table[probe].gb_var%d",g); rhs_op = tmpstr;
2990                   ret += generate_equality_test(lhs_op,rhs_op,gb_tbl->get_data_type(g));
2991                   ret += ")";
2992                 }
2993          }
2994          ret += "){\n"
2995 "                       match_found = 1;\n"
2996 "                       best_slot = probe;\n"
2997 "               }\n"
2998 "       }\n"
2999 "//             Rate slots in case no match found: prefer empty, then full but old slots\n"
3000 "       if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3001 "               if((t->aggr_table_hashmap[probe] & SLOT_FILLED)==0)\n"
3002 "                       best_slot = probe;\n"
3003 "               }else{\n"
3004 "                       if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) != gen_val){\n"
3005 "                               best_slot = probe;\n"
3006 "                       }\n"
3007 "               }\n"
3008 "               probe++;\n"
3009 "               if(probe >= t->max_aggrs)\n"
3010 "                       probe=0;\n"
3011 "       }\n"
3012 "       if(match_found){\n"
3013 ;
3014         ret += generate_gb_update(node_name, schema, "best_slot",has_udaf);
3015         ret +=
3016 "       }else{\n"
3017 "               if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3018 ;
3019 printf("sgah_qpn name is %s, disorder is %d\n",fs->node_name.c_str(),((sgah_qpn *)fs)->lfta_disorder);
3020         if(((sgah_qpn *)fs)->lfta_disorder <= 1){
3021                 ret +=
3022 "                       if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS)==gen_val){\n"
3023 "                               if((";
3024                 bool first_g = true;
3025                 for(int g=0;g<gb_tbl->size();g++){
3026                         data_type *gdt = gb_tbl->get_data_type(g);
3027                         if(gdt->is_temporal()){
3028                                 if(first_g) first_g = false; else ret+=" + ";
3029                                 ret += "(gb_attr_"+int_to_string(g)+" - t->aggr_table[best_slot].gb_var"+int_to_string(g)+")";
3030                         }
3031                 }
3032                 ret += ") == 0 ){\n";
3033
3034                 ret +=
3035 "                                       fta_aggr_flush_old_"+ node_name+"(f,t->max_aggrs);\n"
3036 "                               }\n"
3037 "                       }\n"
3038 ;
3039         }
3040
3041         ret += generate_tuple_from_aggr(node_name,schema,"best_slot");
3042         ret +=
3043 "\t\t\t#ifdef LFTA_STATS\n"
3044 "\t\t\tif((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val)\n"
3045 "\t\t\t\tt->collision_cnt++;\n\n"
3046 "\t\t\t#endif\n\n"
3047 "\t\t}\n"
3048 ;
3049         ret += generate_init_group(schema,"best_slot");
3050
3051
3052           ret += "\t}\n";
3053
3054         return ret;
3055 }
3056
3057
3058
3059 string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, set<unsigned int> &s_pids){
3060
3061         string ret="static gs_retval_t accept_packet_"+node_name+
3062                 "(struct FTA *f, FTAID * ftaid, void *pkt, gs_int32_t sz){\n";
3063     ret += "\tstruct packet *p = (struct packet *)pkt;\n";
3064
3065   int a;
3066
3067 //                      Define all of the variables needed by this
3068 //                      procedure.
3069
3070
3071 //                      Gather all column references, need to define unpacking variables.
3072   int w,s;
3073   col_id_set cid_set;
3074   col_id_set::iterator csi;
3075
3076 //              If its a filter join, rebind all colrefs
3077 //              to the first range var, to avoid double unpacking.
3078
3079   if(is_fj){
3080     for(w=0;w<where.size();++w)
3081                 reset_pr_col_ids_tblvars(where[w]->pr, gb_tbl);
3082     for(s=0;s<sl_list.size();s++)
3083                 reset_se_col_ids_tblvars(sl_list[s], gb_tbl);
3084   }
3085
3086   for(w=0;w<where.size();++w){
3087         if(is_fj || s_pids.count(w) == 0)
3088                 gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
3089         }
3090   for(s=0;s<sl_list.size();s++){
3091         gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
3092   }
3093
3094   int g;
3095   if(gb_tbl != NULL){
3096         for(g=0;g<gb_tbl->size();g++)
3097           gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
3098   }
3099
3100   //                    Variables for unpacking attributes.
3101   ret += "/*\t\tVariables for unpacking attributes\t*/\n";
3102   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
3103     int schref = (*csi).schema_ref;
3104         int tblref = (*csi).tblvar_ref;
3105     string field = (*csi).field;
3106     data_type dt(schema->get_type_name(schref,field));
3107     sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
3108         field.c_str(), tblref);
3109     ret += tmpstr;
3110   }
3111
3112   ret += "\n\n";
3113
3114 //                      Variables that are always needed
3115   ret += "/*\t\tVariables which are always needed\t*/\n";
3116   ret += "\tgs_retval_t retval;\n";
3117   ret += "\tgs_int32_t tuple_size, tuple_pos, lfta_bailout;\n";
3118   ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
3119
3120   ret+="\tstruct "+generate_fta_name(node_name)+" *t = (struct "+generate_fta_name(node_name)+"*) f;\n\n";
3121
3122
3123 //                      Variables needed for aggregation queries.
3124   if(is_aggr_query){
3125           ret += "\n/*\t\tVariables for aggregation\t*/\n";
3126           ret+="\tunsigned int i, probe;\n";
3127           ret+="\tunsigned int gen_val, match_found, best_slot;\n";
3128           ret+="\tgs_uint64_t hashval, hash2;\n";
3129 //                      Variables for storing group-by attribute values.
3130           if(gb_tbl->size() > 0)
3131                 ret += "/*\t\tGroup-by attributes\t*/\n";
3132           for(g=0;g<gb_tbl->size();g++){
3133                 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3134                 ret += tmpstr;
3135                 data_type *gdt = gb_tbl->get_data_type(g);
3136                 if(gdt->is_buffer_type()){
3137                   sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3138                   ret += tmpstr;
3139                 }
3140           }
3141           ret += "\n";
3142 //                      Temporaries for min/max
3143           string aggr_tmp_str = "";
3144           for(a=0;a<aggr_tbl->size();a++){
3145                 string aggr_op = aggr_tbl->get_op(a);
3146                 if(aggr_op == "MIN" || aggr_op == "MAX"){
3147                         sprintf(tmpstr,"\t%s aggr_tmp_%d;\n",aggr_tbl->get_data_type(a)->get_cvar_type().c_str(),a);
3148                         aggr_tmp_str.append(tmpstr);
3149                 }
3150           }
3151           if(aggr_tmp_str != ""){
3152                 ret += "/*\t\tTemp vars for BUFFER aggregates\t*/\n";
3153                 ret += aggr_tmp_str;
3154                 ret += "\n";
3155           }
3156 //              Variables for udaf output temporaries
3157         bool no_udaf = true;
3158         for(a=0;a<aggr_tbl->size();a++){
3159                 if(! aggr_tbl->is_builtin(a)){
3160                         if(no_udaf){
3161                                 ret+="/*\t\tUDAF output vars.\t*/\n";
3162                                 no_udaf = false;
3163                         }
3164                         int afcn_id = aggr_tbl->get_fcn_id(a);
3165                         data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
3166                         sprintf(tmpstr,"udaf_ret%d", a);
3167                         ret+="\t"+adt->make_cvar(tmpstr)+";\n";
3168                 }
3169         }
3170   }
3171
3172 //                      Variables needed for a filter join query
3173   if(fs->node_type() == "filter_join"){
3174         filter_join_qpn *fjq = (filter_join_qpn *)fs;
3175         bool uses_bloom = fjq->use_bloom;
3176         ret += "/*\t\tJoin fields\t*/\n";
3177         for(g=0;g<fjq->hash_eq.size();g++){
3178                 sprintf(tmpstr,"\t%s s_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_right_se()->get_data_type()->get_cvar_type().c_str(),g);
3179                 ret += tmpstr;
3180                 sprintf(tmpstr,"\t%s r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g);
3181                 ret += tmpstr;
3182           }
3183         if(uses_bloom){
3184                 ret +=
3185 "  /*           Variables for fj bloom filter   */ \n"
3186 "\tunsigned int i=0,j=0,k=0, b, bf_clean = 0, tmp_i, found; \n"
3187 "\tunsigned int bucket, bucket0, bucket1, bucket2;\n"
3188 "\tlong long int curr_fj_ts;\n"
3189 "\tlong long int curr_bin, the_bin;\n"
3190 "\n"
3191 ;
3192         }else{
3193                 ret +=
3194 "  /*           Variables for fj join table     */ \n"
3195 "\tunsigned int i, bucket, found; \n"
3196 "\tunsigned int bucket1, the_bucket;\n"
3197 "       long long int curr_fj_ts;\n"
3198 "\n"
3199 ;
3200         }
3201   }
3202
3203
3204 //              Variables needed to store selected attributes of BUFFER type
3205 //              temporarily, in order to compute their size for storage
3206 //              in an output tuple.
3207
3208   string select_var_defs = "";
3209   for(s=0;s<sl_list.size();s++){
3210         data_type *sdt = sl_list[s]->get_data_type();
3211         if(sdt->is_buffer_type()){
3212           sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
3213           select_var_defs.append(tmpstr);
3214         }
3215   }
3216   if(select_var_defs != ""){
3217         ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
3218     ret += select_var_defs;
3219   }
3220
3221 //              Variables to store results of partial functions.
3222   int p;
3223   if(partial_fcns.size()>0){
3224           ret += "/*\t\tVariables for storing results of partial functions. \t*/\n";
3225           for(p=0;p<partial_fcns.size();++p){
3226                 if(is_partial_fcn[p] || (!is_aggr_query && fcn_ref_cnt[p] >1)){
3227                   sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
3228                     partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
3229                   ret += tmpstr;
3230                   if(!is_aggr_query && fcn_ref_cnt[p] >1){
3231                         ret += "\tint fcn_ref_cnt_"+int_to_string(p)+" = 0;\n";
3232                   }
3233                 }
3234           }
3235
3236           if(is_aggr_query) ret += "\tint unpack_failed = 0;\n";
3237           ret += "\n";
3238   }
3239
3240 //              variable to hold packet struct  //
3241         if(packed_return){
3242                 ret += "\t struct "+node_name+"_input_struct *"+node_name+"_input_struct_var;\n";
3243         }
3244
3245
3246   ret += "\t#ifdef LFTA_STATS\n";
3247 // variable to store counter of cpu cycles spend in accept_tuple
3248         ret += "\tgs_uint64_t start_cycle = rdtsc();\n";
3249 // increment counter of received tuples
3250         ret += "\tt->in_tuple_cnt++;\n";
3251   ret += "\t#endif\n";
3252
3253
3254 //      -------------------------------------------------
3255 //              If the packet is "packet", test if its for this lfta,
3256 //              and if so load it into its struct
3257
3258         if(packed_return){
3259                 ret+="\n/*  packed tuple : test and load. \t*/\n";
3260                 ret+="\t"+node_name+"_input_struct_var = (struct "+node_name+"_input_struct *) pkt;\n";
3261                 ret+="\tif("+node_name+"_input_struct_var->__lfta_id_fm_nic__ != "+int_to_string(global_id) + ")\n";
3262                 ret+="\t\tgoto end;\n\n";
3263         }
3264
3265
3266
3267   col_id_set unpacked_cids;     //      Keep track of the cols that have been unpacked.
3268
3269   string temporal_flush;
3270   if(is_aggr_query)
3271         ret += generate_aggr_accept_prelim(fs, node_name, schema, unpacked_cids, temporal_flush);
3272   else {        // non-aggregation operators
3273
3274 // Unpack all the temporal attributes referenced in select clause
3275 // and update the last value of the attribute
3276         col_id_set temp_cids;           //      col ids of temp attributes in select clause
3277
3278         for(s=0;s<sl_list.size();s++){
3279                 data_type *sdt = sl_list[s]->get_data_type();
3280                 if (sdt->is_temporal()) {
3281                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
3282                 }
3283         }
3284 //                      If this is a filter join,
3285 //                      ensure that the temporal range field is unpacked.
3286         if(is_fj){
3287                 col_id window_var_cid(((filter_join_qpn *)fs)->temporal_var);
3288                 if(temp_cids.count(window_var_cid)==0)
3289                         temp_cids.insert(window_var_cid);
3290         }
3291
3292         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
3293                 if(unpacked_cids.count((*csi)) == 0){
3294                         int tblref = (*csi).tblvar_ref;
3295                         int schref = (*csi).schema_ref;
3296                         string field = (*csi).field;
3297                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3298                         sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
3299                         ret += tmpstr;
3300
3301                         unpacked_cids.insert( (*csi) );
3302                 }
3303         }
3304
3305   }
3306
3307   vector<cnf_elem *> filter = fs->get_filter_clause();
3308 //              Test the filter predicate (some query types have additional preds).
3309   if(filter.size() > 0){
3310
3311 //              Sort by evaluation cost.
3312 //              First, estimate evaluation costs
3313 //              Eliminate predicates covered by the prefilter (those in s_pids).
3314 //              I need to do it before the sort becuase the indices refer
3315 //              to the position in the unsorted list./
3316         vector<cnf_elem *> tmp_wh;
3317         for(w=0;w<filter.size();++w){
3318                 if(s_pids.count(w) == 0){
3319                         compute_cnf_cost(filter[w],Ext_fcns);
3320                         tmp_wh.push_back(filter[w]);
3321                 }
3322         }
3323         filter = tmp_wh;
3324
3325         sort(filter.begin(), filter.end(), compare_cnf_cost());
3326
3327 //              Now generate the predicates.
3328         for(w=0;w<filter.size();++w){
3329                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,filter[w]->cost);
3330                 ret += tmpstr;
3331 //                      Find the set of variables accessed in this CNF elem,
3332 //                      but in no previous element.
3333                 col_id_set this_pred_cids;
3334                 gather_pr_col_ids(filter[w]->pr, this_pred_cids, gb_tbl);
3335                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3336                         if(unpacked_cids.count( (*csi) ) == 0){
3337                         int tblref = (*csi).tblvar_ref;
3338                         int schref = (*csi).schema_ref;
3339                         string field = (*csi).field;
3340                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3341                                 unpacked_cids.insert( (*csi) );
3342                         }
3343                 }
3344 //                      Find partial fcns ref'd in this cnf element
3345                 set<int> pfcn_refs;
3346                 collect_partial_fcns_pr(filter[w]->pr, pfcn_refs);
3347 //                      Since set<..> is a "Sorted Associative Container",
3348 //                      we can walk through it in sorted order by walking from
3349 //                      begin() to end().  (and the partial fcns must be
3350 //                      evaluated in this order).
3351                 set<int>::iterator si;
3352                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3353                         if(fcn_ref_cnt[(*si)] > 1){
3354                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3355                         }
3356                         if(is_partial_fcn[(*si)]){
3357                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3358                                 ret += "\t\tif(retval) goto end;\n";
3359                         }
3360                         if(fcn_ref_cnt[(*si)] > 1){
3361                                 if(!is_partial_fcn[(*si)]){
3362                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3363                                 }
3364                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3365                                 ret += "\t}\n";
3366                         }
3367                 }
3368
3369                 ret += "\tif( !("+generate_predicate_code(filter[w]->pr,schema)+
3370                                 ") ) goto end;\n";
3371         }
3372   }else{
3373           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
3374   }
3375
3376
3377 //                      We've passed the WHERE clause,
3378 //                      unpack the remainder of the accessed fields.
3379   if(is_fj){
3380         ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
3381         vector<cnf_elem *> h_eq = ((filter_join_qpn *)fs)-> hash_eq;
3382                 for(w=0;w<h_eq.size();++w){
3383                 col_id_set this_pred_cids;
3384                 gather_pr_col_ids(h_eq[w]->pr, this_pred_cids, gb_tbl);
3385                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3386                         if(unpacked_cids.count( (*csi) ) == 0){
3387                                 int tblref = (*csi).tblvar_ref;
3388                                 int schref = (*csi).schema_ref;
3389                                 string field = (*csi).field;
3390                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3391                                 unpacked_cids.insert( (*csi) );
3392                         }
3393                 }
3394         }
3395   }else{
3396           ret += "\n/*\tPassed the WHERE clause, unpack the rest of the accessed fields. */\n";
3397
3398           for(csi=cid_set.begin();csi!=cid_set.end();++csi){
3399                 if(unpacked_cids.count( (*csi) ) == 0){
3400                         int schref = (*csi).schema_ref;
3401                         int tblref = (*csi).tblvar_ref;
3402                         string field = (*csi).field;
3403                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3404                         unpacked_cids.insert( (*csi) );
3405                 }
3406           }
3407   }
3408
3409
3410 //////////////////
3411 //////////////////      After this, the query types
3412 //////////////////      are processed differently.
3413
3414   if(!is_aggr_query && !is_fj)
3415         ret += generate_sel_accept_body(fs, node_name, schema);
3416   else if(is_aggr_query)
3417         ret += generate_aggr_accept_body(fs, node_name, schema, temporal_flush);
3418   else
3419         ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
3420
3421
3422 //              Finish up.
3423
3424    ret += "\n\tend:\n";
3425   ret += "\t#ifdef LFTA_STATS\n";
3426         ret+= "\tt->cycle_cnt += rdtsc() - start_cycle;\n";
3427   ret += "\t#endif\n";
3428    ret += "\n\treturn 1;\n}\n\n";
3429
3430         return(ret);
3431 }
3432
3433
3434 string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, bool is_aggr_query, bool is_fj, bool uses_bloom){
3435         int g, cl;
3436
3437         string ret = "struct FTA * "+generate_alloc_name(node_name) +
3438            "(struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command,  gs_int32_t sz, void *value){\n";
3439
3440         ret+="\tstruct "+generate_fta_name(node_name)+"* f;\n";
3441         ret+="\tint i;\n";
3442         ret += "\n";
3443         ret+="\tif((f=fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n";
3444
3445 //                              assign a streamid to fta instance
3446         ret+="\t/* assign a streamid */\n";
3447         ret+="\tf->f.ftaid = ftaid;\n";
3448         ret+="\tf->f.ftaid.streamid = (gs_p_t)f;\n";
3449         ret+="\tgslog(LOG_INFO,\"Lfta "+node_name+" has FTAID {ip=%u,port=%u,index=%u,streamid=%u}\\n\",f->f.ftaid.ip,f->f.ftaid.port,f->f.ftaid.index,f->f.ftaid.streamid);\n";
3450
3451         if(is_aggr_query){
3452                 ret += "\tf->n_aggrs = 0;\n";
3453
3454                 ret += "\tf->max_aggrs = ";
3455
3456 //                              Computing the number of aggregate blocks is a little
3457 //                              tricky.  If there are no GB attrs, or if all GB attrs
3458 //                              are temporal, then use a single aggregate block, else
3459 //                              use a default value (10).  A user specification overrides
3460 //                              this logic.
3461                 bool single_group = true;
3462                 for(g=0;g<gb_tbl->size();g++){
3463                         data_type *gdt = gb_tbl->get_data_type(g);
3464                         if(! gdt->is_temporal() ){
3465                                 single_group = false;
3466                         }
3467                 }
3468                 string max_aggr_str = fs->get_val_of_def("aggregate_slots");
3469                 int max_aggr_i = atoi(max_aggr_str.c_str());
3470                 if(max_aggr_i <= 0){
3471                         if(single_group)
3472                                 ret += "2";
3473                         else
3474                                 ret += int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
3475                 }else{
3476                         unsigned int naggrs = 1;                // make it power of 2
3477                         unsigned int nones = 0;
3478                         while(max_aggr_i){
3479                                 if(max_aggr_i&1)
3480                                         nones++;
3481                                 naggrs = naggrs << 1;
3482                                 max_aggr_i = max_aggr_i >> 1;
3483                         }
3484                         if(nones==1)            // in case it was already a power of 2.
3485                                 naggrs/=2;
3486                         ret += int_to_string(naggrs);
3487                 }
3488                 ret += ";\n";
3489
3490                 ret+="\tif ((f->aggr_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n";
3491                 ret+="\t\treturn(0);\n";
3492                 ret+="\t}\n\n";
3493 //              ret+="/* compute how many integers we need to store the hashmap */\n";
3494 //              ret+="\tf->bitmap_size = (f->max_aggrs % (sizeof(gs_uint32_t) * 4)) ? (f->max_aggrs / (sizeof(gs_uint32_t) * 4) + 1) : (f->max_aggrs / (sizeof(gs_uint32_t) * 4));\n\n";
3495                 ret+="\tif ((f->aggr_table_hashmap = sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n";
3496                 ret+="\t\treturn(0);\n";
3497                 ret+="\t}\n";
3498                 ret+="/*\t\tfill bitmap with zero \t*/\n";
3499                 ret+="\tfor (i = 0; i < f->max_aggrs; ++i)\n";
3500                 ret+="\t\tf->aggr_table_hashmap[i] = 0;\n";
3501                 ret+="\tf->generation=0;\n";
3502                 ret+="\tf->flush_pos = f->max_aggrs;\n";
3503
3504                 ret += "\tf->flush_ctr = 0;\n";
3505
3506         }
3507
3508         if(is_fj){
3509                 if(uses_bloom){
3510                         ret+="\tf->first_exec = 1;\n";
3511                         unsigned int n_bloom = 11;
3512                         string n_bloom_str = fs->get_val_of_def("num_bloom");
3513                         int tmp_n_bloom = atoi(n_bloom_str.c_str());
3514                         if(tmp_n_bloom>0)
3515                                 n_bloom = tmp_n_bloom+1;
3516
3517                         unsigned int window_len = ((filter_join_qpn *)fs)->temporal_range;
3518                         if(window_len < n_bloom){
3519                                 n_bloom = window_len+1;
3520                         }
3521
3522                         int bf_exp_size = 12;  // base-2 log of number of bits
3523                         string bloom_len_str = fs->get_val_of_def("bloom_size");
3524                         int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
3525                         if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
3526                                 bf_exp_size = tmp_bf_exp_size;
3527                         }
3528                         int bf_bit_size = 1 << 12;
3529                         int bf_byte_size = bf_bit_size / (8*sizeof(char));
3530
3531                         int bf_tot = n_bloom*bf_byte_size;
3532                         ret+="\tif ((f->bf_table = sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n";
3533                         ret+="\t\treturn(0);\n";
3534                         ret+="\t}\n";
3535                         ret +=
3536 "       for(i=0;i<"+int_to_string(bf_tot)+";i++)\n"
3537 "               f->bf_table[i] = 0;\n"
3538 ;
3539                 }else{
3540                         unsigned int ht_size = 4096;
3541                         string ht_size_s = fs->get_val_of_def("aggregate_slots");
3542                         int tmp_ht_size = atoi(ht_size_s.c_str());
3543                         if(tmp_ht_size > 1024){
3544                                 unsigned int hs = 1;            // make it power of 2
3545                                 while(tmp_ht_size){
3546                                         hs =hs << 1;
3547                                         tmp_ht_size = tmp_ht_size >> 1;
3548                                 }
3549                                 ht_size = hs;
3550                         }
3551                         ret+="\tif ((f->join_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n";
3552                         ret+="\t\treturn(0);\n";
3553                         ret+="\t}\n\n";
3554                         ret +=
3555 "       for(i=0;i<"+int_to_string(ht_size)+";i++)\n"
3556 "               f->join_table[i].ts = 0;\n"
3557 ;
3558                 }
3559         }
3560
3561 //                      Initialize the complex literals (which might be handles).
3562
3563         for(cl=0;cl<complex_literals->size();cl++){
3564                 literal_t *l = complex_literals->get_literal(cl);
3565 //              sprintf(tmpstr,"\tf->complex_literal_%d = ",cl);
3566 //              ret += tmpstr + l->to_C_code() + ";\n";
3567                 sprintf(tmpstr,"&(f->complex_literal_%d)",cl);
3568                 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
3569         }
3570
3571         ret += "\n";
3572
3573 //                      Initialize the last seen values of temporal attributes to min(max) value of
3574 //                      their respective type
3575 //                      Create places to hold the last values of temporal attributes referenced in select clause
3576
3577
3578         col_id_set temp_cids;           //      col ids of temp attributes in select clause
3579
3580         int s;
3581         col_id_set::iterator csi;
3582
3583         for(s=0;s<sl_list.size();s++){
3584                 data_type *sdt = sl_list[s]->get_data_type();
3585                 if (sdt->is_temporal()) {
3586                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
3587                 }
3588         }
3589
3590         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
3591                 int tblref = (*csi).tblvar_ref;
3592                 int schref = (*csi).schema_ref;
3593                 string field = (*csi).field;
3594                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
3595                 if (dt.is_increasing()) {
3596                         sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_min_literal().c_str());
3597                         ret += tmpstr;
3598                 } else if (dt.is_decreasing()) {
3599                         sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_max_literal().c_str());
3600                         ret += tmpstr;
3601                 }
3602         }
3603
3604 //      initialize last seen values of temporal groubpy variables
3605         if(is_aggr_query){
3606                 for(g=0;g<gb_tbl->size();g++){
3607                         data_type *dt = gb_tbl->get_data_type(g);
3608                         if(dt->is_temporal()){
3609 /*
3610                                 fprintf(stderr,"group by attribute %s is temporal, ",
3611                                                 gb_tbl->get_name(g).c_str());
3612 */
3613                                 if(dt->is_increasing()){
3614                                         sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_min_literal().c_str());
3615                                 }else{
3616                                         sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_max_literal().c_str());
3617                                 }
3618                                 ret += tmpstr;
3619                         }
3620                 }
3621         }
3622
3623         ret += "\tf->f.alloc_fta="+generate_alloc_name(node_name)+";\n";
3624         ret+="\tf->f.free_fta=free_fta_"+node_name+";\n";
3625         ret+="\tf->f.control_fta=control_fta_"+node_name+";\n";
3626         ret+="\tf->f.accept_packet=accept_packet_"+node_name+";\n";
3627         ret+="\tf->f.clock_fta=clock_fta_"+node_name+";\n\n";
3628
3629 //                      Initialize runtime stats
3630         ret+="\tf->in_tuple_cnt = 0;\n";
3631         ret+="\tf->out_tuple_cnt = 0;\n";
3632         ret+="\tf->out_tuple_sz = 0;\n";
3633         ret+="\tf->accepted_tuple_cnt = 0;\n";
3634         ret+="\tf->cycle_cnt = 0;\n";
3635         ret+="\tf->collision_cnt = 0;\n";
3636         ret+="\tf->eviction_cnt = 0;\n";
3637         ret+="\tf->sampling_rate = 1.0;\n";
3638
3639         ret+="\tf->trace_id = 0;\n\n";
3640     if(param_tbl->size() > 0){
3641         ret+=
3642 "\tif(load_params_"+node_name+"(f, sz, value, 1)){\n"
3643 "#ifndef LFTA_IN_NIC\n"
3644 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small (%d). This query does not have valid parameters, bailing out.\\n\",sz);\n"
3645 "#else\n"
3646 "\t\t}\n"
3647 "#endif\n"
3648 "\t\t\treturn 0;\n"
3649 "\t\t}\n";
3650         }
3651
3652 //                      Register the pass-by-handle parameters
3653     int ph;
3654     for(ph=0;ph<param_handle_table.size();++ph){
3655                 data_type pdt(param_handle_table[ph]->type_name);
3656                 sprintf(tmpstr,"\tf->handle_param_%d = %s((struct FTA *)f,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
3657                 switch(param_handle_table[ph]->val_type){
3658                 case cplx_lit_e:
3659                         ret += tmpstr;
3660                         if(pdt.is_buffer_type()) ret += "&(";
3661                         sprintf(tmpstr,"f->complex_literal_%d",param_handle_table[ph]->complex_literal_idx);
3662                         ret += tmpstr ;
3663                         if(pdt.is_buffer_type()) ret += ")";
3664                         ret +=  ");\n";
3665                         break;
3666                 case litval_e:
3667 //                              not complex, no constructor
3668                         ret += tmpstr;
3669                         ret += param_handle_table[ph]->litval->to_C_code("") + ");\n";
3670                         break;
3671                 case param_e:
3672 //                              query parameter handles are regstered/deregistered in the
3673 //                              load_params function.
3674 //                      ret += "t->param_"+param_handle_table[ph]->param_name;
3675                         break;
3676                 default:
3677                         fprintf(stderr, "INTERNAL ERROR unknown case found when processing pass-by-handle parameter table.\n");
3678                         exit(1);
3679                 }
3680         }
3681
3682         ret += "\treturn (struct FTA *) f;\n";
3683         ret += "}\n\n";
3684
3685         return(ret);
3686 }
3687
3688
3689
3690
3691 //////////////////////////////////////////////////////////////////
3692
3693 string generate_lfta_block(qp_node *fs, table_list *schema, int gid,
3694 //              map<string,string> &int_fcn_defs,
3695                 ext_fcn_list *Ext_fcns, string &schema_embed_str, ifq_t *ifdb, nic_property *nicp, set<unsigned int> &s_pids){
3696         bool is_aggr_query;
3697         int s,p,g;
3698         string retval;
3699
3700 /////////////////////////////////////////////////////////////
3701 ///             Do operator-generic processing, such as
3702 ///             gathering the set of referenced columns,
3703 ///             generating structures, etc.
3704
3705 //              Initialize globals to empty.
3706         gb_tbl = NULL; aggr_tbl = NULL;
3707         global_id = -1; nicprop = NULL;
3708         param_tbl = fs->get_param_tbl();
3709         sl_list.clear(); where.clear();
3710         partial_fcns.clear();
3711         fcn_ref_cnt.clear(); is_partial_fcn.clear();
3712         pred_class.clear(); pred_pos.clear();
3713         sl_fcns_start=sl_fcns_end=wh_fcns_start=wh_fcns_end=0;
3714         gb_fcns_start=gb_fcns_end=ag_fcns_start=ag_fcns_end=0;
3715
3716
3717 //              Does the lfta read packed results from the NIC?
3718         nicprop = nicp;                 // load into global
3719         global_id = gid;
3720     packed_return = false;
3721         if(nicp && nicp->option_exists("Return")){
3722                 if(nicp->option_value("Return") == "Packed"){
3723                         packed_return = true;
3724                 }else{
3725                         fprintf(stderr,"Warning, nic option value of Return=%s is not recognized, ignoring\n",nicp->option_value("Return").c_str());
3726                 }
3727         }
3728
3729
3730 //                      Extract data which defines the query.
3731 //                              complex literals gathered now.
3732         complex_literals = fs->get_cplx_lit_tbl(Ext_fcns);
3733         param_handle_table = fs->get_handle_param_tbl(Ext_fcns);
3734         string node_name = fs->get_node_name();
3735     bool is_fj = false, uses_bloom = false;
3736
3737
3738         if(fs->node_type() == "spx_qpn"){
3739                 is_aggr_query = false;
3740                 spx_qpn *spx_node = (spx_qpn *)fs;
3741                 sl_list = spx_node->get_select_se_list();
3742                 where = spx_node->get_where_clause();
3743                 gb_tbl = NULL;
3744                 aggr_tbl = NULL;
3745         } else
3746         if(fs->node_type() == "sgah_qpn"){
3747                 is_aggr_query = true;
3748                 sgah_qpn *sgah_node = (sgah_qpn *)fs;
3749                 sl_list = sgah_node->get_select_se_list();
3750                 where = sgah_node->get_where_clause();
3751                 gb_tbl = sgah_node->get_gb_tbl();
3752                 aggr_tbl = sgah_node->get_aggr_tbl();
3753
3754                 if((sgah_node->get_having_clause()).size() > 0){
3755                         fprintf(stderr,"Warning in LFTA %s: HAVING clause will be ignored.\n", fs->get_node_name().c_str());
3756                 }
3757         } else
3758         if(fs->node_type() == "filter_join"){
3759                 is_aggr_query = false;
3760         is_fj = true;
3761                 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
3762                 sl_list = fj_node->get_select_se_list();
3763                 where = fj_node->get_where_clause();
3764                 uses_bloom = fj_node->use_bloom;
3765                 gb_tbl = NULL;
3766                 aggr_tbl = NULL;
3767         } else {
3768                 fprintf(stderr,"INTERNAL ERROR, unrecognized node type %s in generate_lfta_block\n", fs->node_type().c_str());
3769                 exit(1);
3770         }
3771
3772 //                      Build list of "partial functions", by clause.
3773 //                      NOTE : partial fcns are not handles well.
3774 //                      The act of searching for them associates the fcn call
3775 //                      in the SE with an index to an array.  Refs to the
3776 //                      fcn value are replaced with refs to the variable they are
3777 //                      unpacked into.  A more general tagging mechanism would be better.
3778
3779         int i;
3780         vector<bool> *pfunc_ptr = NULL;
3781         vector<int> *ref_cnt_ptr = NULL;
3782         if(!is_aggr_query){             // don't collect cacheable fcns on aggr query.
3783                 ref_cnt_ptr = &fcn_ref_cnt;
3784                 pfunc_ptr = &is_partial_fcn;
3785         }
3786
3787         sl_fcns_start = 0;
3788         for(i=0;i<sl_list.size();i++){
3789                 find_partial_fcns(sl_list[i], &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
3790         }
3791         wh_fcns_start = sl_fcns_end = partial_fcns.size();
3792         for(i=0;i<where.size();i++){
3793                 find_partial_fcns_pr(where[i]->pr, &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
3794         }
3795         gb_fcns_start = wh_fcns_end = partial_fcns.size();
3796         if(gb_tbl != NULL){
3797                 for(i=0;i<gb_tbl->size();i++){
3798                         find_partial_fcns(gb_tbl->get_def(i), &partial_fcns, NULL, NULL, Ext_fcns);
3799                 }
3800         }
3801         ag_fcns_start = gb_fcns_end = partial_fcns.size();
3802         if(aggr_tbl != NULL){
3803                 for(i=0;i<aggr_tbl->size();i++){
3804                         find_partial_fcns(aggr_tbl->get_aggr_se(i), NULL, NULL, &is_partial_fcn, Ext_fcns);
3805                 }
3806         }
3807         ag_fcns_end = partial_fcns.size();
3808
3809 //              Fill up the is_partial_fcn and fcn_ref_cnt arrays.
3810         if(is_aggr_query){
3811                 for(i=0; i<partial_fcns.size();i++){
3812                         fcn_ref_cnt.push_back(1);
3813                         is_partial_fcn.push_back(true);
3814                 }
3815         }
3816
3817 //              Unmark non-partial expensive functions referenced only once.
3818         for(i=0; i<partial_fcns.size();i++){
3819                 if(!is_partial_fcn[i] && fcn_ref_cnt[i] <= 1){
3820                         partial_fcns[i]->set_partial_ref(-1);
3821                 }
3822         }
3823
3824         node_name = normalize_name(node_name);
3825
3826         retval += generate_preamble(schema, /*int_fcn_defs,*/ node_name, schema_embed_str);
3827
3828         if(packed_return){              // generate unpack struct
3829                 vector<tablevar_t *> input_tbls = fs->get_input_tbls();
3830                 int schref = input_tbls[0]->get_schema_ref();
3831                 vector<string> refd_cols;
3832                 for(s=0;s<sl_list.size();++s){
3833                         gather_nicsafe_cols(sl_list[s],refd_cols, nicp, gb_tbl);
3834                 }
3835                 for(p=0;p<where.size();++p){
3836 //                              I'm not disabling these preds ...
3837                         gather_nicsafe_cols(where[p]->pr,refd_cols, nicp, gb_tbl);
3838                 }
3839                 if(gb_tbl){
3840                         for(g=0;g<gb_tbl->size();++g){
3841                           gather_nicsafe_cols(gb_tbl->get_def(g),refd_cols, nicp, gb_tbl);
3842                         }
3843                 }
3844                 sort(refd_cols.begin(), refd_cols.end());
3845                 retval += "struct "+node_name+"_input_struct{\n";
3846                 retval += "\tint __lfta_id_fm_nic__;\n";
3847                 int vsi;
3848                 for(vsi=0;vsi<refd_cols.size();++vsi){
3849                 data_type dt(schema->get_type_name(schref,refd_cols[vsi]));
3850                         retval+="\t"+dt.get_cvar_type()+" unpack_var_"+refd_cols[vsi]+";\n";
3851                 }
3852                 retval+="};\n\n";
3853         }
3854
3855
3856 /////////////////////////////////////////////////////
3857 //                      Common stuff unpacked, do some generation
3858
3859         if(is_aggr_query)
3860           retval += generate_aggr_struct(node_name, gb_tbl, aggr_tbl);
3861         if(is_fj)
3862                 retval += generate_fj_struct((filter_join_qpn *)fs, node_name);
3863
3864         retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, uses_bloom, schema);
3865         retval += generate_tuple_struct(node_name, sl_list) ;
3866
3867         if(is_aggr_query)
3868                 retval += generate_fta_flush(node_name, schema, Ext_fcns) ;
3869         if(param_tbl->size() > 0)
3870                 retval += generate_fta_load_params(node_name) ;
3871         retval += generate_fta_free(node_name, is_aggr_query) ;
3872         retval +=  generate_fta_control(node_name, schema, is_aggr_query) ;
3873         retval +=  generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, s_pids) ;
3874
3875
3876         /* extract the value of Time_Correlation from interface definition */
3877         int e,v;
3878         string es;
3879         unsigned time_corr;
3880         vector<tablevar_t *> tvec =  fs->get_input_tbls();
3881         vector<string> time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es);
3882         if (time_corr_vec.empty())
3883                 time_corr = DEFAULT_TIME_CORR;
3884         else
3885                 time_corr = atoi(time_corr_vec[0].c_str());
3886
3887         retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query) );
3888         retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) );
3889
3890   return(retval);
3891 }
3892
3893
3894
3895 int compute_snap_len(qp_node *fs, table_list *schema){
3896
3897 //              Initialize global vars
3898         gb_tbl = NULL;
3899         sl_list.clear(); where.clear();
3900
3901         if(fs->node_type() == "spx_qpn"){
3902                 spx_qpn *spx_node = (spx_qpn *)fs;
3903                 sl_list = spx_node->get_select_se_list();
3904                 where = spx_node->get_where_clause();
3905         }
3906         else if(fs->node_type() == "sgah_qpn"){
3907                 sgah_qpn *sgah_node = (sgah_qpn *)fs;
3908                 sl_list = sgah_node->get_select_se_list();
3909                 where = sgah_node->get_where_clause();
3910                 gb_tbl = sgah_node->get_gb_tbl();
3911         }
3912         else if(fs->node_type() == "filter_join"){
3913                 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
3914                 sl_list = fj_node->get_select_se_list();
3915                 where = fj_node->get_where_clause();
3916         } else{
3917                 fprintf(stderr,"INTERNAL ERROR, node type %s not recognized in compute_snap_len\n",fs->node_type().c_str());
3918                 exit(1);
3919         }
3920
3921 //                      Gather all column references, need to define unpacking variables.
3922   int w,s;
3923   col_id_set cid_set;
3924   col_id_set::iterator csi;
3925
3926   for(w=0;w<where.size();++w)
3927         gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
3928   for(s=0;s<sl_list.size();s++){
3929         gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
3930   }
3931
3932   int g;
3933   if(gb_tbl != NULL){
3934         for(g=0;g<gb_tbl->size();g++)
3935           gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
3936   }
3937
3938   //                    compute snap length
3939   int snap_len = -1;
3940   int n_snap=0;
3941   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
3942     int schref = (*csi).schema_ref;
3943         int tblref = (*csi).tblvar_ref;
3944     string field = (*csi).field;
3945
3946         param_list *field_params = schema->get_modifier_list(schref, field);
3947         if(field_params->contains_key("snap_len")){
3948                 string fld_snap_str = field_params->val_of("snap_len");
3949                 int fld_snap;
3950                 if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
3951                         if(fld_snap > snap_len) snap_len = fld_snap;
3952                         n_snap++;
3953                 }else{
3954                         fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
3955                 }
3956         }
3957   }
3958
3959   if(n_snap == cid_set.size()){
3960         return (snap_len);
3961   }else{
3962         return -1;
3963   }
3964
3965
3966 }
3967
3968 //              Function which computes an optimal
3969 //              set of unpacking functions.
3970
3971 void find_optimal_unpack_fcns(col_id_set &upref_cids, table_list *Schema, map<col_id, string,lt_col_id> &ucol_fcn_map){
3972         map<string, int> pfcn_count;
3973         map<string, int>::iterator msii;
3974         col_id_set::iterator cisi;
3975         set<string>::iterator ssi;
3976         string best_fcn;
3977
3978         while(ucol_fcn_map.size() < upref_cids.size()){
3979
3980 //                      Gather unpack functions referenced by unaccounted-for
3981 //                      columns, and increment their reference count.
3982                 pfcn_count.clear();
3983                 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
3984                         if(ucol_fcn_map.count((*cisi)) == 0){
3985                                 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
3986                                 for(ssi=ufcns.begin();ssi!=ufcns.end();++ssi)
3987                                         pfcn_count[(*ssi)]++;
3988                         }
3989                 }
3990
3991 //              Get the lowest cost per field function.
3992                 float min_cost = 0.0;
3993                 string best_fcn = "";
3994                 for(msii=pfcn_count.begin();msii!=pfcn_count.end();++msii){
3995                         int fcost = Schema->get_ufcn_cost((*msii).first);
3996                         if(fcost < 0){
3997                                 fprintf(stderr,"CONFIGURATION ERROR, unpack function %s either has negative cost or is not defined.\n",(*msii).first.c_str());
3998                                 exit(1);
3999                         }
4000                         float this_cost = (1.0*fcost)/(*msii).second;
4001                         if(msii == pfcn_count.begin() || this_cost < min_cost){
4002                                 min_cost = this_cost;
4003                                 best_fcn = (*msii).first;
4004                         }
4005                 }
4006                 if(best_fcn == ""){
4007                         fprintf(stderr,"ERROR, could not find a best field unpqacking function.\n");
4008                         exit(1);
4009                 }
4010
4011 //              Assign this function to the unassigned fcns which use it.
4012                 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
4013                         if(ucol_fcn_map.count((*cisi)) == 0){
4014                                 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
4015                                 if(ufcns.count(best_fcn)>0)
4016                                         ucol_fcn_map[(*cisi)] = best_fcn;
4017                         }
4018                 }
4019         }
4020 }
4021
4022
4023
4024 //              Generate an initial test test for the lfta
4025 //              Assume that the predicate references no external functions,
4026 //              and especially no partial functions,
4027 //              aggregates, internal functions.
4028 string generate_lfta_prefilter(vector<cnf_set *> &pred_list,
4029                 col_id_set &temp_cids, table_list *Schema, ext_fcn_list *Ext_fcns,
4030                 vector<col_id_set> &lfta_cols, vector<long long int> &lfta_sigs,
4031                 vector<int> &lfta_snap_lens, string iface){
4032   col_id_set tmp_cid_set, cid_set,upref_cids,upall_cids;
4033   col_id_set::iterator csi;
4034         int o,p,q;
4035         string ret;
4036
4037 //              Gather complex literals in the prefilter.
4038         cplx_lit_table *complex_literals = new cplx_lit_table();
4039         for(p=0;p<pred_list.size();++p){
4040                 find_complex_literal_pr(pred_list[p]->pr,Ext_fcns, complex_literals);
4041         }
4042
4043
4044 //              Find the combinable predicates
4045         vector<predicate_t *> pr_list;
4046         for(p=0;p<pred_list.size();++p){
4047         find_combinable_preds(pred_list[p]->pr,&pr_list, Schema, Ext_fcns);
4048         }
4049
4050 //              Analyze the combinable predicates to find the predicate classes.
4051         pred_class.clear();             // idx to equiv pred in equiv_list
4052         pred_pos.clear();               // idx to returned bitmask.
4053         vector<predicate_t *> equiv_list;
4054         vector<int> num_equiv;
4055
4056
4057         for(p=0;p<pr_list.size();++p){
4058                 for(q=0;q<equiv_list.size();++q){
4059                         if(is_equivalent_class_pred_base(equiv_list[q],pr_list[p],Schema,Ext_fcns))
4060                                 break;
4061                 }
4062                 if(q == equiv_list.size()){             // no equiv : create new
4063                         pred_class.push_back(equiv_list.size());
4064                         equiv_list.push_back(pr_list[p]);
4065                         pred_pos.push_back(0);
4066                         num_equiv.push_back(1);
4067
4068                 }else{                  // pr_list[p] is equivalent to pred q
4069                         pred_class.push_back(q);
4070                         pred_pos.push_back(num_equiv[q]);
4071                         num_equiv[q]++;
4072                 }
4073         }
4074
4075 //              Generate the variables which hold the common pred handles
4076         ret += "/*\t\tprefilter global vars.\t*/\n";
4077         for(q=0;q<equiv_list.size();++q){
4078                 for(p=0;p<=(num_equiv[q]/32);++p){
4079                         ret += "void *pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface+";\n";
4080                 }
4081         }
4082
4083 //              Struct to hold prefilter complex literals
4084         ret += "struct prefilter_complex_lit_struct_"+iface+" {\n";
4085         if(complex_literals->size() == 0)
4086                 ret += "\tint no_variable;\n";
4087         int cl;
4088         for(cl=0;cl<complex_literals->size();cl++){
4089                 literal_t *l = complex_literals->get_literal(cl);
4090                 data_type *dtl = new data_type( l->get_type() );
4091                 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
4092                 ret += tmpstr;
4093         }
4094         ret += "} prefilter_complex_lits_"+iface+";\n\n";
4095
4096
4097 //              Generate the prefilter initialziation code
4098         ret += "void init_lfta_prefilter_"+iface+"(){\n";
4099
4100 //              First initialize complex literals, if any.
4101         ret += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4102         for(cl=0;cl<complex_literals->size();cl++){
4103                 literal_t *l = complex_literals->get_literal(cl);
4104                 sprintf(tmpstr,"&(t->complex_literal_%d)",cl);
4105                 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
4106         }
4107
4108
4109         set<int> epred_seen;
4110         for(p=0;p<pr_list.size();++p){
4111                 int q = pred_class[p];
4112 //printf("\tq=%d\n",q);
4113                 if(epred_seen.count(q)>0){
4114                         ret += "\tregister_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4115                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4116                         vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4117                         for(o=0;o<op_list.size();++o){
4118                                 if(! cl_op[o]){
4119                                         ret += generate_se_code(op_list[o],Schema)+", ";
4120                                 }
4121                         }
4122                         ret += "pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+","+int_to_string(pred_pos[p]%32)+");\n";
4123                         epred_seen.insert(q);
4124                 }else{
4125                         ret += "\tpref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+" = (void *)register_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4126                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4127                         vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4128                         for(o=0;o<op_list.size();++o){
4129                                 if(! cl_op[o]){
4130                                         ret += generate_se_code(op_list[o],Schema)+", ";
4131                                 }
4132                         }
4133                         ret += "NULL,"+int_to_string(pred_pos[p]%32)+");\n";
4134                         epred_seen.insert(q);
4135                 }
4136         }
4137         ret += "}\n\n";
4138
4139
4140
4141 //              Start on main body code generation
4142   ret+="gs_uint64_t lfta_prefilter_"+iface+"(void *pkt){\n";
4143
4144
4145 ///--------------------------------------------------------------
4146 ///             Generate and store the prefilter body,
4147 ///             reuse it for the snap length calculator
4148 ///-------------------------------------------------------------
4149         string body;
4150
4151     body += "\tstruct packet *p = (struct packet *)pkt;\n";
4152
4153
4154
4155 //              Gather the colids to store unpacked variables.
4156         for(p=0;p<pred_list.size();++p){
4157         gather_pr_col_ids(pred_list[p]->pr,tmp_cid_set, gb_tbl);
4158         }
4159
4160 //              make the col_ids refer to the base tables, and
4161 //              grab the col_ids with at least one unpacking function.
4162         for(csi=tmp_cid_set.begin();csi!=tmp_cid_set.end();++csi){
4163                 string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
4164                 col_id tmp_col_id;
4165                 tmp_col_id.field = (*csi).field;
4166                 tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
4167                 tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
4168                 cid_set.insert(tmp_col_id);
4169                 field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
4170                 if(fe->get_unpack_fcns().size()>0)
4171                         upref_cids.insert(tmp_col_id);
4172
4173
4174         }
4175
4176 //              Find the set of unpacking programs needed for the
4177 //              prefilter fields.
4178         map<col_id, string,lt_col_id>  ucol_fcn_map;
4179         find_optimal_unpack_fcns(upref_cids, Schema, ucol_fcn_map);
4180         set<string> pref_ufcns;
4181         map<col_id, string,lt_col_id>::iterator mcis;
4182         for(mcis=ucol_fcn_map.begin(); mcis!=ucol_fcn_map.end(); mcis++){
4183                 pref_ufcns.insert((*mcis).second);
4184         }
4185
4186
4187
4188 //                      Variables for unpacking attributes.
4189     body += "/*\t\tVariables for unpacking attributes\t*/\n";
4190     for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4191       int schref = (*csi).schema_ref;
4192           int tblref = (*csi).tblvar_ref;
4193       string field = (*csi).field;
4194       data_type dt(Schema->get_type_name(schref,field));
4195       sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4196         field.c_str(), tblref);
4197       body += tmpstr;
4198       sprintf(tmpstr,"\tgs_retval_t ret_%s_%d;\n", field.c_str(),tblref);
4199       body += tmpstr;
4200     }
4201 //                      Variables for unpacking temporal attributes.
4202     body += "/*\t\tVariables for unpacking temporal attributes\t*/\n";
4203     for(csi=temp_cids.begin(); csi!=temp_cids.end();++csi){
4204           if (cid_set.count(*csi) == 0) {
4205         int schref = (*csi).schema_ref;
4206                 int tblref = (*csi).tblvar_ref;
4207         string field = (*csi).field;
4208         data_type dt(Schema->get_type_name(schref,field));
4209         sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4210           field.c_str(), tblref);
4211         body += tmpstr;
4212
4213           }
4214     }
4215     body += "\n\n";
4216
4217 //              Variables for combinable predicate evaluation
4218         body += "/*\t\tVariables for common prdicate evaluation\t*/\n";
4219         for(q=0;q<equiv_list.size();++q){
4220                 for(p=0;p<=(num_equiv[q]/32);++p){
4221                         body += "unsigned long int pref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = 0;\n";
4222                 }
4223         }
4224
4225
4226 //                      Variables that are always needed
4227     body += "/*\t\tVariables which are always needed\t*/\n";
4228         body += "\tgs_uint64_t retval=0, bitpos=1;\n";
4229         body += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4230
4231 //              Call the unpacking functions for the prefilter fields
4232         if(pref_ufcns.size() > 0)
4233                 body += "\n/*\t\tcall field unpacking functions\t*/\n";
4234         set<string>::iterator ssi;
4235         for(ssi=pref_ufcns.begin(); ssi!=pref_ufcns.end(); ++ssi){
4236                 body += "\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
4237         }
4238
4239
4240 //              Unpack the accessed attributes
4241         body += "\n/*\t\tUnpack the accessed attributes.\t*/\n";
4242     for(csi=cid_set.begin();csi!=cid_set.end();++csi){
4243           int tblref = (*csi).tblvar_ref;
4244       int schref = (*csi).schema_ref;
4245           string field = (*csi).field;
4246           sprintf(tmpstr,"\tret_%s_%d =  (%s(p, &unpack_var_%s_%d) == 0);\n",
4247                 field.c_str(),tblref,Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
4248           body += tmpstr;
4249     }
4250
4251 //              next unpack the temporal attributes and ignore the errors
4252 //              We are assuming here that failed unpack of temporal attributes
4253 //              is not going to overwrite the last stored value
4254 //              Failed upacks are ignored
4255     for(csi=temp_cids.begin();csi!=temp_cids.end();++csi){
4256           int tblref = (*csi).tblvar_ref;
4257       int schref = (*csi).schema_ref;
4258           string field = (*csi).field;
4259           sprintf(tmpstr,"\t%s(p, &prefilter_temp_vars.unpack_var_%s_%d);\n",
4260                  Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
4261           body += tmpstr;
4262     }
4263
4264 //              Evaluate the combinable predicates
4265         if(equiv_list.size()>0)
4266                 body += "/*\t\tEvaluate the combinable predicates.\t*/\n";
4267         for(q=0;q<equiv_list.size();++q){
4268                 for(p=0;p<=(num_equiv[q]/32);++p){
4269
4270 //              Only call the common eval fcn if all ref'd fields present.
4271                         col_id_set pred_cids;
4272                         col_id_set::iterator cpi;
4273                         gather_pr_col_ids(equiv_list[q], pred_cids, gb_tbl);
4274                         if(pred_cids.size()>0){
4275                         body += "\tif(";
4276                                 for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
4277                                         if(cpi != pred_cids.begin())
4278                                                 body += " && ";
4279                                 string field = (*cpi).field;
4280                                         int tblref = (*cpi).tblvar_ref;
4281                                         body += "ret_"+field+"_"+int_to_string(tblref);
4282                                 }
4283                                 body+=")\n";
4284                         }
4285
4286                         body += "\t\tpref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = eval_commonpred_"+equiv_list[q]->get_op()+"(pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface;
4287                         vector<scalarexp_t *> op_list = equiv_list[q]->get_op_list();
4288                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4289                         for(o=0;o<op_list.size();++o){
4290                                 if(cl_op[o]){
4291                                         body += ","+generate_se_code(op_list[o],Schema);
4292                                 }
4293                         }
4294                         body += ");\n";
4295                 }
4296         }
4297
4298
4299         for(p=0;p<pred_list.size();++p){
4300                 col_id_set pred_cids;
4301                 col_id_set::iterator cpi;
4302                 gather_pr_col_ids(pred_list[p]->pr,pred_cids, gb_tbl);
4303                 if(pred_cids.size()>0){
4304                         body += "\tif(";
4305                         for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
4306                                 if(cpi != pred_cids.begin())
4307                                         body += " && ";
4308                         string field = (*cpi).field;
4309                                 int tblref = (*cpi).tblvar_ref;
4310                                 body += "ret_"+field+"_"+int_to_string(tblref);
4311                         }
4312                         body+=")\n";
4313                 }
4314         body += "\t\tif("+generate_predicate_code(pred_list[p]->pr,Schema)+")\n\t\t\tretval |= bitpos;\n";
4315                 body+="\tbitpos = bitpos << 1;\n";
4316         }
4317
4318 // ---------------------------------------------------------------
4319 //              Finished with the body of the prefilter
4320 // --------------------------------------------------------------
4321
4322         ret += body;
4323
4324 //                      Collect fields referenced by an lfta but not
4325 //                      already unpacked for the prefilter.
4326
4327 //printf("upref_cids is:\n");
4328 //for(csi=upref_cids.begin();csi!=upref_cids.end();csi++)
4329 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
4330 //printf("pref_ufcns is:\n");
4331 //for(ssi=pref_ufcns.begin();ssi!=pref_ufcns.end();++ssi)
4332 //printf("\t%s\n",(*ssi).c_str());
4333
4334         int l;
4335         for(l=0;l<lfta_cols.size();++l){
4336                 for(csi=lfta_cols[l].begin();csi!=lfta_cols[l].end();++csi){
4337                         string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
4338                         col_id tmp_col_id;
4339                         tmp_col_id.field = (*csi).field;
4340                         tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
4341                         tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
4342                         field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
4343                         set<string> fld_ufcns = fe->get_unpack_fcns();
4344 //printf("tmpcol is (%s, %d), ufcns size is %d, upref_cids cnt is %d\n",tmp_col_id.field.c_str(),tmp_col_id.schema_ref,fld_ufcns.size(),  upref_cids.count(tmp_col_id));
4345                         if(fld_ufcns.size()>0 && upref_cids.count(tmp_col_id) == 0){
4346 //              Ensure that this field not already unpacked.
4347                                 bool found = false;
4348                                 for(ssi=fld_ufcns.begin();ssi!=fld_ufcns.end();++ssi){
4349 //printf("\tField has unpacking fcn %s\n",(*ssi).c_str());
4350                                         if(pref_ufcns.count((*ssi))){
4351 //printf("Field already unpacked.\n");
4352                                                 found = true;;
4353                                         }
4354                                 }
4355                                 if(! found){
4356 //printf("\tadding to unpack list\n");
4357                                         upall_cids.insert(tmp_col_id);
4358                                 }
4359                         }
4360                 }
4361         }
4362
4363 //printf("upall_cids is:\n");
4364 //for(csi=upall_cids.begin();csi!=upall_cids.end();csi++)
4365 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
4366
4367 //              Get the set of unpacking programs for these.
4368         map<col_id, string,lt_col_id>  uall_fcn_map;
4369         find_optimal_unpack_fcns(upall_cids, Schema, uall_fcn_map);
4370         set<string> pall_ufcns;
4371         for(mcis=uall_fcn_map.begin(); mcis!=uall_fcn_map.end(); mcis++){
4372 //printf("uall_fcn_map[%s %d] = %s\n",(*mcis).first.field.c_str(),(*mcis).first.schema_ref,(*mcis).second.c_str());
4373                 pall_ufcns.insert((*mcis).second);
4374         }
4375
4376 //              Iterate through the remaining set of unpacking function
4377         if(pall_ufcns.size() > 0)
4378                 ret += "//\t\tcall all remaining field unpacking functions.\n";
4379         for(ssi=pall_ufcns.begin(); ssi!=pall_ufcns.end(); ++ssi){
4380 //              gather the set of columns unpacked by this ufcn
4381                 col_id_set fcol_set;
4382                 for(csi=upall_cids.begin();csi!=upall_cids.end();++csi){
4383                         if(uall_fcn_map[(*csi)] == (*ssi))
4384                                 fcol_set.insert((*csi));
4385                 }
4386
4387 //              gather the set of lftas which access a field unpacked by the fcn
4388                 set<long long int> clfta;
4389                 for(l=0;l<lfta_cols.size();l++){
4390                         for(csi=fcol_set.begin();csi!=fcol_set.end();++csi){
4391                                 if(lfta_cols[l].count((*csi)) > 0)
4392                                         break;
4393                         }
4394                         if(csi != fcol_set.end())
4395                                 clfta.insert(lfta_sigs[l]);
4396                 }
4397
4398 //              generate the unpacking code
4399                 ret += "\tif(";
4400                 set<long long int>::iterator sii;
4401                 for(sii=clfta.begin();sii!=clfta.end();++sii){
4402                         if(sii!=clfta.begin())
4403                                 ret += " || ";
4404                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sii),(*sii));
4405                         ret += tmpstr;
4406                 }
4407                 ret += ")\n\t\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
4408         }
4409
4410
4411     ret += "\treturn(retval);\n\n";
4412   ret += "}\n\n";
4413
4414
4415 // --------------------------------------------------------
4416 //              reuse prefilter body for snaplen calculator
4417 //
4418 //      This is dummy code, so I'm commenting it out.
4419
4420 /*
4421   ret+="gs_uint32_t lfta_pkt_snaplen(void *pkt){\n";
4422
4423         ret += body;
4424
4425         int i;
4426         vector<int> s_snaps = lfta_snap_lens;
4427         sort(s_snaps.begin(), s_snaps.end());
4428
4429         if(s_snaps[0] == -1){
4430                 set<unsigned long long int> sigset;
4431                 for(i=0;i<lfta_snap_lens.size();++i){
4432                         if(lfta_snap_lens[i] == -1){
4433                                 sigset.insert(lfta_sigs[i]);
4434                         }
4435                 }
4436                 ret += "\tif( ";
4437                 set<unsigned long long int>::iterator sulli;
4438                 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
4439                         if(sulli!=sigset.begin())
4440                                 ret += " || ";
4441                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
4442                         ret += tmpstr;
4443                 }
4444                 ret += ") return -1;\n";
4445         }
4446
4447         int nextpos = lfta_snap_lens.size()-1;
4448         int nextval = lfta_snap_lens[nextpos];
4449         while(nextval >= 0){
4450                 set<unsigned long long int> sigset;
4451                 for(i=0;i<lfta_snap_lens.size();++i){
4452                         if(lfta_snap_lens[i] == nextval){
4453                                 sigset.insert(lfta_sigs[i]);
4454                         }
4455                 }
4456                 ret += "\tif( ";
4457                 set<unsigned long long int>::iterator sulli;
4458                 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
4459                         if(sulli!=sigset.begin())
4460                                 ret += " || ";
4461                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
4462                         ret += tmpstr;
4463                 }
4464                 ret += ") return "+int_to_string(nextval)+";\n";
4465
4466                 for(nextpos--;nextpos>0 && lfta_snap_lens[nextpos] == nextval;nextpos--);
4467                 if(nextpos>0)
4468                         nextval = lfta_snap_lens[nextpos];
4469                 else
4470                         nextval = -1;
4471         }
4472         ret += "\treturn 0;\n";
4473         ret += "}\n\n";
4474 */
4475
4476
4477   return(ret);
4478 }
4479
4480
4481
4482
4483 //              Generate the struct which will store the the values of
4484 //              temporal attributesunpacked by prefilter
4485 string generate_lfta_prefilter_struct(col_id_set &cid_set, table_list *Schema) {
4486
4487   col_id_set::iterator csi;
4488
4489 // printf("generate_lfta_prefilter_struct : %d vars\n",cid_set.size());
4490
4491   string ret="struct prefilter_unpacked_temp_vars {\n";
4492   ret += "\t/*\tVariables for unpacking temporal attributes\t*/\n";
4493
4494   string init_code;
4495
4496   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4497     int schref = (*csi).schema_ref;
4498     int tblref = (*csi).tblvar_ref;
4499     string field = (*csi).field;
4500         data_type dt(Schema->get_type_name(schref,field), Schema->get_modifier_list(schref,field));
4501     sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4502         field.c_str(), tblref);
4503     ret += tmpstr;
4504
4505         if (init_code != "")
4506                 init_code += ", ";
4507         if (dt.is_increasing())
4508                 init_code += dt.get_min_literal();
4509         else
4510                 init_code += dt.get_max_literal();
4511
4512   }
4513   ret += "};\n\n";
4514
4515   ret += "struct prefilter_unpacked_temp_vars prefilter_temp_vars = {" + init_code + "};\n\n";
4516
4517   return(ret);
4518 }