98d872d4b184948ade3ee02256e13d77fc3c0982
[com/gs-lite.git] / src / ftacmp / generate_lfta_code.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #include <string>
17 #include <stdio.h>
18 #include <stdlib.h>
19 //#include <algo.h>
20 #include<algorithm>
21
22 #include "parse_fta.h"
23 #include "parse_schema.h"
24 #include "analyze_fta.h"
25 #include "generate_utils.h"
26 #include "query_plan.h"
27 #include "generate_lfta_code.h"
28 #include "generate_nic_code.h"
29
30 using namespace std;
31
32 extern int DEFAULT_LFTA_HASH_TABLE_SIZE;
33
34 // default value for correlation between the interface card and
35 // the system clock
36 #define DEFAULT_TIME_CORR 16
37
38
39 //      For fast hashing
40 //#define NRANDS 100
41 extern string hash_nums[NRANDS];
42 /*
43 = {
44 "12916008961267169387ull", "13447227858232756685ull",
45 "15651770379918602919ull", "1154671861688431608ull",
46 "6777078091984849858ull", "14217205709582564356ull",
47 "4955408621820609982ull", "15813680319165523695ull",
48 "9897969721407807129ull", "5799700135519793083ull",
49 "3446529189623437397ull", "2766403683465910630ull",
50 "3759321430908793328ull", "6569396511892890354ull",
51 "11124853911180290924ull", "17425412145238035549ull",
52 "6879931585355039943ull", "16598635011539670441ull",
53 "9615975578494811651ull", "4378135509538422740ull",
54 "741282195344332574ull", "17368612862906255584ull",
55 "17294299200556814618ull", "518343398779663051ull",
56 "3861893449302272757ull", "8951107288843549591ull",
57 "15785139392894559409ull", "5917810836789601602ull",
58 "16169988133001117004ull", "9792861259254509262ull",
59 "5089058010244872136ull", "2130075224835397689ull",
60 "844136788226150435ull", "1303298091153875333ull",
61 "3579898206894361183ull", "7529542662845336496ull",
62 "13151949992653382522ull", "2145333536541545660ull",
63 "11258221828939586934ull", "3741808146124570279ull",
64 "16272841626371307089ull", "12174572036188391283ull",
65 "9749343496254107661ull", "9141275584134508830ull",
66 "10134192232065698216ull", "12944268412561423018ull",
67 "17499725811865666340ull", "5281482378159088661ull",
68 "13254803486023572607ull", "4526762838498717025ull",
69 "15990846379668494011ull", "10680949816169027468ull",
70 "7116154096012931030ull", "5296740689865236632ull",
71 "5222427027515795922ull", "6893215299448261251ull",
72 "10164707755932877485ull", "15325979189512082255ull",
73 "3713267224148573289ull", "12292682741753167354ull",
74 "4098115959960163588ull", "16095675565885113990ull",
75 "11391590846210510720ull", "8432889531466002673ull",
76 "7146668520368482523ull", "7678169991822407997ull",
77 "9882712513525031447ull", "13904414563513869160ull",
78 "1080076724395768626ull", "8448147843172150388ull",
79 "17633093729608185134ull", "10044622457050142303ull",
80 "4128911859292425737ull", "30642269109444395ull",
81 "16124215396922640581ull", "15444089895060081110ull",
82 "16437006538696302944ull", "800338649777443426ull",
83 "5355794945275091932ull", "11656354278827687117ull",
84 "1110873718944691255ull", "10829576045617693977ull",
85 "3846916616884579955ull", "17055821716837625668ull",
86 "13418968402643535758ull", "11671612594828802128ull",
87 "11597298928184328586ull", "13196028510862205499ull",
88 "16539578557089782373ull", "3182048322921507591ull",
89 "10016080431267550241ull", "148751875162592690ull",
90 "10400930266590768572ull", "4023803397139127870ull",
91 "17766462746879108920ull", "14807761432134600873ull",
92 "13521540421053792403ull", "13980983198941385205ull",
93 "16257584414193564367ull", "1760484796451765024ull"
94 };
95 */
96
97
98 // ----------------------------------------------
99 //              Data extracted from the query plan node
100 //              for use by code generation.
101
102 static cplx_lit_table *complex_literals;  //Table of literals with constructors.
103 static vector<handle_param_tbl_entry *> param_handle_table;
104 static param_table *param_tbl;          // Table of all referenced parameters.
105
106 static vector<scalarexp_t *> sl_list;
107 static vector<cnf_elem *> where;
108
109 static gb_table *gb_tbl;                        // Table of all group-by attributes.
110 static aggregate_table *aggr_tbl;       // Table of all referenced aggregates.
111
112 static bool packed_return;              // unpack using structyure, not fcns
113 static nic_property *nicprop;   // nic properties for this interface.
114 static int global_id;
115
116
117 //              The partial_fcns vector can now refer to
118 //              partial functions, or expensive functions
119 //              which can be cached (if there are multiple refs).  A couple
120 //              of int vectors distinguish the cases.
121 static vector<scalarexp_t *> partial_fcns;
122 static vector<int> fcn_ref_cnt;
123 static vector<bool> is_partial_fcn;
124 int sl_fcns_start = 0, sl_fcns_end = 0;
125 int wh_fcns_start = 0, wh_fcns_end = 0;
126 int gb_fcns_start = 0, gb_fcns_end = 0;
127 int ag_fcns_start = 0, ag_fcns_end = 0;
128
129
130 //              These vectors are for combinable predicates.
131 static vector<int> pred_class;  //      identifies the group
132 static vector<int> pred_pos;    //  position in the group.
133
134
135
136 static char tmpstr[1000];
137
138 //////////////////////////////////////////////////////////////////////
139 ///                     Various utilities
140
141 string generate_fta_name(string node_name){
142         string ret = normalize_name(node_name);
143         if(ret == ""){
144                 ret = "default";
145         }
146         ret += "_fta";
147
148         return(ret);
149 }
150
151
152 string generate_aggr_struct_name(string node_name){
153         string ret = normalize_name(node_name);
154         if(ret == ""){
155                 ret = "default";
156         }
157         ret += "_aggr_struct";
158
159         return(ret);
160 }
161
162 string generate_fj_struct_name(string node_name){
163         string ret = normalize_name(node_name);
164         if(ret == ""){
165                 ret = "default";
166         }
167         ret += "_fj_struct";
168
169         return(ret);
170 }
171
172 string generate_unpack_code(int tblref, int schref, string field, table_list *schema, string node_name, string end_goto = string("end")){
173         string ret;
174         if(! packed_return){
175                 sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",
176                         schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
177         ret += tmpstr;
178         if(!schema->get_modifier_list(schref,field)->contains_key("required"))
179                 ret += "\tif(retval) goto "+end_goto+";\n";
180
181         }else{
182 //                      TODO: ntoh xforms (aug 2010 : removing ntoh, hton)
183                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
184                 if(dt.is_buffer_type()){
185                         if(dt.get_type() != v_str_t){
186                           ret += "\tif(sizeof(struct "+node_name+"_input_struct)+"+node_name+"_input_struct_var->unpack_var_"+field+".length+int("+node_name+"_input_struct_var->unpack_var_"+field+".data) > sz)\n";
187                           ret += "\t\tgoto "+end_goto+";\n";
188                           ret+= "\t\t"+node_name+"_input_struct_var->unpack_var_"+field+".data += "+node_name+"_input_struct_var->unpack_var_"+field+".length;\n";
189                           ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
190                                 " = "+node_name+"_input_struct_var->unpack_var_"+field+";+\n";
191                         }else{
192                                 fprintf(stderr,"INTERNAL ERROR buffer type not string type in generate_lfta_code.cc:generate_unpack_code\n");
193                                 exit(1);
194                         }
195                 }else{
196                         ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
197                                 " = "+node_name+"_input_struct_var->unpack_var_"+field+";\n";
198                 }
199         }
200         return ret;
201 }
202
203 string generate_aggr_struct(string node_name, gb_table *gb_tbl, aggregate_table *aggr_tbl){
204   string ret = "struct " + generate_aggr_struct_name(node_name) + "{\n";
205
206   int g;
207   for(g=0;g<gb_tbl->size();g++){
208           sprintf(tmpstr,"gb_var%d",g);
209           ret += "\t"+gb_tbl->get_data_type(g)->make_cvar(tmpstr)+";\n";
210   }
211
212   int a;
213   for(a=0;a<aggr_tbl->size();a++){
214           ret += "\t";
215           sprintf(tmpstr,"aggr_var%d",a);
216           if(aggr_tbl->is_builtin(a))
217                 ret+="\t"+aggr_tbl->get_data_type(a)->make_cvar(tmpstr)+";\n";
218           else
219                 ret+="\t"+aggr_tbl->get_storage_type(a)->make_cvar(tmpstr)+";\n";
220   }
221
222 /*
223   ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *next;\n";
224 */
225
226   ret += "};\n\n";
227
228   return(ret);
229 }
230
231
232 string generate_fj_struct(filter_join_qpn *fs, string node_name ){
233   string ret;
234
235   if(fs->use_bloom == false){   // uses hash table instead
236         ret = "struct " + generate_fj_struct_name(node_name) + "{\n";
237         int k;
238         for(k=0;k<fs->hash_eq.size();++k){
239                 sprintf(tmpstr,"key_var%d",k);
240                 ret += "\t"+fs->hash_eq[k]->pr->get_left_se()->get_data_type()->make_cvar(tmpstr)+";\n";
241         }
242         ret += "\tlong long int ts;\n";
243     ret += "};\n\n";
244   }
245
246   return(ret);
247 }
248
249
250
251
252 string generate_fta_struct(string node_name, gb_table *gb_tbl,
253                 aggregate_table *aggr_tbl, param_table *param_tbl,
254                 cplx_lit_table *complex_literals,
255                 vector<handle_param_tbl_entry *> &param_handle_table,
256                 bool is_aggr_query, bool is_fj, bool uses_bloom,
257                 table_list *schema){
258
259         string ret = "struct " + generate_fta_name(node_name) + "{\n";
260         ret += "\tstruct FTA f;\n";
261
262 //-------------------------------------------------------------
263 //              Aggregate-specific fields
264
265         if(is_aggr_query){
266 /*
267                 ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_head, *flush_head;\n";
268 */
269                 ret+="\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_table; // the groups\n";
270                 ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n";
271 //              ret+="\tint bitmap_size;\n";
272                 ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n";
273                 ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n";
274                 ret += "\tint max_windows; // max number of open windows.\n";
275                 ret += "\tunsigned int generation; // initially zero, increment on\n";
276                 ret += "\t     // every hash table flush - whether regular or induced.\n";
277                 ret += "\t     // Old groups are identified by a generation mismatch.\n";
278                 ret += "\tunsigned int flush_pos; // next aggr_table entry to examine\n";
279                 ret += "\tunsigned int flush_ctr; // control slow flushing\n";
280
281
282
283                 int g;
284                 bool uses_temporal_flush = false;
285                 for(g=0;g<gb_tbl->size();g++){
286                         data_type *dt = gb_tbl->get_data_type(g);
287                         if(dt->is_temporal()){
288 /*
289                                 fprintf(stderr,"group by attribute %s is temporal, ",
290                                                 gb_tbl->get_name(g).c_str());
291                                 if(dt->is_increasing()){
292                                         fprintf(stderr,"increasing.\n");
293                                 }else{
294                                         fprintf(stderr,"decreasing.\n");
295                                 }
296 */
297                                 data_type *gdt = gb_tbl->get_data_type(g);
298                                 if(gdt->is_buffer_type()){
299                                         fprintf(stderr, "\t but temporal BUFFER types are not supported, skipping.\n");
300                                 }else{
301                                         sprintf(tmpstr,"\t%s last_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
302                                         ret += tmpstr;
303                                         sprintf(tmpstr,"\t%s flush_start_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
304                                         ret += tmpstr;
305                                         sprintf(tmpstr,"\t%s last_flushed_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
306                                         ret += tmpstr;
307                                         uses_temporal_flush = true;
308                                 }
309
310                         }
311
312                 }
313                 if(! uses_temporal_flush){
314                         fprintf(stderr,"Warning: no temporal flush.\n");
315                 }
316         }
317
318 // ---------------------------------------------------------
319 //                      Filter-join specific fields
320
321         if(is_fj){
322                 if(uses_bloom){
323                         ret +=
324 "\tunsigned char * bf_table; //array of bloom filters with layout \n"
325 "\t\t// bit 0 bf 0| bit 0 bf 1| bit 0 bf 2| bit 1 bf 0| bit 1 bf 1|.....\n"
326 "\tint first_exec;\n"
327 "\tlong long int last_bin;\n"
328 "\tint last_bloom_pos;\n"
329 "\n"
330 ;
331                 }else{          // limited hash table
332                         ret +=
333 "  struct "+generate_fj_struct_name(node_name)+" *join_table;\n"
334 "\n"
335 ;
336                 }
337
338         }
339
340 //--------------------------------------------------------
341 //                      Common fields
342
343 //                      Create places to hold the parameters.
344         int p;
345         vector<string> param_vec = param_tbl->get_param_names();
346         for(p=0;p<param_vec.size();p++){
347                 data_type *dt = param_tbl->get_data_type(param_vec[p]);
348                 sprintf(tmpstr,"\t%s param_%s;\n",dt->get_cvar_type().c_str(),
349                                 param_vec[p].c_str());
350                 ret += tmpstr;
351                 if(param_tbl->handle_access(param_vec[p])){
352                         ret += "\tstruct search_handle *param_handle_"+param_vec[p]+";\n";
353                 }
354         }
355
356 //                      Create places to hold complex literals.
357         int cl;
358         for(cl=0;cl<complex_literals->size();cl++){
359                 literal_t *l = complex_literals->get_literal(cl);
360                 data_type *dtl = new data_type( l->get_type() );
361                 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
362                 ret += tmpstr;
363         }
364
365 //                      Create places to hold the pass-by-handle parameters.
366         for(p=0;p<param_handle_table.size();++p){
367                 sprintf(tmpstr,"\tgs_param_handle_t handle_param_%d;\n",p);
368                 ret += tmpstr;
369         }
370
371 //                      Create places to hold the last values of temporal
372 //                      attributes referenced in select clause
373 //                      we also need to store values of the temoral attributed
374 //                      of last flushed tuple in aggr queries
375 //                      to make sure we generate the cirrect temporal tuple
376 //                      in the presense of slow flushes
377
378
379         col_id_set temp_cids;           //      col ids of temp attributes in select clause
380
381         int s;
382         col_id_set::iterator csi;
383
384         for(s=0;s<sl_list.size();s++){
385                 data_type *sdt = sl_list[s]->get_data_type();
386                 if (sdt->is_temporal()) {
387                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
388                 }
389         }
390
391         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
392                 int tblref = (*csi).tblvar_ref;
393                 int schref = (*csi).schema_ref;
394                 string field = (*csi).field;
395                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
396                 sprintf(tmpstr,"\t%s last_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
397                 ret += tmpstr;
398         }
399
400         ret += "\tgs_uint64_t trace_id;\n\n";
401
402 //      Fields to store the runtime stats
403
404         ret += "\tgs_uint32_t in_tuple_cnt;\n";
405         ret += "\tgs_uint32_t out_tuple_cnt;\n";
406         ret += "\tgs_uint32_t out_tuple_sz;\n";
407         ret += "\tgs_uint32_t accepted_tuple_cnt;\n";
408         ret += "\tgs_uint64_t cycle_cnt;\n";
409         ret += "\tgs_uint32_t collision_cnt;\n";
410         ret += "\tgs_uint32_t eviction_cnt;\n";
411         ret += "\tgs_float_t sampling_rate;\n";
412
413
414
415         ret += "};\n\n";
416
417         return(ret);
418 }
419
420 //------------------------------------------------------------
421 //              Set colref tblvars to 0..
422 //              (special processing for join-like operators in an lfta).
423
424 void reset_se_col_ids_tblvars(scalarexp_t *se,  gb_table *gtbl){
425         vector<scalarexp_t *> operands;
426         int o;
427
428         if(! se)
429                 return;
430
431         switch(se->get_operator_type()){
432         case SE_LITERAL:
433         case SE_PARAM:
434         case SE_IFACE_PARAM:
435                 return;
436         case SE_UNARY_OP:
437                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
438                 return;
439         case SE_BINARY_OP:
440                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
441                 reset_se_col_ids_tblvars(se->get_right_se(),gtbl);
442                 return;
443         case SE_COLREF:
444                 if(! se->is_gb() ){
445                         se->get_colref()->set_tablevar_ref(0);
446                 }else{
447                         if(gtbl==NULL){
448                                 fprintf(stderr,"INTERNAL ERROR: gbvar ref in gather_se_col_ids, but gtbl is NULL.\n");
449                                 exit(1);
450                         }
451                         reset_se_col_ids_tblvars(gtbl->get_def(se->get_gb_ref()),gtbl);
452                 }
453                 return;
454         case SE_AGGR_STAR:
455                 return;
456         case SE_AGGR_SE:
457                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
458                 return;
459         case SE_FUNC:
460                 operands = se->get_operands();
461                 for(o=0;o<operands.size();o++){
462                         reset_se_col_ids_tblvars(operands[o], gtbl);
463                 }
464                 return;
465         default:
466                 fprintf(stderr,"INTERNAL ERROR in reset_se_col_ids_tblvars, line %d, character %d: unknown operator type %d\n",
467                                 se->get_lineno(), se->get_charno(),se->get_operator_type());
468                 exit(1);
469         }
470 }
471
472
473 //              reset  column tblvars accessed in this pr.
474
475 void reset_pr_col_ids_tblvars(predicate_t *pr,  gb_table *gtbl){
476         vector<scalarexp_t *> op_list;
477         int o;
478
479         switch(pr->get_operator_type()){
480         case PRED_IN:
481                 reset_se_col_ids_tblvars(pr->get_left_se(), gtbl);
482                 return;
483         case PRED_COMPARE:
484                 reset_se_col_ids_tblvars(pr->get_left_se(),gtbl) ;
485                 reset_se_col_ids_tblvars(pr->get_right_se(),gtbl) ;
486                 return;
487         case PRED_UNARY_OP:
488                 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
489                 return;
490         case PRED_BINARY_OP:
491                 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
492                 reset_pr_col_ids_tblvars(pr->get_right_pr(),gtbl) ;
493                 return;
494         case PRED_FUNC:
495                 op_list = pr->get_op_list();
496                 for(o=0;o<op_list.size();++o){
497                         reset_se_col_ids_tblvars(op_list[o],gtbl) ;
498                 }
499                 return;
500         default:
501                 fprintf(stderr,"INTERNAL ERROR in reset_pr_col_ids_tblvars, line %d, character %d, unknown predicate operator type %d\n",
502                         pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
503         }
504 }
505
506
507
508
509 //                      Generate code that makes reference
510 //                      to the tuple, and not to any aggregates.
511 static string generate_se_code(scalarexp_t *se,table_list *schema){
512         string ret;
513     data_type *ldt, *rdt;
514         int o;
515         vector<scalarexp_t *> operands;
516
517
518         switch(se->get_operator_type()){
519         case SE_LITERAL:
520                 if(se->is_handle_ref()){
521                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
522                         ret = tmpstr;
523                         return(ret);
524                 }
525                 if(se->get_literal()->is_cpx_lit()){
526                         sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
527                         ret = tmpstr;
528                         return(ret);
529                 }
530                 return(se->get_literal()->to_C_code("")); // not complex, no constructor
531         case SE_PARAM:
532                 if(se->is_handle_ref()){
533                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
534                         ret = tmpstr;
535                         return(ret);
536                 }
537                 ret += "t->param_";
538                 ret += se->get_param_name();
539                 return(ret);
540         case SE_UNARY_OP:
541         ldt = se->get_left_se()->get_data_type();
542         if(ldt->complex_operator(se->get_op()) ){
543                         ret +=  ldt->get_complex_operator(se->get_op());
544                         ret += "(";
545                         ret += generate_se_code(se->get_left_se(),schema);
546             ret += ")";
547                 }else{
548                         ret += "(";
549                         ret += se->get_op();
550                         ret += generate_se_code(se->get_left_se(),schema);
551                         ret += ")";
552                 }
553                 return(ret);
554         case SE_BINARY_OP:
555         ldt = se->get_left_se()->get_data_type();
556         rdt = se->get_right_se()->get_data_type();
557
558         if(ldt->complex_operator(rdt, se->get_op()) ){
559                         ret +=  ldt->get_complex_operator(rdt, se->get_op());
560                         ret += "(";
561                         ret += generate_se_code(se->get_left_se(),schema);
562                         ret += ", ";
563                         ret += generate_se_code(se->get_right_se(),schema);
564                         ret += ")";
565                 }else{
566                         ret += "(";
567                         ret += generate_se_code(se->get_left_se(),schema);
568                         ret += se->get_op();
569                         ret += generate_se_code(se->get_right_se(),schema);
570                         ret += ")";
571                 }
572                 return(ret);
573         case SE_COLREF:
574                 if(se->is_gb()){                // OK to ref gb attrs, but they're not yet unpacked ...
575                                                         // so return the defining code.
576                         ret = generate_se_code(gb_tbl->get_def(se->get_gb_ref()), schema );
577
578                 }else{
579                 sprintf(tmpstr,"unpack_var_%s_%d",
580                   se->get_colref()->get_field().c_str(), se->get_colref()->get_tablevar_ref() );
581                 ret = tmpstr;
582                 }
583                 return(ret);
584         case SE_FUNC:
585 //                              Should not be ref'ing any aggr here.
586                 if(se->get_aggr_ref() >= 0){
587                         fprintf(stderr,"INTERNAL ERROR, UDAF reference in generate_se_code.\n");
588                         return("ERROR in generate_se_code");
589                 }
590
591                 if(se->is_partial()){
592                         sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
593                         ret = tmpstr;
594                 }else{
595                         ret += se->op + "(";
596                         operands = se->get_operands();
597                         for(o=0;o<operands.size();o++){
598                                 if(o>0) ret += ", ";
599                                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
600                                         ret += "&";
601                                 ret += generate_se_code(operands[o], schema);
602                         }
603                         ret += ")";
604                 }
605                 return(ret);
606         default:
607                 fprintf(stderr,"INTERNAL ERROR in generate_se_code (lfta), line %d, character %d: unknown operator type %d\n",
608                                 se->get_lineno(), se->get_charno(),se->get_operator_type());
609                 return("ERROR in generate_se_code");
610         }
611 }
612
613 //              generate code that refers only to aggregate data and constants.
614 static string generate_se_code_fm_aggr(scalarexp_t *se, string var, table_list *schema){
615
616         string ret;
617     data_type *ldt, *rdt;
618         int o;
619         vector<scalarexp_t *> operands;
620
621
622         switch(se->get_operator_type()){
623         case SE_LITERAL:
624                 if(se->is_handle_ref()){
625                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
626                         ret = tmpstr;
627                         return(ret);
628                 }
629                 if(se->get_literal()->is_cpx_lit()){
630                         sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
631                         ret = tmpstr;
632                         return(ret);
633                 }
634                 return(se->get_literal()->to_C_code("")); // not complex no constructor
635         case SE_PARAM:
636                 if(se->is_handle_ref()){
637                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
638                         ret = tmpstr;
639                         return(ret);
640                 }
641                 ret += "t->param_";
642                 ret += se->get_param_name();
643                 return(ret);
644         case SE_UNARY_OP:
645         ldt = se->get_left_se()->get_data_type();
646         if(ldt->complex_operator(se->get_op()) ){
647                         ret +=  ldt->get_complex_operator(se->get_op());
648                         ret += "(";
649                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
650             ret += ")";
651                 }else{
652                         ret += "(";
653                         ret += se->get_op();
654                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
655                         ret += ")";
656                 }
657                 return(ret);
658         case SE_BINARY_OP:
659         ldt = se->get_left_se()->get_data_type();
660         rdt = se->get_right_se()->get_data_type();
661
662         if(ldt->complex_operator(rdt, se->get_op()) ){
663                         ret +=  ldt->get_complex_operator(rdt, se->get_op());
664                         ret += "(";
665                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
666                         ret += ", ";
667                         ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
668                         ret += ")";
669                 }else{
670                         ret += "(";
671                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
672                         ret += se->get_op();
673                         ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
674                         ret += ")";
675                 }
676                 return(ret);
677         case SE_COLREF:
678                 if(se->is_gb()){                // OK to ref gb attrs, but they're not yet
679                                                         // unpacked ... so return the defining code.
680                         sprintf(tmpstr,"%sgb_var%d",var.c_str(),se->get_gb_ref());
681                         ret = tmpstr;
682
683                 }else{
684                 fprintf(stderr,"ERROR reference to non-GB column ref not permitted here,"
685                                 "error in generate_se_code_fm_aggr, line %d, character %d.\n",
686                                 se->get_lineno(), se->get_charno());
687                 ret = tmpstr;
688                 }
689                 return(ret);
690         case SE_AGGR_STAR:
691         case SE_AGGR_SE:
692                 sprintf(tmpstr,"%saggr_var%d",var.c_str(),se->get_aggr_ref());
693                 ret = tmpstr;
694                 return(ret);
695         case SE_FUNC:
696 //                              Is it a UDAF?
697                 if(se->get_aggr_ref() >= 0){
698                         sprintf(tmpstr,"udaf_ret%d",se->get_aggr_ref());
699                         ret = tmpstr;
700                         return(ret);
701                 }
702
703                 if(se->is_partial()){
704                         sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
705                         ret = tmpstr;
706                 }else{
707                         ret += se->op + "(";
708                         operands = se->get_operands();
709                         for(o=0;o<operands.size();o++){
710                                 if(o>0) ret += ", ";
711                                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
712                                         ret += "&";
713                                 ret += generate_se_code_fm_aggr(operands[o], var, schema);
714                         }
715                         ret += ")";
716                 }
717                 return(ret);
718         default:
719                 fprintf(stderr,"INTERNAL ERROR in generate_lfta_code.cc::generate_se_code_fm_aggr, line %d, character %d: unknown operator type %d\n",
720                                 se->get_lineno(), se->get_charno(),se->get_operator_type());
721                 return("ERROR in generate_se_code");
722         }
723
724 }
725
726
727 static string unpack_partial_fcn_fm_aggr(scalarexp_t *se, int pfn_id, string var, table_list *schema){
728         string ret;
729         int o;
730         vector<scalarexp_t *> operands;
731
732
733         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
734                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn_fm_aggr. line %d, character %d\n",
735                                 se->get_lineno(), se->get_charno());
736                 return("ERROR in generate_se_code");
737         }
738
739         ret = "\tretval = " + se->get_op() + "( ";
740         sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
741         ret += tmpstr;
742
743         operands = se->get_operands();
744         for(o=0;o<operands.size();o++){
745                 ret += ", ";
746                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
747                         ret += "&";
748                 ret += generate_se_code_fm_aggr(operands[o], var, schema);
749         }
750         ret += ");\n";
751
752         return(ret);
753 }
754
755 static string generate_cached_fcn(scalarexp_t *se, table_list *schema){
756         string ret;
757         int o;
758         vector<scalarexp_t *> operands;
759
760         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
761                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to generate_cached_fcn. line %d, character %d\n",
762                                 se->get_lineno(), se->get_charno());
763                 return("ERROR in generate_se_code");
764         }
765
766         ret = se->get_op() + "( ";
767
768         operands = se->get_operands();
769         for(o=0;o<operands.size();o++){
770                 if(o) ret += ", ";
771                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
772                         ret += "&";
773                 ret += generate_se_code(operands[o], schema);
774         }
775         ret += ");\n";
776
777         return(ret);
778 }
779
780
781
782 static string unpack_partial_fcn(scalarexp_t *se, int pfn_id, table_list *schema){
783         string ret;
784         int o;
785         vector<scalarexp_t *> operands;
786
787
788         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
789                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn. line %d, character %d\n",
790                                 se->get_lineno(), se->get_charno());
791                 return("ERROR in generate_se_code");
792         }
793
794         ret = "\tretval = " + se->get_op() + "( ",
795         sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
796         ret += tmpstr;
797
798         operands = se->get_operands();
799         for(o=0;o<operands.size();o++){
800                 ret += ", ";
801                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
802                         ret += "&";
803                 ret += generate_se_code(operands[o], schema);
804         }
805         ret += ");\n";
806
807         return(ret);
808 }
809
810
811
812
813
814 static string generate_C_comparison_op(string op){
815   if(op == "=") return("==");
816   if(op == "<>") return("!=");
817   return(op);
818 }
819
820 static string generate_C_boolean_op(string op){
821         if( (op == "AND") || (op == "And") || (op == "and") ){
822                 return("&&");
823         }
824         if( (op == "OR") || (op == "Or") || (op == "or") ){
825                 return("||");
826         }
827         if( (op == "NOT") || (op == "Not") || (op == "not") ){
828                 return("!");
829         }
830
831         fprintf(stderr,"INTERNAL ERROR: unknown boolean operator %s\n",op.c_str());
832         return("ERROR UNKNOWN BOOLEAN OPERATOR :"+op);
833 }
834
835
836 static string generate_predicate_code(predicate_t *pr,table_list *schema){
837         string ret;
838         vector<literal_t *>  litv;
839         int i;
840     data_type *ldt, *rdt;
841         vector<scalarexp_t *> op_list;
842         int o,cref,ppos;
843         unsigned int bitmask;
844
845         switch(pr->get_operator_type()){
846         case PRED_IN:
847         ldt = pr->get_left_se()->get_data_type();
848
849                 ret += "( ";
850                 litv = pr->get_lit_vec();
851                 for(i=0;i<litv.size();i++){
852                         if(i>0) ret += " || ";
853                         ret += "( ";
854
855                 if(ldt->complex_comparison(ldt) ){
856                                 ret +=  ldt->get_comparison_fcn(ldt) ;
857                                 ret += "( ";
858                                 if(ldt->is_buffer_type() ) ret += "&";
859                                 ret += generate_se_code(pr->get_left_se(), schema);
860                                 ret += ", ";
861                                 if(ldt->is_buffer_type() ) ret += "&";
862                                 if(litv[i]->is_cpx_lit()){
863                                         sprintf(tmpstr,"t->complex_literal_%d",litv[i]->get_cpx_lit_ref() );
864                                         ret += tmpstr;
865                                 }else{
866                                         ret += litv[i]->to_C_code("");
867                                 }
868                                 ret += ") == 0";
869                         }else{
870                                 ret += generate_se_code(pr->get_left_se(), schema);
871                                 ret += " == ";
872                                 ret += litv[i]->to_C_code("");
873                         }
874
875                         ret += " )";
876                 }
877                 ret += " )";
878                 return(ret);
879
880         case PRED_COMPARE:
881         ldt = pr->get_left_se()->get_data_type();
882         rdt = pr->get_right_se()->get_data_type();
883
884                 ret += "( ";
885         if(ldt->complex_comparison(rdt) ){
886                         ret += ldt->get_comparison_fcn(rdt);
887                         ret += "(";
888                         if(ldt->is_buffer_type() ) ret += "&";
889                         ret += generate_se_code(pr->get_left_se(),schema);
890                         ret += ", ";
891                         if(rdt->is_buffer_type() ) ret += "&";
892                         ret += generate_se_code(pr->get_right_se(),schema);
893                         ret += ") ";
894                         ret +=  generate_C_comparison_op(pr->get_op());
895                         ret += "0";
896                 }else{
897                         ret += generate_se_code(pr->get_left_se(),schema);
898                         ret +=  generate_C_comparison_op(pr->get_op());
899                         ret += generate_se_code(pr->get_right_se(),schema);
900                 }
901                 ret += " )";
902                 return(ret);
903         case PRED_UNARY_OP:
904                 ret += "( ";
905                 ret +=  generate_C_boolean_op(pr->get_op());
906                 ret += generate_predicate_code(pr->get_left_pr(),schema);
907                 ret += " )";
908                 return(ret);
909         case PRED_BINARY_OP:
910                 ret += "( ";
911                 ret += generate_predicate_code(pr->get_left_pr(),schema);
912                 ret +=  generate_C_boolean_op(pr->get_op());
913                 ret += generate_predicate_code(pr->get_right_pr(),schema);
914                 ret += " )";
915                 return(ret);
916         case PRED_FUNC:
917                 op_list = pr->get_op_list();
918                 cref = pr->get_combinable_ref();
919                 if(cref >= 0){  // predicate is a combinable pred reference
920                         //              Trust, but verify
921                         if(pred_class.size() >= cref && pred_class[cref] >= 0){
922                                 ppos = pred_pos[cref];
923                                 bitmask = 1 << ppos % 32;
924                                 sprintf(tmpstr,"(pref_common_pred_val_%d_%d & %u)",pred_class[cref],ppos/32,bitmask);
925                                 ret = tmpstr;
926                                 return ret;
927                         }
928                 }
929
930                 ret =  pr->get_op() + "(";
931                 if (pr->is_sampling_fcn) {
932                         ret += "t->sampling_rate";
933                         if (!op_list.empty())
934                                 ret += ", ";
935                 }
936                 for(o=0;o<op_list.size();++o){
937                         if(o>0) ret += ", ";
938                         if(op_list[o]->get_data_type()->is_buffer_type() && (! (op_list[o]->is_handle_ref()) ) )
939                                         ret += "&";
940                         ret += generate_se_code(op_list[o],schema);
941                 }
942                 ret += " )";
943                 return(ret);
944         default:
945                 fprintf(stderr,"INTERNAL ERROR in generate_predicate_code, line %d, character %d, unknown predicate operator type %d\n",
946                         pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
947                 return("ERROR in generate_predicate_code");
948         }
949 }
950
951
952 static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *dt){
953         string ret;
954
955     if(dt->complex_comparison(dt) ){
956                 ret += dt->get_comparison_fcn(dt);
957                 ret += "(";
958                         if(dt->is_buffer_type() ) ret += "&";
959                 ret += lhs_op;
960                 ret += ", ";
961                         if(dt->is_buffer_type() ) ret += "&";
962                 ret += rhs_op;
963                 ret += ") == 0";
964         }else{
965                 ret += lhs_op;
966                 ret += " == ";
967                 ret += rhs_op;
968         }
969
970         return(ret);
971 }
972
973 static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
974         string ret;
975
976     if(dt->complex_comparison(dt) ){
977                 ret += dt->get_comparison_fcn(dt);
978                 ret += "(";
979                         if(dt->is_buffer_type() ) ret += "&";
980                 ret += lhs_op;
981                 ret += ", ";
982                         if(dt->is_buffer_type() ) ret += "&";
983                 ret += rhs_op;
984                 ret += ") == 0";
985         }else{
986                 ret += lhs_op;
987                 ret += " == ";
988                 ret += rhs_op;
989         }
990
991         return(ret);
992 }
993
994 //              Here I assume that only MIN and MAX aggregates can be computed
995 //              over BUFFER data types.
996
997 static string generate_aggr_update(string var, aggregate_table *atbl,int aidx, table_list *schema){
998         string retval = "\t\t";
999         string op = atbl->get_op(aidx);
1000
1001 //              Is it a UDAF
1002         if(! atbl->is_builtin(aidx)) {
1003                 int o;
1004                 retval += op+"_LFTA_AGGR_UPDATE_(";
1005                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1006                 retval+="("+var+")";
1007                 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1008                 for(o=0;o<opl.size();++o){
1009                         retval += ",";
1010                         if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1011                                         retval.append("&");
1012                         retval += generate_se_code(opl[o], schema);
1013                 }
1014                 retval += ");\n";
1015
1016                 return retval;
1017         }
1018
1019 //              Built-in aggregate processing.
1020
1021         data_type *dt = atbl->get_data_type(aidx);
1022
1023         if(op == "COUNT"){
1024                 retval.append(var);
1025                 retval.append("++;\n");
1026                 return(retval);
1027         }
1028         if(op == "SUM"){
1029                 retval.append(var);
1030                 retval.append(" += ");
1031                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1032                 retval.append(";\n");
1033                 return(retval);
1034         }
1035         if(op == "MIN"){
1036                 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1037                 retval.append(tmpstr);
1038                 if(dt->complex_comparison(dt)){
1039                         if(dt->is_buffer_type())
1040                           sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1041                         else
1042                           sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1043                 }else{
1044                         sprintf(tmpstr,"\t\tif(aggr_tmp_%d < %s)\n",aidx,var.c_str());
1045                 }
1046                 retval.append(tmpstr);
1047                 if(dt->is_buffer_type()){
1048                         sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1049                 }else{
1050                         sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1051                 }
1052                 retval.append(tmpstr);
1053
1054                 return(retval);
1055         }
1056         if(op == "MAX"){
1057                 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1058                 retval.append(tmpstr);
1059                 if(dt->complex_comparison(dt)){
1060                         if(dt->is_buffer_type())
1061                          sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1062                         else
1063                          sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1064                 }else{
1065                         sprintf(tmpstr,"\t\tif(aggr_tmp_%d > %s)\n",aidx,var.c_str());
1066                 }
1067                 retval.append(tmpstr);
1068                 if(dt->is_buffer_type()){
1069                         sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1070                 }else{
1071                         sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1072                 }
1073                 retval.append(tmpstr);
1074
1075                 return(retval);
1076
1077         }
1078         if(op == "AND_AGGR"){
1079                 retval.append(var);
1080                 retval.append(" &= ");
1081                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1082                 retval.append(";\n");
1083                 return(retval);
1084         }
1085         if(op == "OR_AGGR"){
1086                 retval.append(var);
1087                 retval.append(" |= ");
1088                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1089                 retval.append(";\n");
1090                 return(retval);
1091         }
1092         if(op == "XOR_AGGR"){
1093                 retval.append(var);
1094                 retval.append(" ^= ");
1095                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1096                 retval.append(";\n");
1097                 return(retval);
1098         }
1099         fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_update.\n",op.c_str());
1100         return("ERROR: aggregate not recognized: "+op);
1101
1102 }
1103
1104
1105
1106 static string generate_aggr_init(string var, aggregate_table *atbl,int aidx, table_list *schema){
1107         string retval;
1108         string op = atbl->get_op(aidx);
1109
1110 //              Is it a UDAF
1111         if(! atbl->is_builtin(aidx)) {
1112                 int o;
1113                 retval += "\t\t"+op+"_LFTA_AGGR_INIT_(";
1114                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1115                 retval+="("+var+"));\n";
1116 //                      Add 1st tupl
1117                 retval += "\t"+atbl->get_op(aidx)+"_LFTA_AGGR_UPDATE_(";
1118                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1119                 retval+="("+var+")";
1120                 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1121                 for(o=0;o<opl.size();++o){
1122                         retval += ",";
1123                         if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1124                                         retval.append("&");
1125                         retval += generate_se_code(opl[o],schema);
1126                 }
1127                 retval += ");\n";
1128                 return(retval);
1129         }
1130
1131 //              Built-in aggregate processing.
1132
1133
1134         data_type *dt = atbl->get_data_type(aidx);
1135
1136         if(op == "COUNT"){
1137                 retval = "\t\t"+var;
1138                 retval.append(" = 1;\n");
1139                 return(retval);
1140         }
1141
1142         if(op == "SUM" || op == "MIN" || op == "MAX" || op == "AND_AGGR" ||
1143                                                                         op == "OR_AGGR" || op == "XOR_AGGR"){
1144                 if(dt->is_buffer_type()){
1145                         sprintf(tmpstr,"\t\taggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1146                         retval.append(tmpstr);
1147                         sprintf(tmpstr,"\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_assign_copy().c_str(),var.c_str(),aidx);
1148                         retval.append(tmpstr);
1149                 }else{
1150                         retval = "\t\t"+var;
1151                         retval += " = ";
1152                         retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema));
1153                         retval.append(";\n");
1154                 }
1155                 return(retval);
1156         }
1157
1158         fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_init.\n",op.c_str());
1159         return("ERROR: aggregate not recognized: "+op);
1160 }
1161
1162
1163 ////////////////////////////////////////////////////////////
1164
1165
1166 string generate_preamble(table_list *schema, //map<string,string> &int_fcn_defs,
1167         std::string &node_name, std::string &schema_embed_str){
1168 //                      Include these only once, not once per lfta
1169 //      string ret = "#include \"rts.h\"\n";
1170 //      ret +=  "#include \"fta.h\"\n\n");
1171
1172         string ret = "#ifndef LFTA_IN_NIC\n";
1173         ret += "char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n";
1174         ret += "#include<stdio.h>\n";
1175         ret += "#include <limits.h>\n";
1176         ret += "#include <float.h>\n";
1177         ret += "#include \"rdtsc.h\"\n";
1178         ret += "#endif\n";
1179
1180
1181
1182         return(ret);
1183 }
1184
1185
1186 string generate_tuple_from_aggr(string node_name, table_list *schema, string idx){
1187         int a,p,s;
1188 //                       need to create and output the tuple.
1189         string ret = "/*\t\tCreate an output tuple for the aggregate being kicked out \t*/\n";
1190 //                      Check for any UDAFs with LFTA_BAILOUT
1191         ret += "\tlfta_bailout = 0;\n";
1192         for(a=0;a<aggr_tbl->size();a++){
1193                 if(aggr_tbl->has_bailout(a)){
1194                         ret += "\tlfta_bailout+="+aggr_tbl->get_op(a)+"_LFTA_AGGR_BAILOUT_(";
1195                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1196                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1197                 }
1198         }
1199         ret += "\tif(! lfta_bailout){\n";
1200
1201 //                      First, compute the size of the tuple.
1202
1203 //                      Unpack UDAF return values
1204         for(a=0;a<aggr_tbl->size();a++){
1205                 if(! aggr_tbl->is_builtin(a)){
1206                         ret += "\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_OUTPUT_(&(udaf_ret"+int_to_string(a)+"),";
1207                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1208                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1209
1210                 }
1211         }
1212
1213
1214 //                      Unpack partial fcns ref'd by the select clause.
1215   if(sl_fcns_start != sl_fcns_end){
1216             ret += "\t\tunpack_failed = 0;\n";
1217     for(p=sl_fcns_start;p<sl_fcns_end;p++){
1218           if(is_partial_fcn[p]){
1219                 ret += "\t" + unpack_partial_fcn_fm_aggr(partial_fcns[p], p,
1220                          "t->aggr_table["+idx+"].",schema);
1221                 ret += "\t\tif(retval) unpack_failed = 1;\n";
1222           }
1223     }
1224                                                                 // BEGIN don't allocate tuple if
1225         ret += "\t\tif( unpack_failed == 0 ){\n"; // unpack failed.
1226   }
1227
1228 //                      Unpack any BUFFER type selections into temporaries
1229 //                      so that I can compute their size and not have
1230 //                      to recompute their value during tuple packing.
1231 //                      I can use regular assignment here because
1232 //                      these temporaries are non-persistent.
1233
1234           for(s=0;s<sl_list.size();s++){
1235                 data_type *sdt = sl_list[s]->get_data_type();
1236                 if(sdt->is_buffer_type()){
1237                         sprintf(tmpstr,"\t\t\tselvar_%d = ",s);
1238                         ret += tmpstr;
1239                         ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1240                         ret += ";\n";
1241                 }
1242           }
1243
1244
1245 //              The size of the tuple is the size of the tuple struct plus the
1246 //              size of the buffers to be copied in.
1247
1248           ret += "\t\t\ttuple_size = sizeof( struct ";
1249           ret +=  generate_tuple_name(node_name);
1250           ret += ")";
1251           for(s=0;s<sl_list.size();s++){
1252                 data_type *sdt = sl_list[s]->get_data_type();
1253                 if(sdt->is_buffer_type()){
1254                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
1255                         ret += tmpstr;
1256                 }
1257           }
1258           ret += ";\n";
1259
1260
1261           ret += "\t\t\ttuple = allocate_tuple(f, tuple_size );\n";
1262           ret += "\t\t\tif( tuple != NULL){\n";
1263
1264
1265 //                      Test passed, make assignments to the tuple.
1266
1267           ret += "\t\t\t\ttuple_pos = sizeof( struct ";
1268           ret +=  generate_tuple_name(node_name) ;
1269           ret += ");\n";
1270
1271 //                      Mark tuple as REGULAR_TUPLE
1272           ret += "\n\t\t\t\ttuple->tuple_type = REGULAR_TUPLE;\n";
1273
1274           for(s=0;s<sl_list.size();s++){
1275                 data_type *sdt = sl_list[s]->get_data_type();
1276                 if(sdt->is_buffer_type()){
1277                         sprintf(tmpstr,"\t\t\t\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
1278                         ret += tmpstr;
1279                         sprintf(tmpstr,"\t\t\t\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
1280                         ret += tmpstr;
1281                 }else{
1282                         sprintf(tmpstr,"\t\t\t\ttuple->tuple_var%d = ",s);
1283                         ret += tmpstr;
1284 //                      if(sdt->needs_hn_translation())
1285 //                              ret += sdt->hton_translation() +"( ";
1286                         ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1287 //                      if(sdt->needs_hn_translation())
1288 //                              ret += ") ";
1289                         ret += ";\n";
1290                 }
1291           }
1292
1293 //              Generate output.
1294           ret += "\t\t\t\tpost_tuple(tuple);\n";
1295           ret += "\t\t\t\t#ifdef LFTA_STATS\n";
1296           ret+="\t\t\t\tt->out_tuple_cnt++;\n";
1297           ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
1298           ret += "\t\t\t\t#endif\n\n";
1299           ret += "\t\t\t}\n";
1300
1301           if(sl_fcns_start != sl_fcns_end)      // END don't allocate tuple if
1302                 ret += "\t\t}\n";                               // unpack failed.
1303           ret += "\t}\n";
1304
1305 //                      Need to release memory held by BUFFER types.
1306                 int g;
1307
1308           for(g=0;g<gb_tbl->size();g++){
1309                 data_type *gdt = gb_tbl->get_data_type(g);
1310                 if(gdt->is_buffer_type()){
1311                         sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].gb_var%d));\n",gdt->get_buffer_destroy().c_str(),idx.c_str(),g);
1312                         ret += tmpstr;
1313                 }
1314           }
1315           for(a=0;a<aggr_tbl->size();a++){
1316                 if(aggr_tbl->is_builtin(a)){
1317                         data_type *adt = aggr_tbl->get_data_type(a);
1318                         if(adt->is_buffer_type()){
1319                                 sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].aggr_var%d));\n",adt->get_buffer_destroy().c_str(),idx.c_str(),a);
1320                                 ret += tmpstr;
1321                         }
1322                 }else{
1323                         ret += "\t\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_DESTROY_(";
1324                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1325                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1326                 }
1327           }
1328
1329         ret += "\t\tt->n_aggrs--;\n";
1330
1331         return(ret);
1332
1333 }
1334
1335 string generate_gb_match_test(string idx){
1336         int g;
1337         string ret="\tif (IS_FILLED(t->aggr_table_bitmap, "+idx+") &&  IS_NEW(t->aggr_table_bitmap,"+idx+")";
1338         if(gb_tbl->size()>0){
1339                 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
1340                 ret+="\t\t";
1341
1342 //                      Next, scan list for a match on the group-by attributes.
1343           string rhs_op, lhs_op;
1344           for(g=0;g<gb_tbl->size();g++){
1345                   ret += " && ";
1346                   ret += "(";
1347                   sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
1348                   sprintf(tmpstr,"t->aggr_table[%s].gb_var%d",idx.c_str(),g); rhs_op = tmpstr;
1349                   ret += generate_equality_test(lhs_op, rhs_op, gb_tbl->get_data_type(g) );
1350                   ret += ")";
1351           }
1352          }
1353
1354           ret += "){\n";
1355
1356         return ret;
1357 }
1358
1359 string generate_gb_update(string node_name, table_list *schema, string idx, bool has_udaf){
1360         int g;
1361         string ret;
1362
1363           ret += "/*\t\tMatch found : update in place.\t*/\n";
1364           int a;
1365           has_udaf = false;
1366           for(a=0;a<aggr_tbl->size();a++){
1367                   sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1368                   ret += generate_aggr_update(tmpstr,aggr_tbl,a, schema);
1369                   if(! aggr_tbl->is_builtin(a)) has_udaf = true;
1370           }
1371
1372 //                      garbage collect copied buffer type gb attrs.
1373   for(g=0;g<gb_tbl->size();g++){
1374           data_type *gdt = gb_tbl->get_data_type(g);
1375           if(gdt->is_buffer_type()){
1376                 sprintf(tmpstr,"\t\t\t%s(&(gb_attr_%d));\n",gdt->get_buffer_destroy().c_str(),g);
1377                 ret+=tmpstr;
1378           }
1379         }
1380
1381
1382
1383           bool first_udaf = true;
1384           if(has_udaf){
1385                 ret += "\t\tif(";
1386                 for(a=0;a<aggr_tbl->size();a++){
1387                         if(! aggr_tbl->is_builtin(a)){
1388                                 if(! first_udaf)ret += " || ";
1389                                 else first_udaf = false;
1390                                 ret += aggr_tbl->get_op(a)+"_LFTA_AGGR_FLUSHME_(";
1391                                 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1392                                 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"))";
1393                         }
1394                 }
1395                 ret+="){\n";
1396                 ret+=" fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
1397                 ret += generate_tuple_from_aggr(node_name,schema,idx);
1398                 ret += "\t\tt->aggr_table_hashmap["+idx+"] &= ~SLOT_FILLED;\n";
1399                 ret+="\t\t}\n";
1400         }
1401         return ret;
1402 }
1403
1404
1405 string generate_init_group( table_list *schema, string idx){
1406           int g,a;
1407           string ret="\t\t\tt->aggr_table_hashmap["+idx+"] = hash2 | SLOT_FILLED | gen_val;\n";
1408 //                      Fill up the aggregate block.
1409           for(g=0;g<gb_tbl->size();g++){
1410                   sprintf(tmpstr,"\t\t\tt->aggr_table[%s].gb_var%d = gb_attr_%d;\n",idx.c_str(),g,g);
1411                   ret += tmpstr;
1412           }
1413           for(a=0;a<aggr_tbl->size();a++){
1414                   sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1415                   ret += generate_aggr_init(tmpstr, aggr_tbl,a,  schema);
1416           }
1417           ret+="\t\tt->n_aggrs++;\n";
1418         return ret;
1419 }
1420
1421
1422 string generate_fta_flush(string node_name, table_list *schema,
1423                 ext_fcn_list *Ext_fcns){
1424
1425    string ret;
1426    string select_var_defs ;
1427         int s, p;
1428
1429 //              Flush from previous epoch
1430
1431         ret+="static void fta_aggr_flush_old_"+node_name+"(struct FTA *f, unsigned int nflush){\n";
1432
1433         ret += "\tgs_int32_t tuple_size, tuple_pos;\n";
1434     ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1435         ret += "\tint i, lfta_bailout;\n";
1436         ret += "\tunsigned int gen_val;\n";
1437
1438         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1439         ret += generate_fta_name(node_name)+" *) f;\n";
1440
1441         ret += "\n";
1442
1443
1444 //              Variables needed to store selected attributes of BUFFER type
1445 //              temporarily, in order to compute their size for storage
1446 //              in an output tuple.
1447
1448   select_var_defs = "";
1449   for(s=0;s<sl_list.size();s++){
1450         data_type *sdt = sl_list[s]->get_data_type();
1451         if(sdt->is_buffer_type()){
1452           sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
1453           select_var_defs.append(tmpstr);
1454         }
1455   }
1456   if(select_var_defs != ""){
1457         ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
1458     ret += select_var_defs;
1459   }
1460
1461
1462 //              Variables to store results of partial functions.
1463   if(sl_fcns_start != sl_fcns_end){
1464         ret += "/*\t\tVariables to store the results of partial functions.\t*/\n";
1465         for(p=sl_fcns_start;p<sl_fcns_end;p++){
1466                 sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
1467                         partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
1468                 ret += tmpstr;
1469         }
1470         ret += "\tgs_retval_t retval = 0;\n\tint unpack_failed = 0;\n;";
1471   }
1472
1473 //              Variables for udaf output temporaries
1474         bool no_udaf = true;
1475         int a;
1476         for(a=0;a<aggr_tbl->size();a++){
1477                 if(! aggr_tbl->is_builtin(a)){
1478                         if(no_udaf){
1479                                 ret+="/*\t\tUDAF output vars.\t*/\n";
1480                                 no_udaf = false;
1481                         }
1482                         int afcn_id = aggr_tbl->get_fcn_id(a);
1483                         data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
1484                         sprintf(tmpstr,"udaf_ret%d", a);
1485                         ret+="\t"+adt->make_cvar(tmpstr)+";\n";
1486                 }
1487         }
1488
1489
1490 //  ret+="\tt->flush_finished=1; /* flush will be completed */\n";
1491   ret+="\n";
1492   ret+="\tgen_val = t->generation & SLOT_GEN_BITS;\n";
1493   ret+="\tfor (i=t->flush_pos; (i < t->max_aggrs) && t->n_aggrs && nflush>0; ++i){\n";
1494   ret+="\t\tif ( (t->aggr_table_hashmap[i] & SLOT_FILLED) && (((t->aggr_table_hashmap[i] & SLOT_GEN_BITS) != gen_val ) || (";
1495                 bool first_g=true;
1496                 int g;
1497                 for(g=0;g<gb_tbl->size();g++){
1498                   data_type *gdt = gb_tbl->get_data_type(g);
1499                   if(gdt->is_temporal()){
1500                         if(first_g) first_g=false; else ret+=" || ";
1501                         ret += "t->last_gb_"+int_to_string(g)+" > t->aggr_table[i].gb_var"+int_to_string(g)+" ";
1502                   }
1503                 }
1504   ret += "))) {\n";
1505   ret+="\t\t\tt->aggr_table_hashmap[i] = 0;\n";
1506   ret+=
1507 "#ifdef LFTA_STATS\n"
1508 "\t\t\tt->eviction_cnt++;\n"
1509 "#endif\n"
1510 ;
1511
1512
1513   ret+=generate_tuple_from_aggr(node_name,schema,"i");
1514
1515 //  ret+="\t\t\tt->n_aggrs--;\n";  // done in generate_tuple_from_aggr
1516   ret+="\t\t\tnflush--;\n";
1517   ret+="\t\t}\n";
1518   ret+="\t}\n";
1519   ret+="\tt->flush_pos=i;\n";
1520   ret+="\tif(t->n_aggrs == 0) {\n";
1521   ret+="\t\tt->flush_pos = t->max_aggrs;\n";
1522   ret += "\t}\n\n";
1523
1524   ret+="\tif(t->flush_pos == t->max_aggrs) {\n";
1525
1526   for(int g=0;g<gb_tbl->size();g++){
1527         data_type *dt = gb_tbl->get_data_type(g);
1528         if(dt->is_temporal()){
1529                 data_type *gdt = gb_tbl->get_data_type(g);
1530                 if(!gdt->is_buffer_type()){
1531                         sprintf(tmpstr,"\t\tt->last_flushed_gb_%d = t->flush_start_gb_%d;\n",g,g);
1532                         ret += tmpstr;
1533                 }
1534         }
1535   }
1536   ret += "\t}\n}\n\n";
1537
1538   return(ret);
1539 }
1540
1541 // TODO Remove sprintf to perform string catenation
1542 string generate_fta_load_params(string node_name){
1543         int p;
1544         vector<string> param_names = param_tbl->get_param_names();
1545
1546         string ret = "static int load_params_"+node_name+"(struct "+generate_fta_name(node_name);
1547                 ret += " *t, int sz, void *value, int initial_call){\n";
1548     ret += "\tint pos=0;\n";
1549     ret += "\tint data_pos;\n";
1550
1551         for(p=0;p<param_names.size();p++){
1552                 data_type *dt = param_tbl->get_data_type(param_names[p]);
1553                 if(dt->is_buffer_type()){
1554                         sprintf(tmpstr,"\t%s tmp_var_%s;\n",dt->get_cvar_type().c_str(), param_names[p].c_str() );
1555                         ret += tmpstr;
1556                         sprintf(tmpstr,"\t%s access_var_%s;\n",dt->get_tuple_cvar_type().c_str(), param_names[p].c_str() );
1557                         ret += tmpstr;
1558                 }
1559         }
1560
1561
1562
1563         ret += "\n\tdata_pos = ";
1564         for(p=0;p<param_names.size();p++){
1565                 if(p>0) ret += " + ";
1566                 data_type *dt = param_tbl->get_data_type(param_names[p]);
1567                 ret += "sizeof( ";
1568                 ret +=  dt->get_tuple_cvar_type();
1569                 ret += " )";
1570         }
1571         ret += ";\n";
1572         ret += "\tif(data_pos > sz) return 1;\n\n";
1573
1574
1575         for(p=0;p<param_names.size();p++){
1576                 data_type *dt = param_tbl->get_data_type(param_names[p]);
1577                 if(dt->is_buffer_type()){
1578                         sprintf(tmpstr,"\taccess_var_%s =  *( (%s *)((char *)value+pos) );\n",param_names[p].c_str(), dt->get_tuple_cvar_type().c_str() );
1579                         ret += tmpstr;
1580                         switch( dt->get_type() ){
1581                         case v_str_t:
1582 //                              ret += "\ttmp_var_"+param_names[p]+".data = ntohl( tmp_var_"+param_names[p]+".data );\n";               // ntoh conversion
1583 //                              ret += "\ttmp_var_"+param_names[p]+".length = ntohl( tmp_var_"+param_names[p]+".length );\n";   // ntoh conversion
1584                                 sprintf(tmpstr,"\tif( (access_var_%s.offset) + access_var_%s.length > sz) return 1;\n",param_names[p].c_str(), param_names[p].c_str() );
1585                                 ret += tmpstr;
1586                                 sprintf(tmpstr,"\ttmp_var_%s.data = (gs_sp_t)(value) + access_var_%s.offset ;\n",param_names[p].c_str(), param_names[p].c_str() );
1587                                 ret += tmpstr;
1588                                 sprintf(tmpstr,"\ttmp_var_%s.length =  access_var_%s.length ;\n",param_names[p].c_str(), param_names[p].c_str() );
1589                                 ret += tmpstr;
1590                         break;
1591                         default:
1592                                 fprintf(stderr,"ERROR: parameter %s is of type %s, a buffered type, but I don't know how to unpack it as a parameter.\n",param_names[p].c_str(), dt->get_type_str().c_str() );
1593                                 exit(1);
1594                         break;
1595                         }
1596 //                                      First, destroy the old
1597                         ret += "\tif(! initial_call)\n";
1598                         sprintf(tmpstr,"\t\t%s(&(t->param_%s));\n",dt->get_buffer_destroy().c_str(),param_names[p].c_str());
1599                         ret += tmpstr;
1600 //                                      Next, create the new.
1601                         sprintf(tmpstr,"\t%s((struct FTA *)t, &(t->param_%s), &tmp_var_%s);\n", dt->get_buffer_assign_copy().c_str(), param_names[p].c_str(), param_names[p].c_str() );
1602                         ret += tmpstr;
1603                 }else{
1604 //                      if(dt->needs_hn_translation()){
1605 //                              sprintf(tmpstr,"\tt->param_%s =  %s( *( (%s *)( (char *)value+pos) ) );\n",
1606 //                                param_names[p].c_str(), dt->ntoh_translation().c_str(), dt->get_cvar_type().c_str() );
1607 //                      }else{
1608                                 sprintf(tmpstr,"\tt->param_%s =  *( (%s *)( (char *)value+pos) );\n",
1609                                   param_names[p].c_str(), dt->get_cvar_type().c_str() );
1610 //                      }
1611                         ret += tmpstr;
1612                 }
1613                 sprintf(tmpstr,"\tpos += sizeof( %s );\n",dt->get_cvar_type().c_str() );
1614                 ret += tmpstr;
1615         }
1616
1617 //                      Register the pass-by-handle parameters
1618
1619         ret += "/* register and de-register the pass-by-handle parameters */\n";
1620
1621     int ph;
1622     for(ph=0;ph<param_handle_table.size();++ph){
1623                 data_type pdt(param_handle_table[ph]->type_name);
1624                 switch(param_handle_table[ph]->val_type){
1625                 case cplx_lit_e:
1626                         break;
1627                 case litval_e:
1628                         break;
1629                 case param_e:
1630                         ret += "\tif(! initial_call)\n";
1631                         sprintf(tmpstr, "\t\t%s(t->handle_param_%d);\n",
1632                                 param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1633                         ret += tmpstr;
1634                         sprintf(tmpstr,"\tt->handle_param_%d = %s((struct FTA *)t,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
1635                         ret += tmpstr;
1636
1637                         if(pdt.is_buffer_type()) ret += "&(";
1638                         ret += "t->param_"+param_handle_table[ph]->param_name;
1639                         if(pdt.is_buffer_type()) ret += ")";
1640                         ret += ");\n";
1641                         break;
1642                 default:
1643                         sprintf(tmpstr, "INTERNAL ERROR unknown case (%d) found when processing pass-by-handle parameter table.",param_handle_table[ph]->val_type);
1644                         fprintf(stderr,"%s\n",tmpstr);
1645                         ret+=tmpstr;
1646                 }
1647         }
1648
1649         ret+="\treturn 0;\n";
1650         ret += "}\n\n";
1651
1652         return(ret);
1653 }
1654
1655
1656
1657
1658 string generate_fta_free(string node_name, bool is_aggr_query){
1659
1660         string ret="static gs_retval_t free_fta_"+node_name+"(struct FTA *f, gs_uint32_t recursive){\n";
1661         ret+= "\tstruct "+generate_fta_name(node_name)+
1662                 " * t = (struct "+generate_fta_name(node_name)+" *) f;\n";
1663         ret += "\tint i;\n";
1664
1665         if(is_aggr_query){
1666                 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1667                 ret+="\t/* \t\tmark all groups as old */\n";
1668                 ret+="\tt->generation++;\n";
1669                 ret+="\tt->flush_pos = 0;\n";
1670                 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1671         }
1672
1673 //                      Deregister the pass-by-handle parameters
1674         ret += "/* de-register the pass-by-handle parameters */\n";
1675     int ph;
1676     for(ph=0;ph<param_handle_table.size();++ph){
1677                 sprintf(tmpstr, "\t%s(t->handle_param_%d);\n",
1678                         param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1679                 ret += tmpstr;
1680         }
1681
1682
1683         ret += "\treturn 0;\n}\n\n";
1684         return(ret);
1685 }
1686
1687
1688 string generate_fta_control(string node_name, table_list *schema, bool is_aggr_query){
1689         string ret="static gs_retval_t control_fta_"+node_name+"(struct FTA *f,  gs_int32_t command, gs_int32_t sz, void *value){\n";
1690         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1691                 ret += generate_fta_name(node_name)+" *) f;\n\n";
1692         ret+="\tint i;\n";
1693
1694
1695         ret += "\t/* temp status tuple */\n";
1696         ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1697         ret += "\tgs_int32_t tuple_size;\n";
1698
1699
1700         if(is_aggr_query){
1701                 ret+="\tif(command == FTA_COMMAND_FLUSH){\n";
1702
1703                 ret+="\t\tif (!t->n_aggrs) {\n";
1704                 ret+="\t\t\ttuple = allocate_tuple(f, 0);\n";
1705                 ret+="\t\t\tif( tuple != NULL)\n";
1706                 ret+="\t\t\t\tpost_tuple(tuple);\n";
1707
1708                 ret+="\t\t}else{\n";
1709
1710                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1711                 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1712                 ret +="\t\tt->generation++;\n";
1713                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1714                 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1715                 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1716                 ret+="\t\t\tt->flush_pos = 0;\n";
1717                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1718                 ret+="\t\t}\n";
1719
1720                 ret+="\t}\n";
1721         }
1722         if(param_tbl->size() > 0){
1723                 ret+=
1724 "\tif(command == FTA_COMMAND_LOAD_PARAMS){\n"
1725 "\t\tif(load_params_"+node_name+"(t, sz, value, 0))\n"
1726 "#ifndef LFTA_IN_NIC\n"
1727 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small, ignored.\\n\");\n"
1728 "#else\n"
1729 "\t\t{}\n"
1730 "#endif\n"
1731 "\t}\n";
1732         }
1733         ret+=
1734 "\tif(command == FTA_COMMAND_SET_SAMPLING_RATE){\n"
1735 "\t\tmemcpy(&t->sampling_rate, value, sizeof(gs_float_t));\n"
1736 "\t}\n\n";
1737
1738
1739         ret += "\tif(command == FTA_COMMAND_FILE_DONE ){\n";
1740
1741         if(is_aggr_query){
1742                 ret+="\t\tif (t->n_aggrs) {\n";
1743                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1744                 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1745                 ret +="\t\tt->generation++;\n";
1746                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1747                 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1748                 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1749                 ret+="\t\t\tt->flush_pos = 0;\n";
1750                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1751                 ret+="\t\t}\n";
1752         }
1753
1754         ret += "\t\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+");\n";
1755         ret += "\t\ttuple = allocate_tuple(f, tuple_size );\n";
1756         ret += "\t\tif( tuple == NULL)\n\t\treturn 1;\n";
1757
1758         /* mark tuple as EOF_TUPLE */
1759         ret += "\n\t\t/* Mark tuple as eof_tuple */\n";
1760         ret += "\t\ttuple->tuple_type = EOF_TUPLE;\n";
1761         ret += "\t\tpost_tuple(tuple);\n";
1762         ret += "\t}\n";
1763
1764         ret += "\treturn 0;\n}\n\n";
1765
1766         return(ret);
1767 }
1768
1769 string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query){
1770         string ret="static gs_retval_t clock_fta_"+node_name+"(struct FTA *f){\n";
1771         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1772                 ret += generate_fta_name(node_name)+" *) f;\n\n";
1773
1774         ret += "\t/* Create a temp status tuple */\n";
1775         ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1776         ret += "\tgs_int32_t tuple_size;\n";
1777         ret += "\tunsigned int i;\n";
1778         ret += "\ttime_t cur_time;\n";
1779         ret += "\tint time_advanced;\n";
1780         ret += "\tstruct fta_stat stats;\n";
1781
1782
1783
1784         /* copy the last seen values of temporal attributes */
1785         col_id_set temp_cids;           //      col ids of temp attributes in select clause
1786
1787
1788         /* HACK: in order to reuse the SE generation code, we need to copy
1789          * the last values of the temp attributes into new variables
1790          * which have names unpack_var_XXX_XXX
1791          */
1792
1793         int s, g;
1794         col_id_set::iterator csi;
1795
1796         for(s=0;s<sl_list.size();s++){
1797                 data_type *sdt = sl_list[s]->get_data_type();
1798                 if (sdt->is_temporal()) {
1799                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
1800                 }
1801         }
1802
1803         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
1804                 int tblref = (*csi).tblvar_ref;
1805                 int schref = (*csi).schema_ref;
1806                 string field = (*csi).field;
1807                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
1808                 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
1809                 ret += tmpstr;
1810         }
1811
1812         if (is_aggr_query) {
1813                 for(g=0;g<gb_tbl->size();g++){
1814                         data_type *gdt = gb_tbl->get_data_type(g);
1815                         if(gdt->is_temporal()){
1816                                 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
1817                                 ret += tmpstr;
1818                                 data_type *gdt = gb_tbl->get_data_type(g);
1819                                 if(gdt->is_buffer_type()){
1820                                 sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
1821                                 ret += tmpstr;
1822                                 }
1823                         }
1824                 }
1825         }
1826         ret += "\n";
1827
1828         ret += "\ttime_advanced = 0;\n";
1829
1830         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
1831                 int tblref = (*csi).tblvar_ref;
1832                 int schref = (*csi).schema_ref;
1833                 string field = (*csi).field;
1834                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
1835
1836                 // update last seen value with the value seen
1837                 ret += "\t#ifdef PREFILTER_DEFINED\n";
1838                 sprintf(tmpstr,"\tif (prefilter_temp_vars.unpack_var_%s_%d > t->last_%s_%d) {\n\t\tt->last_%s_%d = prefilter_temp_vars.unpack_var_%s_%d;\n",
1839                         field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref);
1840                 ret += tmpstr;
1841                 ret += "\t\ttime_advanced = 1;\n\t}\n";
1842                 ret += "\t#endif\n";
1843
1844                 // we need to pay special attention to time fields
1845                 if (field == "time" || field == "timestamp" || field == "timestamp_ms"){
1846                         ret += "\tcur_time = time(&cur_time);\n";
1847
1848                         if (field == "time") {
1849                                 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && (t->last_time_%d < (cur_time - %d))) {\n",
1850                                         tblref, time_corr);
1851                                 ret += tmpstr;
1852                                 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = cur_time - %d;\n",
1853                                 field.c_str(), tblref, field.c_str(), tblref, time_corr);
1854                         } else if (field == "timestamp_ms") {
1855                                 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((t->last_timestamp_ms_%d/1000) < (cur_time - %d))) {\n",
1856                                         tblref, time_corr);
1857                                 ret += tmpstr;
1858                                 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = (cur_time - %d) * 1000;\n",
1859                                 field.c_str(), tblref, field.c_str(), tblref, time_corr);
1860                         }else{
1861                                 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((gs_uint32_t)(t->last_%s_%d>>32) < (cur_time - %d))) {\n",
1862                                         field.c_str(), tblref, time_corr);
1863                                 ret += tmpstr;
1864                                 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = ((gs_uint64_t)(cur_time - %d))<<32;\n",
1865                                         field.c_str(), tblref, field.c_str(), tblref, time_corr);
1866                         }
1867                         ret += tmpstr;
1868
1869                         ret += "\t\ttime_advanced = 1;\n";
1870                         ret += "\t}\n";
1871
1872                         sprintf(tmpstr,"\telse\n\t\tunpack_var_%s_%d = t->last_%s_%d;\n",
1873                         field.c_str(), tblref, field.c_str(), tblref);
1874                         ret += tmpstr;
1875                 } else {
1876                         sprintf(tmpstr,"\tunpack_var_%s_%d = t->last_%s_%d;\n",
1877                                 field.c_str(), tblref, field.c_str(), tblref);
1878                         ret += tmpstr;
1879                 }
1880         }
1881
1882         // for aggregation lftas we need to check if the time was advanced beyond the current epoch
1883         if (is_aggr_query) {
1884
1885                 string change_test;
1886                 bool first_one = true;
1887                 for(g=0;g<gb_tbl->size();g++){
1888                   data_type *gdt = gb_tbl->get_data_type(g);
1889                   if(gdt->is_temporal()){
1890 //                                      To perform the test, first need to compute the value
1891 //                                      of the temporal gb attrs.
1892                           if(gdt->is_buffer_type()){
1893         //                              NOTE : if the SE defining the gb is anything
1894         //                              other than a ref to a variable, this will generate
1895         //                              illegal code.  To be resolved with Spatch.
1896                                 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
1897                                         g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
1898                                 ret+=tmpstr;
1899                                 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
1900                                         gdt->get_buffer_assign_copy().c_str(), g, g);
1901                           }else{
1902                                 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
1903                           }
1904                           ret += tmpstr;
1905
1906                           sprintf(tmpstr,"t->last_gb_%d",g);   string lhs_op = tmpstr;
1907                           sprintf(tmpstr,"gb_attr_%d",g);   string rhs_op = tmpstr;
1908                           if(first_one){first_one = false;} else {change_test.append(") && (");}
1909                           change_test.append(generate_equality_test(lhs_op, rhs_op, gdt));
1910                   }
1911                 }
1912
1913                 ret += "\n\tif( time_advanced && !( (";
1914                 ret += change_test;
1915                 ret += ") ) ){\n";
1916
1917                 ret += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
1918                 ret += "\t\tif(t->flush_pos<t->max_aggrs) \n";
1919                 ret += "\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
1920
1921                 ret += "\t\t/* \t\tmark all groups as old */\n";
1922                 ret +="\t\tt->generation++;\n";
1923                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1924                 ret += "//\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1925                 ret += "//\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1926                 ret += "\t\tt->flush_pos = 0;\n";
1927
1928                 for(g=0;g<gb_tbl->size();g++){
1929                      data_type *gdt = gb_tbl->get_data_type(g);
1930                      if(gdt->is_temporal()){
1931                         sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);                 ret += tmpstr;
1932                         sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);                        ret += tmpstr;
1933                      }
1934                 }
1935                 ret += "\t}\n\n";
1936
1937         }
1938
1939         ret += "\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t) + sizeof(struct fta_stat);\n";
1940         ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
1941         ret += "\tif( tuple == NULL)\n\t\treturn 1;\n";
1942
1943
1944         for(s=0;s<sl_list.size();s++){
1945                 data_type *sdt = sl_list[s]->get_data_type();
1946                 if(sdt->is_temporal()){
1947
1948                         if (sl_list[s]->is_gb()) {
1949                                 sprintf(tmpstr,"\tt->last_flushed_gb_%d = (t->n_aggrs) ? t->last_flushed_gb_%d : %s;\n",sl_list[s]->get_gb_ref(), sl_list[s]->get_gb_ref(), generate_se_code(sl_list[s],schema).c_str());
1950                                 ret += tmpstr;
1951                         }
1952
1953                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
1954                         ret += tmpstr;
1955 //                      if(sdt->needs_hn_translation())
1956 //                              ret += sdt->hton_translation() +"( ";
1957                         if (sl_list[s]->is_gb()) {
1958                                 sprintf(tmpstr, "t->last_flushed_gb_%d",sl_list[s]->get_gb_ref());
1959                                 ret += tmpstr;
1960                         } else{
1961                                 ret += generate_se_code(sl_list[s],schema);
1962                         }
1963 //                      if(sdt->needs_hn_translation())
1964 //                              ret += " )";
1965                         ret += ";\n";
1966                 }
1967         }
1968
1969         /* mark tuple as temporal */
1970         ret += "\n\t/* Mark tuple as temporal */\n";
1971         ret += "\ttuple->tuple_type = TEMPORAL_TUPLE;\n";
1972
1973         ret += "\n\t/* Copy trace id */\n";
1974         ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+"), &t->trace_id, sizeof(gs_uint64_t));\n\n";
1975
1976         ret += "\n\t/* Populate runtime stats */\n";
1977         ret += "\tstats.ftaid = f->ftaid;\n";
1978         ret += "\tstats.in_tuple_cnt = t->in_tuple_cnt;\n";
1979         ret += "\tstats.out_tuple_cnt = t->out_tuple_cnt;\n";
1980         ret += "\tstats.out_tuple_sz = t->out_tuple_sz;\n";
1981         ret += "\tstats.accepted_tuple_cnt = t->accepted_tuple_cnt;\n";
1982         ret += "\tstats.cycle_cnt = t->cycle_cnt;\n";
1983         ret += "\tstats.collision_cnt = t->collision_cnt;\n";
1984         ret += "\tstats.eviction_cnt = t->eviction_cnt;\n";
1985         ret += "\tstats.sampling_rate    = t->sampling_rate;\n";
1986
1987         ret += "\n#ifdef LFTA_PROFILE\n";
1988         ret += "\n\t/* Print stats */\n";
1989         ret += "\tfprintf(stderr, \"STATS " + node_name + " \");\n";
1990         ret += "\tfprintf(stderr, \"in_tuple_cnt= %u \", t->in_tuple_cnt);\n";
1991         ret += "\tfprintf(stderr, \"out_tuple_cnt= %u \", t->out_tuple_cnt);\n";
1992         ret += "\tfprintf(stderr, \"out_tuple_sz= %u \", t->out_tuple_sz);\n";
1993         ret += "\tfprintf(stderr, \"accepted_tuple_cnt= %u \", t->accepted_tuple_cnt);\n";
1994         ret += "\tfprintf(stderr, \"cycle_cnt= %llu \", t->cycle_cnt);\n";
1995         ret += "\tfprintf(stderr, \"cycle_per_in_tuple= %lf \", ((double)t->cycle_cnt)/((double)t->in_tuple_cnt));\n";
1996         ret += "\tfprintf(stderr, \"collision_cnt= %d\\n\", t->collision_cnt);\n\n";
1997         ret += "\tfprintf(stderr, \"eviction_cnt= %d\\n\", t->eviction_cnt);\n\n";
1998         ret += "\n#endif\n";
1999
2000
2001         ret += "\n\t/* Copy stats */\n";
2002         ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t), &stats, sizeof(fta_stat));\n\n";
2003         ret+="\tpost_tuple(tuple);\n";
2004
2005         ret += "\n\t/* Send a heartbeat message to clearinghouse */\n";
2006         ret += "\tfta_heartbeat(f->ftaid, t->trace_id++, 1, &stats);\n";
2007
2008         ret += "\n\t/* Reset runtime stats */\n";
2009         ret += "\tt->in_tuple_cnt = 0;\n";
2010         ret += "\tt->out_tuple_cnt = 0;\n";
2011         ret += "\tt->out_tuple_sz = 0;\n";
2012         ret += "\tt->accepted_tuple_cnt = 0;\n";
2013         ret += "\tt->cycle_cnt = 0;\n";
2014         ret += "\tt->collision_cnt = 0;\n";
2015         ret += "\tt->eviction_cnt = 0;\n";
2016
2017         ret += "\treturn 0;\n}\n\n";
2018
2019         return(ret);
2020 }
2021
2022
2023 //              accept processing before the where clause,
2024 //              do flush processwing.
2025 string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *schema,   col_id_set &unpacked_cids, string &temporal_flush){
2026         int s;
2027
2028 //              Slow flush
2029   string ret="\n/*\tslow flush\t*/\n";
2030   string slow_flush_str = fs->get_val_of_def("slow_flush");
2031   int n_slow_flush = atoi(slow_flush_str.c_str());
2032   if(n_slow_flush <= 0) n_slow_flush = 2;
2033   if(n_slow_flush > 1){
2034         ret += "\tt->flush_ctr++;\n";
2035         ret += "\tif(t->flush_ctr >= "+int_to_string(n_slow_flush)+"){\n";
2036         ret += "\t\tt->flush_ctr = 0;\n";
2037     ret+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n";
2038         ret += "\t}\n\n";
2039   }else{
2040     ret+="\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n\n";
2041   }
2042
2043
2044         string change_test;
2045         bool first_one = true;
2046         int g;
2047     col_id_set flush_cids;              //      col ids accessed when computing flush variables.
2048                                                         //  unpack them at temporal flush test time.
2049     temporal_flush = "";
2050
2051
2052         for(g=0;g<gb_tbl->size();g++){
2053                   data_type *gdt = gb_tbl->get_data_type(g);
2054                   if(gdt->is_temporal()){
2055                           gather_se_col_ids(gb_tbl->get_def(g), flush_cids, gb_tbl);
2056
2057 //                                      To perform the test, first need to compute the value
2058 //                                      of the temporal gb attrs.
2059                           if(gdt->is_buffer_type()){
2060         //                              NOTE : if the SE defining the gb is anything
2061         //                              other than a ref to a variable, this will generate
2062         //                              illegal code.  To be resolved with Spatch.
2063                                 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2064                                         g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2065                                 temporal_flush += tmpstr;
2066                                 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2067                                         gdt->get_buffer_assign_copy().c_str(), g, g);
2068                           }else{
2069                                 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2070                           }
2071                           temporal_flush += tmpstr;
2072 //                                      END computing the value of the temporal GB attr.
2073
2074
2075                           sprintf(tmpstr,"t->last_gb_%d",g);   string lhs_op = tmpstr;
2076                           sprintf(tmpstr,"gb_attr_%d",g);   string rhs_op = tmpstr;
2077                           if(first_one){first_one = false;} else {change_test.append(") && (");}
2078                           change_test += generate_equality_test(lhs_op, rhs_op, gdt);
2079                   }
2080         }
2081         if(!first_one){         // will be false iff. there is a temporal GB attribute
2082                   temporal_flush += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
2083                   temporal_flush += "\tif( !( (";
2084                   temporal_flush += change_test;
2085                   temporal_flush += ") ) ){\n";
2086
2087 //                temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2088                   temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs){ \n";
2089                   temporal_flush+="\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2090                   temporal_flush+="\t\t}\n";
2091                   temporal_flush+="\t\t/* \t\tmark all groups as old */\n";
2092                   temporal_flush+="\t\tt->generation++;\n";
2093                   temporal_flush+="\t\tt->flush_pos = 0;\n";
2094
2095
2096 //                              Now set the saved temporal value of the gb to the
2097 //                              current value of the gb.  Only for simple types,
2098 //                              not for buffer types -- but the strings are not
2099 //                              temporal in any case.
2100
2101                         for(g=0;g<gb_tbl->size();g++){
2102                           data_type *gdt = gb_tbl->get_data_type(g);
2103                           if(gdt->is_temporal()){
2104                                   if(gdt->is_buffer_type()){
2105
2106                                         fprintf(stderr,"ERROR : can't handle temporal buffer types, ignoring in buffer flush control.\n");
2107                                   }else{
2108                                         sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);
2109                                         temporal_flush += tmpstr;
2110                                         sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);
2111                                         temporal_flush += tmpstr;
2112                                   }
2113                                 }
2114                         }
2115                   temporal_flush += "\t}\n\n";
2116         }
2117
2118 // Unpack all the temporal attributes referenced in select clause
2119 // and update the last value of the attribute
2120         col_id_set temp_cids;           //      col ids of temp attributes in select clause
2121         col_id_set::iterator csi;
2122
2123         for(s=0;s<sl_list.size();s++){
2124                 data_type *sdt = sl_list[s]->get_data_type();
2125                 if (sdt->is_temporal()) {
2126                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
2127                 }
2128         }
2129
2130         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2131                 if(unpacked_cids.count((*csi)) == 0){
2132                         int tblref = (*csi).tblvar_ref;
2133                         int schref = (*csi).schema_ref;
2134                         string field = (*csi).field;
2135                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2136 /*
2137                         data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2138                         sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",
2139                                 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2140                         ret += tmpstr;
2141                         ret += "\tif(retval) return 1;\n";
2142 */
2143                         sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
2144                         ret += tmpstr;
2145
2146                         unpacked_cids.insert( (*csi) );
2147                 }
2148         }
2149
2150
2151 //                              Do the flush here if this is a real_time query
2152         string rt_level = fs->get_val_of_def("real_time");
2153         if(rt_level != "" && temporal_flush != ""){
2154                 for(csi=flush_cids.begin(); csi != flush_cids.end();++csi){
2155                         if(unpacked_cids.count((*csi)) == 0){
2156                         int tblref = (*csi).tblvar_ref;
2157                         int schref = (*csi).schema_ref;
2158                         string field = (*csi).field;
2159                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2160 /*
2161                                 sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",
2162                                 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2163                         ret += tmpstr;
2164                         ret += "\tif(retval) return 1;\n";
2165 */
2166                                 unpacked_cids.insert( (*csi) );
2167                         }
2168                 }
2169                 ret += temporal_flush;
2170         }
2171
2172         return ret;
2173  }
2174
2175 string generate_sel_accept_body(qp_node *fs, string node_name, table_list *schema){
2176
2177 int p,s;
2178 string ret;
2179
2180 ///////////////                 Processing for filter-only query
2181
2182 //                      test passed : create the tuple, then assign to it.
2183           ret += "/*\t\tCreate and post the tuple\t*/\n";
2184
2185 //                      Unpack partial fcns ref'd by the select clause.
2186 //                      Its a kind of a WHERE clause ...
2187   for(p=sl_fcns_start;p<sl_fcns_end;p++){
2188         if(fcn_ref_cnt[p] > 1){
2189                 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2190         }
2191         if(is_partial_fcn[p]){
2192                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2193                 ret += "\tif(retval) goto end;\n";
2194         }
2195         if(fcn_ref_cnt[p] > 1){
2196                 if(!is_partial_fcn[p]){
2197                         ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2198                 }
2199                 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2200                 ret += "\t}\n";
2201         }
2202   }
2203
2204   // increment the counter of accepted tuples
2205   ret += "\n\t#ifdef LFTA_STATS\n";
2206   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2207   ret += "\t#endif\n\n";
2208
2209 //                      First, compute the size of the tuple.
2210
2211 //                      Unpack any BUFFER type selections into temporaries
2212 //                      so that I can compute their size and not have
2213 //                      to recompute their value during tuple packing.
2214 //                      I can use regular assignment here because
2215 //                      these temporaries are non-persistent.
2216
2217           for(s=0;s<sl_list.size();s++){
2218                 data_type *sdt = sl_list[s]->get_data_type();
2219                 if(sdt->is_buffer_type()){
2220                         sprintf(tmpstr,"\tselvar_%d = ",s);
2221                         ret += tmpstr;
2222                         ret += generate_se_code(sl_list[s],schema);
2223                         ret += ";\n";
2224                 }
2225           }
2226
2227
2228 //              The size of the tuple is the size of the tuple struct plus the
2229 //              size of the buffers to be copied in.
2230
2231           ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
2232           for(s=0;s<sl_list.size();s++){
2233                 data_type *sdt = sl_list[s]->get_data_type();
2234                 if(sdt->is_buffer_type()){
2235                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
2236                         ret += tmpstr;
2237                 }
2238           }
2239           ret += ";\n";
2240
2241
2242           ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
2243           ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
2244
2245 //                      Test passed, make assignments to the tuple.
2246
2247           ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
2248
2249 //                      Mark tuple as REGULAR_TUPLE
2250           ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
2251
2252
2253           for(s=0;s<sl_list.size();s++){
2254                 data_type *sdt = sl_list[s]->get_data_type();
2255                 if(sdt->is_buffer_type()){
2256                         sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
2257                         ret += tmpstr;
2258                         sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
2259                         ret += tmpstr;
2260                 }else{
2261                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2262                         ret += tmpstr;
2263 //                      if(sdt->needs_hn_translation())
2264 //                              ret += sdt->hton_translation() +"( ";
2265                         ret += generate_se_code(sl_list[s],schema);
2266 //                      if(sdt->needs_hn_translation())
2267 //                              ret += ") ";
2268                         ret += ";\n";
2269                 }
2270           }
2271
2272 //              Generate output.
2273
2274           ret += "\tpost_tuple(tuple);\n";
2275
2276 //              Increment the counter of posted tuples
2277   ret += "\n\t#ifdef LFTA_STATS\n";
2278   ret += "\tt->out_tuple_cnt++;\n";
2279   ret+="\tt->out_tuple_sz+=tuple_size;\n";
2280   ret += "\t#endif\n\n";
2281
2282
2283
2284         return ret;
2285 }
2286
2287 string generate_fj_accept_body(filter_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
2288
2289 int p,s,w;
2290 string ret;
2291
2292 //                      Get parameters
2293         unsigned int window_len = fs->temporal_range;
2294         unsigned int n_bloom = 11;
2295         string n_bloom_str = fs->get_val_of_def("num_bloom");
2296         int tmp_n_bloom = atoi(n_bloom_str.c_str());
2297         if(tmp_n_bloom>0)
2298                 n_bloom = tmp_n_bloom+1;
2299         float bloom_width = (window_len+1.0)/(1.0*n_bloom-1);
2300         sprintf(tmpstr,"%f",bloom_width);
2301         string bloom_width_str = tmpstr;
2302
2303         if(window_len < n_bloom){
2304                 n_bloom = window_len+1;
2305                 bloom_width_str = "1";
2306         }
2307
2308
2309 //              Grab the current window time
2310         scalarexp_t winvar(fs->temporal_var);
2311         ret += "\tcurr_fj_ts = "+generate_se_code(&winvar,schema)+";\n";
2312
2313         int bf_exp_size = 12;  // base-2 log of number of bits
2314         string bloom_len_str = fs->get_val_of_def("bloom_size");
2315         int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
2316         if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
2317                 bf_exp_size = tmp_bf_exp_size;
2318         }
2319         int bf_bit_size = 1 << bf_exp_size;
2320         int bf_byte_size = bf_bit_size / (8*sizeof(char));
2321
2322         unsigned int ht_size = 4096;
2323         string ht_size_s = fs->get_val_of_def("aggregate_slots");
2324         int tmp_ht_size = atoi(ht_size_s.c_str());
2325         if(tmp_ht_size > 1024){
2326                 unsigned int hs = 1;            // make it power of 2
2327                 while(tmp_ht_size){
2328                         hs =hs << 1;
2329                         tmp_ht_size = tmp_ht_size >> 1;
2330                 }
2331                 ht_size = hs;
2332         }
2333
2334         int i, bf_mask = 0;
2335         if(fs->use_bloom){
2336                 for(i=0;i<bf_exp_size;i++)
2337                         bf_mask = (bf_mask << 1) | 1;
2338         }else{
2339                 for(i=ht_size;i>1;i=i>>1)
2340                         bf_mask = (bf_mask << 1) | 1;
2341         }
2342
2343 /*
2344 printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=%d, bf_byte_size=%d, ht_size=%d, ht_size_s=%s, bf_mask=%d\n",
2345         n_bloom,
2346         window_len,
2347         bloom_width_str.c_str(),
2348         bf_exp_size,
2349         bf_bit_size,
2350         bf_byte_size,
2351         ht_size,
2352         ht_size_s.c_str(),
2353         bf_mask);
2354 */
2355
2356
2357
2358
2359 //              If this is a bloom-filter fj, first test if the
2360 //              bloom filter needs to be advanced.
2361 //              SET_BF_EMPTY(table,number of bloom filters,bloom filter index,bit index)
2362 //              t->bf_size : number of bits in bloom filter
2363         if(fs->use_bloom){
2364                 ret +=
2365 "//                     Clean out old bloom filters if needed.\n"
2366 "       if(t->first_exec){\n"
2367 "               t->first_exec = 0;\n"
2368 "               t->last_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2369 "               t->last_bloom_pos = t->last_bin % "+int_to_string(n_bloom)+";\n"
2370 "       }else{\n"
2371 "               curr_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2372 "               if(curr_bin != t->last_bin){\n"
2373 "                       for(the_bin=t->last_bin+1;the_bin<=curr_bin;the_bin++){\n"
2374 "                               t->last_bloom_pos++;\n"
2375 "                               if(t->last_bloom_pos >= "+int_to_string(n_bloom)+")\n"
2376 "                                       t->last_bloom_pos = 0;\n"
2377 "                               tmp_i = t->last_bloom_pos;\n"
2378 "                               for(j=0;j<"+int_to_string(bf_bit_size)+";j++){\n"
2379 "                                       SET_BF_EMPTY(t->bf_table, "+int_to_string(n_bloom)+", tmp_i,j);\n"
2380 "                               }\n"
2381 "                       }\n"
2382 "               }\n"
2383 "               t->last_bin = curr_bin;\n"
2384 "       }\n"
2385 ;
2386         }
2387
2388
2389 //-----------------------------------------------------------------
2390 //              First, determine whether to do S (filter stream) processing.
2391
2392         ret +=
2393 "//             S (filtering stream) predicate, should it be processed?\n"
2394 "\n"
2395 ;
2396 // Sort S preds based on cost.
2397         vector<cnf_elem *> s_filt = fs->pred_t1;
2398         col_id_set::iterator csi;
2399   if(s_filt.size() > 0){
2400
2401 //                      Unpack fields ref'd in the S pred
2402         for(w=0;w<s_filt.size();++w){
2403                 col_id_set this_pred_cids;
2404                 gather_pr_col_ids(s_filt[w]->pr, this_pred_cids, gb_tbl);
2405                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2406                         if(unpacked_cids.count( (*csi) ) == 0){
2407                                 int tblref = (*csi).tblvar_ref;
2408                                 int schref = (*csi).schema_ref;
2409                                 string field = (*csi).field;
2410                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name,"end_s");
2411                                 unpacked_cids.insert( (*csi) );
2412                         }
2413                 }
2414         }
2415
2416
2417 //              Sort by evaluation cost.
2418 //              First, estimate evaluation costs
2419 //              Eliminate predicates covered by the prefilter (those in s_pids).
2420 //              I need to do it before the sort becuase the indices refer
2421 //              to the position in the unsorted list.
2422         vector<cnf_elem *> tmp_wh;
2423         for(w=0;w<s_filt.size();++w){
2424                 compute_cnf_cost(s_filt[w],Ext_fcns);
2425                 tmp_wh.push_back(s_filt[w]);
2426         }
2427         s_filt = tmp_wh;
2428
2429         sort(s_filt.begin(), s_filt.end(), compare_cnf_cost());
2430
2431 //              Now generate the predicates.
2432         for(w=0;w<s_filt.size();++w){
2433                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,s_filt[w]->cost);
2434                 ret += tmpstr;
2435
2436 //                      Find partial fcns ref'd in this cnf element
2437                 set<int> pfcn_refs;
2438                 collect_partial_fcns_pr(s_filt[w]->pr, pfcn_refs);
2439 //                      Since set<..> is a "Sorted Associative Container",
2440 //                      we can walk through it in sorted order by walking from
2441 //                      begin() to end().  (and the partial fcns must be
2442 //                      evaluated in this order).
2443                 set<int>::iterator si;
2444                 string pf_preds;
2445                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2446                         if(fcn_ref_cnt[(*si)] > 1){
2447                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2448                         }
2449                         if(is_partial_fcn[(*si)]){
2450                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2451                                 ret += "\t\tif(retval) goto end_s;\n";
2452                         }
2453                         if(fcn_ref_cnt[(*si)] > 1){
2454                                 if(!is_partial_fcn[(*si)]){
2455                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2456 //              Testing for S is a side branch.
2457 //              I don't want a cacheable partial function to be
2458 //              marked as evaluated.  Therefore I mark the function
2459 //              as evalauted ONLY IF it is not partial.
2460                                         ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2461                                 }
2462                                 ret += "\t}\n";
2463                         }
2464                 }
2465
2466                 ret += "\tif( !("+generate_predicate_code(s_filt[w]->pr,schema)+
2467                                 ") ) goto end_s;\n";
2468         }
2469   }else{
2470           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2471   }
2472
2473         for(p=0;p<fs->hash_eq.size();++p)
2474                 ret += "\t\ts_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_right_se(),schema)+";\n";
2475
2476         if(fs->use_bloom){
2477 //                      First, generate the S scalar expressions in the hash_eq
2478
2479 //                      Iterate over the bloom filters
2480                 for(i=0;i<3;i++){
2481                         ret += "\t\tbucket=0;\n";
2482                         for(p=0;p<fs->hash_eq.size();++p){
2483                                 ret +=
2484 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2485         fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2486         +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2487                         }
2488 //              SET_BF_BIT(table,number of bloom filters,bloom filter index,bit index)
2489                                 ret +=
2490 "               bucket &= "+int_to_string(bf_mask)+";\n"
2491 "               SET_BF_BIT(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket);\n"
2492 "\n"
2493 ;
2494                 }
2495         }else{
2496                 ret += "\t\tbucket=0;\n";
2497                 for(p=0;p<fs->hash_eq.size();++p){
2498                         ret +=
2499 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2500         fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2501         +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2502                 }
2503                 ret +=
2504 "               bucket &= "+int_to_string(bf_mask)+";\n"
2505 "               bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2506 ;
2507 //                      Try the first bucket
2508                 ret += "\t\tif(";
2509                 for(p=0;p<fs->hash_eq.size();++p){
2510                         if(p>0) ret += " && ";
2511 //                      ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2512 //                                      " == s_equijoin_"+int_to_string(p);
2513                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2514                         string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2515                         string rhs_op = "s_equijoin_"+int_to_string(p);
2516                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2517                 }
2518                 ret += "){\n\t\t\tthe_bucket = bucket;\n";
2519                 ret += "\t\t}else {if(";
2520                 for(p=0;p<fs->hash_eq.size();++p){
2521                         if(p>0) ret += " && ";
2522 //                      ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2523 //                                      " == s_equijoin_"+int_to_string(p);
2524                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2525                         string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2526                         string rhs_op = "s_equijoin_"+int_to_string(p);
2527                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2528                 }
2529                 ret +=  "){\n\t\t\tthe_bucket = bucket1;\n";
2530                 ret += "\t\t}else{ if(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n";
2531                 ret+="\t\t\tthe_bucket = bucket;\n\t\t\telse the_bucket=bucket1;\n";
2532                 ret += "\t\t}}\n";
2533                 for(p=0;p<fs->hash_eq.size();++p){
2534                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2535                         if(hdt->is_buffer_type()){
2536                                 sprintf(tmpstr,"\t\t%s(f, &(t->join_table[the_bucket].key_var%d), &s_equijoin_%d);\n", hdt->get_buffer_assign_copy().c_str(), p, p);
2537                                 ret += tmpstr;
2538                         }else{
2539                                 ret += "\t\tt->join_table[the_bucket].key_var"+int_to_string(p)+
2540                                         " = s_equijoin_"+int_to_string(p)+";\n";
2541                         }
2542                 }
2543                 ret+="\t\tt->join_table[the_bucket].ts =  curr_fj_ts;\n";
2544         }
2545   ret += "\tend_s:\n";
2546
2547 //      ------------------------------------------------------------
2548 //              Next, determine if the R record should be processed.
2549
2550
2551         ret +=
2552 "//             R (main stream) cheap predicate\n"
2553 "\n"
2554 ;
2555
2556 //              Unpack r_filt fields
2557         vector<cnf_elem *> r_filt = fs->pred_t0;
2558         for(w=0;w<r_filt.size();++w){
2559                 col_id_set this_pred_cids;
2560                 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
2561                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2562                         if(unpacked_cids.count( (*csi) ) == 0){
2563                                 int tblref = (*csi).tblvar_ref;
2564                                 int schref = (*csi).schema_ref;
2565                                 string field = (*csi).field;
2566                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2567                                 unpacked_cids.insert( (*csi) );
2568                         }
2569                 }
2570         }
2571
2572 // Sort S preds based on cost.
2573
2574         vector<cnf_elem *> tmp_wh;
2575         for(w=0;w<r_filt.size();++w){
2576                 compute_cnf_cost(r_filt[w],Ext_fcns);
2577                 tmp_wh.push_back(r_filt[w]);
2578         }
2579         r_filt = tmp_wh;
2580
2581         sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
2582
2583 //              WARNING! the constant 20 below is a wild-ass guess.
2584         int cheap_rpos;
2585         for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++)
2586
2587 //              Test the cheap filters on R.
2588   if(cheap_rpos >0){
2589
2590 //              Now generate the predicates.
2591         for(w=0;w<cheap_rpos;++w){
2592                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
2593                 ret += tmpstr;
2594
2595 //                      Find partial fcns ref'd in this cnf element
2596                 set<int> pfcn_refs;
2597                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2598 //                      Since set<..> is a "Sorted Associative Container",
2599 //                      we can walk through it in sorted order by walking from
2600 //                      begin() to end().  (and the partial fcns must be
2601 //                      evaluated in this order).
2602                 set<int>::iterator si;
2603                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2604                         if(fcn_ref_cnt[(*si)] > 1){
2605                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2606                         }
2607                         if(is_partial_fcn[(*si)]){
2608                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2609                                 ret += "\t\tif(retval) goto end;\n";
2610                         }
2611                         if(fcn_ref_cnt[(*si)] > 1){
2612                                 if(!is_partial_fcn[(*si)]){
2613                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2614                                 }
2615                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2616                                 ret += "\t}\n";
2617                         }
2618                 }
2619
2620                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2621                                 ") ) goto end;\n";
2622         }
2623   }else{
2624           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2625   }
2626
2627         ret += "\n//    Do the join\n\n";
2628         for(p=0;p<fs->hash_eq.size();++p)
2629                 ret += "\t\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n";
2630
2631
2632 //                      Passed the cheap pred, now test the join with S.
2633         if(fs->use_bloom){
2634                 for(i=0;i<3;i++){
2635                         ret += "\t\tbucket"+int_to_string(i)+"=0;\n";
2636                         for(p=0;p<fs->hash_eq.size();++p){
2637                                 ret +=
2638 "       bucket"+int_to_string(i)+
2639         " ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2640         fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2641         +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2642                         }
2643                                 ret +=
2644 "       bucket"+int_to_string(i)+" &= "+int_to_string(bf_mask)+";\n";
2645                 }
2646                 ret += "\tfound = 0;\n";
2647                 ret += "\tfor(b=0;b<"+int_to_string(n_bloom)+" && !found; b++){\n";
2648                 ret +=
2649 "\t\tif(IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket0) && "
2650 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket1) && "
2651 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket2))\n "
2652 "\t\t\tfound=1;\n"
2653 "\t}\n"
2654 ;
2655                 ret +=
2656 "       if(!found)\n"
2657 "               goto end;\n"
2658 ;
2659         }else{
2660                 ret += "\tfound = 0;\n";
2661                 ret += "\t\tbucket=0;\n";
2662                 for(p=0;p<fs->hash_eq.size();++p){
2663                         ret +=
2664 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2665         fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2666         +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2667                 }
2668                 ret +=
2669 "               bucket &= "+int_to_string(bf_mask)+";\n"
2670 "               bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2671 ;
2672 //                      Try the first bucket
2673                 ret += "\t\tif(";
2674                 for(p=0;p<fs->hash_eq.size();++p){
2675                         if(p>0) ret += " && ";
2676 //                      ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2677 //                                      " == r_equijoin_"+int_to_string(p);
2678                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2679                         string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2680                         string rhs_op = "s_equijoin_"+int_to_string(p);
2681                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2682                 }
2683                 if(p>0) ret += " && ";
2684                 ret += "t->join_table[bucket].ts+"+int_to_string(fs->temporal_range)+" <=  curr_fj_ts";
2685                 ret += "){\n\t\t\tfound = 1;\n";
2686                 ret += "\t\t}else {if(";
2687                 for(p=0;p<fs->hash_eq.size();++p){
2688                         if(p>0) ret += " && ";
2689 //                      ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2690 //                                      " == r_equijoin_"+int_to_string(p);
2691                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2692                         string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2693                         string rhs_op = "s_equijoin_"+int_to_string(p);
2694                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2695                 }
2696                 if(p>0) ret += " && ";
2697                 ret += "t->join_table[bucket1].ts+"+int_to_string(fs->temporal_range)+" <=  curr_fj_ts";
2698                 ret +=  ")\n\t\t\tfound=1;\n";
2699                 ret+="\t\t}\n";
2700                 ret +=
2701 "       if(!found)\n"
2702 "               goto end;\n"
2703 ;
2704         }
2705
2706
2707 //              Test the expensive filters on R.
2708   if(cheap_rpos < r_filt.size()){
2709
2710 //              Now generate the predicates.
2711         for(w=cheap_rpos;w<r_filt.size();++w){
2712                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
2713                 ret += tmpstr;
2714
2715 //                      Find partial fcns ref'd in this cnf element
2716                 set<int> pfcn_refs;
2717                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2718 //                      Since set<..> is a "Sorted Associative Container",
2719 //                      we can walk through it in sorted order by walking from
2720 //                      begin() to end().  (and the partial fcns must be
2721 //                      evaluated in this order).
2722                 set<int>::iterator si;
2723                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2724                         if(fcn_ref_cnt[(*si)] > 1){
2725                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2726                         }
2727                         if(is_partial_fcn[(*si)]){
2728                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2729                                 ret += "\t\tif(retval) goto end;\n";
2730                         }
2731                         if(fcn_ref_cnt[(*si)] > 1){
2732                                 if(!is_partial_fcn[(*si)]){
2733                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2734                                 }
2735                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2736                                 ret += "\t}\n";
2737                         }
2738                 }
2739
2740                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2741                                 ") ) goto end;\n";
2742         }
2743   }else{
2744           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2745   }
2746
2747
2748
2749 ///////////////                 post the tuple
2750
2751 //                      test passed : create the tuple, then assign to it.
2752           ret += "/*\t\tCreate and post the tuple\t*/\n";
2753
2754 //              Unpack r_filt fields
2755         for(s=0;s<sl_list.size();++s){
2756                 col_id_set this_se_cids;
2757                 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
2758                 for(csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
2759                         if(unpacked_cids.count( (*csi) ) == 0){
2760                                 int tblref = (*csi).tblvar_ref;
2761                                 int schref = (*csi).schema_ref;
2762                                 string field = (*csi).field;
2763                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2764                                 unpacked_cids.insert( (*csi) );
2765                         }
2766                 }
2767         }
2768
2769
2770 //                      Unpack partial fcns ref'd by the select clause.
2771 //                      Its a kind of a WHERE clause ...
2772   for(p=sl_fcns_start;p<sl_fcns_end;p++){
2773         if(fcn_ref_cnt[p] > 1){
2774                 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2775         }
2776         if(is_partial_fcn[p]){
2777                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2778                 ret += "\tif(retval) goto end;\n";
2779         }
2780         if(fcn_ref_cnt[p] > 1){
2781                 if(!is_partial_fcn[p]){
2782                         ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2783                 }
2784                 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2785                 ret += "\t}\n";
2786         }
2787   }
2788
2789   // increment the counter of accepted tuples
2790   ret += "\n\t#ifdef LFTA_STATS\n";
2791   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2792   ret += "\t#endif\n\n";
2793
2794 //                      First, compute the size of the tuple.
2795
2796 //                      Unpack any BUFFER type selections into temporaries
2797 //                      so that I can compute their size and not have
2798 //                      to recompute their value during tuple packing.
2799 //                      I can use regular assignment here because
2800 //                      these temporaries are non-persistent.
2801
2802           for(s=0;s<sl_list.size();s++){
2803                 data_type *sdt = sl_list[s]->get_data_type();
2804                 if(sdt->is_buffer_type()){
2805                         sprintf(tmpstr,"\tselvar_%d = ",s);
2806                         ret += tmpstr;
2807                         ret += generate_se_code(sl_list[s],schema);
2808                         ret += ";\n";
2809                 }
2810           }
2811
2812
2813 //              The size of the tuple is the size of the tuple struct plus the
2814 //              size of the buffers to be copied in.
2815
2816           ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
2817           for(s=0;s<sl_list.size();s++){
2818                 data_type *sdt = sl_list[s]->get_data_type();
2819                 if(sdt->is_buffer_type()){
2820                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
2821                         ret += tmpstr;
2822                 }
2823           }
2824           ret += ";\n";
2825
2826
2827           ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
2828           ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
2829
2830 //                      Test passed, make assignments to the tuple.
2831
2832           ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
2833
2834 //                      Mark tuple as REGULAR_TUPLE
2835           ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
2836
2837
2838           for(s=0;s<sl_list.size();s++){
2839                 data_type *sdt = sl_list[s]->get_data_type();
2840                 if(sdt->is_buffer_type()){
2841                         sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
2842                         ret += tmpstr;
2843                         sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
2844                         ret += tmpstr;
2845                 }else{
2846                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2847                         ret += tmpstr;
2848 //                      if(sdt->needs_hn_translation())
2849 //                              ret += sdt->hton_translation() +"( ";
2850                         ret += generate_se_code(sl_list[s],schema);
2851 //                      if(sdt->needs_hn_translation())
2852 //                              ret += ") ";
2853                         ret += ";\n";
2854                 }
2855           }
2856
2857 //              Generate output.
2858
2859           ret += "\tpost_tuple(tuple);\n";
2860
2861 //              Increment the counter of posted tuples
2862   ret += "\n\t#ifdef LFTA_STATS\n";
2863   ret += "\n\tt->out_tuple_cnt++;\n\n";
2864   ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
2865   ret += "\t#endif\n\n";
2866
2867
2868         return ret;
2869 }
2870
2871 string generate_aggr_accept_body(qp_node *fs,string node_name,table_list *schema, string &temporal_flush){
2872         string ret;
2873         int a,p,g;
2874
2875 //////////////          Processing for aggregtion query
2876
2877 //              First, search for a match.  Start by unpacking the group-by attributes.
2878
2879 //                      One complication : if a real-time aggregate flush occurs,
2880 //                      the GB attr has already been calculated.  So don't compute
2881 //                      it again if 1) its temporal and 2) it will be computed in the
2882 //                      agggregate flush code.
2883
2884 //              Unpack the partial fcns ref'd by the gb's and the aggr defs.
2885   for(p=gb_fcns_start;p<gb_fcns_end;p++){
2886     if(is_partial_fcn[p]){
2887                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2888                 ret += "\tif(retval) goto end;\n";
2889         }
2890   }
2891   for(p=ag_fcns_start;p<ag_fcns_end;p++){
2892     if(is_partial_fcn[p]){
2893                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2894                 ret += "\tif(retval) goto end;\n";
2895         }
2896   }
2897
2898   // increment the counter of accepted tuples
2899   ret += "\n\t#ifdef LFTA_STATS\n";
2900   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2901   ret += "\t#endif\n\n";
2902
2903   ret += "/*\t\tTest if the group is in the hash table \t*/\n";
2904 //                      Compute the values of the group-by variables.
2905   for(g=0;g<gb_tbl->size();g++){
2906           data_type *gdt = gb_tbl->get_data_type(g);
2907           if((! gdt->is_temporal()) || temporal_flush == ""){
2908
2909                   if(gdt->is_buffer_type()){
2910         //                              NOTE : if the SE defining the gb is anything
2911         //                              other than a ref to a variable, this will generate
2912         //                              illegal code.  To be resolved with Spatch.
2913                         sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2914                                 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2915                         ret += tmpstr;
2916                         sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2917                                 gdt->get_buffer_assign_copy().c_str(), g, g);
2918                   }else{
2919                         sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2920                   }
2921                   ret += tmpstr;
2922           }
2923   }
2924   ret += "\n";
2925
2926 //                      A quick aside : if any of the GB attrs are temporal,
2927 //                      test for change and flush if any change occurred.
2928 //                      We've already computed the flush code,
2929 //                      Put it here if this is not a real time query.
2930 //                      We've already unpacked all column refs, so no need to
2931 //                      do it again here.
2932
2933         string rt_level = fs->get_val_of_def("real_time");
2934         if(rt_level == "" && temporal_flush != ""){
2935                 ret += temporal_flush;
2936         }
2937
2938 //                      Compute the hash bucket
2939         if(gb_tbl->size() > 0){
2940                 ret += "\thashval = ";\
2941                 for(g=0;g<gb_tbl->size();g++){
2942                   if(g>0) ret += " ^ ";
2943                   data_type *gdt = gb_tbl->get_data_type(g);
2944                   if(gdt->is_buffer_type()){
2945                         sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
2946                                 gdt->get_type_str().c_str(), g);
2947                   }else{
2948                         sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
2949                                 gdt->get_type_str().c_str(), g);
2950                   }
2951                   ret += tmpstr;
2952                 }
2953                 ret += ";\n";
2954                 ret += "\thash2 = ((hashval * "+hash_nums[g%NRANDS]+") >> 32) & SLOT_HASH_BITS;\n";
2955         ret+="\tprobe = (hashval >> 32) & (t->max_aggrs-1);\n";
2956         }else{
2957                 ret+="\tprobe = 0;\n";
2958                 ret+="\thash2 = 0;\n\n";
2959         }
2960
2961 //              Does the lfta reference a udaf?
2962           bool has_udaf = false;
2963           for(a=0;a<aggr_tbl->size();a++){
2964                   if(! aggr_tbl->is_builtin(a)) has_udaf = true;
2965           }
2966
2967 //              Scan for a match, or alternatively the best slot.
2968 //              Currently, hardcode 5 tests.
2969         ret +=
2970 "       gen_val = t->generation & SLOT_GEN_BITS;\n"
2971 "       match_found = 0;\n"
2972 "       best_slot = probe;\n"
2973 "       for(i=0;i<5 && match_found == 0;i++){\n"
2974 "               if((t->aggr_table_hashmap[probe] & SLOT_FILLED) && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_HASH_BITS) == hash2 ){\n"
2975 ;
2976         if(gb_tbl->size()>0){
2977                 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
2978                 ret+="\t\tif(";
2979                 string rhs_op, lhs_op;
2980                 for(g=0;g<gb_tbl->size();g++){
2981                   if(g>0) ret += " && ";
2982                   ret += "(";
2983                   sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
2984                   sprintf(tmpstr,"t->aggr_table[probe].gb_var%d",g); rhs_op = tmpstr;
2985                   ret += generate_equality_test(lhs_op,rhs_op,gb_tbl->get_data_type(g));
2986                   ret += ")";
2987                 }
2988          }
2989          ret += "){\n"
2990 "                       match_found = 1;\n"
2991 "                       best_slot = probe;\n"
2992 "               }\n"
2993 "       }\n"
2994 "//             Rate slots in case no match found: prefer empty, then full but old slots\n"
2995 "       if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
2996 "               if((t->aggr_table_hashmap[probe] & SLOT_FILLED)==0)\n"
2997 "                       best_slot = probe;\n"
2998 "               }else{\n"
2999 "                       if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) != gen_val){\n"
3000 "                               best_slot = probe;\n"
3001 "                       }\n"
3002 "               }\n"
3003 "               probe++;\n"
3004 "               if(probe >= t->max_aggrs)\n"
3005 "                       probe=0;\n"
3006 "       }\n"
3007 "       if(match_found){\n"
3008 ;
3009         ret += generate_gb_update(node_name, schema, "best_slot",has_udaf);
3010         ret +=
3011 "       }else{\n"
3012 "               if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3013 ;
3014 printf("sgah_qpn name is %s, disorder is %d\n",fs->node_name.c_str(),((sgah_qpn *)fs)->lfta_disorder);
3015         if(((sgah_qpn *)fs)->lfta_disorder <= 1){
3016                 ret +=
3017 "                       if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS)==gen_val){\n"
3018 "                               if((";
3019                 bool first_g = true;
3020                 for(int g=0;g<gb_tbl->size();g++){
3021                         data_type *gdt = gb_tbl->get_data_type(g);
3022                         if(gdt->is_temporal()){
3023                                 if(first_g) first_g = false; else ret+=" + ";
3024                                 ret += "(gb_attr_"+int_to_string(g)+" - t->aggr_table[best_slot].gb_var"+int_to_string(g)+")";
3025                         }
3026                 }
3027                 ret += ") == 0 ){\n";
3028
3029                 ret +=
3030 "                                       fta_aggr_flush_old_"+ node_name+"(f,t->max_aggrs);\n"
3031 "                               }\n"
3032 "                       }\n"
3033 ;
3034         }
3035
3036         ret += generate_tuple_from_aggr(node_name,schema,"best_slot");
3037         ret +=
3038 "\t\t\t#ifdef LFTA_STATS\n"
3039 "\t\t\tif((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val)\n"
3040 "\t\t\t\tt->collision_cnt++;\n\n"
3041 "\t\t\t#endif\n\n"
3042 "\t\t}\n"
3043 ;
3044         ret += generate_init_group(schema,"best_slot");
3045
3046
3047           ret += "\t}\n";
3048
3049         return ret;
3050 }
3051
3052
3053
3054 string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, set<unsigned int> &s_pids){
3055
3056         string ret="static gs_retval_t accept_packet_"+node_name+
3057                 "(struct FTA *f, FTAID * ftaid, void *pkt, gs_int32_t sz){\n";
3058     ret += "\tstruct packet *p = (struct packet *)pkt;\n";
3059
3060   int a;
3061
3062 //                      Define all of the variables needed by this
3063 //                      procedure.
3064
3065
3066 //                      Gather all column references, need to define unpacking variables.
3067   int w,s;
3068   col_id_set cid_set;
3069   col_id_set::iterator csi;
3070
3071 //              If its a filter join, rebind all colrefs
3072 //              to the first range var, to avoid double unpacking.
3073
3074   if(is_fj){
3075     for(w=0;w<where.size();++w)
3076                 reset_pr_col_ids_tblvars(where[w]->pr, gb_tbl);
3077     for(s=0;s<sl_list.size();s++)
3078                 reset_se_col_ids_tblvars(sl_list[s], gb_tbl);
3079   }
3080
3081   for(w=0;w<where.size();++w){
3082         if(is_fj || s_pids.count(w) == 0)
3083                 gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
3084         }
3085   for(s=0;s<sl_list.size();s++){
3086         gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
3087   }
3088
3089   int g;
3090   if(gb_tbl != NULL){
3091         for(g=0;g<gb_tbl->size();g++)
3092           gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
3093   }
3094
3095   //                    Variables for unpacking attributes.
3096   ret += "/*\t\tVariables for unpacking attributes\t*/\n";
3097   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
3098     int schref = (*csi).schema_ref;
3099         int tblref = (*csi).tblvar_ref;
3100     string field = (*csi).field;
3101     data_type dt(schema->get_type_name(schref,field));
3102     sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
3103         field.c_str(), tblref);
3104     ret += tmpstr;
3105   }
3106
3107   ret += "\n\n";
3108
3109 //                      Variables that are always needed
3110   ret += "/*\t\tVariables which are always needed\t*/\n";
3111   ret += "\tgs_retval_t retval;\n";
3112   ret += "\tgs_int32_t tuple_size, tuple_pos, lfta_bailout;\n";
3113   ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
3114
3115   ret+="\tstruct "+generate_fta_name(node_name)+" *t = (struct "+generate_fta_name(node_name)+"*) f;\n\n";
3116
3117
3118 //                      Variables needed for aggregation queries.
3119   if(is_aggr_query){
3120           ret += "\n/*\t\tVariables for aggregation\t*/\n";
3121           ret+="\tunsigned int i, probe;\n";
3122           ret+="\tunsigned int gen_val, match_found, best_slot;\n";
3123           ret+="\tgs_uint64_t hashval, hash2;\n";
3124 //                      Variables for storing group-by attribute values.
3125           if(gb_tbl->size() > 0)
3126                 ret += "/*\t\tGroup-by attributes\t*/\n";
3127           for(g=0;g<gb_tbl->size();g++){
3128                 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3129                 ret += tmpstr;
3130                 data_type *gdt = gb_tbl->get_data_type(g);
3131                 if(gdt->is_buffer_type()){
3132                   sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3133                   ret += tmpstr;
3134                 }
3135           }
3136           ret += "\n";
3137 //                      Temporaries for min/max
3138           string aggr_tmp_str = "";
3139           for(a=0;a<aggr_tbl->size();a++){
3140                 string aggr_op = aggr_tbl->get_op(a);
3141                 if(aggr_op == "MIN" || aggr_op == "MAX"){
3142                         sprintf(tmpstr,"\t%s aggr_tmp_%d;\n",aggr_tbl->get_data_type(a)->get_cvar_type().c_str(),a);
3143                         aggr_tmp_str.append(tmpstr);
3144                 }
3145           }
3146           if(aggr_tmp_str != ""){
3147                 ret += "/*\t\tTemp vars for BUFFER aggregates\t*/\n";
3148                 ret += aggr_tmp_str;
3149                 ret += "\n";
3150           }
3151 //              Variables for udaf output temporaries
3152         bool no_udaf = true;
3153         for(a=0;a<aggr_tbl->size();a++){
3154                 if(! aggr_tbl->is_builtin(a)){
3155                         if(no_udaf){
3156                                 ret+="/*\t\tUDAF output vars.\t*/\n";
3157                                 no_udaf = false;
3158                         }
3159                         int afcn_id = aggr_tbl->get_fcn_id(a);
3160                         data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
3161                         sprintf(tmpstr,"udaf_ret%d", a);
3162                         ret+="\t"+adt->make_cvar(tmpstr)+";\n";
3163                 }
3164         }
3165   }
3166
3167 //                      Variables needed for a filter join query
3168   if(fs->node_type() == "filter_join"){
3169         filter_join_qpn *fjq = (filter_join_qpn *)fs;
3170         bool uses_bloom = fjq->use_bloom;
3171         ret += "/*\t\tJoin fields\t*/\n";
3172         for(g=0;g<fjq->hash_eq.size();g++){
3173                 sprintf(tmpstr,"\t%s s_equijoin_%d, r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g,g);
3174                 ret += tmpstr;
3175           }
3176         if(uses_bloom){
3177                 ret +=
3178 "  /*           Variables for fj bloom filter   */ \n"
3179 "\tunsigned int i=0,j=0,k=0, b, bf_clean = 0, tmp_i, found; \n"
3180 "\tunsigned int bucket, bucket0, bucket1, bucket2;\n"
3181 "\tlong long int curr_fj_ts;\n"
3182 "\tunsigned int curr_bin, the_bin;\n"
3183 "\n"
3184 ;
3185         }else{
3186                 ret +=
3187 "  /*           Variables for fj join table     */ \n"
3188 "\tunsigned int i, bucket, found; \n"
3189 "\tunsigned int bucket1, the_bucket;\n"
3190 "       long long int curr_fj_ts;\n"
3191 "\n"
3192 ;
3193         }
3194   }
3195
3196
3197 //              Variables needed to store selected attributes of BUFFER type
3198 //              temporarily, in order to compute their size for storage
3199 //              in an output tuple.
3200
3201   string select_var_defs = "";
3202   for(s=0;s<sl_list.size();s++){
3203         data_type *sdt = sl_list[s]->get_data_type();
3204         if(sdt->is_buffer_type()){
3205           sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
3206           select_var_defs.append(tmpstr);
3207         }
3208   }
3209   if(select_var_defs != ""){
3210         ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
3211     ret += select_var_defs;
3212   }
3213
3214 //              Variables to store results of partial functions.
3215   int p;
3216   if(partial_fcns.size()>0){
3217           ret += "/*\t\tVariables for storing results of partial functions. \t*/\n";
3218           for(p=0;p<partial_fcns.size();++p){
3219                 if(is_partial_fcn[p] || (!is_aggr_query && fcn_ref_cnt[p] >1)){
3220                   sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
3221                     partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
3222                   ret += tmpstr;
3223                   if(!is_aggr_query && fcn_ref_cnt[p] >1){
3224                         ret += "\tint fcn_ref_cnt_"+int_to_string(p)+" = 0;\n";
3225                   }
3226                 }
3227           }
3228
3229           if(is_aggr_query) ret += "\tint unpack_failed = 0;\n";
3230           ret += "\n";
3231   }
3232
3233 //              variable to hold packet struct  //
3234         if(packed_return){
3235                 ret += "\t struct "+node_name+"_input_struct *"+node_name+"_input_struct_var;\n";
3236         }
3237
3238
3239   ret += "\t#ifdef LFTA_STATS\n";
3240 // variable to store counter of cpu cycles spend in accept_tuple
3241         ret += "\tgs_uint64_t start_cycle = rdtsc();\n";
3242 // increment counter of received tuples
3243         ret += "\tt->in_tuple_cnt++;\n";
3244   ret += "\t#endif\n";
3245
3246
3247 //      -------------------------------------------------
3248 //              If the packet is "packet", test if its for this lfta,
3249 //              and if so load it into its struct
3250
3251         if(packed_return){
3252                 ret+="\n/*  packed tuple : test and load. \t*/\n";
3253                 ret+="\t"+node_name+"_input_struct_var = (struct "+node_name+"_input_struct *) pkt;\n";
3254                 ret+="\tif("+node_name+"_input_struct_var->__lfta_id_fm_nic__ != "+int_to_string(global_id) + ")\n";
3255                 ret+="\t\tgoto end;\n\n";
3256         }
3257
3258
3259
3260   col_id_set unpacked_cids;     //      Keep track of the cols that have been unpacked.
3261
3262   string temporal_flush;
3263   if(is_aggr_query)
3264         ret += generate_aggr_accept_prelim(fs, node_name, schema, unpacked_cids, temporal_flush);
3265   else {        // non-aggregation operators
3266
3267 // Unpack all the temporal attributes referenced in select clause
3268 // and update the last value of the attribute
3269         col_id_set temp_cids;           //      col ids of temp attributes in select clause
3270
3271         for(s=0;s<sl_list.size();s++){
3272                 data_type *sdt = sl_list[s]->get_data_type();
3273                 if (sdt->is_temporal()) {
3274                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
3275                 }
3276         }
3277 //                      If this is a filter join,
3278 //                      ensure that the temporal range field is unpacked.
3279         if(is_fj){
3280                 col_id window_var_cid(((filter_join_qpn *)fs)->temporal_var);
3281                 if(temp_cids.count(window_var_cid)==0)
3282                         temp_cids.insert(window_var_cid);
3283         }
3284
3285         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
3286                 if(unpacked_cids.count((*csi)) == 0){
3287                         int tblref = (*csi).tblvar_ref;
3288                         int schref = (*csi).schema_ref;
3289                         string field = (*csi).field;
3290                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3291                         sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
3292                         ret += tmpstr;
3293
3294                         unpacked_cids.insert( (*csi) );
3295                 }
3296         }
3297
3298   }
3299
3300   vector<cnf_elem *> filter = fs->get_filter_clause();
3301 //              Test the filter predicate (some query types have additional preds).
3302   if(filter.size() > 0){
3303
3304 //              Sort by evaluation cost.
3305 //              First, estimate evaluation costs
3306 //              Eliminate predicates covered by the prefilter (those in s_pids).
3307 //              I need to do it before the sort becuase the indices refer
3308 //              to the position in the unsorted list./
3309         vector<cnf_elem *> tmp_wh;
3310         for(w=0;w<filter.size();++w){
3311                 if(s_pids.count(w) == 0){
3312                         compute_cnf_cost(filter[w],Ext_fcns);
3313                         tmp_wh.push_back(filter[w]);
3314                 }
3315         }
3316         filter = tmp_wh;
3317
3318         sort(filter.begin(), filter.end(), compare_cnf_cost());
3319
3320 //              Now generate the predicates.
3321         for(w=0;w<filter.size();++w){
3322                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,filter[w]->cost);
3323                 ret += tmpstr;
3324 //                      Find the set of variables accessed in this CNF elem,
3325 //                      but in no previous element.
3326                 col_id_set this_pred_cids;
3327                 gather_pr_col_ids(filter[w]->pr, this_pred_cids, gb_tbl);
3328                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3329                         if(unpacked_cids.count( (*csi) ) == 0){
3330                         int tblref = (*csi).tblvar_ref;
3331                         int schref = (*csi).schema_ref;
3332                         string field = (*csi).field;
3333                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3334                                 unpacked_cids.insert( (*csi) );
3335                         }
3336                 }
3337 //                      Find partial fcns ref'd in this cnf element
3338                 set<int> pfcn_refs;
3339                 collect_partial_fcns_pr(filter[w]->pr, pfcn_refs);
3340 //                      Since set<..> is a "Sorted Associative Container",
3341 //                      we can walk through it in sorted order by walking from
3342 //                      begin() to end().  (and the partial fcns must be
3343 //                      evaluated in this order).
3344                 set<int>::iterator si;
3345                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3346                         if(fcn_ref_cnt[(*si)] > 1){
3347                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3348                         }
3349                         if(is_partial_fcn[(*si)]){
3350                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3351                                 ret += "\t\tif(retval) goto end;\n";
3352                         }
3353                         if(fcn_ref_cnt[(*si)] > 1){
3354                                 if(!is_partial_fcn[(*si)]){
3355                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3356                                 }
3357                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3358                                 ret += "\t}\n";
3359                         }
3360                 }
3361
3362                 ret += "\tif( !("+generate_predicate_code(filter[w]->pr,schema)+
3363                                 ") ) goto end;\n";
3364         }
3365   }else{
3366           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
3367   }
3368
3369
3370 //                      We've passed the WHERE clause,
3371 //                      unpack the remainder of the accessed fields.
3372   if(is_fj){
3373         ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
3374         vector<cnf_elem *> h_eq = ((filter_join_qpn *)fs)-> hash_eq;
3375                 for(w=0;w<h_eq.size();++w){
3376                 col_id_set this_pred_cids;
3377                 gather_pr_col_ids(h_eq[w]->pr, this_pred_cids, gb_tbl);
3378                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3379                         if(unpacked_cids.count( (*csi) ) == 0){
3380                                 int tblref = (*csi).tblvar_ref;
3381                                 int schref = (*csi).schema_ref;
3382                                 string field = (*csi).field;
3383                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3384                                 unpacked_cids.insert( (*csi) );
3385                         }
3386                 }
3387         }
3388   }else{
3389           ret += "\n/*\tPassed the WHERE clause, unpack the rest of the accessed fields. */\n";
3390
3391           for(csi=cid_set.begin();csi!=cid_set.end();++csi){
3392                 if(unpacked_cids.count( (*csi) ) == 0){
3393                         int schref = (*csi).schema_ref;
3394                         int tblref = (*csi).tblvar_ref;
3395                         string field = (*csi).field;
3396                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3397                         unpacked_cids.insert( (*csi) );
3398                 }
3399           }
3400   }
3401
3402
3403 //////////////////
3404 //////////////////      After this, the query types
3405 //////////////////      are processed differently.
3406
3407   if(!is_aggr_query && !is_fj)
3408         ret += generate_sel_accept_body(fs, node_name, schema);
3409   else if(is_aggr_query)
3410         ret += generate_aggr_accept_body(fs, node_name, schema, temporal_flush);
3411   else
3412         ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
3413
3414
3415 //              Finish up.
3416
3417    ret += "\n\tend:\n";
3418   ret += "\t#ifdef LFTA_STATS\n";
3419         ret+= "\tt->cycle_cnt += rdtsc() - start_cycle;\n";
3420   ret += "\t#endif\n";
3421    ret += "\n\treturn 1;\n}\n\n";
3422
3423         return(ret);
3424 }
3425
3426
3427 string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, bool is_aggr_query, bool is_fj, bool uses_bloom){
3428         int g, cl;
3429
3430         string ret = "struct FTA * "+generate_alloc_name(node_name) +
3431            "(struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command,  gs_int32_t sz, void *value){\n";
3432
3433         ret+="\tstruct "+generate_fta_name(node_name)+"* f;\n";
3434         ret+="\tint i;\n";
3435         ret += "\n";
3436         ret+="\tif((f=fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n";
3437
3438 //                              assign a streamid to fta instance
3439         ret+="\t/* assign a streamid */\n";
3440         ret+="\tf->f.ftaid = ftaid;\n";
3441         ret+="\tf->f.ftaid.streamid = (gs_p_t)f;\n";
3442         ret+="\tgslog(LOG_INFO,\"Lfta "+node_name+" has FTAID {ip=%u,port=%u,index=%u,streamid=%u}\\n\",f->f.ftaid.ip,f->f.ftaid.port,f->f.ftaid.index,f->f.ftaid.streamid);\n";
3443
3444         if(is_aggr_query){
3445                 ret += "\tf->n_aggrs = 0;\n";
3446
3447                 ret += "\tf->max_aggrs = ";
3448
3449 //                              Computing the number of aggregate blocks is a little
3450 //                              tricky.  If there are no GB attrs, or if all GB attrs
3451 //                              are temporal, then use a single aggregate block, else
3452 //                              use a default value (10).  A user specification overrides
3453 //                              this logic.
3454                 bool single_group = true;
3455                 for(g=0;g<gb_tbl->size();g++){
3456                         data_type *gdt = gb_tbl->get_data_type(g);
3457                         if(! gdt->is_temporal() ){
3458                                 single_group = false;
3459                         }
3460                 }
3461                 string max_aggr_str = fs->get_val_of_def("aggregate_slots");
3462                 int max_aggr_i = atoi(max_aggr_str.c_str());
3463                 if(max_aggr_i <= 0){
3464                         if(single_group)
3465                                 ret += "2";
3466                         else
3467                                 ret += int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
3468                 }else{
3469                         unsigned int naggrs = 1;                // make it power of 2
3470                         unsigned int nones = 0;
3471                         while(max_aggr_i){
3472                                 if(max_aggr_i&1)
3473                                         nones++;
3474                                 naggrs = naggrs << 1;
3475                                 max_aggr_i = max_aggr_i >> 1;
3476                         }
3477                         if(nones==1)            // in case it was already a power of 2.
3478                                 naggrs/=2;
3479                         ret += int_to_string(naggrs);
3480                 }
3481                 ret += ";\n";
3482
3483                 ret+="\tif ((f->aggr_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n";
3484                 ret+="\t\treturn(0);\n";
3485                 ret+="\t}\n\n";
3486 //              ret+="/* compute how many integers we need to store the hashmap */\n";
3487 //              ret+="\tf->bitmap_size = (f->max_aggrs % (sizeof(gs_uint32_t) * 4)) ? (f->max_aggrs / (sizeof(gs_uint32_t) * 4) + 1) : (f->max_aggrs / (sizeof(gs_uint32_t) * 4));\n\n";
3488                 ret+="\tif ((f->aggr_table_hashmap = sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n";
3489                 ret+="\t\treturn(0);\n";
3490                 ret+="\t}\n";
3491                 ret+="/*\t\tfill bitmap with zero \t*/\n";
3492                 ret+="\tfor (i = 0; i < f->max_aggrs; ++i)\n";
3493                 ret+="\t\tf->aggr_table_hashmap[i] = 0;\n";
3494                 ret+="\tf->generation=0;\n";
3495                 ret+="\tf->flush_pos = f->max_aggrs;\n";
3496
3497                 ret += "\tf->flush_ctr = 0;\n";
3498
3499         }
3500
3501         if(is_fj){
3502                 if(uses_bloom){
3503                         ret+="\tf->first_exec = 1;\n";
3504                         unsigned int n_bloom = 11;
3505                         string n_bloom_str = fs->get_val_of_def("num_bloom");
3506                         int tmp_n_bloom = atoi(n_bloom_str.c_str());
3507                         if(tmp_n_bloom>0)
3508                                 n_bloom = tmp_n_bloom+1;
3509
3510                         unsigned int window_len = ((filter_join_qpn *)fs)->temporal_range;
3511                         if(window_len < n_bloom){
3512                                 n_bloom = window_len+1;
3513                         }
3514
3515                         int bf_exp_size = 12;  // base-2 log of number of bits
3516                         string bloom_len_str = fs->get_val_of_def("bloom_size");
3517                         int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
3518                         if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
3519                                 bf_exp_size = tmp_bf_exp_size;
3520                         }
3521                         int bf_bit_size = 1 << 12;
3522                         int bf_byte_size = bf_bit_size / (8*sizeof(char));
3523
3524                         int bf_tot = n_bloom*bf_byte_size;
3525                         ret+="\tif ((f->bf_table = sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n";
3526                         ret+="\t\treturn(0);\n";
3527                         ret+="\t}\n";
3528                         ret +=
3529 "       for(i=0;i<"+int_to_string(bf_tot)+";i++)\n"
3530 "               f->bf_table[i] = 0;\n"
3531 ;
3532                 }else{
3533                         unsigned int ht_size = 4096;
3534                         string ht_size_s = fs->get_val_of_def("aggregate_slots");
3535                         int tmp_ht_size = atoi(ht_size_s.c_str());
3536                         if(tmp_ht_size > 1024){
3537                                 unsigned int hs = 1;            // make it power of 2
3538                                 while(tmp_ht_size){
3539                                         hs =hs << 1;
3540                                         tmp_ht_size = tmp_ht_size >> 1;
3541                                 }
3542                                 ht_size = hs;
3543                         }
3544                         ret+="\tif ((f->join_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n";
3545                         ret+="\t\treturn(0);\n";
3546                         ret+="\t}\n\n";
3547                         ret +=
3548 "       for(i=0;i<"+int_to_string(ht_size)+";i++)\n"
3549 "               f->join_table[i].ts = 0;\n"
3550 ;
3551                 }
3552         }
3553
3554 //                      Initialize the complex literals (which might be handles).
3555
3556         for(cl=0;cl<complex_literals->size();cl++){
3557                 literal_t *l = complex_literals->get_literal(cl);
3558 //              sprintf(tmpstr,"\tf->complex_literal_%d = ",cl);
3559 //              ret += tmpstr + l->to_C_code() + ";\n";
3560                 sprintf(tmpstr,"&(f->complex_literal_%d)",cl);
3561                 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
3562         }
3563
3564         ret += "\n";
3565
3566 //                      Initialize the last seen values of temporal attributes to min(max) value of
3567 //                      their respective type
3568 //                      Create places to hold the last values of temporal attributes referenced in select clause
3569
3570
3571         col_id_set temp_cids;           //      col ids of temp attributes in select clause
3572
3573         int s;
3574         col_id_set::iterator csi;
3575
3576         for(s=0;s<sl_list.size();s++){
3577                 data_type *sdt = sl_list[s]->get_data_type();
3578                 if (sdt->is_temporal()) {
3579                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
3580                 }
3581         }
3582
3583         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
3584                 int tblref = (*csi).tblvar_ref;
3585                 int schref = (*csi).schema_ref;
3586                 string field = (*csi).field;
3587                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
3588                 if (dt.is_increasing()) {
3589                         sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_min_literal().c_str());
3590                         ret += tmpstr;
3591                 } else if (dt.is_decreasing()) {
3592                         sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_max_literal().c_str());
3593                         ret += tmpstr;
3594                 }
3595         }
3596
3597 //      initialize last seen values of temporal groubpy variables
3598         if(is_aggr_query){
3599                 for(g=0;g<gb_tbl->size();g++){
3600                         data_type *dt = gb_tbl->get_data_type(g);
3601                         if(dt->is_temporal()){
3602 /*
3603                                 fprintf(stderr,"group by attribute %s is temporal, ",
3604                                                 gb_tbl->get_name(g).c_str());
3605 */
3606                                 if(dt->is_increasing()){
3607                                         sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_min_literal().c_str());
3608                                 }else{
3609                                         sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_max_literal().c_str());
3610                                 }
3611                                 ret += tmpstr;
3612                         }
3613                 }
3614         }
3615
3616         ret += "\tf->f.alloc_fta="+generate_alloc_name(node_name)+";\n";
3617         ret+="\tf->f.free_fta=free_fta_"+node_name+";\n";
3618         ret+="\tf->f.control_fta=control_fta_"+node_name+";\n";
3619         ret+="\tf->f.accept_packet=accept_packet_"+node_name+";\n";
3620         ret+="\tf->f.clock_fta=clock_fta_"+node_name+";\n\n";
3621
3622 //                      Initialize runtime stats
3623         ret+="\tf->in_tuple_cnt = 0;\n";
3624         ret+="\tf->out_tuple_cnt = 0;\n";
3625         ret+="\tf->out_tuple_sz = 0;\n";
3626         ret+="\tf->accepted_tuple_cnt = 0;\n";
3627         ret+="\tf->cycle_cnt = 0;\n";
3628         ret+="\tf->collision_cnt = 0;\n";
3629         ret+="\tf->eviction_cnt = 0;\n";
3630         ret+="\tf->sampling_rate = 1.0;\n";
3631
3632         ret+="\tf->trace_id = 0;\n\n";
3633     if(param_tbl->size() > 0){
3634         ret+=
3635 "\tif(load_params_"+node_name+"(f, sz, value, 1)){\n"
3636 "#ifndef LFTA_IN_NIC\n"
3637 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small (%d). This query does not have valid parameters, bailing out.\\n\",sz);\n"
3638 "#else\n"
3639 "\t\t}\n"
3640 "#endif\n"
3641 "\t\t\treturn 0;\n"
3642 "\t\t}\n";
3643         }
3644
3645 //                      Register the pass-by-handle parameters
3646     int ph;
3647     for(ph=0;ph<param_handle_table.size();++ph){
3648                 data_type pdt(param_handle_table[ph]->type_name);
3649                 sprintf(tmpstr,"\tf->handle_param_%d = %s((struct FTA *)f,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
3650                 switch(param_handle_table[ph]->val_type){
3651                 case cplx_lit_e:
3652                         ret += tmpstr;
3653                         if(pdt.is_buffer_type()) ret += "&(";
3654                         sprintf(tmpstr,"f->complex_literal_%d",param_handle_table[ph]->complex_literal_idx);
3655                         ret += tmpstr ;
3656                         if(pdt.is_buffer_type()) ret += ")";
3657                         ret +=  ");\n";
3658                         break;
3659                 case litval_e:
3660 //                              not complex, no constructor
3661                         ret += tmpstr;
3662                         ret += param_handle_table[ph]->litval->to_C_code("") + ");\n";
3663                         break;
3664                 case param_e:
3665 //                              query parameter handles are regstered/deregistered in the
3666 //                              load_params function.
3667 //                      ret += "t->param_"+param_handle_table[ph]->param_name;
3668                         break;
3669                 default:
3670                         fprintf(stderr, "INTERNAL ERROR unknown case found when processing pass-by-handle parameter table.\n");
3671                         exit(1);
3672                 }
3673         }
3674
3675         ret += "\treturn (struct FTA *) f;\n";
3676         ret += "}\n\n";
3677
3678         return(ret);
3679 }
3680
3681
3682
3683
3684 //////////////////////////////////////////////////////////////////
3685
3686 string generate_lfta_block(qp_node *fs, table_list *schema, int gid,
3687 //              map<string,string> &int_fcn_defs,
3688                 ext_fcn_list *Ext_fcns, string &schema_embed_str, ifq_t *ifdb, nic_property *nicp, set<unsigned int> &s_pids){
3689         bool is_aggr_query;
3690         int s,p,g;
3691         string retval;
3692
3693 /////////////////////////////////////////////////////////////
3694 ///             Do operator-generic processing, such as
3695 ///             gathering the set of referenced columns,
3696 ///             generating structures, etc.
3697
3698 //              Initialize globals to empty.
3699         gb_tbl = NULL; aggr_tbl = NULL;
3700         global_id = -1; nicprop = NULL;
3701         param_tbl = fs->get_param_tbl();
3702         sl_list.clear(); where.clear();
3703         partial_fcns.clear();
3704         fcn_ref_cnt.clear(); is_partial_fcn.clear();
3705         pred_class.clear(); pred_pos.clear();
3706         sl_fcns_start=sl_fcns_end=wh_fcns_start=wh_fcns_end=0;
3707         gb_fcns_start=gb_fcns_end=ag_fcns_start=ag_fcns_end=0;
3708
3709
3710 //              Does the lfta read packed results from the NIC?
3711         nicprop = nicp;                 // load into global
3712         global_id = gid;
3713     packed_return = false;
3714         if(nicp && nicp->option_exists("Return")){
3715                 if(nicp->option_value("Return") == "Packed"){
3716                         packed_return = true;
3717                 }else{
3718                         fprintf(stderr,"Warning, nic option value of Return=%s is not recognized, ignoring\n",nicp->option_value("Return").c_str());
3719                 }
3720         }
3721
3722
3723 //                      Extract data which defines the query.
3724 //                              complex literals gathered now.
3725         complex_literals = fs->get_cplx_lit_tbl(Ext_fcns);
3726         param_handle_table = fs->get_handle_param_tbl(Ext_fcns);
3727         string node_name = fs->get_node_name();
3728     bool is_fj = false, uses_bloom = false;
3729
3730
3731         if(fs->node_type() == "spx_qpn"){
3732                 is_aggr_query = false;
3733                 spx_qpn *spx_node = (spx_qpn *)fs;
3734                 sl_list = spx_node->get_select_se_list();
3735                 where = spx_node->get_where_clause();
3736                 gb_tbl = NULL;
3737                 aggr_tbl = NULL;
3738         } else
3739         if(fs->node_type() == "sgah_qpn"){
3740                 is_aggr_query = true;
3741                 sgah_qpn *sgah_node = (sgah_qpn *)fs;
3742                 sl_list = sgah_node->get_select_se_list();
3743                 where = sgah_node->get_where_clause();
3744                 gb_tbl = sgah_node->get_gb_tbl();
3745                 aggr_tbl = sgah_node->get_aggr_tbl();
3746
3747                 if((sgah_node->get_having_clause()).size() > 0){
3748                         fprintf(stderr,"Warning in LFTA %s: HAVING clause will be ignored.\n", fs->get_node_name().c_str());
3749                 }
3750         } else
3751         if(fs->node_type() == "filter_join"){
3752                 is_aggr_query = false;
3753         is_fj = true;
3754                 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
3755                 sl_list = fj_node->get_select_se_list();
3756                 where = fj_node->get_where_clause();
3757                 uses_bloom = fj_node->use_bloom;
3758                 gb_tbl = NULL;
3759                 aggr_tbl = NULL;
3760         } else {
3761                 fprintf(stderr,"INTERNAL ERROR, unrecognized node type %s in generate_lfta_block\n", fs->node_type().c_str());
3762                 exit(1);
3763         }
3764
3765 //                      Build list of "partial functions", by clause.
3766 //                      NOTE : partial fcns are not handles well.
3767 //                      The act of searching for them associates the fcn call
3768 //                      in the SE with an index to an array.  Refs to the
3769 //                      fcn value are replaced with refs to the variable they are
3770 //                      unpacked into.  A more general tagging mechanism would be better.
3771
3772         int i;
3773         vector<bool> *pfunc_ptr = NULL;
3774         vector<int> *ref_cnt_ptr = NULL;
3775         if(!is_aggr_query){             // don't collect cacheable fcns on aggr query.
3776                 ref_cnt_ptr = &fcn_ref_cnt;
3777                 pfunc_ptr = &is_partial_fcn;
3778         }
3779
3780         sl_fcns_start = 0;
3781         for(i=0;i<sl_list.size();i++){
3782                 find_partial_fcns(sl_list[i], &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
3783         }
3784         wh_fcns_start = sl_fcns_end = partial_fcns.size();
3785         for(i=0;i<where.size();i++){
3786                 find_partial_fcns_pr(where[i]->pr, &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
3787         }
3788         gb_fcns_start = wh_fcns_end = partial_fcns.size();
3789         if(gb_tbl != NULL){
3790                 for(i=0;i<gb_tbl->size();i++){
3791                         find_partial_fcns(gb_tbl->get_def(i), &partial_fcns, NULL, NULL, Ext_fcns);
3792                 }
3793         }
3794         ag_fcns_start = gb_fcns_end = partial_fcns.size();
3795         if(aggr_tbl != NULL){
3796                 for(i=0;i<aggr_tbl->size();i++){
3797                         find_partial_fcns(aggr_tbl->get_aggr_se(i), NULL, NULL, &is_partial_fcn, Ext_fcns);
3798                 }
3799         }
3800         ag_fcns_end = partial_fcns.size();
3801
3802 //              Fill up the is_partial_fcn and fcn_ref_cnt arrays.
3803         if(is_aggr_query){
3804                 for(i=0; i<partial_fcns.size();i++){
3805                         fcn_ref_cnt.push_back(1);
3806                         is_partial_fcn.push_back(true);
3807                 }
3808         }
3809
3810 //              Unmark non-partial expensive functions referenced only once.
3811         for(i=0; i<partial_fcns.size();i++){
3812                 if(!is_partial_fcn[i] && fcn_ref_cnt[i] <= 1){
3813                         partial_fcns[i]->set_partial_ref(-1);
3814                 }
3815         }
3816
3817         node_name = normalize_name(node_name);
3818
3819         retval += generate_preamble(schema, /*int_fcn_defs,*/ node_name, schema_embed_str);
3820
3821         if(packed_return){              // generate unpack struct
3822                 vector<tablevar_t *> input_tbls = fs->get_input_tbls();
3823                 int schref = input_tbls[0]->get_schema_ref();
3824                 vector<string> refd_cols;
3825                 for(s=0;s<sl_list.size();++s){
3826                         gather_nicsafe_cols(sl_list[s],refd_cols, nicp, gb_tbl);
3827                 }
3828                 for(p=0;p<where.size();++p){
3829 //                              I'm not disabling these preds ...
3830                         gather_nicsafe_cols(where[p]->pr,refd_cols, nicp, gb_tbl);
3831                 }
3832                 if(gb_tbl){
3833                         for(g=0;g<gb_tbl->size();++g){
3834                           gather_nicsafe_cols(gb_tbl->get_def(g),refd_cols, nicp, gb_tbl);
3835                         }
3836                 }
3837                 sort(refd_cols.begin(), refd_cols.end());
3838                 retval += "struct "+node_name+"_input_struct{\n";
3839                 retval += "\tint __lfta_id_fm_nic__;\n";
3840                 int vsi;
3841                 for(vsi=0;vsi<refd_cols.size();++vsi){
3842                 data_type dt(schema->get_type_name(schref,refd_cols[vsi]));
3843                         retval+="\t"+dt.get_cvar_type()+" unpack_var_"+refd_cols[vsi]+";\n";
3844                 }
3845                 retval+="};\n\n";
3846         }
3847
3848
3849 /////////////////////////////////////////////////////
3850 //                      Common stuff unpacked, do some generation
3851
3852         if(is_aggr_query)
3853           retval += generate_aggr_struct(node_name, gb_tbl, aggr_tbl);
3854         if(is_fj)
3855                 retval += generate_fj_struct((filter_join_qpn *)fs, node_name);
3856
3857         retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, uses_bloom, schema);
3858         retval += generate_tuple_struct(node_name, sl_list) ;
3859
3860         if(is_aggr_query)
3861                 retval += generate_fta_flush(node_name, schema, Ext_fcns) ;
3862         if(param_tbl->size() > 0)
3863                 retval += generate_fta_load_params(node_name) ;
3864         retval += generate_fta_free(node_name, is_aggr_query) ;
3865         retval +=  generate_fta_control(node_name, schema, is_aggr_query) ;
3866         retval +=  generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, s_pids) ;
3867
3868
3869         /* extract the value of Time_Correlation from interface definition */
3870         int e,v;
3871         string es;
3872         unsigned time_corr;
3873         vector<tablevar_t *> tvec =  fs->get_input_tbls();
3874         vector<string> time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es);
3875         if (time_corr_vec.empty())
3876                 time_corr = DEFAULT_TIME_CORR;
3877         else
3878                 time_corr = atoi(time_corr_vec[0].c_str());
3879
3880         retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query) );
3881         retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) );
3882
3883   return(retval);
3884 }
3885
3886
3887
3888 int compute_snap_len(qp_node *fs, table_list *schema){
3889
3890 //              Initialize global vars
3891         gb_tbl = NULL;
3892         sl_list.clear(); where.clear();
3893
3894         if(fs->node_type() == "spx_qpn"){
3895                 spx_qpn *spx_node = (spx_qpn *)fs;
3896                 sl_list = spx_node->get_select_se_list();
3897                 where = spx_node->get_where_clause();
3898         }
3899         else if(fs->node_type() == "sgah_qpn"){
3900                 sgah_qpn *sgah_node = (sgah_qpn *)fs;
3901                 sl_list = sgah_node->get_select_se_list();
3902                 where = sgah_node->get_where_clause();
3903                 gb_tbl = sgah_node->get_gb_tbl();
3904         }
3905         else if(fs->node_type() == "filter_join"){
3906                 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
3907                 sl_list = fj_node->get_select_se_list();
3908                 where = fj_node->get_where_clause();
3909         } else{
3910                 fprintf(stderr,"INTERNAL ERROR, node type %s not recognized in compute_snap_len\n",fs->node_type().c_str());
3911                 exit(1);
3912         }
3913
3914 //                      Gather all column references, need to define unpacking variables.
3915   int w,s;
3916   col_id_set cid_set;
3917   col_id_set::iterator csi;
3918
3919   for(w=0;w<where.size();++w)
3920         gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
3921   for(s=0;s<sl_list.size();s++){
3922         gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
3923   }
3924
3925   int g;
3926   if(gb_tbl != NULL){
3927         for(g=0;g<gb_tbl->size();g++)
3928           gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
3929   }
3930
3931   //                    compute snap length
3932   int snap_len = -1;
3933   int n_snap=0;
3934   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
3935     int schref = (*csi).schema_ref;
3936         int tblref = (*csi).tblvar_ref;
3937     string field = (*csi).field;
3938
3939         param_list *field_params = schema->get_modifier_list(schref, field);
3940         if(field_params->contains_key("snap_len")){
3941                 string fld_snap_str = field_params->val_of("snap_len");
3942                 int fld_snap;
3943                 if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
3944                         if(fld_snap > snap_len) snap_len = fld_snap;
3945                         n_snap++;
3946                 }else{
3947                         fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
3948                 }
3949         }
3950   }
3951
3952   if(n_snap == cid_set.size()){
3953         return (snap_len);
3954   }else{
3955         return -1;
3956   }
3957
3958
3959 }
3960
3961 //              Function which computes an optimal
3962 //              set of unpacking functions.
3963
3964 void find_optimal_unpack_fcns(col_id_set &upref_cids, table_list *Schema, map<col_id, string,lt_col_id> &ucol_fcn_map){
3965         map<string, int> pfcn_count;
3966         map<string, int>::iterator msii;
3967         col_id_set::iterator cisi;
3968         set<string>::iterator ssi;
3969         string best_fcn;
3970
3971         while(ucol_fcn_map.size() < upref_cids.size()){
3972
3973 //                      Gather unpack functions referenced by unaccounted-for
3974 //                      columns, and increment their reference count.
3975                 pfcn_count.clear();
3976                 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
3977                         if(ucol_fcn_map.count((*cisi)) == 0){
3978                                 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
3979                                 for(ssi=ufcns.begin();ssi!=ufcns.end();++ssi)
3980                                         pfcn_count[(*ssi)]++;
3981                         }
3982                 }
3983
3984 //              Get the lowest cost per field function.
3985                 float min_cost = 0.0;
3986                 string best_fcn = "";
3987                 for(msii=pfcn_count.begin();msii!=pfcn_count.end();++msii){
3988                         int fcost = Schema->get_ufcn_cost((*msii).first);
3989                         if(fcost < 0){
3990                                 fprintf(stderr,"CONFIGURATION ERROR, unpack function %s either has negative cost or is not defined.\n",(*msii).first.c_str());
3991                                 exit(1);
3992                         }
3993                         float this_cost = (1.0*fcost)/(*msii).second;
3994                         if(msii == pfcn_count.begin() || this_cost < min_cost){
3995                                 min_cost = this_cost;
3996                                 best_fcn = (*msii).first;
3997                         }
3998                 }
3999                 if(best_fcn == ""){
4000                         fprintf(stderr,"ERROR, could not find a best field unpqacking function.\n");
4001                         exit(1);
4002                 }
4003
4004 //              Assign this function to the unassigned fcns which use it.
4005                 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
4006                         if(ucol_fcn_map.count((*cisi)) == 0){
4007                                 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
4008                                 if(ufcns.count(best_fcn)>0)
4009                                         ucol_fcn_map[(*cisi)] = best_fcn;
4010                         }
4011                 }
4012         }
4013 }
4014
4015
4016
4017 //              Generate an initial test test for the lfta
4018 //              Assume that the predicate references no external functions,
4019 //              and especially no partial functions,
4020 //              aggregates, internal functions.
4021 string generate_lfta_prefilter(vector<cnf_set *> &pred_list,
4022                 col_id_set &temp_cids, table_list *Schema, ext_fcn_list *Ext_fcns,
4023                 vector<col_id_set> &lfta_cols, vector<long long int> &lfta_sigs,
4024                 vector<int> &lfta_snap_lens, string iface){
4025   col_id_set tmp_cid_set, cid_set,upref_cids,upall_cids;
4026   col_id_set::iterator csi;
4027         int o,p,q;
4028         string ret;
4029
4030 //              Gather complex literals in the prefilter.
4031         cplx_lit_table *complex_literals = new cplx_lit_table();
4032         for(p=0;p<pred_list.size();++p){
4033                 find_complex_literal_pr(pred_list[p]->pr,Ext_fcns, complex_literals);
4034         }
4035
4036
4037 //              Find the combinable predicates
4038         vector<predicate_t *> pr_list;
4039         for(p=0;p<pred_list.size();++p){
4040         find_combinable_preds(pred_list[p]->pr,&pr_list, Schema, Ext_fcns);
4041         }
4042
4043 //              Analyze the combinable predicates to find the predicate classes.
4044         pred_class.clear();             // idx to equiv pred in equiv_list
4045         pred_pos.clear();               // idx to returned bitmask.
4046         vector<predicate_t *> equiv_list;
4047         vector<int> num_equiv;
4048
4049
4050         for(p=0;p<pr_list.size();++p){
4051                 for(q=0;q<equiv_list.size();++q){
4052                         if(is_equivalent_class_pred_base(equiv_list[q],pr_list[p],Schema,Ext_fcns))
4053                                 break;
4054                 }
4055                 if(q == equiv_list.size()){             // no equiv : create new
4056                         pred_class.push_back(equiv_list.size());
4057                         equiv_list.push_back(pr_list[p]);
4058                         pred_pos.push_back(0);
4059                         num_equiv.push_back(1);
4060
4061                 }else{                  // pr_list[p] is equivalent to pred q
4062                         pred_class.push_back(q);
4063                         pred_pos.push_back(num_equiv[q]);
4064                         num_equiv[q]++;
4065                 }
4066         }
4067
4068 //              Generate the variables which hold the common pred handles
4069         ret += "/*\t\tprefilter global vars.\t*/\n";
4070         for(q=0;q<equiv_list.size();++q){
4071                 for(p=0;p<=(num_equiv[q]/32);++p){
4072                         ret += "void *pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface+";\n";
4073                 }
4074         }
4075
4076 //              Struct to hold prefilter complex literals
4077         ret += "struct prefilter_complex_lit_struct_"+iface+" {\n";
4078         if(complex_literals->size() == 0)
4079                 ret += "\tint no_variable;\n";
4080         int cl;
4081         for(cl=0;cl<complex_literals->size();cl++){
4082                 literal_t *l = complex_literals->get_literal(cl);
4083                 data_type *dtl = new data_type( l->get_type() );
4084                 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
4085                 ret += tmpstr;
4086         }
4087         ret += "} prefilter_complex_lits_"+iface+";\n\n";
4088
4089
4090 //              Generate the prefilter initialziation code
4091         ret += "void init_lfta_prefilter_"+iface+"(){\n";
4092
4093 //              First initialize complex literals, if any.
4094         ret += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4095         for(cl=0;cl<complex_literals->size();cl++){
4096                 literal_t *l = complex_literals->get_literal(cl);
4097                 sprintf(tmpstr,"&(t->complex_literal_%d)",cl);
4098                 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
4099         }
4100
4101
4102         set<int> epred_seen;
4103         for(p=0;p<pr_list.size();++p){
4104                 int q = pred_class[p];
4105 //printf("\tq=%d\n",q);
4106                 if(epred_seen.count(q)>0){
4107                         ret += "\tregister_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4108                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4109                         vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4110                         for(o=0;o<op_list.size();++o){
4111                                 if(! cl_op[o]){
4112                                         ret += generate_se_code(op_list[o],Schema)+", ";
4113                                 }
4114                         }
4115                         ret += "pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+","+int_to_string(pred_pos[p]%32)+");\n";
4116                         epred_seen.insert(q);
4117                 }else{
4118                         ret += "\tpref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+" = (void *)register_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4119                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4120                         vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4121                         for(o=0;o<op_list.size();++o){
4122                                 if(! cl_op[o]){
4123                                         ret += generate_se_code(op_list[o],Schema)+", ";
4124                                 }
4125                         }
4126                         ret += "NULL,"+int_to_string(pred_pos[p]%32)+");\n";
4127                         epred_seen.insert(q);
4128                 }
4129         }
4130         ret += "}\n\n";
4131
4132
4133
4134 //              Start on main body code generation
4135   ret+="gs_uint64_t lfta_prefilter_"+iface+"(void *pkt){\n";
4136
4137
4138 ///--------------------------------------------------------------
4139 ///             Generate and store the prefilter body,
4140 ///             reuse it for the snap length calculator
4141 ///-------------------------------------------------------------
4142         string body;
4143
4144     body += "\tstruct packet *p = (struct packet *)pkt;\n";
4145
4146
4147
4148 //              Gather the colids to store unpacked variables.
4149         for(p=0;p<pred_list.size();++p){
4150         gather_pr_col_ids(pred_list[p]->pr,tmp_cid_set, gb_tbl);
4151         }
4152
4153 //              make the col_ids refer to the base tables, and
4154 //              grab the col_ids with at least one unpacking function.
4155         for(csi=tmp_cid_set.begin();csi!=tmp_cid_set.end();++csi){
4156                 string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
4157                 col_id tmp_col_id;
4158                 tmp_col_id.field = (*csi).field;
4159                 tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
4160                 tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
4161                 cid_set.insert(tmp_col_id);
4162                 field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
4163                 if(fe->get_unpack_fcns().size()>0)
4164                         upref_cids.insert(tmp_col_id);
4165
4166
4167         }
4168
4169 //              Find the set of unpacking programs needed for the
4170 //              prefilter fields.
4171         map<col_id, string,lt_col_id>  ucol_fcn_map;
4172         find_optimal_unpack_fcns(upref_cids, Schema, ucol_fcn_map);
4173         set<string> pref_ufcns;
4174         map<col_id, string,lt_col_id>::iterator mcis;
4175         for(mcis=ucol_fcn_map.begin(); mcis!=ucol_fcn_map.end(); mcis++){
4176                 pref_ufcns.insert((*mcis).second);
4177         }
4178
4179
4180
4181 //                      Variables for unpacking attributes.
4182     body += "/*\t\tVariables for unpacking attributes\t*/\n";
4183     for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4184       int schref = (*csi).schema_ref;
4185           int tblref = (*csi).tblvar_ref;
4186       string field = (*csi).field;
4187       data_type dt(Schema->get_type_name(schref,field));
4188       sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4189         field.c_str(), tblref);
4190       body += tmpstr;
4191       sprintf(tmpstr,"\tgs_retval_t ret_%s_%d;\n", field.c_str(),tblref);
4192       body += tmpstr;
4193     }
4194 //                      Variables for unpacking temporal attributes.
4195     body += "/*\t\tVariables for unpacking temporal attributes\t*/\n";
4196     for(csi=temp_cids.begin(); csi!=temp_cids.end();++csi){
4197           if (cid_set.count(*csi) == 0) {
4198         int schref = (*csi).schema_ref;
4199                 int tblref = (*csi).tblvar_ref;
4200         string field = (*csi).field;
4201         data_type dt(Schema->get_type_name(schref,field));
4202         sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4203           field.c_str(), tblref);
4204         body += tmpstr;
4205
4206           }
4207     }
4208     body += "\n\n";
4209
4210 //              Variables for combinable predicate evaluation
4211         body += "/*\t\tVariables for common prdicate evaluation\t*/\n";
4212         for(q=0;q<equiv_list.size();++q){
4213                 for(p=0;p<=(num_equiv[q]/32);++p){
4214                         body += "unsigned long int pref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = 0;\n";
4215                 }
4216         }
4217
4218
4219 //                      Variables that are always needed
4220     body += "/*\t\tVariables which are always needed\t*/\n";
4221         body += "\tgs_uint64_t retval=0, bitpos=1;\n";
4222         body += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4223
4224 //              Call the unpacking functions for the prefilter fields
4225         if(pref_ufcns.size() > 0)
4226                 body += "\n/*\t\tcall field unpacking functions\t*/\n";
4227         set<string>::iterator ssi;
4228         for(ssi=pref_ufcns.begin(); ssi!=pref_ufcns.end(); ++ssi){
4229                 body += "\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
4230         }
4231
4232
4233 //              Unpack the accessed attributes
4234         body += "\n/*\t\tUnpack the accessed attributes.\t*/\n";
4235     for(csi=cid_set.begin();csi!=cid_set.end();++csi){
4236           int tblref = (*csi).tblvar_ref;
4237       int schref = (*csi).schema_ref;
4238           string field = (*csi).field;
4239           sprintf(tmpstr,"\tret_%s_%d =  (%s(p, &unpack_var_%s_%d) == 0);\n",
4240                 field.c_str(),tblref,Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
4241           body += tmpstr;
4242     }
4243
4244 //              next unpack the temporal attributes and ignore the errors
4245 //              We are assuming here that failed unpack of temporal attributes
4246 //              is not going to overwrite the last stored value
4247 //              Failed upacks are ignored
4248     for(csi=temp_cids.begin();csi!=temp_cids.end();++csi){
4249           int tblref = (*csi).tblvar_ref;
4250       int schref = (*csi).schema_ref;
4251           string field = (*csi).field;
4252           sprintf(tmpstr,"\t%s(p, &prefilter_temp_vars.unpack_var_%s_%d);\n",
4253                  Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
4254           body += tmpstr;
4255     }
4256
4257 //              Evaluate the combinable predicates
4258         if(equiv_list.size()>0)
4259                 body += "/*\t\tEvaluate the combinable predicates.\t*/\n";
4260         for(q=0;q<equiv_list.size();++q){
4261                 for(p=0;p<=(num_equiv[q]/32);++p){
4262
4263 //              Only call the common eval fcn if all ref'd fields present.
4264                         col_id_set pred_cids;
4265                         col_id_set::iterator cpi;
4266                         gather_pr_col_ids(equiv_list[q], pred_cids, gb_tbl);
4267                         if(pred_cids.size()>0){
4268                         body += "\tif(";
4269                                 for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
4270                                         if(cpi != pred_cids.begin())
4271                                                 body += " && ";
4272                                 string field = (*cpi).field;
4273                                         int tblref = (*cpi).tblvar_ref;
4274                                         body += "ret_"+field+"_"+int_to_string(tblref);
4275                                 }
4276                                 body+=")\n";
4277                         }
4278
4279                         body += "\t\tpref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = eval_commonpred_"+equiv_list[q]->get_op()+"(pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface;
4280                         vector<scalarexp_t *> op_list = equiv_list[q]->get_op_list();
4281                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4282                         for(o=0;o<op_list.size();++o){
4283                                 if(cl_op[o]){
4284                                         body += ","+generate_se_code(op_list[o],Schema);
4285                                 }
4286                         }
4287                         body += ");\n";
4288                 }
4289         }
4290
4291
4292         for(p=0;p<pred_list.size();++p){
4293                 col_id_set pred_cids;
4294                 col_id_set::iterator cpi;
4295                 gather_pr_col_ids(pred_list[p]->pr,pred_cids, gb_tbl);
4296                 if(pred_cids.size()>0){
4297                         body += "\tif(";
4298                         for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
4299                                 if(cpi != pred_cids.begin())
4300                                         body += " && ";
4301                         string field = (*cpi).field;
4302                                 int tblref = (*cpi).tblvar_ref;
4303                                 body += "ret_"+field+"_"+int_to_string(tblref);
4304                         }
4305                         body+=")\n";
4306                 }
4307         body += "\t\tif("+generate_predicate_code(pred_list[p]->pr,Schema)+")\n\t\t\tretval |= bitpos;\n";
4308                 body+="\tbitpos = bitpos << 1;\n";
4309         }
4310
4311 // ---------------------------------------------------------------
4312 //              Finished with the body of the prefilter
4313 // --------------------------------------------------------------
4314
4315         ret += body;
4316
4317 //                      Collect fields referenced by an lfta but not
4318 //                      already unpacked for the prefilter.
4319
4320 //printf("upref_cids is:\n");
4321 //for(csi=upref_cids.begin();csi!=upref_cids.end();csi++)
4322 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
4323 //printf("pref_ufcns is:\n");
4324 //for(ssi=pref_ufcns.begin();ssi!=pref_ufcns.end();++ssi)
4325 //printf("\t%s\n",(*ssi).c_str());
4326
4327         int l;
4328         for(l=0;l<lfta_cols.size();++l){
4329                 for(csi=lfta_cols[l].begin();csi!=lfta_cols[l].end();++csi){
4330                         string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
4331                         col_id tmp_col_id;
4332                         tmp_col_id.field = (*csi).field;
4333                         tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
4334                         tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
4335                         field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
4336                         set<string> fld_ufcns = fe->get_unpack_fcns();
4337 //printf("tmpcol is (%s, %d), ufcns size is %d, upref_cids cnt is %d\n",tmp_col_id.field.c_str(),tmp_col_id.schema_ref,fld_ufcns.size(),  upref_cids.count(tmp_col_id));
4338                         if(fld_ufcns.size()>0 && upref_cids.count(tmp_col_id) == 0){
4339 //              Ensure that this field not already unpacked.
4340                                 bool found = false;
4341                                 for(ssi=fld_ufcns.begin();ssi!=fld_ufcns.end();++ssi){
4342 //printf("\tField has unpacking fcn %s\n",(*ssi).c_str());
4343                                         if(pref_ufcns.count((*ssi))){
4344 //printf("Field already unpacked.\n");
4345                                                 found = true;;
4346                                         }
4347                                 }
4348                                 if(! found){
4349 //printf("\tadding to unpack list\n");
4350                                         upall_cids.insert(tmp_col_id);
4351                                 }
4352                         }
4353                 }
4354         }
4355
4356 //printf("upall_cids is:\n");
4357 //for(csi=upall_cids.begin();csi!=upall_cids.end();csi++)
4358 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
4359
4360 //              Get the set of unpacking programs for these.
4361         map<col_id, string,lt_col_id>  uall_fcn_map;
4362         find_optimal_unpack_fcns(upall_cids, Schema, uall_fcn_map);
4363         set<string> pall_ufcns;
4364         for(mcis=uall_fcn_map.begin(); mcis!=uall_fcn_map.end(); mcis++){
4365 //printf("uall_fcn_map[%s %d] = %s\n",(*mcis).first.field.c_str(),(*mcis).first.schema_ref,(*mcis).second.c_str());
4366                 pall_ufcns.insert((*mcis).second);
4367         }
4368
4369 //              Iterate through the remaining set of unpacking function
4370         if(pall_ufcns.size() > 0)
4371                 ret += "//\t\tcall all remaining field unpacking functions.\n";
4372         for(ssi=pall_ufcns.begin(); ssi!=pall_ufcns.end(); ++ssi){
4373 //              gather the set of columns unpacked by this ufcn
4374                 col_id_set fcol_set;
4375                 for(csi=upall_cids.begin();csi!=upall_cids.end();++csi){
4376                         if(uall_fcn_map[(*csi)] == (*ssi))
4377                                 fcol_set.insert((*csi));
4378                 }
4379
4380 //              gather the set of lftas which access a field unpacked by the fcn
4381                 set<long long int> clfta;
4382                 for(l=0;l<lfta_cols.size();l++){
4383                         for(csi=fcol_set.begin();csi!=fcol_set.end();++csi){
4384                                 if(lfta_cols[l].count((*csi)) > 0)
4385                                         break;
4386                         }
4387                         if(csi != fcol_set.end())
4388                                 clfta.insert(lfta_sigs[l]);
4389                 }
4390
4391 //              generate the unpacking code
4392                 ret += "\tif(";
4393                 set<long long int>::iterator sii;
4394                 for(sii=clfta.begin();sii!=clfta.end();++sii){
4395                         if(sii!=clfta.begin())
4396                                 ret += " || ";
4397                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sii),(*sii));
4398                         ret += tmpstr;
4399                 }
4400                 ret += ")\n\t\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
4401         }
4402
4403
4404     ret += "\treturn(retval);\n\n";
4405   ret += "}\n\n";
4406
4407
4408 // --------------------------------------------------------
4409 //              reuse prefilter body for snaplen calculator
4410 //
4411 //      This is dummy code, so I'm commenting it out.
4412
4413 /*
4414   ret+="gs_uint32_t lfta_pkt_snaplen(void *pkt){\n";
4415
4416         ret += body;
4417
4418         int i;
4419         vector<int> s_snaps = lfta_snap_lens;
4420         sort(s_snaps.begin(), s_snaps.end());
4421
4422         if(s_snaps[0] == -1){
4423                 set<unsigned long long int> sigset;
4424                 for(i=0;i<lfta_snap_lens.size();++i){
4425                         if(lfta_snap_lens[i] == -1){
4426                                 sigset.insert(lfta_sigs[i]);
4427                         }
4428                 }
4429                 ret += "\tif( ";
4430                 set<unsigned long long int>::iterator sulli;
4431                 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
4432                         if(sulli!=sigset.begin())
4433                                 ret += " || ";
4434                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
4435                         ret += tmpstr;
4436                 }
4437                 ret += ") return -1;\n";
4438         }
4439
4440         int nextpos = lfta_snap_lens.size()-1;
4441         int nextval = lfta_snap_lens[nextpos];
4442         while(nextval >= 0){
4443                 set<unsigned long long int> sigset;
4444                 for(i=0;i<lfta_snap_lens.size();++i){
4445                         if(lfta_snap_lens[i] == nextval){
4446                                 sigset.insert(lfta_sigs[i]);
4447                         }
4448                 }
4449                 ret += "\tif( ";
4450                 set<unsigned long long int>::iterator sulli;
4451                 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
4452                         if(sulli!=sigset.begin())
4453                                 ret += " || ";
4454                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
4455                         ret += tmpstr;
4456                 }
4457                 ret += ") return "+int_to_string(nextval)+";\n";
4458
4459                 for(nextpos--;nextpos>0 && lfta_snap_lens[nextpos] == nextval;nextpos--);
4460                 if(nextpos>0)
4461                         nextval = lfta_snap_lens[nextpos];
4462                 else
4463                         nextval = -1;
4464         }
4465         ret += "\treturn 0;\n";
4466         ret += "}\n\n";
4467 */
4468
4469
4470   return(ret);
4471 }
4472
4473
4474
4475
4476 //              Generate the struct which will store the the values of
4477 //              temporal attributesunpacked by prefilter
4478 string generate_lfta_prefilter_struct(col_id_set &cid_set, table_list *Schema) {
4479
4480   col_id_set::iterator csi;
4481
4482 // printf("generate_lfta_prefilter_struct : %d vars\n",cid_set.size());
4483
4484   string ret="struct prefilter_unpacked_temp_vars {\n";
4485   ret += "\t/*\tVariables for unpacking temporal attributes\t*/\n";
4486
4487   string init_code;
4488
4489   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4490     int schref = (*csi).schema_ref;
4491     int tblref = (*csi).tblvar_ref;
4492     string field = (*csi).field;
4493         data_type dt(Schema->get_type_name(schref,field), Schema->get_modifier_list(schref,field));
4494     sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4495         field.c_str(), tblref);
4496     ret += tmpstr;
4497
4498         if (init_code != "")
4499                 init_code += ", ";
4500         if (dt.is_increasing())
4501                 init_code += dt.get_min_literal();
4502         else
4503                 init_code += dt.get_max_literal();
4504
4505   }
4506   ret += "};\n\n";
4507
4508   ret += "struct prefilter_unpacked_temp_vars prefilter_temp_vars = {" + init_code + "};\n\n";
4509
4510   return(ret);
4511 }