61872c4a5f850a2df7f3493fe6c8fabd2c6a3816
[com/gs-lite.git] / src / ftacmp / generate_lfta_code.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #include <string>
17 #include <stdio.h>
18 #include <stdlib.h>
19 //#include <algo.h>
20 #include<algorithm>
21
22 #include "parse_fta.h"
23 #include "parse_schema.h"
24 #include "analyze_fta.h"
25 #include "generate_utils.h"
26 #include "query_plan.h"
27 #include "generate_lfta_code.h"
28 #include "generate_nic_code.h"
29
30 using namespace std;
31
32 extern int DEFAULT_LFTA_HASH_TABLE_SIZE;
33
34 // default value for correlation between the interface card and
35 // the system clock
36 #define DEFAULT_TIME_CORR 16
37
38
39 //      For fast hashing
40 //#define NRANDS 100
41 extern string hash_nums[NRANDS];
42 /*
43 = {
44 "12916008961267169387ull", "13447227858232756685ull",
45 "15651770379918602919ull", "1154671861688431608ull",
46 "6777078091984849858ull", "14217205709582564356ull",
47 "4955408621820609982ull", "15813680319165523695ull",
48 "9897969721407807129ull", "5799700135519793083ull",
49 "3446529189623437397ull", "2766403683465910630ull",
50 "3759321430908793328ull", "6569396511892890354ull",
51 "11124853911180290924ull", "17425412145238035549ull",
52 "6879931585355039943ull", "16598635011539670441ull",
53 "9615975578494811651ull", "4378135509538422740ull",
54 "741282195344332574ull", "17368612862906255584ull",
55 "17294299200556814618ull", "518343398779663051ull",
56 "3861893449302272757ull", "8951107288843549591ull",
57 "15785139392894559409ull", "5917810836789601602ull",
58 "16169988133001117004ull", "9792861259254509262ull",
59 "5089058010244872136ull", "2130075224835397689ull",
60 "844136788226150435ull", "1303298091153875333ull",
61 "3579898206894361183ull", "7529542662845336496ull",
62 "13151949992653382522ull", "2145333536541545660ull",
63 "11258221828939586934ull", "3741808146124570279ull",
64 "16272841626371307089ull", "12174572036188391283ull",
65 "9749343496254107661ull", "9141275584134508830ull",
66 "10134192232065698216ull", "12944268412561423018ull",
67 "17499725811865666340ull", "5281482378159088661ull",
68 "13254803486023572607ull", "4526762838498717025ull",
69 "15990846379668494011ull", "10680949816169027468ull",
70 "7116154096012931030ull", "5296740689865236632ull",
71 "5222427027515795922ull", "6893215299448261251ull",
72 "10164707755932877485ull", "15325979189512082255ull",
73 "3713267224148573289ull", "12292682741753167354ull",
74 "4098115959960163588ull", "16095675565885113990ull",
75 "11391590846210510720ull", "8432889531466002673ull",
76 "7146668520368482523ull", "7678169991822407997ull",
77 "9882712513525031447ull", "13904414563513869160ull",
78 "1080076724395768626ull", "8448147843172150388ull",
79 "17633093729608185134ull", "10044622457050142303ull",
80 "4128911859292425737ull", "30642269109444395ull",
81 "16124215396922640581ull", "15444089895060081110ull",
82 "16437006538696302944ull", "800338649777443426ull",
83 "5355794945275091932ull", "11656354278827687117ull",
84 "1110873718944691255ull", "10829576045617693977ull",
85 "3846916616884579955ull", "17055821716837625668ull",
86 "13418968402643535758ull", "11671612594828802128ull",
87 "11597298928184328586ull", "13196028510862205499ull",
88 "16539578557089782373ull", "3182048322921507591ull",
89 "10016080431267550241ull", "148751875162592690ull",
90 "10400930266590768572ull", "4023803397139127870ull",
91 "17766462746879108920ull", "14807761432134600873ull",
92 "13521540421053792403ull", "13980983198941385205ull",
93 "16257584414193564367ull", "1760484796451765024ull"
94 };
95 */
96
97
98 // ----------------------------------------------
99 //              Data extracted from the query plan node
100 //              for use by code generation.
101
102 static cplx_lit_table *complex_literals;  //Table of literals with constructors.
103 static vector<handle_param_tbl_entry *> param_handle_table;
104 static param_table *param_tbl;          // Table of all referenced parameters.
105
106 static vector<scalarexp_t *> sl_list;
107 static vector<cnf_elem *> where;
108
109 static gb_table *gb_tbl;                        // Table of all group-by attributes.
110 static aggregate_table *aggr_tbl;       // Table of all referenced aggregates.
111
112 static bool packed_return;              // unpack using structyure, not fcns
113 static nic_property *nicprop;   // nic properties for this interface.
114 static int global_id;
115
116
117 //              The partial_fcns vector can now refer to
118 //              partial functions, or expensive functions
119 //              which can be cached (if there are multiple refs).  A couple
120 //              of int vectors distinguish the cases.
121 static vector<scalarexp_t *> partial_fcns;
122 static vector<int> fcn_ref_cnt;
123 static vector<bool> is_partial_fcn;
124 int sl_fcns_start = 0, sl_fcns_end = 0;
125 int wh_fcns_start = 0, wh_fcns_end = 0;
126 int gb_fcns_start = 0, gb_fcns_end = 0;
127 int ag_fcns_start = 0, ag_fcns_end = 0;
128
129
130 //              These vectors are for combinable predicates.
131 static vector<int> pred_class;  //      identifies the group
132 static vector<int> pred_pos;    //  position in the group.
133
134
135
136 static char tmpstr[1000];
137
138 //////////////////////////////////////////////////////////////////////
139 ///                     Various utilities
140
141 string generate_fta_name(string node_name){
142         string ret = normalize_name(node_name);
143         if(ret == ""){
144                 ret = "default";
145         }
146         ret += "_fta";
147
148         return(ret);
149 }
150
151
152 string generate_aggr_struct_name(string node_name){
153         string ret = normalize_name(node_name);
154         if(ret == ""){
155                 ret = "default";
156         }
157         ret += "_aggr_struct";
158
159         return(ret);
160 }
161
162 string generate_fj_struct_name(string node_name){
163         string ret = normalize_name(node_name);
164         if(ret == ""){
165                 ret = "default";
166         }
167         ret += "_fj_struct";
168
169         return(ret);
170 }
171
172 string generate_watchlist_element_name(string node_name){
173         string ret = normalize_name(node_name);
174         if(ret == ""){
175                 ret = "default";
176         }
177         ret += "__wl_elem";
178
179         return(ret);
180 }
181
182 string generate_watchlist_struct_name(string node_name){
183         string ret = normalize_name(node_name);
184         if(ret == ""){
185                 ret = "default";
186         }
187         ret += "__wl_struct";
188
189         return(ret);
190 }
191
192 string generate_watchlist_name(string node_name){
193         string ret = normalize_name(node_name);
194         if(ret == ""){
195                 ret = "default";
196         }
197         ret += "__wl";
198
199         return(ret);
200 }
201
202 string generate_unpack_code(int tblref, int schref, string field, table_list *schema, string node_name, string end_goto = string("end")){
203         string ret;
204         if(! packed_return){
205                 sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",
206                         schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
207         ret += tmpstr;
208         if(!schema->get_modifier_list(schref,field)->contains_key("required"))
209                 ret += "\tif(retval) goto "+end_goto+";\n";
210
211         }else{
212 //                      TODO: ntoh xforms (aug 2010 : removing ntoh, hton)
213                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
214                 if(dt.is_buffer_type()){
215                         if(dt.get_type() != v_str_t){
216                           ret += "\tif(sizeof(struct "+node_name+"_input_struct)+"+node_name+"_input_struct_var->unpack_var_"+field+".length+int("+node_name+"_input_struct_var->unpack_var_"+field+".data) > sz)\n";
217                           ret += "\t\tgoto "+end_goto+";\n";
218                           ret+= "\t\t"+node_name+"_input_struct_var->unpack_var_"+field+".data += "+node_name+"_input_struct_var->unpack_var_"+field+".length;\n";
219                           ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
220                                 " = "+node_name+"_input_struct_var->unpack_var_"+field+";+\n";
221                         }else{
222                                 fprintf(stderr,"INTERNAL ERROR buffer type not string type in generate_lfta_code.cc:generate_unpack_code\n");
223                                 exit(1);
224                         }
225                 }else{
226                         ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
227                                 " = "+node_name+"_input_struct_var->unpack_var_"+field+";\n";
228                 }
229         }
230         return ret;
231 }
232
233 string generate_aggr_struct(string node_name, gb_table *gb_tbl, aggregate_table *aggr_tbl){
234   string ret = "struct " + generate_aggr_struct_name(node_name) + "{\n";
235
236   int g;
237   for(g=0;g<gb_tbl->size();g++){
238           sprintf(tmpstr,"gb_var%d",g);
239           ret += "\t"+gb_tbl->get_data_type(g)->make_cvar(tmpstr)+";\n";
240   }
241
242   int a;
243   for(a=0;a<aggr_tbl->size();a++){
244           ret += "\t";
245           sprintf(tmpstr,"aggr_var%d",a);
246           if(aggr_tbl->is_builtin(a))
247                 ret+="\t"+aggr_tbl->get_data_type(a)->make_cvar(tmpstr)+";\n";
248           else
249                 ret+="\t"+aggr_tbl->get_storage_type(a)->make_cvar(tmpstr)+";\n";
250   }
251
252 /*
253   ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *next;\n";
254 */
255
256   ret += "};\n\n";
257
258   return(ret);
259 }
260
261
262 string generate_fj_struct(filter_join_qpn *fs, string node_name ){
263   string ret;
264
265   if(fs->use_bloom == false){   // uses hash table instead
266         ret = "struct " + generate_fj_struct_name(node_name) + "{\n";
267         int k;
268         for(k=0;k<fs->hash_eq.size();++k){
269                 sprintf(tmpstr,"key_var%d",k);
270                 ret += "\t"+fs->hash_eq[k]->pr->get_right_se()->get_data_type()->make_cvar(tmpstr)+";\n";
271         }
272         ret += "\tlong long int ts;\n";
273     ret += "};\n\n";
274   }
275
276   return(ret);
277 }
278
279 string generate_watchlist_structs(string node_name, table_def *tbl,
280                 std::string filename, int refresh_interval){
281         string ret;
282
283         ret += "struct "+generate_watchlist_element_name(node_name)+"{\n";
284         vector<field_entry *> fields = tbl->get_fields();
285         for(int f=0;f<fields.size();++f){
286                 data_type dt(fields[f]->get_type());
287                 ret += "\t"+dt.make_cvar(fields[f]->get_name())+";\n";
288         }
289         ret += "\tgs_uint64_t hashval;\n";
290         ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next;\n";
291         ret += "};\n\n";
292
293         ret += "char *"+generate_watchlist_name(node_name)+"__fstr = \""+filename+"\";\n";
294         ret += "struct "+generate_watchlist_struct_name(node_name)+"{\n";
295         ret += "\tstruct "+ generate_watchlist_element_name(node_name)+" **ht;\n";
296         ret += "\tgs_uint32_t ht_size;\n";
297         ret += "\tgs_uint32_t n_elem;\n";
298         ret += "\tgs_uint32_t refresh_interval;\n";
299         ret += "\ttime_t next_refresh;\n";
300         ret += "\ttime_t last_mtime;\n";        
301         ret += "\tchar *filename;\n";
302         ret += "} "+generate_watchlist_name(node_name)+" = { NULL, 0, 0, "+std::to_string(refresh_interval)+", 0, 0, NULL};\n\n";
303
304         return ret;
305 }
306
307 string generate_watchlist_load(string node_name, table_def *tbl, vector<string> keys){
308         string ret;
309         string tgt = generate_watchlist_name(node_name);
310         vector<field_entry *> fields = tbl->get_fields();
311
312         ret += "void reload_watchlist__"+node_name+"(){\n";
313         ret += "\tgs_uint32_t i;\n";
314         ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *ptr = NULL;\n";
315         ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next = NULL;\n";
316         ret += "\tFILE *fl;\n";
317         ret += "\tchar buf[10000];\n";
318         ret += "\tgs_uint32_t buflen = 10000;\n";
319         ret += "\tchar *flds["+std::to_string(fields.size())+"];\n";
320         ret += "\tgs_uint32_t pos, f, linelen, malformed;\n";
321         ret += "\tgs_uint32_t n_malformed, short_lines, toolong_lines, ok;\n";
322         ret += "\tgs_uint64_t hash, bucket;\n";
323         ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *rec;\n\n";
324
325         ret += "//  make sure watchlist file has changed since the last time we loaded it\n";
326     ret += "\tstruct stat file_stat;\n";
327     ret += "\tint err = stat(" + tgt + ".filename, &file_stat);\n";
328     ret += "\tif (err) {\n";
329     ret += "\t\tgslog(LOG_INFO,\"Warning, unable to stat() watchlist file %s to reload " + node_name + ", continue using old version\\n\", " + tgt + ".filename);\n";
330     ret += "\t\treturn;\n";
331     ret += "\t}\n";
332         ret += "\tif (file_stat.st_mtime <= " + tgt + ".last_mtime && file_stat.st_ctime <= " + tgt + ".last_mtime)     // watchlist file hasn't changed since last time\n";
333         ret += "\t\treturn;\n";
334         ret += "\t" + tgt + ".last_mtime = (file_stat.st_mtime>file_stat.st_ctime)?file_stat.st_mtime:file_stat.st_ctime;\n\n";
335
336         ret += "//  Delete old entries.\n";
337         ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n";
338         ret += "\t\tptr="+tgt+".ht[i];\n";
339         ret += "\t\twhile(ptr!=NULL){\n";
340         for(int f=0;f<fields.size();++f){
341                 data_type dt(fields[f]->get_type());
342                 if(dt.is_buffer_type()){
343                         ret += "\t\t\t"+dt.get_buffer_destroy()+"(&(ptr->"+fields[f]->get_name()+"));\n";
344                 }
345         }
346         ret += "\t\t\tnext = ptr->next;\n";
347         ret += "\t\t\tfree(ptr);\n";
348         ret += "\t\t\tptr = next;\n";
349         ret += "\t\t}\n";
350         ret += "\t}\n";
351         ret += "\n// prepare new table. \n";
352         ret += "\tif("+tgt+".n_elem > "+tgt+".ht_size || "+tgt+".ht_size==0){\n";
353         ret += "\t\tif("+tgt+".ht)\n";
354         ret += "\t\t\tfree("+tgt+".ht);\n";
355         ret += "\t\tif("+tgt+".ht_size == 0)\n";
356         ret += "\t\t\t"+tgt+".ht_size = 100000;\n";
357         ret += "\t\telse\n";    
358         ret += "\t\t\t"+tgt+".ht_size = "+tgt+".n_elem;\n";
359         ret += "\t\t"+tgt+".ht = (struct "+generate_watchlist_element_name(node_name)+" **)malloc("+tgt+".ht_size * sizeof(struct "+generate_watchlist_element_name(node_name)+" *));\n";
360         ret += "\t}\n";
361         ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n";
362         ret += "\t\t"+tgt+".ht[i] = NULL;\n";
363         ret += "\t}\n";
364         ret += "\n// load new table\n";
365         ret += "\t"+tgt+".n_elem = 0;\n";
366         ret += "\tfl = fopen("+tgt+".filename, \"r\");\n";
367         ret += "\tif(fl==NULL){\n";
368         ret += "\t\tgslog(LOG_INFO,\"Warning, can't open file %s for watchlist "+node_name+"\\n\","+tgt+".filename);\n";
369         ret += "\t\treturn;\n";
370         ret += "\t}\n";
371         ret += "\tmalformed = 0;\n";
372         ret += "\tshort_lines = 0;\n";
373         ret += "\ttoolong_lines = 0;\n";
374         ret += "\twhile(fgets(buf, buflen, fl) != NULL){\n";
375         ret += "\t\tlinelen = strlen(buf);\n";
376         ret += "\t\tbuf[linelen-1]='\\0';       // strip off trailing newline\n";
377         ret += "\t\tlinelen--;\n";
378
379         ret += "\t\tpos=0;\n";
380         ret += "\t\tmalformed=0;\n";
381         ret += "\t\tok=1;\n";
382         ret += "\t\tflds[0] = buf;\n";
383         ret += "\t\tfor(f=1;pos < linelen && f<"+std::to_string(fields.size())+";++f){\n";
384         ret += "\t\t\tfor(;pos < linelen && buf[pos]!=',' && buf[pos]!='\\n';++pos);\n";
385         ret += "\t\t\tif(pos >= linelen){\n";
386         ret += "\t\t\t\tmalformed = 1;\n";
387         ret += "\t\t\t\tbreak;\n";
388         ret += "\t\t\t}\n";
389         ret += "\t\t\tbuf[pos]='\\0';\n";
390         ret += "\t\t\tpos++;\n";
391         ret += "\t\t\tflds[f]=buf+pos;\n";
392         ret += "\t\t}\n";
393         ret += "\t\tif(malformed){\n";
394         ret += "\t\t\tok=0;\n";
395         ret += "\t\t\tn_malformed++;\n";
396         ret += "\t\t}\n";
397         ret += "\t\tif(f<"+std::to_string(fields.size())+"){\n";
398         ret += "\t\t\tok=0;\n";
399         ret += "\t\t\tshort_lines++;\n";
400         ret += "\t\t}\n";
401         ret += "\t\tif(pos && (pos<linelen)){\n";
402         ret += "\t\t\tok=0;\n";
403         ret += "\t\t\ttoolong_lines++;\n";
404         ret += "\t\t}\n";
405         ret += "\t\tif(f>="+std::to_string(fields.size())+"){\n";
406         ret += "\t\t\trec = (struct "+generate_watchlist_element_name(node_name)+" *)malloc(sizeof(struct "+generate_watchlist_element_name(node_name)+"));\n";
407 //              Extract fields
408         for(int f=0;f<fields.size();++f){
409                 data_type dt(fields[f]->get_type());
410                 ret += "\t\t\t"+dt.get_wl_extract_fcn()+"(flds["+std::to_string(f)+"], &(rec->"+fields[f]->get_name()+"));\n";
411         }
412 //              Compute the hash value
413         ret += "\t\t\thash=0;\n";
414         for(int k=0;k<keys.size();++k){
415                 string key_fld = keys[k];
416                 int f;
417                 for(f=0;f<fields.size();++f){
418                         if(fields[f]->get_name() == key_fld)
419                                 break;
420                 }
421                 data_type dt(fields[f]->get_type());
422
423                 ret +=
424 "\t\t\thash ^= (("+hash_nums[f%NRANDS]+" * lfta_"+
425                         dt.get_type_str()+"_to_hash(rec->"+fields[f]->get_name()+")));\n";
426                 }
427         ret += "\t\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n";
428         ret += "\t\t\trec->hashval = hash;\n";
429         ret += "\t\t\trec->next = "+tgt+".ht[bucket];\n";
430         ret += "\t\t\t"+tgt+".ht[bucket] = rec;\n";
431         ret += "\t\t\t"+tgt+".n_elem++;\n";
432
433         ret += "\t\t}\n";
434         ret += "\t}\n";
435         ret += "\tif(n_malformed+toolong_lines > 0){\n";
436         ret += "\t\tgslog(LOG_INFO,\"Errors reading data for watchlist "+node_name+" from file %s: malformed=%d, too short=%d, too long=%d\\n\","+tgt+".filename, malformed, short_lines, toolong_lines);\n";
437         ret += "\t}\n";
438         ret += "}\n\n";
439
440         return ret;
441 }
442
443
444
445                 
446 string generate_fta_struct(string node_name, gb_table *gb_tbl,
447                 aggregate_table *aggr_tbl, param_table *param_tbl,
448                 cplx_lit_table *complex_literals,
449                 vector<handle_param_tbl_entry *> &param_handle_table,
450                 bool is_aggr_query, bool is_fj, bool is_wj, bool uses_bloom,
451                 table_list *schema){
452
453         string ret = "struct " + generate_fta_name(node_name) + "{\n";
454         ret += "\tstruct FTA f;\n";
455
456 //-------------------------------------------------------------
457 //              Aggregate-specific fields
458
459         if(is_aggr_query){
460 /*
461                 ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_head, *flush_head;\n";
462 */
463                 ret+="\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_table; // the groups\n";
464                 ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n";
465 //              ret+="\tint bitmap_size;\n";
466                 ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n";
467                 ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n";
468                 ret += "\tint max_windows; // max number of open windows.\n";
469                 ret += "\tunsigned int generation; // initially zero, increment on\n";
470                 ret += "\t     // every hash table flush - whether regular or induced.\n";
471                 ret += "\t     // Old groups are identified by a generation mismatch.\n";
472                 ret += "\tunsigned int flush_pos; // next aggr_table entry to examine\n";
473                 ret += "\tunsigned int flush_ctr; // control slow flushing\n";
474
475
476
477                 int g;
478                 bool uses_temporal_flush = false;
479                 for(g=0;g<gb_tbl->size();g++){
480                         data_type *dt = gb_tbl->get_data_type(g);
481                         if(dt->is_temporal()){
482 /*
483                                 fprintf(stderr,"group by attribute %s is temporal, ",
484                                                 gb_tbl->get_name(g).c_str());
485                                 if(dt->is_increasing()){
486                                         fprintf(stderr,"increasing.\n");
487                                 }else{
488                                         fprintf(stderr,"decreasing.\n");
489                                 }
490 */
491                                 data_type *gdt = gb_tbl->get_data_type(g);
492                                 if(gdt->is_buffer_type()){
493                                         fprintf(stderr, "\t but temporal BUFFER types are not supported, skipping.\n");
494                                 }else{
495                                         sprintf(tmpstr,"\t%s last_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
496                                         ret += tmpstr;
497                                         sprintf(tmpstr,"\t%s flush_start_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
498                                         ret += tmpstr;
499                                         sprintf(tmpstr,"\t%s last_flushed_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
500                                         ret += tmpstr;
501                                         uses_temporal_flush = true;
502                                 }
503
504                         }
505
506                 }
507                 if(! uses_temporal_flush){
508                         fprintf(stderr,"Warning: no temporal flush.\n");
509                 }
510         }
511
512 // ---------------------------------------------------------
513 //                      Filter-join specific fields
514
515         if(is_fj){
516                 if(uses_bloom){
517                         ret +=
518 "\tunsigned char * bf_table; //array of bloom filters with layout \n"
519 "\t\t// bit 0 bf 0| bit 0 bf 1| bit 0 bf 2| bit 1 bf 0| bit 1 bf 1|.....\n"
520 "\tint first_exec;\n"
521 "\tlong long int last_bin;\n"
522 "\tint last_bloom_pos;\n"
523 "\n"
524 ;
525                 }else{          // limited hash table
526                         ret +=
527 "  struct "+generate_fj_struct_name(node_name)+" *join_table;\n"
528 "\n"
529 ;
530                 }
531
532         }
533
534 // --------------------------------------------
535 //              watchlist-join specific
536         if(is_wj){
537                 ret += "\ttime_t ux_time;\n";
538         }
539
540 //--------------------------------------------------------
541 //                      Common fields
542
543 //                      Create places to hold the parameters.
544         int p;
545         vector<string> param_vec = param_tbl->get_param_names();
546         for(p=0;p<param_vec.size();p++){
547                 data_type *dt = param_tbl->get_data_type(param_vec[p]);
548                 sprintf(tmpstr,"\t%s param_%s;\n",dt->get_cvar_type().c_str(),
549                                 param_vec[p].c_str());
550                 ret += tmpstr;
551                 if(param_tbl->handle_access(param_vec[p])){
552                         ret += "\tstruct search_handle *param_handle_"+param_vec[p]+";\n";
553                 }
554         }
555
556 //                      Create places to hold complex literals.
557         int cl;
558         for(cl=0;cl<complex_literals->size();cl++){
559                 literal_t *l = complex_literals->get_literal(cl);
560                 data_type *dtl = new data_type( l->get_type() );
561                 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
562                 ret += tmpstr;
563         }
564
565 //                      Create places to hold the pass-by-handle parameters.
566         for(p=0;p<param_handle_table.size();++p){
567                 sprintf(tmpstr,"\tgs_param_handle_t handle_param_%d;\n",p);
568                 ret += tmpstr;
569         }
570
571 //                      Create places to hold the last values of temporal
572 //                      attributes referenced in select clause
573 //                      we also need to store values of the temoral attributed
574 //                      of last flushed tuple in aggr queries
575 //                      to make sure we generate the cirrect temporal tuple
576 //                      in the presense of slow flushes
577
578
579         col_id_set temp_cids;           //      col ids of temp attributes in select clause
580
581         int s;
582         col_id_set::iterator csi;
583
584         for(s=0;s<sl_list.size();s++){
585                 data_type *sdt = sl_list[s]->get_data_type();
586                 if (sdt->is_temporal()) {
587                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
588                 }
589         }
590
591         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
592                 int tblref = (*csi).tblvar_ref;
593                 int schref = (*csi).schema_ref;
594                 string field = (*csi).field;
595                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
596                 sprintf(tmpstr,"\t%s last_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
597                 ret += tmpstr;
598         }
599
600         ret += "\tgs_uint64_t trace_id;\n\n";
601
602 //      Fields to store the runtime stats
603
604         ret += "\tgs_uint32_t in_tuple_cnt;\n";
605         ret += "\tgs_uint32_t out_tuple_cnt;\n";
606         ret += "\tgs_uint32_t out_tuple_sz;\n";
607         ret += "\tgs_uint32_t accepted_tuple_cnt;\n";
608         ret += "\tgs_uint64_t cycle_cnt;\n";
609         ret += "\tgs_uint32_t collision_cnt;\n";
610         ret += "\tgs_uint32_t eviction_cnt;\n";
611         ret += "\tgs_float_t sampling_rate;\n";
612
613
614
615         ret += "};\n\n";
616
617         return(ret);
618 }
619
620 //------------------------------------------------------------
621 //              Set colref tblvars to 0..
622 //              (special processing for join-like operators in an lfta).
623
624 void reset_se_col_ids_tblvars(scalarexp_t *se,  gb_table *gtbl){
625         vector<scalarexp_t *> operands;
626         int o;
627
628         if(! se)
629                 return;
630
631         switch(se->get_operator_type()){
632         case SE_LITERAL:
633         case SE_PARAM:
634         case SE_IFACE_PARAM:
635                 return;
636         case SE_UNARY_OP:
637                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
638                 return;
639         case SE_BINARY_OP:
640                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
641                 reset_se_col_ids_tblvars(se->get_right_se(),gtbl);
642                 return;
643         case SE_COLREF:
644                 if(! se->is_gb() ){
645                         se->get_colref()->set_tablevar_ref(0);
646                 }else{
647                         if(gtbl==NULL){
648                                 fprintf(stderr,"INTERNAL ERROR: gbvar ref in gather_se_col_ids, but gtbl is NULL.\n");
649                                 exit(1);
650                         }
651                         reset_se_col_ids_tblvars(gtbl->get_def(se->get_gb_ref()),gtbl);
652                 }
653                 return;
654         case SE_AGGR_STAR:
655                 return;
656         case SE_AGGR_SE:
657                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
658                 return;
659         case SE_FUNC:
660                 operands = se->get_operands();
661                 for(o=0;o<operands.size();o++){
662                         reset_se_col_ids_tblvars(operands[o], gtbl);
663                 }
664                 return;
665         default:
666                 fprintf(stderr,"INTERNAL ERROR in reset_se_col_ids_tblvars, line %d, character %d: unknown operator type %d\n",
667                                 se->get_lineno(), se->get_charno(),se->get_operator_type());
668                 exit(1);
669         }
670 }
671
672
673 //              reset  column tblvars accessed in this pr.
674
675 void reset_pr_col_ids_tblvars(predicate_t *pr,  gb_table *gtbl){
676         vector<scalarexp_t *> op_list;
677         int o;
678
679         switch(pr->get_operator_type()){
680         case PRED_IN:
681                 reset_se_col_ids_tblvars(pr->get_left_se(), gtbl);
682                 return;
683         case PRED_COMPARE:
684                 reset_se_col_ids_tblvars(pr->get_left_se(),gtbl) ;
685                 reset_se_col_ids_tblvars(pr->get_right_se(),gtbl) ;
686                 return;
687         case PRED_UNARY_OP:
688                 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
689                 return;
690         case PRED_BINARY_OP:
691                 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
692                 reset_pr_col_ids_tblvars(pr->get_right_pr(),gtbl) ;
693                 return;
694         case PRED_FUNC:
695                 op_list = pr->get_op_list();
696                 for(o=0;o<op_list.size();++o){
697                         reset_se_col_ids_tblvars(op_list[o],gtbl) ;
698                 }
699                 return;
700         default:
701                 fprintf(stderr,"INTERNAL ERROR in reset_pr_col_ids_tblvars, line %d, character %d, unknown predicate operator type %d\n",
702                         pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
703         }
704 }
705
706
707
708
709 //                      Generate code that makes reference
710 //                      to the tuple, and not to any aggregates.
711 static string generate_se_code(scalarexp_t *se,table_list *schema){
712         string ret;
713     data_type *ldt, *rdt;
714         int o;
715         vector<scalarexp_t *> operands;
716
717
718         switch(se->get_operator_type()){
719         case SE_LITERAL:
720                 if(se->is_handle_ref()){
721                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
722                         ret = tmpstr;
723                         return(ret);
724                 }
725                 if(se->get_literal()->is_cpx_lit()){
726                         sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
727                         ret = tmpstr;
728                         return(ret);
729                 }
730                 return(se->get_literal()->to_C_code("")); // not complex, no constructor
731         case SE_PARAM:
732                 if(se->is_handle_ref()){
733                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
734                         ret = tmpstr;
735                         return(ret);
736                 }
737                 ret += "t->param_";
738                 ret += se->get_param_name();
739                 return(ret);
740         case SE_UNARY_OP:
741         ldt = se->get_left_se()->get_data_type();
742         if(ldt->complex_operator(se->get_op()) ){
743                         ret +=  ldt->get_complex_operator(se->get_op());
744                         ret += "(";
745                         ret += generate_se_code(se->get_left_se(),schema);
746             ret += ")";
747                 }else{
748                         ret += "(";
749                         ret += se->get_op();
750                         ret += generate_se_code(se->get_left_se(),schema);
751                         ret += ")";
752                 }
753                 return(ret);
754         case SE_BINARY_OP:
755         ldt = se->get_left_se()->get_data_type();
756         rdt = se->get_right_se()->get_data_type();
757
758         if(ldt->complex_operator(rdt, se->get_op()) ){
759                         ret +=  ldt->get_complex_operator(rdt, se->get_op());
760                         ret += "(";
761                         ret += generate_se_code(se->get_left_se(),schema);
762                         ret += ", ";
763                         ret += generate_se_code(se->get_right_se(),schema);
764                         ret += ")";
765                 }else{
766                         ret += "(";
767                         ret += generate_se_code(se->get_left_se(),schema);
768                         ret += se->get_op();
769                         ret += generate_se_code(se->get_right_se(),schema);
770                         ret += ")";
771                 }
772                 return(ret);
773         case SE_COLREF:
774                 if(se->is_gb()){                // OK to ref gb attrs, but they're not yet unpacked ...
775                                                         // so return the defining code.
776                         ret = generate_se_code(gb_tbl->get_def(se->get_gb_ref()), schema );
777
778                 }else{
779                 sprintf(tmpstr,"unpack_var_%s_%d",
780                   se->get_colref()->get_field().c_str(), se->get_colref()->get_tablevar_ref() );
781                 ret = tmpstr;
782                 }
783                 return(ret);
784         case SE_FUNC:
785 //                              Should not be ref'ing any aggr here.
786                 if(se->get_aggr_ref() >= 0){
787                         fprintf(stderr,"INTERNAL ERROR, UDAF reference in generate_se_code.\n");
788                         return("ERROR in generate_se_code");
789                 }
790
791                 if(se->is_partial()){
792                         sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
793                         ret = tmpstr;
794                 }else{
795                         ret += se->op + "(";
796                         operands = se->get_operands();
797                         for(o=0;o<operands.size();o++){
798                                 if(o>0) ret += ", ";
799                                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
800                                         ret += "&";
801                                 ret += generate_se_code(operands[o], schema);
802                         }
803                         ret += ")";
804                 }
805                 return(ret);
806         default:
807                 fprintf(stderr,"INTERNAL ERROR in generate_se_code (lfta), line %d, character %d: unknown operator type %d\n",
808                                 se->get_lineno(), se->get_charno(),se->get_operator_type());
809                 return("ERROR in generate_se_code");
810         }
811 }
812
813 //              generate code that refers only to aggregate data and constants.
814 static string generate_se_code_fm_aggr(scalarexp_t *se, string var, table_list *schema){
815
816         string ret;
817     data_type *ldt, *rdt;
818         int o;
819         vector<scalarexp_t *> operands;
820
821
822         switch(se->get_operator_type()){
823         case SE_LITERAL:
824                 if(se->is_handle_ref()){
825                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
826                         ret = tmpstr;
827                         return(ret);
828                 }
829                 if(se->get_literal()->is_cpx_lit()){
830                         sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
831                         ret = tmpstr;
832                         return(ret);
833                 }
834                 return(se->get_literal()->to_C_code("")); // not complex no constructor
835         case SE_PARAM:
836                 if(se->is_handle_ref()){
837                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
838                         ret = tmpstr;
839                         return(ret);
840                 }
841                 ret += "t->param_";
842                 ret += se->get_param_name();
843                 return(ret);
844         case SE_UNARY_OP:
845         ldt = se->get_left_se()->get_data_type();
846         if(ldt->complex_operator(se->get_op()) ){
847                         ret +=  ldt->get_complex_operator(se->get_op());
848                         ret += "(";
849                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
850             ret += ")";
851                 }else{
852                         ret += "(";
853                         ret += se->get_op();
854                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
855                         ret += ")";
856                 }
857                 return(ret);
858         case SE_BINARY_OP:
859         ldt = se->get_left_se()->get_data_type();
860         rdt = se->get_right_se()->get_data_type();
861
862         if(ldt->complex_operator(rdt, se->get_op()) ){
863                         ret +=  ldt->get_complex_operator(rdt, se->get_op());
864                         ret += "(";
865                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
866                         ret += ", ";
867                         ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
868                         ret += ")";
869                 }else{
870                         ret += "(";
871                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
872                         ret += se->get_op();
873                         ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
874                         ret += ")";
875                 }
876                 return(ret);
877         case SE_COLREF:
878                 if(se->is_gb()){                // OK to ref gb attrs, but they're not yet
879                                                         // unpacked ... so return the defining code.
880                         sprintf(tmpstr,"%sgb_var%d",var.c_str(),se->get_gb_ref());
881                         ret = tmpstr;
882
883                 }else{
884                 fprintf(stderr,"ERROR reference to non-GB column ref not permitted here,"
885                                 "error in generate_se_code_fm_aggr, line %d, character %d.\n",
886                                 se->get_lineno(), se->get_charno());
887                 ret = tmpstr;
888                 }
889                 return(ret);
890         case SE_AGGR_STAR:
891         case SE_AGGR_SE:
892                 sprintf(tmpstr,"%saggr_var%d",var.c_str(),se->get_aggr_ref());
893                 ret = tmpstr;
894                 return(ret);
895         case SE_FUNC:
896 //                              Is it a UDAF?
897                 if(se->get_aggr_ref() >= 0){
898                         sprintf(tmpstr,"udaf_ret%d",se->get_aggr_ref());
899                         ret = tmpstr;
900                         return(ret);
901                 }
902
903                 if(se->is_partial()){
904                         sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
905                         ret = tmpstr;
906                 }else{
907                         ret += se->op + "(";
908                         operands = se->get_operands();
909                         for(o=0;o<operands.size();o++){
910                                 if(o>0) ret += ", ";
911                                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
912                                         ret += "&";
913                                 ret += generate_se_code_fm_aggr(operands[o], var, schema);
914                         }
915                         ret += ")";
916                 }
917                 return(ret);
918         default:
919                 fprintf(stderr,"INTERNAL ERROR in generate_lfta_code.cc::generate_se_code_fm_aggr, line %d, character %d: unknown operator type %d\n",
920                                 se->get_lineno(), se->get_charno(),se->get_operator_type());
921                 return("ERROR in generate_se_code");
922         }
923
924 }
925
926
927 static string unpack_partial_fcn_fm_aggr(scalarexp_t *se, int pfn_id, string var, table_list *schema){
928         string ret;
929         int o;
930         vector<scalarexp_t *> operands;
931
932
933         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
934                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn_fm_aggr. line %d, character %d\n",
935                                 se->get_lineno(), se->get_charno());
936                 return("ERROR in generate_se_code");
937         }
938
939         ret = "\tretval = " + se->get_op() + "( ";
940         sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
941         ret += tmpstr;
942
943         operands = se->get_operands();
944         for(o=0;o<operands.size();o++){
945                 ret += ", ";
946                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
947                         ret += "&";
948                 ret += generate_se_code_fm_aggr(operands[o], var, schema);
949         }
950         ret += ");\n";
951
952         return(ret);
953 }
954
955 static string generate_cached_fcn(scalarexp_t *se, table_list *schema){
956         string ret;
957         int o;
958         vector<scalarexp_t *> operands;
959
960         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
961                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to generate_cached_fcn. line %d, character %d\n",
962                                 se->get_lineno(), se->get_charno());
963                 return("ERROR in generate_se_code");
964         }
965
966         ret = se->get_op() + "( ";
967
968         operands = se->get_operands();
969         for(o=0;o<operands.size();o++){
970                 if(o) ret += ", ";
971                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
972                         ret += "&";
973                 ret += generate_se_code(operands[o], schema);
974         }
975         ret += ");\n";
976
977         return(ret);
978 }
979
980
981
982 static string unpack_partial_fcn(scalarexp_t *se, int pfn_id, table_list *schema){
983         string ret;
984         int o;
985         vector<scalarexp_t *> operands;
986
987
988         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
989                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn. line %d, character %d\n",
990                                 se->get_lineno(), se->get_charno());
991                 return("ERROR in generate_se_code");
992         }
993
994         ret = "\tretval = " + se->get_op() + "( ",
995         sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
996         ret += tmpstr;
997
998         operands = se->get_operands();
999         for(o=0;o<operands.size();o++){
1000                 ret += ", ";
1001                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
1002                         ret += "&";
1003                 ret += generate_se_code(operands[o], schema);
1004         }
1005         ret += ");\n";
1006
1007         return(ret);
1008 }
1009
1010
1011
1012
1013
1014 static string generate_C_comparison_op(string op){
1015   if(op == "=") return("==");
1016   if(op == "<>") return("!=");
1017   return(op);
1018 }
1019
1020 static string generate_C_boolean_op(string op){
1021         if( (op == "AND") || (op == "And") || (op == "and") ){
1022                 return("&&");
1023         }
1024         if( (op == "OR") || (op == "Or") || (op == "or") ){
1025                 return("||");
1026         }
1027         if( (op == "NOT") || (op == "Not") || (op == "not") ){
1028                 return("!");
1029         }
1030
1031         fprintf(stderr,"INTERNAL ERROR: unknown boolean operator %s\n",op.c_str());
1032         return("ERROR UNKNOWN BOOLEAN OPERATOR :"+op);
1033 }
1034
1035
1036 static string generate_predicate_code(predicate_t *pr,table_list *schema){
1037         string ret;
1038         vector<literal_t *>  litv;
1039         int i;
1040     data_type *ldt, *rdt;
1041         vector<scalarexp_t *> op_list;
1042         int o,cref,ppos;
1043         unsigned int bitmask;
1044
1045         switch(pr->get_operator_type()){
1046         case PRED_IN:
1047         ldt = pr->get_left_se()->get_data_type();
1048
1049                 ret += "( ";
1050                 litv = pr->get_lit_vec();
1051                 for(i=0;i<litv.size();i++){
1052                         if(i>0) ret += " || ";
1053                         ret += "( ";
1054
1055                 if(ldt->complex_comparison(ldt) ){
1056                                 ret +=  ldt->get_equals_fcn(ldt) ;
1057                                 ret += "( ";
1058                                 if(ldt->is_buffer_type() ) ret += "&";
1059                                 ret += generate_se_code(pr->get_left_se(), schema);
1060                                 ret += ", ";
1061                                 if(ldt->is_buffer_type() ) ret += "&";
1062                                 if(litv[i]->is_cpx_lit()){
1063                                         sprintf(tmpstr,"t->complex_literal_%d",litv[i]->get_cpx_lit_ref() );
1064                                         ret += tmpstr;
1065                                 }else{
1066                                         ret += litv[i]->to_C_code("");
1067                                 }
1068                                 ret += ") == 0";
1069                         }else{
1070                                 ret += generate_se_code(pr->get_left_se(), schema);
1071                                 ret += " == ";
1072                                 ret += litv[i]->to_C_code("");
1073                         }
1074
1075                         ret += " )";
1076                 }
1077                 ret += " )";
1078                 return(ret);
1079
1080         case PRED_COMPARE:
1081         ldt = pr->get_left_se()->get_data_type();
1082         rdt = pr->get_right_se()->get_data_type();
1083
1084                 ret += "( ";
1085         if(ldt->complex_comparison(rdt) ){
1086 // TODO can use get_equals_fcn if op is "=" ?
1087                         ret += ldt->get_comparison_fcn(rdt);
1088                         ret += "(";
1089                         if(ldt->is_buffer_type() ) ret += "&";
1090                         ret += generate_se_code(pr->get_left_se(),schema);
1091                         ret += ", ";
1092                         if(rdt->is_buffer_type() ) ret += "&";
1093                         ret += generate_se_code(pr->get_right_se(),schema);
1094                         ret += ") ";
1095                         ret +=  generate_C_comparison_op(pr->get_op());
1096                         ret += "0";
1097                 }else{
1098                         ret += generate_se_code(pr->get_left_se(),schema);
1099                         ret +=  generate_C_comparison_op(pr->get_op());
1100                         ret += generate_se_code(pr->get_right_se(),schema);
1101                 }
1102                 ret += " )";
1103                 return(ret);
1104         case PRED_UNARY_OP:
1105                 ret += "( ";
1106                 ret +=  generate_C_boolean_op(pr->get_op());
1107                 ret += generate_predicate_code(pr->get_left_pr(),schema);
1108                 ret += " )";
1109                 return(ret);
1110         case PRED_BINARY_OP:
1111                 ret += "( ";
1112                 ret += generate_predicate_code(pr->get_left_pr(),schema);
1113                 ret +=  generate_C_boolean_op(pr->get_op());
1114                 ret += generate_predicate_code(pr->get_right_pr(),schema);
1115                 ret += " )";
1116                 return(ret);
1117         case PRED_FUNC:
1118                 op_list = pr->get_op_list();
1119                 cref = pr->get_combinable_ref();
1120                 if(cref >= 0){  // predicate is a combinable pred reference
1121                         //              Trust, but verify
1122                         if(pred_class.size() >= cref && pred_class[cref] >= 0){
1123                                 ppos = pred_pos[cref];
1124                                 bitmask = 1 << ppos % 32;
1125                                 sprintf(tmpstr,"(pref_common_pred_val_%d_%d & %u)",pred_class[cref],ppos/32,bitmask);
1126                                 ret = tmpstr;
1127                                 return ret;
1128                         }
1129                 }
1130
1131                 ret =  pr->get_op() + "(";
1132                 if (pr->is_sampling_fcn) {
1133                         ret += "t->sampling_rate";
1134                         if (!op_list.empty())
1135                                 ret += ", ";
1136                 }
1137                 for(o=0;o<op_list.size();++o){
1138                         if(o>0) ret += ", ";
1139                         if(op_list[o]->get_data_type()->is_buffer_type() && (! (op_list[o]->is_handle_ref()) ) )
1140                                         ret += "&";
1141                         ret += generate_se_code(op_list[o],schema);
1142                 }
1143                 ret += " )";
1144                 return(ret);
1145         default:
1146                 fprintf(stderr,"INTERNAL ERROR in generate_predicate_code, line %d, character %d, unknown predicate operator type %d\n",
1147                         pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
1148                 return("ERROR in generate_predicate_code");
1149         }
1150 }
1151
1152
1153 static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *dt){
1154         string ret;
1155
1156     if(dt->complex_comparison(dt) ){
1157                 ret += dt->get_equals_fcn(dt);
1158                 ret += "(";
1159                         if(dt->is_buffer_type() ) ret += "&";
1160                 ret += lhs_op;
1161                 ret += ", ";
1162                         if(dt->is_buffer_type() ) ret += "&";
1163                 ret += rhs_op;
1164                 ret += ") == 0";
1165         }else{
1166                 ret += lhs_op;
1167                 ret += " == ";
1168                 ret += rhs_op;
1169         }
1170
1171         return(ret);
1172 }
1173
1174 //static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
1175 //      string ret;
1176 //
1177  //   if(dt->complex_comparison(dt) ){
1178 //              ret += dt->get_equals_fcn(dt);
1179 //              ret += "(";
1180 //                      if(dt->is_buffer_type() ) ret += "&";
1181 //              ret += lhs_op;
1182 //              ret += ", ";
1183 //                      if(dt->is_buffer_type() ) ret += "&";
1184 //              ret += rhs_op;
1185 //              ret += ") == 0";
1186 //      }else{
1187 //              ret += lhs_op;
1188 //              ret += " == ";
1189 //              ret += rhs_op;
1190 //      }
1191 //
1192 //      return(ret);
1193 //}
1194
1195 //              Here I assume that only MIN and MAX aggregates can be computed
1196 //              over BUFFER data types.
1197
1198 static string generate_aggr_update(string var, aggregate_table *atbl,int aidx, table_list *schema){
1199         string retval = "\t\t";
1200         string op = atbl->get_op(aidx);
1201
1202 //              Is it a UDAF
1203         if(! atbl->is_builtin(aidx)) {
1204                 int o;
1205                 retval += op+"_LFTA_AGGR_UPDATE_(";
1206                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1207                 retval+="("+var+")";
1208                 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1209                 for(o=0;o<opl.size();++o){
1210                         retval += ",";
1211                         if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1212                                         retval.append("&");
1213                         retval += generate_se_code(opl[o], schema);
1214                 }
1215                 retval += ");\n";
1216
1217                 return retval;
1218         }
1219
1220 //              Built-in aggregate processing.
1221
1222         data_type *dt = atbl->get_data_type(aidx);
1223
1224         if(op == "COUNT"){
1225                 retval.append(var);
1226                 retval.append("++;\n");
1227                 return(retval);
1228         }
1229         if(op == "SUM"){
1230                 retval.append(var);
1231                 retval.append(" += ");
1232                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1233                 retval.append(";\n");
1234                 return(retval);
1235         }
1236         if(op == "MIN"){
1237                 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1238                 retval.append(tmpstr);
1239                 if(dt->complex_comparison(dt)){
1240                         if(dt->is_buffer_type())
1241                           sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1242                         else
1243                           sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1244                 }else{
1245                         sprintf(tmpstr,"\t\tif(aggr_tmp_%d < %s)\n",aidx,var.c_str());
1246                 }
1247                 retval.append(tmpstr);
1248                 if(dt->is_buffer_type()){
1249                         sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1250                 }else{
1251                         sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1252                 }
1253                 retval.append(tmpstr);
1254
1255                 return(retval);
1256         }
1257         if(op == "MAX"){
1258                 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1259                 retval.append(tmpstr);
1260                 if(dt->complex_comparison(dt)){
1261                         if(dt->is_buffer_type())
1262                          sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1263                         else
1264                          sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1265                 }else{
1266                         sprintf(tmpstr,"\t\tif(aggr_tmp_%d > %s)\n",aidx,var.c_str());
1267                 }
1268                 retval.append(tmpstr);
1269                 if(dt->is_buffer_type()){
1270                         sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1271                 }else{
1272                         sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1273                 }
1274                 retval.append(tmpstr);
1275
1276                 return(retval);
1277
1278         }
1279         if(op == "AND_AGGR"){
1280                 retval.append(var);
1281                 retval.append(" &= ");
1282                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1283                 retval.append(";\n");
1284                 return(retval);
1285         }
1286         if(op == "OR_AGGR"){
1287                 retval.append(var);
1288                 retval.append(" |= ");
1289                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1290                 retval.append(";\n");
1291                 return(retval);
1292         }
1293         if(op == "XOR_AGGR"){
1294                 retval.append(var);
1295                 retval.append(" ^= ");
1296                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1297                 retval.append(";\n");
1298                 return(retval);
1299         }
1300         fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_update.\n",op.c_str());
1301         return("ERROR: aggregate not recognized: "+op);
1302
1303 }
1304
1305
1306
1307 static string generate_aggr_init(string var, aggregate_table *atbl,int aidx, table_list *schema){
1308         string retval;
1309         string op = atbl->get_op(aidx);
1310
1311 //              Is it a UDAF
1312         if(! atbl->is_builtin(aidx)) {
1313                 int o;
1314                 retval += "\t\t"+op+"_LFTA_AGGR_INIT_(";
1315                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1316                 retval+="("+var+"));\n";
1317 //                      Add 1st tupl
1318                 retval += "\t"+atbl->get_op(aidx)+"_LFTA_AGGR_UPDATE_(";
1319                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1320                 retval+="("+var+")";
1321                 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1322                 for(o=0;o<opl.size();++o){
1323                         retval += ",";
1324                         if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1325                                         retval.append("&");
1326                         retval += generate_se_code(opl[o],schema);
1327                 }
1328                 retval += ");\n";
1329                 return(retval);
1330         }
1331
1332 //              Built-in aggregate processing.
1333
1334
1335         data_type *dt = atbl->get_data_type(aidx);
1336
1337         if(op == "COUNT"){
1338                 retval = "\t\t"+var;
1339                 retval.append(" = 1;\n");
1340                 return(retval);
1341         }
1342
1343         if(op == "SUM" || op == "MIN" || op == "MAX" || op == "AND_AGGR" ||
1344                                                                         op == "OR_AGGR" || op == "XOR_AGGR"){
1345                 if(dt->is_buffer_type()){
1346                         sprintf(tmpstr,"\t\taggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1347                         retval.append(tmpstr);
1348                         sprintf(tmpstr,"\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_assign_copy().c_str(),var.c_str(),aidx);
1349                         retval.append(tmpstr);
1350                 }else{
1351                         retval = "\t\t"+var;
1352                         retval += " = ";
1353                         retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema));
1354                         retval.append(";\n");
1355                 }
1356                 return(retval);
1357         }
1358
1359         fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_init.\n",op.c_str());
1360         return("ERROR: aggregate not recognized: "+op);
1361 }
1362
1363
1364 ////////////////////////////////////////////////////////////
1365
1366
1367 string generate_preamble(table_list *schema, //map<string,string> &int_fcn_defs,
1368         std::string &node_name, std::string &schema_embed_str){
1369 //                      Include these only once, not once per lfta
1370 //      string ret = "#include \"rts.h\"\n";
1371 //      ret +=  "#include \"fta.h\"\n\n");
1372
1373         string ret = "#ifndef LFTA_IN_NIC\n";
1374         ret += "const char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n";
1375         ret += "#include<stdio.h>\n";
1376         ret += "#include <limits.h>\n";
1377         ret += "#include <float.h>\n";
1378         ret += "#include <sys/stat.h>\n";
1379         ret += "#include \"rdtsc.h\"\n";
1380         ret += "#endif\n";
1381
1382
1383
1384         return(ret);
1385 }
1386
1387
1388 string generate_tuple_from_aggr(string node_name, table_list *schema, string idx){
1389         int a,p,s;
1390 //                       need to create and output the tuple.
1391         string ret = "/*\t\tCreate an output tuple for the aggregate being kicked out \t*/\n";
1392 //                      Check for any UDAFs with LFTA_BAILOUT
1393         ret += "\tlfta_bailout = 0;\n";
1394         for(a=0;a<aggr_tbl->size();a++){
1395                 if(aggr_tbl->has_bailout(a)){
1396                         ret += "\tlfta_bailout+="+aggr_tbl->get_op(a)+"_LFTA_AGGR_BAILOUT_(";
1397                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1398                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1399                 }
1400         }
1401         ret += "\tif(! lfta_bailout){\n";
1402
1403 //                      First, compute the size of the tuple.
1404
1405 //                      Unpack UDAF return values
1406         for(a=0;a<aggr_tbl->size();a++){
1407                 if(! aggr_tbl->is_builtin(a)){
1408                         ret += "\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_OUTPUT_(&(udaf_ret"+int_to_string(a)+"),";
1409                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1410                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1411
1412                 }
1413         }
1414
1415
1416 //                      Unpack partial fcns ref'd by the select clause.
1417   if(sl_fcns_start != sl_fcns_end){
1418             ret += "\t\tunpack_failed = 0;\n";
1419     for(p=sl_fcns_start;p<sl_fcns_end;p++){
1420           if(is_partial_fcn[p]){
1421                 ret += "\t" + unpack_partial_fcn_fm_aggr(partial_fcns[p], p,
1422                          "t->aggr_table["+idx+"].",schema);
1423                 ret += "\t\tif(retval) unpack_failed = 1;\n";
1424           }
1425     }
1426                                                                 // BEGIN don't allocate tuple if
1427         ret += "\t\tif( unpack_failed == 0 ){\n"; // unpack failed.
1428   }
1429
1430 //                      Unpack any BUFFER type selections into temporaries
1431 //                      so that I can compute their size and not have
1432 //                      to recompute their value during tuple packing.
1433 //                      I can use regular assignment here because
1434 //                      these temporaries are non-persistent.
1435
1436           for(s=0;s<sl_list.size();s++){
1437                 data_type *sdt = sl_list[s]->get_data_type();
1438                 if(sdt->is_buffer_type()){
1439                         sprintf(tmpstr,"\t\t\tselvar_%d = ",s);
1440                         ret += tmpstr;
1441                         ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1442                         ret += ";\n";
1443                 }
1444           }
1445
1446
1447 //              The size of the tuple is the size of the tuple struct plus the
1448 //              size of the buffers to be copied in.
1449
1450           ret += "\t\t\ttuple_size = sizeof( struct ";
1451           ret +=  generate_tuple_name(node_name);
1452           ret += ")";
1453           for(s=0;s<sl_list.size();s++){
1454                 data_type *sdt = sl_list[s]->get_data_type();
1455                 if(sdt->is_buffer_type()){
1456                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
1457                         ret += tmpstr;
1458                 }
1459           }
1460           ret += ";\n";
1461
1462
1463           ret += "\t\t\ttuple = (struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
1464           ret += "\t\t\tif( tuple != NULL){\n";
1465
1466
1467 //                      Test passed, make assignments to the tuple.
1468
1469           ret += "\t\t\t\ttuple_pos = sizeof( struct ";
1470           ret +=  generate_tuple_name(node_name) ;
1471           ret += ");\n";
1472
1473 //                      Mark tuple as REGULAR_TUPLE
1474           ret += "\n\t\t\t\ttuple->tuple_type = REGULAR_TUPLE;\n";
1475
1476           for(s=0;s<sl_list.size();s++){
1477                 data_type *sdt = sl_list[s]->get_data_type();
1478                 if(sdt->is_buffer_type()){
1479                         sprintf(tmpstr,"\t\t\t\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
1480                         ret += tmpstr;
1481                         sprintf(tmpstr,"\t\t\t\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
1482                         ret += tmpstr;
1483                 }else{
1484                         sprintf(tmpstr,"\t\t\t\ttuple->tuple_var%d = ",s);
1485                         ret += tmpstr;
1486 //                      if(sdt->needs_hn_translation())
1487 //                              ret += sdt->hton_translation() +"( ";
1488                         ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1489 //                      if(sdt->needs_hn_translation())
1490 //                              ret += ") ";
1491                         ret += ";\n";
1492                 }
1493           }
1494
1495 //              Generate output.
1496           ret += "\t\t\t\tpost_tuple(tuple);\n";
1497           ret += "\t\t\t\t#ifdef LFTA_STATS\n";
1498           ret+="\t\t\t\tt->out_tuple_cnt++;\n";
1499           ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
1500           ret += "\t\t\t\t#endif\n\n";
1501           ret += "\t\t\t}\n";
1502
1503           if(sl_fcns_start != sl_fcns_end)      // END don't allocate tuple if
1504                 ret += "\t\t}\n";                               // unpack failed.
1505           ret += "\t}\n";
1506
1507 //                      Need to release memory held by BUFFER types.
1508                 int g;
1509
1510           for(g=0;g<gb_tbl->size();g++){
1511                 data_type *gdt = gb_tbl->get_data_type(g);
1512                 if(gdt->is_buffer_type()){
1513                         sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].gb_var%d));\n",gdt->get_buffer_destroy().c_str(),idx.c_str(),g);
1514                         ret += tmpstr;
1515                 }
1516           }
1517           for(a=0;a<aggr_tbl->size();a++){
1518                 if(aggr_tbl->is_builtin(a)){
1519                         data_type *adt = aggr_tbl->get_data_type(a);
1520                         if(adt->is_buffer_type()){
1521                                 sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].aggr_var%d));\n",adt->get_buffer_destroy().c_str(),idx.c_str(),a);
1522                                 ret += tmpstr;
1523                         }
1524                 }else{
1525                         ret += "\t\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_DESTROY_(";
1526                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1527                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1528                 }
1529           }
1530
1531         ret += "\t\tt->n_aggrs--;\n";
1532
1533         return(ret);
1534
1535 }
1536
1537 string generate_gb_match_test(string idx){
1538         int g;
1539         string ret="\tif (IS_FILLED(t->aggr_table_bitmap, "+idx+") &&  IS_NEW(t->aggr_table_bitmap,"+idx+")";
1540         if(gb_tbl->size()>0){
1541                 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
1542                 ret+="\t\t";
1543
1544 //                      Next, scan list for a match on the group-by attributes.
1545           string rhs_op, lhs_op;
1546           for(g=0;g<gb_tbl->size();g++){
1547                   ret += " && ";
1548                   ret += "(";
1549                   sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
1550                   sprintf(tmpstr,"t->aggr_table[%s].gb_var%d",idx.c_str(),g); rhs_op = tmpstr;
1551                   ret += generate_equality_test(lhs_op, rhs_op, gb_tbl->get_data_type(g) );
1552                   ret += ")";
1553           }
1554          }
1555
1556           ret += "){\n";
1557
1558         return ret;
1559 }
1560
1561 string generate_gb_update(string node_name, table_list *schema, string idx, bool has_udaf){
1562         int g;
1563         string ret;
1564
1565           ret += "/*\t\tMatch found : update in place.\t*/\n";
1566           int a;
1567           has_udaf = false;
1568           for(a=0;a<aggr_tbl->size();a++){
1569                   sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1570                   ret += generate_aggr_update(tmpstr,aggr_tbl,a, schema);
1571                   if(! aggr_tbl->is_builtin(a)) has_udaf = true;
1572           }
1573
1574 //                      garbage collect copied buffer type gb attrs.
1575   for(g=0;g<gb_tbl->size();g++){
1576           data_type *gdt = gb_tbl->get_data_type(g);
1577           if(gdt->is_buffer_type()){
1578                 sprintf(tmpstr,"\t\t\t%s(&(gb_attr_%d));\n",gdt->get_buffer_destroy().c_str(),g);
1579                 ret+=tmpstr;
1580           }
1581         }
1582
1583
1584
1585           bool first_udaf = true;
1586           if(has_udaf){
1587                 ret += "\t\tif(";
1588                 for(a=0;a<aggr_tbl->size();a++){
1589                         if(! aggr_tbl->is_builtin(a)){
1590                                 if(! first_udaf)ret += " || ";
1591                                 else first_udaf = false;
1592                                 ret += aggr_tbl->get_op(a)+"_LFTA_AGGR_FLUSHME_(";
1593                                 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1594                                 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"))";
1595                         }
1596                 }
1597                 ret+="){\n";
1598                 ret+=" fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
1599                 ret += generate_tuple_from_aggr(node_name,schema,idx);
1600                 ret += "\t\tt->aggr_table_hashmap["+idx+"] &= ~SLOT_FILLED;\n";
1601                 ret+="\t\t}\n";
1602         }
1603         return ret;
1604 }
1605
1606
1607 string generate_init_group( table_list *schema, string idx){
1608           int g,a;
1609           string ret="\t\t\tt->aggr_table_hashmap["+idx+"] = hash2 | SLOT_FILLED | gen_val;\n";
1610 //                      Fill up the aggregate block.
1611           for(g=0;g<gb_tbl->size();g++){
1612                   sprintf(tmpstr,"\t\t\tt->aggr_table[%s].gb_var%d = gb_attr_%d;\n",idx.c_str(),g,g);
1613                   ret += tmpstr;
1614           }
1615           for(a=0;a<aggr_tbl->size();a++){
1616                   sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1617                   ret += generate_aggr_init(tmpstr, aggr_tbl,a,  schema);
1618           }
1619           ret+="\t\tt->n_aggrs++;\n";
1620         return ret;
1621 }
1622
1623
1624 string generate_fta_flush(string node_name, table_list *schema,
1625                 ext_fcn_list *Ext_fcns){
1626
1627    string ret;
1628    string select_var_defs ;
1629         int s, p;
1630
1631 //              Flush from previous epoch
1632
1633         ret+="static void fta_aggr_flush_old_"+node_name+"(struct FTA *f, unsigned int nflush){\n";
1634
1635         ret += "\tgs_int32_t tuple_size, tuple_pos;\n";
1636     ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1637         ret += "\tint i, lfta_bailout;\n";
1638         ret += "\tunsigned int gen_val;\n";
1639
1640         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1641         ret += generate_fta_name(node_name)+" *) f;\n";
1642
1643         ret += "\n";
1644
1645
1646 //              Variables needed to store selected attributes of BUFFER type
1647 //              temporarily, in order to compute their size for storage
1648 //              in an output tuple.
1649
1650   select_var_defs = "";
1651   for(s=0;s<sl_list.size();s++){
1652         data_type *sdt = sl_list[s]->get_data_type();
1653         if(sdt->is_buffer_type()){
1654           sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
1655           select_var_defs.append(tmpstr);
1656         }
1657   }
1658   if(select_var_defs != ""){
1659         ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
1660     ret += select_var_defs;
1661   }
1662
1663
1664 //              Variables to store results of partial functions.
1665   if(sl_fcns_start != sl_fcns_end){
1666         ret += "/*\t\tVariables to store the results of partial functions.\t*/\n";
1667         for(p=sl_fcns_start;p<sl_fcns_end;p++){
1668                 sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
1669                         partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
1670                 ret += tmpstr;
1671         }
1672         ret += "\tgs_retval_t retval = 0;\n\tint unpack_failed = 0;\n;";
1673   }
1674
1675 //              Variables for udaf output temporaries
1676         bool no_udaf = true;
1677         int a;
1678         for(a=0;a<aggr_tbl->size();a++){
1679                 if(! aggr_tbl->is_builtin(a)){
1680                         if(no_udaf){
1681                                 ret+="/*\t\tUDAF output vars.\t*/\n";
1682                                 no_udaf = false;
1683                         }
1684                         int afcn_id = aggr_tbl->get_fcn_id(a);
1685                         data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
1686                         sprintf(tmpstr,"udaf_ret%d", a);
1687                         ret+="\t"+adt->make_cvar(tmpstr)+";\n";
1688                 }
1689         }
1690
1691
1692 //  ret+="\tt->flush_finished=1; /* flush will be completed */\n";
1693   ret+="\n";
1694   ret+="\tgen_val = t->generation & SLOT_GEN_BITS;\n";
1695   ret+="\tfor (i=t->flush_pos; (i < t->max_aggrs) && t->n_aggrs && nflush>0; ++i){\n";
1696   ret+="\t\tif ( (t->aggr_table_hashmap[i] & SLOT_FILLED) && (((t->aggr_table_hashmap[i] & SLOT_GEN_BITS) != gen_val ) || (";
1697                 bool first_g=true;
1698                 int g;
1699                 for(g=0;g<gb_tbl->size();g++){
1700                   data_type *gdt = gb_tbl->get_data_type(g);
1701                   if(gdt->is_temporal()){
1702                         if(first_g) first_g=false; else ret+=" || ";
1703                         ret += "t->last_gb_"+int_to_string(g)+" > t->aggr_table[i].gb_var"+int_to_string(g)+" ";
1704                   }
1705                 }
1706   ret += "))) {\n";
1707   ret+="\t\t\tt->aggr_table_hashmap[i] = 0;\n";
1708   ret+=
1709 "#ifdef LFTA_STATS\n"
1710 "\t\t\tt->eviction_cnt++;\n"
1711 "#endif\n"
1712 ;
1713
1714
1715   ret+=generate_tuple_from_aggr(node_name,schema,"i");
1716
1717 //  ret+="\t\t\tt->n_aggrs--;\n";  // done in generate_tuple_from_aggr
1718   ret+="\t\t\tnflush--;\n";
1719   ret+="\t\t}\n";
1720   ret+="\t}\n";
1721   ret+="\tt->flush_pos=i;\n";
1722   ret+="\tif(t->n_aggrs == 0) {\n";
1723   ret+="\t\tt->flush_pos = t->max_aggrs;\n";
1724   ret += "\t}\n\n";
1725
1726   ret+="\tif(t->flush_pos == t->max_aggrs) {\n";
1727
1728   for(int g=0;g<gb_tbl->size();g++){
1729         data_type *dt = gb_tbl->get_data_type(g);
1730         if(dt->is_temporal()){
1731                 data_type *gdt = gb_tbl->get_data_type(g);
1732                 if(!gdt->is_buffer_type()){
1733                         sprintf(tmpstr,"\t\tt->last_flushed_gb_%d = t->flush_start_gb_%d;\n",g,g);
1734                         ret += tmpstr;
1735                 }
1736         }
1737   }
1738   ret += "\t}\n}\n\n";
1739
1740   return(ret);
1741 }
1742
1743 // TODO Remove sprintf to perform string catenation
1744 string generate_fta_load_params(string node_name){
1745         int p;
1746         vector<string> param_names = param_tbl->get_param_names();
1747
1748         string ret = "static int load_params_"+node_name+"(struct "+generate_fta_name(node_name);
1749                 ret += " *t, int sz, void *value, int initial_call){\n";
1750     ret += "\tint pos=0;\n";
1751     ret += "\tint data_pos;\n";
1752
1753         for(p=0;p<param_names.size();p++){
1754                 data_type *dt = param_tbl->get_data_type(param_names[p]);
1755                 if(dt->is_buffer_type()){
1756                         sprintf(tmpstr,"\t%s tmp_var_%s;\n",dt->get_cvar_type().c_str(), param_names[p].c_str() );
1757                         ret += tmpstr;
1758                         sprintf(tmpstr,"\t%s access_var_%s;\n",dt->get_tuple_cvar_type().c_str(), param_names[p].c_str() );
1759                         ret += tmpstr;
1760                 }
1761         }
1762
1763
1764
1765         ret += "\n\tdata_pos = ";
1766         for(p=0;p<param_names.size();p++){
1767                 if(p>0) ret += " + ";
1768                 data_type *dt = param_tbl->get_data_type(param_names[p]);
1769                 ret += "sizeof( ";
1770                 ret +=  dt->get_tuple_cvar_type();
1771                 ret += " )";
1772         }
1773         ret += ";\n";
1774         ret += "\tif(data_pos > sz) return 1;\n\n";
1775
1776
1777         for(p=0;p<param_names.size();p++){
1778                 data_type *dt = param_tbl->get_data_type(param_names[p]);
1779                 if(dt->is_buffer_type()){
1780                         sprintf(tmpstr,"\taccess_var_%s =  *( (%s *)((char *)value+pos) );\n",param_names[p].c_str(), dt->get_tuple_cvar_type().c_str() );
1781                         ret += tmpstr;
1782                         switch( dt->get_type() ){
1783                         case v_str_t:
1784 //                              ret += "\ttmp_var_"+param_names[p]+".data = ntohl( tmp_var_"+param_names[p]+".data );\n";               // ntoh conversion
1785 //                              ret += "\ttmp_var_"+param_names[p]+".length = ntohl( tmp_var_"+param_names[p]+".length );\n";   // ntoh conversion
1786                                 sprintf(tmpstr,"\tif( (access_var_%s.offset) + access_var_%s.length > sz) return 1;\n",param_names[p].c_str(), param_names[p].c_str() );
1787                                 ret += tmpstr;
1788                                 sprintf(tmpstr,"\ttmp_var_%s.data = (gs_sp_t)(value) + access_var_%s.offset ;\n",param_names[p].c_str(), param_names[p].c_str() );
1789                                 ret += tmpstr;
1790                                 sprintf(tmpstr,"\ttmp_var_%s.length =  access_var_%s.length ;\n",param_names[p].c_str(), param_names[p].c_str() );
1791                                 ret += tmpstr;
1792                         break;
1793                         default:
1794                                 fprintf(stderr,"ERROR: parameter %s is of type %s, a buffered type, but I don't know how to unpack it as a parameter.\n",param_names[p].c_str(), dt->get_type_str().c_str() );
1795                                 exit(1);
1796                         break;
1797                         }
1798 //                                      First, destroy the old
1799                         ret += "\tif(! initial_call)\n";
1800                         sprintf(tmpstr,"\t\t%s(&(t->param_%s));\n",dt->get_buffer_destroy().c_str(),param_names[p].c_str());
1801                         ret += tmpstr;
1802 //                                      Next, create the new.
1803                         sprintf(tmpstr,"\t%s((struct FTA *)t, &(t->param_%s), &tmp_var_%s);\n", dt->get_buffer_assign_copy().c_str(), param_names[p].c_str(), param_names[p].c_str() );
1804                         ret += tmpstr;
1805                 }else{
1806 //                      if(dt->needs_hn_translation()){
1807 //                              sprintf(tmpstr,"\tt->param_%s =  %s( *( (%s *)( (char *)value+pos) ) );\n",
1808 //                                param_names[p].c_str(), dt->ntoh_translation().c_str(), dt->get_cvar_type().c_str() );
1809 //                      }else{
1810                                 sprintf(tmpstr,"\tt->param_%s =  *( (%s *)( (char *)value+pos) );\n",
1811                                   param_names[p].c_str(), dt->get_cvar_type().c_str() );
1812 //                      }
1813                         ret += tmpstr;
1814                 }
1815                 sprintf(tmpstr,"\tpos += sizeof( %s );\n",dt->get_cvar_type().c_str() );
1816                 ret += tmpstr;
1817         }
1818
1819 //                      Register the pass-by-handle parameters
1820
1821         ret += "/* register and de-register the pass-by-handle parameters */\n";
1822
1823     int ph;
1824     for(ph=0;ph<param_handle_table.size();++ph){
1825                 data_type pdt(param_handle_table[ph]->type_name);
1826                 switch(param_handle_table[ph]->val_type){
1827                 case cplx_lit_e:
1828                         break;
1829                 case litval_e:
1830                         break;
1831                 case param_e:
1832                         ret += "\tif(! initial_call)\n";
1833                         sprintf(tmpstr, "\t\t%s(t->handle_param_%d);\n",
1834                                 param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1835                         ret += tmpstr;
1836                         sprintf(tmpstr,"\tt->handle_param_%d = %s((struct FTA *)t,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
1837                         ret += tmpstr;
1838
1839                         if(pdt.is_buffer_type()) ret += "&(";
1840                         ret += "t->param_"+param_handle_table[ph]->param_name;
1841                         if(pdt.is_buffer_type()) ret += ")";
1842                         ret += ");\n";
1843                         break;
1844                 default:
1845                         sprintf(tmpstr, "INTERNAL ERROR unknown case (%d) found when processing pass-by-handle parameter table.",param_handle_table[ph]->val_type);
1846                         fprintf(stderr,"%s\n",tmpstr);
1847                         ret+=tmpstr;
1848                 }
1849         }
1850
1851         ret+="\treturn 0;\n";
1852         ret += "}\n\n";
1853
1854         return(ret);
1855 }
1856
1857
1858
1859
1860 string generate_fta_free(string node_name, bool is_aggr_query){
1861
1862         string ret="static gs_retval_t free_fta_"+node_name+"(struct FTA *f, gs_uint32_t recursive){\n";
1863         ret+= "\tstruct "+generate_fta_name(node_name)+
1864                 " * t = (struct "+generate_fta_name(node_name)+" *) f;\n";
1865         ret += "\tint i;\n";
1866
1867         if(is_aggr_query){
1868                 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1869                 ret+="\t/* \t\tmark all groups as old */\n";
1870                 ret+="\tt->generation++;\n";
1871                 ret+="\tt->flush_pos = 0;\n";
1872                 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1873         }
1874
1875 //                      Deregister the pass-by-handle parameters
1876         ret += "/* de-register the pass-by-handle parameters */\n";
1877     int ph;
1878     for(ph=0;ph<param_handle_table.size();++ph){
1879                 sprintf(tmpstr, "\t%s(t->handle_param_%d);\n",
1880                         param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1881                 ret += tmpstr;
1882         }
1883
1884
1885         ret += "\treturn 0;\n}\n\n";
1886         return(ret);
1887 }
1888
1889
1890 string generate_fta_control(string node_name, table_list *schema, bool is_aggr_query){
1891         string ret="static gs_retval_t control_fta_"+node_name+"(struct FTA *f,  gs_int32_t command, gs_int32_t sz, void *value){\n";
1892         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1893                 ret += generate_fta_name(node_name)+" *) f;\n\n";
1894         ret+="\tint i;\n";
1895
1896
1897         ret += "\t/* temp status tuple */\n";
1898         ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1899         ret += "\tgs_int32_t tuple_size;\n";
1900
1901
1902         if(is_aggr_query){
1903                 ret+="\tif(command == FTA_COMMAND_FLUSH){\n";
1904
1905                 ret+="\t\tif (!t->n_aggrs) {\n";
1906                 ret+="\t\t\ttuple = (struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, 0);\n";
1907                 ret+="\t\t\tif( tuple != NULL)\n";
1908                 ret+="\t\t\t\tpost_tuple(tuple);\n";
1909
1910                 ret+="\t\t}else{\n";
1911
1912                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1913                 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1914                 ret +="\t\tt->generation++;\n";
1915                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1916                 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1917                 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1918                 ret+="\t\t\tt->flush_pos = 0;\n";
1919                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1920                 ret+="\t\t}\n";
1921
1922                 ret+="\t}\n";
1923         }
1924         if(param_tbl->size() > 0){
1925                 ret+=
1926 "\tif(command == FTA_COMMAND_LOAD_PARAMS){\n"
1927 "\t\tif(load_params_"+node_name+"(t, sz, value, 0))\n"
1928 "#ifndef LFTA_IN_NIC\n"
1929 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small, ignored.\\n\");\n"
1930 "#else\n"
1931 "\t\t{}\n"
1932 "#endif\n"
1933 "\t}\n";
1934         }
1935         ret+=
1936 "\tif(command == FTA_COMMAND_SET_SAMPLING_RATE){\n"
1937 "\t\tmemcpy(&t->sampling_rate, value, sizeof(gs_float_t));\n"
1938 "\t}\n\n";
1939
1940
1941         ret += "\tif(command == FTA_COMMAND_FILE_DONE ){\n";
1942
1943         if(is_aggr_query){
1944                 ret+="\t\tif (t->n_aggrs) {\n";
1945                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1946                 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1947                 ret +="\t\tt->generation++;\n";
1948                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1949                 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1950                 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1951                 ret+="\t\t\tt->flush_pos = 0;\n";
1952                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1953                 ret+="\t\t}\n";
1954         }
1955
1956         ret += "\t\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+");\n";
1957         ret += "\t\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
1958         ret += "\t\tif( tuple == NULL)\n\t\treturn 1;\n";
1959
1960         /* mark tuple as EOF_TUPLE */
1961         ret += "\n\t\t/* Mark tuple as eof_tuple */\n";
1962         ret += "\t\ttuple->tuple_type = EOF_TUPLE;\n";
1963         ret += "\t\tpost_tuple(tuple);\n";
1964         ret += "\t}\n";
1965
1966         ret += "\treturn 0;\n}\n\n";
1967
1968         return(ret);
1969 }
1970
1971 string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query, bool advance_uxtime){
1972         string ret="static gs_retval_t clock_fta_"+node_name+"(struct FTA *f){\n";
1973         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1974                 ret += generate_fta_name(node_name)+" *) f;\n\n";
1975
1976         ret += "\t/* Create a temp status tuple */\n";
1977         ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1978         ret += "\tgs_int32_t tuple_size;\n";
1979         ret += "\tunsigned int i;\n";
1980         ret += "\ttime_t cur_time;\n";
1981         ret += "\tint time_advanced;\n";
1982         ret += "\tstruct fta_stat stats;\n";
1983
1984
1985
1986         /* copy the last seen values of temporal attributes */
1987         col_id_set temp_cids;           //      col ids of temp attributes in select clause
1988
1989
1990         /* HACK: in order to reuse the SE generation code, we need to copy
1991          * the last values of the temp attributes into new variables
1992          * which have names unpack_var_XXX_XXX
1993          */
1994
1995         int s, g;
1996         col_id_set::iterator csi;
1997
1998         for(s=0;s<sl_list.size();s++){
1999                 data_type *sdt = sl_list[s]->get_data_type();
2000                 if (sdt->is_temporal()) {
2001                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
2002                 }
2003         }
2004
2005         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2006                 int tblref = (*csi).tblvar_ref;
2007                 int schref = (*csi).schema_ref;
2008                 string field = (*csi).field;
2009                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2010                 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
2011                 ret += tmpstr;
2012         }
2013
2014         if (is_aggr_query) {
2015                 for(g=0;g<gb_tbl->size();g++){
2016                         data_type *gdt = gb_tbl->get_data_type(g);
2017                         if(gdt->is_temporal()){
2018                                 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
2019                                 ret += tmpstr;
2020                                 data_type *gdt = gb_tbl->get_data_type(g);
2021                                 if(gdt->is_buffer_type()){
2022                                 sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
2023                                 ret += tmpstr;
2024                                 }
2025                         }
2026                 }
2027         }
2028         ret += "\n";
2029
2030         ret += "\ttime_advanced = 0;\n";
2031
2032         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2033                 int tblref = (*csi).tblvar_ref;
2034                 int schref = (*csi).schema_ref;
2035                 string field = (*csi).field;
2036                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2037
2038                 // update last seen value with the value seen
2039                 ret += "\t#ifdef PREFILTER_DEFINED\n";
2040                 sprintf(tmpstr,"\tif (prefilter_temp_vars.unpack_var_%s_%d > t->last_%s_%d) {\n\t\tt->last_%s_%d = prefilter_temp_vars.unpack_var_%s_%d;\n",
2041                         field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref);
2042                 ret += tmpstr;
2043                 ret += "\t\ttime_advanced = 1;\n\t}\n";
2044                 ret += "\t#endif\n";
2045
2046                 // we need to pay special attention to time fields
2047                 if (field == "time" || field == "timestamp" || field == "timestamp_ms"){
2048                         ret += "\tcur_time = time(&cur_time);\n";
2049
2050                         if (field == "time") {
2051                                 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && (t->last_time_%d < (cur_time - %d))) {\n",
2052                                         tblref, time_corr);
2053                                 ret += tmpstr;
2054                                 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = cur_time - %d;\n",
2055                                 field.c_str(), tblref, field.c_str(), tblref, time_corr);
2056                         } else if (field == "timestamp_ms") {
2057                                 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((t->last_timestamp_ms_%d/1000) < (cur_time - %d))) {\n",
2058                                         tblref, time_corr);
2059                                 ret += tmpstr;
2060                                 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = (cur_time - %d) * 1000;\n",
2061                                 field.c_str(), tblref, field.c_str(), tblref, time_corr);
2062                         }else{
2063                                 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((gs_uint32_t)(t->last_%s_%d>>32) < (cur_time - %d))) {\n",
2064                                         field.c_str(), tblref, time_corr);
2065                                 ret += tmpstr;
2066                                 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = ((gs_uint64_t)(cur_time - %d))<<32;\n",
2067                                         field.c_str(), tblref, field.c_str(), tblref, time_corr);
2068                         }
2069                         ret += tmpstr;
2070
2071                         ret += "\t\ttime_advanced = 1;\n";
2072                         ret += "\t}\n";
2073
2074                         sprintf(tmpstr,"\telse\n\t\tunpack_var_%s_%d = t->last_%s_%d;\n",
2075                         field.c_str(), tblref, field.c_str(), tblref);
2076                         ret += tmpstr;
2077                 } else {
2078                         sprintf(tmpstr,"\tunpack_var_%s_%d = t->last_%s_%d;\n",
2079                                 field.c_str(), tblref, field.c_str(), tblref);
2080                         ret += tmpstr;
2081                 }
2082         }
2083
2084
2085         if(advance_uxtime){
2086                 ret += "\tt->ux_time = time(&(t->ux_time));\n";
2087         }
2088
2089         // for aggregation lftas we need to check if the time was advanced beyond the current epoch
2090         if (is_aggr_query) {
2091
2092                 string change_test;
2093                 bool first_one = true;
2094                 for(g=0;g<gb_tbl->size();g++){
2095                   data_type *gdt = gb_tbl->get_data_type(g);
2096                   if(gdt->is_temporal()){
2097 //                                      To perform the test, first need to compute the value
2098 //                                      of the temporal gb attrs.
2099                           if(gdt->is_buffer_type()){
2100         //                              NOTE : if the SE defining the gb is anything
2101         //                              other than a ref to a variable, this will generate
2102         //                              illegal code.  To be resolved with Spatch.
2103                                 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2104                                         g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2105                                 ret+=tmpstr;
2106                                 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2107                                         gdt->get_buffer_assign_copy().c_str(), g, g);
2108                           }else{
2109                                 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2110                           }
2111                           ret += tmpstr;
2112
2113                           sprintf(tmpstr,"t->last_gb_%d",g);   string lhs_op = tmpstr;
2114                           sprintf(tmpstr,"gb_attr_%d",g);   string rhs_op = tmpstr;
2115                           if(first_one){first_one = false;} else {change_test.append(") && (");}
2116                           change_test.append(generate_equality_test(lhs_op, rhs_op, gdt));
2117                   }
2118                 }
2119
2120                 ret += "\n\tif( time_advanced && !( (";
2121                 ret += change_test;
2122                 ret += ") ) ){\n";
2123
2124                 ret += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
2125                 ret += "\t\tif(t->flush_pos<t->max_aggrs) \n";
2126                 ret += "\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2127
2128                 ret += "\t\t/* \t\tmark all groups as old */\n";
2129                 ret +="\t\tt->generation++;\n";
2130                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
2131                 ret += "//\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
2132                 ret += "//\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
2133                 ret += "\t\tt->flush_pos = 0;\n";
2134
2135                 for(g=0;g<gb_tbl->size();g++){
2136                      data_type *gdt = gb_tbl->get_data_type(g);
2137                      if(gdt->is_temporal()){
2138                         sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);                 ret += tmpstr;
2139                         sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);                        ret += tmpstr;
2140                      }
2141                 }
2142                 ret += "\t}\n\n";
2143
2144         }
2145
2146         ret += "\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t) + sizeof(struct fta_stat);\n";
2147         ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+"*)allocate_tuple(f, tuple_size );\n";
2148         ret += "\tif( tuple == NULL)\n\t\treturn 1;\n";
2149
2150
2151         for(s=0;s<sl_list.size();s++){
2152                 data_type *sdt = sl_list[s]->get_data_type();
2153                 if(sdt->is_temporal()){
2154
2155                         if (sl_list[s]->is_gb()) {
2156                                 sprintf(tmpstr,"\tt->last_flushed_gb_%d = (t->n_aggrs) ? t->last_flushed_gb_%d : %s;\n",sl_list[s]->get_gb_ref(), sl_list[s]->get_gb_ref(), generate_se_code(sl_list[s],schema).c_str());
2157                                 ret += tmpstr;
2158                         }
2159
2160                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2161                         ret += tmpstr;
2162 //                      if(sdt->needs_hn_translation())
2163 //                              ret += sdt->hton_translation() +"( ";
2164                         if (sl_list[s]->is_gb()) {
2165                                 sprintf(tmpstr, "t->last_flushed_gb_%d",sl_list[s]->get_gb_ref());
2166                                 ret += tmpstr;
2167                         } else{
2168                                 ret += generate_se_code(sl_list[s],schema);
2169                         }
2170 //                      if(sdt->needs_hn_translation())
2171 //                              ret += " )";
2172                         ret += ";\n";
2173                 }
2174         }
2175
2176         /* mark tuple as temporal */
2177         ret += "\n\t/* Mark tuple as temporal */\n";
2178         ret += "\ttuple->tuple_type = TEMPORAL_TUPLE;\n";
2179
2180         ret += "\n\t/* Copy trace id */\n";
2181         ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+"), &t->trace_id, sizeof(gs_uint64_t));\n\n";
2182
2183         ret += "\n\t/* Populate runtime stats */\n";
2184         ret += "\tstats.ftaid = f->ftaid;\n";
2185         ret += "\tstats.in_tuple_cnt = t->in_tuple_cnt;\n";
2186         ret += "\tstats.out_tuple_cnt = t->out_tuple_cnt;\n";
2187         ret += "\tstats.out_tuple_sz = t->out_tuple_sz;\n";
2188         ret += "\tstats.accepted_tuple_cnt = t->accepted_tuple_cnt;\n";
2189         ret += "\tstats.cycle_cnt = t->cycle_cnt;\n";
2190         ret += "\tstats.collision_cnt = t->collision_cnt;\n";
2191         ret += "\tstats.eviction_cnt = t->eviction_cnt;\n";
2192         ret += "\tstats.sampling_rate    = t->sampling_rate;\n";
2193
2194         ret += "\n#ifdef LFTA_PROFILE\n";
2195         ret += "\n\t/* Print stats */\n";
2196         ret += "\tfprintf(stderr, \"STATS " + node_name + " \");\n";
2197         ret += "\tfprintf(stderr, \"in_tuple_cnt= %u \", t->in_tuple_cnt);\n";
2198         ret += "\tfprintf(stderr, \"out_tuple_cnt= %u \", t->out_tuple_cnt);\n";
2199         ret += "\tfprintf(stderr, \"out_tuple_sz= %u \", t->out_tuple_sz);\n";
2200         ret += "\tfprintf(stderr, \"accepted_tuple_cnt= %u \", t->accepted_tuple_cnt);\n";
2201         ret += "\tfprintf(stderr, \"cycle_cnt= %llu \", t->cycle_cnt);\n";
2202         ret += "\tfprintf(stderr, \"cycle_per_in_tuple= %lf \", ((double)t->cycle_cnt)/((double)t->in_tuple_cnt));\n";
2203         ret += "\tfprintf(stderr, \"collision_cnt= %d\\n\", t->collision_cnt);\n\n";
2204         ret += "\tfprintf(stderr, \"eviction_cnt= %d\\n\", t->eviction_cnt);\n\n";
2205         ret += "\n#endif\n";
2206
2207
2208         ret += "\n\t/* Copy stats */\n";
2209         ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t), &stats, sizeof(fta_stat));\n\n";
2210         ret+="\tpost_tuple(tuple);\n";
2211
2212         ret += "\n\t/* Send a heartbeat message to clearinghouse */\n";
2213         ret += "\n\t/* Disable heartbeats for now to avoid overloading clearinghouse */\n";     
2214         ret += "\t/* fta_heartbeat(f->ftaid, t->trace_id++, 1, &stats); */\n";
2215
2216         ret += "\n\t/* Reset runtime stats */\n";
2217         ret += "\tt->in_tuple_cnt = 0;\n";
2218         ret += "\tt->out_tuple_cnt = 0;\n";
2219         ret += "\tt->out_tuple_sz = 0;\n";
2220         ret += "\tt->accepted_tuple_cnt = 0;\n";
2221         ret += "\tt->cycle_cnt = 0;\n";
2222         ret += "\tt->collision_cnt = 0;\n";
2223         ret += "\tt->eviction_cnt = 0;\n";
2224
2225         ret += "\treturn 0;\n}\n\n";
2226
2227         return(ret);
2228 }
2229
2230
2231 //              accept processing before the where clause,
2232 //              do flush processwing.
2233 string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *schema,   col_id_set &unpacked_cids, string &temporal_flush){
2234         int s;
2235
2236 //              Slow flush
2237   string ret="\n/*\tslow flush\t*/\n";
2238   string slow_flush_str = fs->get_val_of_def("slow_flush");
2239   int n_slow_flush = atoi(slow_flush_str.c_str());
2240   if(n_slow_flush <= 0) n_slow_flush = 2;
2241   if(n_slow_flush > 1){
2242         ret += "\tt->flush_ctr++;\n";
2243         ret += "\tif(t->flush_ctr >= "+int_to_string(n_slow_flush)+"){\n";
2244         ret += "\t\tt->flush_ctr = 0;\n";
2245     ret+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n";
2246         ret += "\t}\n\n";
2247   }else{
2248     ret+="\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n\n";
2249   }
2250
2251
2252         string change_test;
2253         bool first_one = true;
2254         int g;
2255     col_id_set flush_cids;              //      col ids accessed when computing flush variables.
2256                                                         //  unpack them at temporal flush test time.
2257     temporal_flush = "";
2258
2259
2260         for(g=0;g<gb_tbl->size();g++){
2261                   data_type *gdt = gb_tbl->get_data_type(g);
2262                   if(gdt->is_temporal()){
2263                           gather_se_col_ids(gb_tbl->get_def(g), flush_cids, gb_tbl);
2264
2265 //                                      To perform the test, first need to compute the value
2266 //                                      of the temporal gb attrs.
2267                           if(gdt->is_buffer_type()){
2268         //                              NOTE : if the SE defining the gb is anything
2269         //                              other than a ref to a variable, this will generate
2270         //                              illegal code.  To be resolved with Spatch.
2271                                 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2272                                         g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2273                                 temporal_flush += tmpstr;
2274                                 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2275                                         gdt->get_buffer_assign_copy().c_str(), g, g);
2276                           }else{
2277                                 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2278                           }
2279                           temporal_flush += tmpstr;
2280 //                                      END computing the value of the temporal GB attr.
2281
2282
2283                           sprintf(tmpstr,"t->last_gb_%d",g);   string lhs_op = tmpstr;
2284                           sprintf(tmpstr,"gb_attr_%d",g);   string rhs_op = tmpstr;
2285                           if(first_one){first_one = false;} else {change_test.append(") && (");}
2286                           change_test += generate_equality_test(lhs_op, rhs_op, gdt);
2287                   }
2288         }
2289         if(!first_one){         // will be false iff. there is a temporal GB attribute
2290                   temporal_flush += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
2291                   temporal_flush += "\tif( !( (";
2292                   temporal_flush += change_test;
2293                   temporal_flush += ") ) ){\n";
2294
2295 //                temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2296                   temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs){ \n";
2297                   temporal_flush+="\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2298                   temporal_flush+="\t\t}\n";
2299                   temporal_flush+="\t\t/* \t\tmark all groups as old */\n";
2300                   temporal_flush+="\t\tt->generation++;\n";
2301                   temporal_flush+="\t\tt->flush_pos = 0;\n";
2302
2303
2304 //                              Now set the saved temporal value of the gb to the
2305 //                              current value of the gb.  Only for simple types,
2306 //                              not for buffer types -- but the strings are not
2307 //                              temporal in any case.
2308
2309                         for(g=0;g<gb_tbl->size();g++){
2310                           data_type *gdt = gb_tbl->get_data_type(g);
2311                           if(gdt->is_temporal()){
2312                                   if(gdt->is_buffer_type()){
2313
2314                                         fprintf(stderr,"ERROR : can't handle temporal buffer types, ignoring in buffer flush control.\n");
2315                                   }else{
2316                                         sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);
2317                                         temporal_flush += tmpstr;
2318                                         sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);
2319                                         temporal_flush += tmpstr;
2320                                   }
2321                                 }
2322                         }
2323                   temporal_flush += "\t}\n\n";
2324         }
2325
2326 // Unpack all the temporal attributes referenced in select clause
2327 // and update the last value of the attribute
2328         col_id_set temp_cids;           //      col ids of temp attributes in select clause
2329         col_id_set::iterator csi;
2330
2331         for(s=0;s<sl_list.size();s++){
2332                 data_type *sdt = sl_list[s]->get_data_type();
2333                 if (sdt->is_temporal()) {
2334                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
2335                 }
2336         }
2337
2338         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2339                 if(unpacked_cids.count((*csi)) == 0){
2340                         int tblref = (*csi).tblvar_ref;
2341                         int schref = (*csi).schema_ref;
2342                         string field = (*csi).field;
2343                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2344 /*
2345                         data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2346                         sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",
2347                                 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2348                         ret += tmpstr;
2349                         ret += "\tif(retval) return 1;\n";
2350 */
2351                         sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
2352                         ret += tmpstr;
2353
2354                         unpacked_cids.insert( (*csi) );
2355                 }
2356         }
2357
2358
2359 //                              Do the flush here if this is a real_time query
2360         string rt_level = fs->get_val_of_def("real_time");
2361         if(rt_level != "" && temporal_flush != ""){
2362                 for(csi=flush_cids.begin(); csi != flush_cids.end();++csi){
2363                         if(unpacked_cids.count((*csi)) == 0){
2364                         int tblref = (*csi).tblvar_ref;
2365                         int schref = (*csi).schema_ref;
2366                         string field = (*csi).field;
2367                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2368 /*
2369                                 sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",
2370                                 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2371                         ret += tmpstr;
2372                         ret += "\tif(retval) return 1;\n";
2373 */
2374                                 unpacked_cids.insert( (*csi) );
2375                         }
2376                 }
2377                 ret += temporal_flush;
2378         }
2379
2380         return ret;
2381  }
2382
2383 string generate_sel_accept_body(qp_node *fs, string node_name, table_list *schema){
2384
2385 int p,s;
2386 string ret;
2387
2388 ///////////////                 Processing for filter-only query
2389
2390 //                      test passed : create the tuple, then assign to it.
2391           ret += "/*\t\tCreate and post the tuple\t*/\n";
2392
2393 //                      Unpack partial fcns ref'd by the select clause.
2394 //                      Its a kind of a WHERE clause ...
2395   for(p=sl_fcns_start;p<sl_fcns_end;p++){
2396         if(fcn_ref_cnt[p] > 1){
2397                 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2398         }
2399         if(is_partial_fcn[p]){
2400                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2401                 ret += "\tif(retval) goto end;\n";
2402         }
2403         if(fcn_ref_cnt[p] > 1){
2404                 if(!is_partial_fcn[p]){
2405                         ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2406                 }
2407                 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2408                 ret += "\t}\n";
2409         }
2410   }
2411
2412   // increment the counter of accepted tuples
2413   ret += "\n\t#ifdef LFTA_STATS\n";
2414   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2415   ret += "\t#endif\n\n";
2416
2417 //                      First, compute the size of the tuple.
2418
2419 //                      Unpack any BUFFER type selections into temporaries
2420 //                      so that I can compute their size and not have
2421 //                      to recompute their value during tuple packing.
2422 //                      I can use regular assignment here because
2423 //                      these temporaries are non-persistent.
2424
2425           for(s=0;s<sl_list.size();s++){
2426                 data_type *sdt = sl_list[s]->get_data_type();
2427                 if(sdt->is_buffer_type()){
2428                         sprintf(tmpstr,"\tselvar_%d = ",s);
2429                         ret += tmpstr;
2430                         ret += generate_se_code(sl_list[s],schema);
2431                         ret += ";\n";
2432                 }
2433           }
2434
2435
2436 //              The size of the tuple is the size of the tuple struct plus the
2437 //              size of the buffers to be copied in.
2438
2439           ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
2440           for(s=0;s<sl_list.size();s++){
2441                 data_type *sdt = sl_list[s]->get_data_type();
2442                 if(sdt->is_buffer_type()){
2443                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
2444                         ret += tmpstr;
2445                 }
2446           }
2447           ret += ";\n";
2448
2449
2450           ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
2451           ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
2452
2453 //                      Test passed, make assignments to the tuple.
2454
2455           ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
2456
2457 //                      Mark tuple as REGULAR_TUPLE
2458           ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
2459
2460
2461           for(s=0;s<sl_list.size();s++){
2462                 data_type *sdt = sl_list[s]->get_data_type();
2463                 if(sdt->is_buffer_type()){
2464                         sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
2465                         ret += tmpstr;
2466                         sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
2467                         ret += tmpstr;
2468                 }else{
2469                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2470                         ret += tmpstr;
2471 //                      if(sdt->needs_hn_translation())
2472 //                              ret += sdt->hton_translation() +"( ";
2473                         ret += generate_se_code(sl_list[s],schema);
2474 //                      if(sdt->needs_hn_translation())
2475 //                              ret += ") ";
2476                         ret += ";\n";
2477                 }
2478           }
2479
2480 //              Generate output.
2481
2482           ret += "\tpost_tuple(tuple);\n";
2483
2484 //              Increment the counter of posted tuples
2485   ret += "\n\t#ifdef LFTA_STATS\n";
2486   ret += "\tt->out_tuple_cnt++;\n";
2487   ret+="\tt->out_tuple_sz+=tuple_size;\n";
2488   ret += "\t#endif\n\n";
2489
2490
2491
2492         return ret;
2493 }
2494
2495 //      TODO Ensure that postfilter predicates are being generated
2496 string generate_fj_accept_body(filter_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
2497
2498 int p,s,w;
2499 string ret;
2500
2501 //                      Get parameters
2502         unsigned int window_len = fs->temporal_range;
2503         unsigned int n_bloom = 11;
2504         string n_bloom_str = fs->get_val_of_def("num_bloom");
2505         int tmp_n_bloom = atoi(n_bloom_str.c_str());
2506         if(tmp_n_bloom>0)
2507                 n_bloom = tmp_n_bloom+1;
2508         float bloom_width = (window_len+1.0)/(1.0*n_bloom-1);
2509         sprintf(tmpstr,"%f",bloom_width);
2510         string bloom_width_str = tmpstr;
2511
2512         if(window_len < n_bloom){
2513                 n_bloom = window_len+1;
2514                 bloom_width_str = "1";
2515         }
2516
2517
2518 //              Grab the current window time
2519         scalarexp_t winvar(fs->temporal_var);
2520         ret += "\tcurr_fj_ts = "+generate_se_code(&winvar,schema)+";\n";
2521
2522         int bf_exp_size = 12;  // base-2 log of number of bits
2523         string bloom_len_str = fs->get_val_of_def("bloom_size");
2524         int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
2525         if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
2526                 bf_exp_size = tmp_bf_exp_size;
2527         }
2528         int bf_bit_size = 1 << bf_exp_size;
2529         int bf_byte_size = bf_bit_size / (8*sizeof(char));
2530
2531         unsigned int ht_size = 4096;
2532         string ht_size_s = fs->get_val_of_def("aggregate_slots");
2533         int tmp_ht_size = atoi(ht_size_s.c_str());
2534         if(tmp_ht_size > 1024){
2535                 unsigned int hs = 1;            // make it power of 2
2536                 while(tmp_ht_size){
2537                         hs =hs << 1;
2538                         tmp_ht_size = tmp_ht_size >> 1;
2539                 }
2540                 ht_size = hs;
2541         }
2542
2543         int i, bf_mask = 0;
2544         if(fs->use_bloom){
2545                 for(i=0;i<bf_exp_size;i++)
2546                         bf_mask = (bf_mask << 1) | 1;
2547         }else{
2548                 for(i=ht_size;i>1;i=i>>1)
2549                         bf_mask = (bf_mask << 1) | 1;
2550         }
2551
2552 /*
2553 printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=%d, bf_byte_size=%d, ht_size=%d, ht_size_s=%s, bf_mask=%d\n",
2554         n_bloom,
2555         window_len,
2556         bloom_width_str.c_str(),
2557         bf_exp_size,
2558         bf_bit_size,
2559         bf_byte_size,
2560         ht_size,
2561         ht_size_s.c_str(),
2562         bf_mask);
2563 */
2564
2565
2566
2567
2568 //              If this is a bloom-filter fj, first test if the
2569 //              bloom filter needs to be advanced.
2570 //              SET_BF_EMPTY(table,number of bloom filters,bloom filter index,bit index)
2571 //              t->bf_size : number of bits in bloom filter
2572 //              TODO: vectorize?
2573 //              TODO: Don't iterate more than n_bloom times!
2574 //                      As written, its possible to wrap around many times.
2575         if(fs->use_bloom){
2576                 ret +=
2577 "//                     Clean out old bloom filters if needed.\n"
2578 "//                     TODO vectorize this ? \n"
2579 "       if(t->first_exec){\n"
2580 "               t->first_exec = 0;\n"
2581 "               t->last_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2582 "               t->last_bloom_pos = t->last_bin % "+int_to_string(n_bloom)+";\n"
2583 "       }else{\n"
2584 "               curr_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2585 "               if(curr_bin != t->last_bin){\n"
2586 "                       for(the_bin=t->last_bin+1;the_bin<=curr_bin;the_bin++){\n"
2587 "                               t->last_bloom_pos++;\n"
2588 "                               if(t->last_bloom_pos >= "+int_to_string(n_bloom)+")\n"
2589 "                                       t->last_bloom_pos = 0;\n"
2590 "                               tmp_i = t->last_bloom_pos;\n"
2591 "                               for(j=0;j<"+int_to_string(bf_bit_size)+";j++){\n"
2592 "                                       SET_BF_EMPTY(t->bf_table, "+int_to_string(n_bloom)+", tmp_i,j);\n"
2593 "                               }\n"
2594 "                       }\n"
2595 "               }\n"
2596 "               t->last_bin = curr_bin;\n"
2597 "       }\n"
2598 ;
2599         }
2600
2601
2602 //-----------------------------------------------------------------
2603 //              First, determine whether to do S (filter stream) processing.
2604
2605         ret +=
2606 "//             S (filtering stream) predicate, should it be processed?\n"
2607 "\n"
2608 ;
2609 // Sort S preds based on cost.
2610         vector<cnf_elem *> s_filt = fs->pred_t1;
2611         col_id_set::iterator csi;
2612   if(s_filt.size() > 0){
2613
2614 //                      Unpack fields ref'd in the S pred
2615         for(w=0;w<s_filt.size();++w){
2616                 col_id_set this_pred_cids;
2617                 gather_pr_col_ids(s_filt[w]->pr, this_pred_cids, gb_tbl);
2618                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2619                         if(unpacked_cids.count( (*csi) ) == 0){
2620                                 int tblref = (*csi).tblvar_ref;
2621                                 int schref = (*csi).schema_ref;
2622                                 string field = (*csi).field;
2623                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name,"end_s");
2624                                 unpacked_cids.insert( (*csi) );
2625                         }
2626                 }
2627         }
2628
2629
2630 //              Sort by evaluation cost.
2631 //              First, estimate evaluation costs
2632 //              Eliminate predicates covered by the prefilter (those in s_pids).
2633 //              I need to do it before the sort becuase the indices refer
2634 //              to the position in the unsorted list.
2635         vector<cnf_elem *> tmp_wh;
2636         for(w=0;w<s_filt.size();++w){
2637                 compute_cnf_cost(s_filt[w],Ext_fcns);
2638                 tmp_wh.push_back(s_filt[w]);
2639         }
2640         s_filt = tmp_wh;
2641
2642         sort(s_filt.begin(), s_filt.end(), compare_cnf_cost());
2643
2644 //              Now generate the predicates.
2645         for(w=0;w<s_filt.size();++w){
2646                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,s_filt[w]->cost);
2647                 ret += tmpstr;
2648
2649 //                      Find partial fcns ref'd in this cnf element
2650                 set<int> pfcn_refs;
2651                 collect_partial_fcns_pr(s_filt[w]->pr, pfcn_refs);
2652 //                      Since set<..> is a "Sorted Associative Container",
2653 //                      we can walk through it in sorted order by walking from
2654 //                      begin() to end().  (and the partial fcns must be
2655 //                      evaluated in this order).
2656                 set<int>::iterator si;
2657                 string pf_preds;
2658                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2659                         if(fcn_ref_cnt[(*si)] > 1){
2660                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2661                         }
2662                         if(is_partial_fcn[(*si)]){
2663                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2664                                 ret += "\t\tif(retval) goto end_s;\n";
2665                         }
2666                         if(fcn_ref_cnt[(*si)] > 1){
2667                                 if(!is_partial_fcn[(*si)]){
2668                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2669 //              Testing for S is a side branch.
2670 //              I don't want a cacheable partial function to be
2671 //              marked as evaluated.  Therefore I mark the function
2672 //              as evalauted ONLY IF it is not partial.
2673                                         ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2674                                 }
2675                                 ret += "\t}\n";
2676                         }
2677                 }
2678
2679                 ret += "\tif( !("+generate_predicate_code(s_filt[w]->pr,schema)+
2680                                 ") ) goto end_s;\n";
2681         }
2682   }else{
2683           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2684   }
2685
2686         for(p=0;p<fs->hash_eq.size();++p)
2687                 ret += "\t\ts_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_right_se(),schema)+";\n";
2688
2689         if(fs->use_bloom){
2690 //                      First, generate the S scalar expressions in the hash_eq
2691
2692 //                      Iterate over the bloom filters
2693                 for(i=0;i<3;i++){
2694                         ret += "\t\tbucket=0;\n";
2695                         for(p=0;p<fs->hash_eq.size();++p){
2696                                 ret +=
2697 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2698         fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2699         +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2700                         }
2701 //              SET_BF_BIT(table,number of bloom filters,bloom filter index,bit index)
2702                                 ret +=
2703 "               bucket &= "+int_to_string(bf_mask)+";\n"
2704 "               SET_BF_BIT(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket);\n"
2705 "\n"
2706 ;
2707                 }
2708         }else{
2709                 ret += "// Add the S record to the hash table, choose a position\n";
2710                 ret += "\t\tbucket=0;\n";
2711                 for(p=0;p<fs->hash_eq.size();++p){
2712                         ret +=
2713 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2714         fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2715         +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2716                 }
2717                 ret +=
2718 "               bucket &= "+int_to_string(bf_mask)+";\n"
2719 "               bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2720 ;
2721 //                      Try the first bucket
2722                 ret += "\t\tif(";
2723                 for(p=0;p<fs->hash_eq.size();++p){
2724                         if(p>0) ret += " && ";
2725 //                      ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2726 //                                      " == s_equijoin_"+int_to_string(p);
2727                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2728                         string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2729                         string rhs_op = "s_equijoin_"+int_to_string(p);
2730                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2731                 }
2732                 ret += "){\n\t\t\tthe_bucket = bucket;\n";
2733                 ret += "\t\t}else{\n\t\t\tif(";
2734                 for(p=0;p<fs->hash_eq.size();++p){
2735                         if(p>0) ret += " && ";
2736 //                      ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2737 //                                      " == s_equijoin_"+int_to_string(p);
2738                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2739                         string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
2740                         string rhs_op = "s_equijoin_"+int_to_string(p);
2741                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2742                 }
2743                 ret +=  "){\n\t\t\t\tthe_bucket = bucket1;\n";
2744                 ret += "\t\t\t}else{ \n\t\t\t\tif(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n";
2745                 ret+="\t\t\t\t\tthe_bucket = bucket;\n\t\t\t\telse\n\t\t\t\t\tthe_bucket=bucket1;\n";
2746                 ret += "\t\t\t}\n\t\t}\n";
2747                 for(p=0;p<fs->hash_eq.size();++p){
2748                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2749                         if(hdt->is_buffer_type()){
2750                                 sprintf(tmpstr,"\t\t%s(f, &(t->join_table[the_bucket].key_var%d), &s_equijoin_%d);\n", hdt->get_buffer_assign_copy().c_str(), p, p);
2751                                 ret += tmpstr;
2752                         }else{
2753                                 ret += "\t\tt->join_table[the_bucket].key_var"+int_to_string(p)+
2754                                         " = s_equijoin_"+int_to_string(p)+";\n";
2755                         }
2756                 }
2757                 ret+="\t\tt->join_table[the_bucket].ts =  curr_fj_ts;\n";
2758         }
2759   ret += "\tend_s:\n";
2760
2761 //      ------------------------------------------------------------
2762 //              Next, determine if the R record should be processed.
2763
2764
2765         ret +=
2766 "//             R (main stream) cheap predicate\n"
2767 "\n"
2768 ;
2769
2770 //              Unpack r_filt fields
2771         vector<cnf_elem *> r_filt = fs->pred_t0;
2772         for(w=0;w<r_filt.size();++w){
2773                 col_id_set this_pred_cids;
2774                 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
2775                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2776                         if(unpacked_cids.count( (*csi) ) == 0){
2777                                 int tblref = (*csi).tblvar_ref;
2778                                 int schref = (*csi).schema_ref;
2779                                 string field = (*csi).field;
2780                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2781                                 unpacked_cids.insert( (*csi) );
2782                         }
2783                 }
2784         }
2785
2786 // Sort R preds based on cost.
2787
2788         vector<cnf_elem *> tmp_wh;
2789         for(w=0;w<r_filt.size();++w){
2790                 compute_cnf_cost(r_filt[w],Ext_fcns);
2791                 tmp_wh.push_back(r_filt[w]);
2792         }
2793         r_filt = tmp_wh;
2794
2795         sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
2796
2797 //              WARNING! the constant 20 below is a wild-ass guess.
2798         int cheap_rpos;
2799         for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++);
2800
2801 //              Test the cheap filters on R.
2802   if(cheap_rpos >0){
2803
2804 //              Now generate the predicates.
2805         for(w=0;w<cheap_rpos;++w){
2806                 sprintf(tmpstr,"//\t\tcheap R predicate clause %d. (cost %d)\n",w,r_filt[w]->cost);
2807                 ret += tmpstr;
2808
2809 //                      Find partial fcns ref'd in this cnf element
2810                 set<int> pfcn_refs;
2811                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2812 //                      Since set<..> is a "Sorted Associative Container",
2813 //                      we can walk through it in sorted order by walking from
2814 //                      begin() to end().  (and the partial fcns must be
2815 //                      evaluated in this order).
2816                 set<int>::iterator si;
2817                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2818                         if(fcn_ref_cnt[(*si)] > 1){
2819                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2820                         }
2821                         if(is_partial_fcn[(*si)]){
2822                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2823                                 ret += "\t\tif(retval) goto end;\n";
2824                         }
2825                         if(fcn_ref_cnt[(*si)] > 1){
2826                                 if(!is_partial_fcn[(*si)]){
2827                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2828                                 }
2829                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2830                                 ret += "\t}\n";
2831                         }
2832                 }
2833
2834                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2835                                 ") ) goto end;\n";
2836         }
2837   }else{
2838           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2839   }
2840
2841         ret += "\n//    Do the join\n\n";
2842         for(p=0;p<fs->hash_eq.size();++p)
2843                 ret += "\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n";
2844
2845
2846 //                      Passed the cheap pred, now test the join with S.
2847         if(fs->use_bloom){
2848                 for(i=0;i<3;i++){
2849                         ret += "\t\tbucket"+int_to_string(i)+"=0;\n";
2850                         for(p=0;p<fs->hash_eq.size();++p){
2851                                 ret +=
2852 "       bucket"+int_to_string(i)+
2853         " ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2854         fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
2855         +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2856                         }
2857                                 ret +=
2858 "       bucket"+int_to_string(i)+" &= "+int_to_string(bf_mask)+";\n";
2859                 }
2860                 ret += "\tfound = 0;\n";
2861                 ret += "\tfor(b=0;b<"+int_to_string(n_bloom)+" && !found; b++){\n";
2862                 ret +=
2863 "\t\tif(IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket0) && "
2864 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket1) && "
2865 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket2))\n "
2866 "\t\t\tfound=1;\n"
2867 "\t}\n"
2868 ;
2869                 ret +=
2870 "       if(!found)\n"
2871 "               goto end;\n"
2872 ;
2873         }else{
2874                 ret += "\tfound = 0;\n";
2875                 ret += "\t\tbucket=0;\n";
2876                 for(p=0;p<fs->hash_eq.size();++p){
2877                         ret +=
2878 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2879         fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
2880         +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2881                 }
2882                 ret +=
2883 "               bucket &= "+int_to_string(bf_mask)+";\n"
2884 "               bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2885 ;
2886 //                      Try the first bucket
2887                 ret += "\t\tif(";
2888                 for(p=0;p<fs->hash_eq.size();++p){
2889                         if(p>0) ret += " && ";
2890 //                      ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2891 //                                      " == r_equijoin_"+int_to_string(p);
2892                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2893                         string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2894                         string rhs_op = "s_equijoin_"+int_to_string(p);
2895                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2896                 }
2897                 if(p>0) ret += " && ";
2898                 ret += "t->join_table[bucket].ts+"+int_to_string(fs->temporal_range)+" <=  curr_fj_ts";
2899                 ret += "){\n\t\t\tfound = 1;\n";
2900                 ret += "\t\t}else {if(";
2901                 for(p=0;p<fs->hash_eq.size();++p){
2902                         if(p>0) ret += " && ";
2903 //                      ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2904 //                                      " == r_equijoin_"+int_to_string(p);
2905                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2906                         string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
2907                         string rhs_op = "s_equijoin_"+int_to_string(p);
2908                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2909                 }
2910                 if(p>0) ret += " && ";
2911                 ret += "t->join_table[bucket1].ts+"+int_to_string(fs->temporal_range)+" <=  curr_fj_ts";
2912                 ret +=  ")\n\t\t\tfound=1;\n";
2913                 ret+="\t\t}\n";
2914                 ret +=
2915 "       if(!found)\n"
2916 "               goto end;\n"
2917 ;
2918         }
2919
2920
2921 //              Test the expensive filters on R.
2922   if(cheap_rpos < r_filt.size()){
2923
2924 //              Now generate the predicates.
2925         for(w=cheap_rpos;w<r_filt.size();++w){
2926                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
2927                 ret += tmpstr;
2928
2929 //                      Find partial fcns ref'd in this cnf element
2930                 set<int> pfcn_refs;
2931                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2932 //                      Since set<..> is a "Sorted Associative Container",
2933 //                      we can walk through it in sorted order by walking from
2934 //                      begin() to end().  (and the partial fcns must be
2935 //                      evaluated in this order).
2936                 set<int>::iterator si;
2937                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2938                         if(fcn_ref_cnt[(*si)] > 1){
2939                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2940                         }
2941                         if(is_partial_fcn[(*si)]){
2942                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2943                                 ret += "\t\tif(retval) goto end;\n";
2944                         }
2945                         if(fcn_ref_cnt[(*si)] > 1){
2946                                 if(!is_partial_fcn[(*si)]){
2947                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2948                                 }
2949                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2950                                 ret += "\t}\n";
2951                         }
2952                 }
2953
2954                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2955                                 ") ) goto end;\n";
2956         }
2957   }else{
2958           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2959   }
2960
2961
2962
2963 ///////////////                 post the tuple
2964
2965 //                      test passed : create the tuple, then assign to it.
2966           ret += "/*\t\tCreate and post the tuple\t*/\n";
2967
2968 //              Unpack r_filt fields
2969         for(s=0;s<sl_list.size();++s){
2970                 col_id_set this_se_cids;
2971                 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
2972                 for(csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
2973                         if(unpacked_cids.count( (*csi) ) == 0){
2974                                 int tblref = (*csi).tblvar_ref;
2975                                 int schref = (*csi).schema_ref;
2976                                 string field = (*csi).field;
2977                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2978                                 unpacked_cids.insert( (*csi) );
2979                         }
2980                 }
2981         }
2982
2983
2984 //                      Unpack partial fcns ref'd by the select clause.
2985 //                      Its a kind of a WHERE clause ...
2986   for(p=sl_fcns_start;p<sl_fcns_end;p++){
2987         if(fcn_ref_cnt[p] > 1){
2988                 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2989         }
2990         if(is_partial_fcn[p]){
2991                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2992                 ret += "\tif(retval) goto end;\n";
2993         }
2994         if(fcn_ref_cnt[p] > 1){
2995                 if(!is_partial_fcn[p]){
2996                         ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2997                 }
2998                 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2999                 ret += "\t}\n";
3000         }
3001   }
3002
3003   // increment the counter of accepted tuples
3004   ret += "\n\t#ifdef LFTA_STATS\n";
3005   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3006   ret += "\t#endif\n\n";
3007
3008 //                      First, compute the size of the tuple.
3009
3010 //                      Unpack any BUFFER type selections into temporaries
3011 //                      so that I can compute their size and not have
3012 //                      to recompute their value during tuple packing.
3013 //                      I can use regular assignment here because
3014 //                      these temporaries are non-persistent.
3015
3016           for(s=0;s<sl_list.size();s++){
3017                 data_type *sdt = sl_list[s]->get_data_type();
3018                 if(sdt->is_buffer_type()){
3019                         sprintf(tmpstr,"\tselvar_%d = ",s);
3020                         ret += tmpstr;
3021                         ret += generate_se_code(sl_list[s],schema);
3022                         ret += ";\n";
3023                 }
3024           }
3025
3026
3027 //              The size of the tuple is the size of the tuple struct plus the
3028 //              size of the buffers to be copied in.
3029
3030           ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
3031           for(s=0;s<sl_list.size();s++){
3032                 data_type *sdt = sl_list[s]->get_data_type();
3033                 if(sdt->is_buffer_type()){
3034                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
3035                         ret += tmpstr;
3036                 }
3037           }
3038           ret += ";\n";
3039
3040
3041           ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
3042           ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
3043
3044 //                      Test passed, make assignments to the tuple.
3045
3046           ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
3047
3048 //                      Mark tuple as REGULAR_TUPLE
3049           ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
3050
3051
3052           for(s=0;s<sl_list.size();s++){
3053                 data_type *sdt = sl_list[s]->get_data_type();
3054                 if(sdt->is_buffer_type()){
3055                         sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
3056                         ret += tmpstr;
3057                         sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
3058                         ret += tmpstr;
3059                 }else{
3060                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
3061                         ret += tmpstr;
3062 //                      if(sdt->needs_hn_translation())
3063 //                              ret += sdt->hton_translation() +"( ";
3064                         ret += generate_se_code(sl_list[s],schema);
3065 //                      if(sdt->needs_hn_translation())
3066 //                              ret += ") ";
3067                         ret += ";\n";
3068                 }
3069           }
3070
3071 //              Generate output.
3072
3073           ret += "\tpost_tuple(tuple);\n";
3074
3075 //              Increment the counter of posted tuples
3076   ret += "\n\t#ifdef LFTA_STATS\n";
3077   ret += "\n\tt->out_tuple_cnt++;\n\n";
3078   ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
3079   ret += "\t#endif\n\n";
3080
3081
3082         return ret;
3083 }
3084
3085
3086 string generate_wj_accept_body(watch_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
3087
3088 int p,s,w;
3089 string ret;
3090
3091
3092         string wl_schema = fs->from[1]->get_schema_name();
3093         string wl_elem_str = generate_watchlist_element_name(wl_schema);
3094         string wl_node_str = generate_watchlist_struct_name(wl_schema);
3095         string tgt = generate_watchlist_name(wl_schema);
3096
3097         ret += "//\n//\t\tGenerate test to update watchtable here\n//\n\n";
3098
3099
3100
3101
3102
3103 //      ------------------------------------------------------------
3104 //              Determine if the R record should be processed.
3105
3106
3107         ret +=
3108 "//             R (main stream) cheap predicate\n"
3109 "\n"
3110 ;
3111
3112 //              Unpack r_filt fields
3113         vector<cnf_elem *> r_filt = fs->pred_t0;
3114         for(w=0;w<r_filt.size();++w){
3115                 col_id_set this_pred_cids;
3116                 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
3117                 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3118                         if(unpacked_cids.count( (*csi) ) == 0){
3119                                 int tblref = (*csi).tblvar_ref;
3120                                 int schref = (*csi).schema_ref;
3121                                 string field = (*csi).field;
3122                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3123                                 unpacked_cids.insert( (*csi) );
3124                         }
3125                 }
3126         }
3127
3128 // Sort R preds based on cost.
3129
3130         vector<cnf_elem *> tmp_wh;
3131         for(w=0;w<r_filt.size();++w){
3132                 compute_cnf_cost(r_filt[w],Ext_fcns);
3133                 tmp_wh.push_back(r_filt[w]);
3134         }
3135         r_filt = tmp_wh;
3136
3137         sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
3138
3139 //              WARNING! the constant 20 below is a wild-ass guess.
3140         int cheap_rpos;
3141         for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++);
3142
3143 //              Test the cheap filters on R.
3144   if(cheap_rpos >0){
3145
3146 //              Now generate the predicates.
3147         for(w=0;w<cheap_rpos;++w){
3148                 sprintf(tmpstr,"//\t\tCheap R predicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
3149                 ret += tmpstr;
3150
3151 //                      Find partial fcns ref'd in this cnf element
3152                 set<int> pfcn_refs;
3153                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
3154 //                      Since set<..> is a "Sorted Associative Container",
3155 //                      we can walk through it in sorted order by walking from
3156 //                      begin() to end().  (and the partial fcns must be
3157 //                      evaluated in this order).
3158                 set<int>::iterator si;
3159                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3160                         if(fcn_ref_cnt[(*si)] > 1){
3161                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3162                         }
3163                         if(is_partial_fcn[(*si)]){
3164                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3165                                 ret += "\t\tif(retval) goto end;\n";
3166                         }
3167                         if(fcn_ref_cnt[(*si)] > 1){
3168                                 if(!is_partial_fcn[(*si)]){
3169                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3170                                 }
3171                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3172                                 ret += "\t}\n";
3173                         }
3174                 }
3175
3176                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
3177                                 ") ) goto end;\n";
3178         }
3179   }else{
3180           ret += "\n\n/*\t\t (no cheap R predicate to test)\t*/\n\n";
3181   }
3182
3183         ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
3184         map<string, cnf_elem *> h_eq = ((watch_join_qpn *)fs)-> hash_eq;
3185         vector<string> kflds = ((watch_join_qpn *)fs)->key_flds;
3186         for(w=0;w<kflds.size();++w){
3187                 string kfld = kflds[w];
3188                 col_id_set this_pred_cids;
3189                 gather_pr_col_ids(h_eq[kfld]->pr, this_pred_cids, gb_tbl);
3190                 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3191                         if(unpacked_cids.count( (*csi) ) == 0){
3192                                 int tblref = (*csi).tblvar_ref;
3193                                 int schref = (*csi).schema_ref;
3194                                 string field = (*csi).field;
3195                                 if(tblref==0) // LHS from packet, don't unpack the RHS
3196                                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3197                                 unpacked_cids.insert( (*csi) );
3198                         }
3199                 }
3200         }
3201
3202
3203         ret += "\n//    Do the join\n\n";
3204         ret += "\n//            (ensure that the watchtable is fresh)\n";
3205         ret += "\tif(t->ux_time >= "+tgt+".next_refresh){\n";
3206         ret += "\t\treload_watchlist__"+wl_schema+"();\n";
3207         ret += "\t\t"+tgt+".next_refresh = t->ux_time+"+tgt+".refresh_interval;\n";
3208         ret += "\t}\n\n";
3209
3210
3211         for(p=0;p<fs->key_flds.size();++p){
3212                 string kfld = fs->key_flds[p];
3213                 ret += "\tr_equijoin_"+kfld+" = "+generate_se_code(fs->hash_eq[kfld]->pr->get_left_se(),schema)+";\n";
3214         }
3215
3216
3217 //                      Passed the cheap pred, now test the join with S.
3218         ret += "\tbucket=0;\n";
3219         ret += "\thash=0;\n";
3220         for(p=0;p<fs->key_flds.size();++p){
3221                 string kfld = fs->key_flds[p];
3222                 ret +=
3223 "               hash ^= (("+hash_nums[p%NRANDS]+" * lfta_"+
3224         fs->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_type_str()+
3225         +"_to_hash(r_equijoin_"+kfld+")));\n";
3226         }
3227         ret += "\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n";
3228
3229         ret += "\t\trec = "+tgt+".ht[bucket];\n";
3230         ret += "\t\twhile(rec!=NULL){\n";
3231         ret += "\t\t\tif(hash==rec->hashval){\n";
3232         ret += "\t\t\t\tif(";
3233         for(p=0;p<fs->key_flds.size();++p){
3234                 string kfld = fs->key_flds[p];
3235                 if(p>0) ret += " && ";
3236                 data_type *hdt = fs->hash_eq[kfld]->pr->get_right_se()->get_data_type();
3237                 string lhs_op = "r_equijoin_"+kfld;
3238                 string rhs_op = "rec->"+kfld;
3239                 ret += generate_equality_test(lhs_op,rhs_op,hdt);
3240         }
3241         ret += ")\n";
3242         ret += "\t\t\t\t\tbreak;\n";
3243         ret += "\t\t\t}\n";
3244         ret += "\t\t\trec=rec->next;\n";
3245         ret += "\t\t}\n";
3246         ret += "\t\tif(rec==NULL)\n";
3247         ret += "\t\t\tgoto end;\n";
3248                 
3249         ret += "\n/*\tPassed the hash lookup clause, unpack the other predicate fields. */\n";
3250         for(w=0;w<where.size();++w){
3251                 col_id_set this_pred_cids;
3252                 gather_pr_col_ids(where[w]->pr, this_pred_cids, gb_tbl);
3253                 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3254                         if(unpacked_cids.count( (*csi) ) == 0){
3255                                 int tblref = (*csi).tblvar_ref;
3256                                 int schref = (*csi).schema_ref;
3257                                 string field = (*csi).field;
3258                                 if(tblref==0) // LHS from packet
3259                                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3260                                 else    // RHS from hash bucket
3261                                         ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n";
3262                                 unpacked_cids.insert( (*csi) );
3263                         }
3264                 }
3265         }
3266
3267
3268 //              Test the expensive filters on R.
3269 //                      TODO Should merge this with other predicates and eval in order
3270 //                              of cost - see the fj code.
3271 //                      TODO join and postfilter predicates haven't been costed yet.
3272   if(cheap_rpos < r_filt.size()){
3273
3274 //              Now generate the predicates.
3275         for(w=cheap_rpos;w<r_filt.size();++w){
3276                 sprintf(tmpstr,"//\t\tExpensive R predicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
3277                 ret += tmpstr;
3278
3279 //                      Find partial fcns ref'd in this cnf element
3280                 set<int> pfcn_refs;
3281                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
3282 //                      Since set<..> is a "Sorted Associative Container",
3283 //                      we can walk through it in sorted order by walking from
3284 //                      begin() to end().  (and the partial fcns must be
3285 //                      evaluated in this order).
3286                 set<int>::iterator si;
3287                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3288                         if(fcn_ref_cnt[(*si)] > 1){
3289                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3290                         }
3291                         if(is_partial_fcn[(*si)]){
3292                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3293                                 ret += "\t\tif(retval) goto end;\n";
3294                         }
3295                         if(fcn_ref_cnt[(*si)] > 1){
3296                                 if(!is_partial_fcn[(*si)]){
3297                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3298                                 }
3299                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3300                                 ret += "\t}\n";
3301                         }
3302                 }
3303
3304                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
3305                                 ") ) goto end;\n";
3306         }
3307   }else{
3308           ret += "\n\n/*\t\t (no expensive R predicate to test)\t*/\n\n";
3309   }
3310
3311 //              TODO sort the additional predicates by cost
3312
3313 //              S-only
3314         for(w=0;w<fs->pred_t1.size();++w){
3315                 sprintf(tmpstr,"//\t\tS Predicate clause %d.(cost %d)\n",w,fs->pred_t1[w]->cost);
3316                 ret += tmpstr;
3317
3318 //                      Find partial fcns ref'd in this cnf element
3319                 set<int> pfcn_refs;
3320                 collect_partial_fcns_pr(fs->pred_t1[w]->pr, pfcn_refs);
3321 //                      Since set<..> is a "Sorted Associative Container",
3322 //                      we can walk through it in sorted order by walking from
3323 //                      begin() to end().  (and the partial fcns must be
3324 //                      evaluated in this order).
3325                 set<int>::iterator si;
3326                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3327                         if(fcn_ref_cnt[(*si)] > 1){
3328                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3329                         }
3330                         if(is_partial_fcn[(*si)]){
3331                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3332                                 ret += "\t\tif(retval) goto end;\n";
3333                         }
3334                         if(fcn_ref_cnt[(*si)] > 1){
3335                                 if(!is_partial_fcn[(*si)]){
3336                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3337                                 }
3338                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3339                                 ret += "\t}\n";
3340                         }
3341                 }
3342
3343                 ret += "\tif( !("+generate_predicate_code(fs->pred_t1[w]->pr,schema)+
3344                                 ") ) goto end;\n";
3345         }
3346
3347 //              non hash-eq join 
3348         for(w=0;w<fs->join_filter.size();++w){
3349                 sprintf(tmpstr,"//\t\tJoin Predicate clause %d.(cost %d)\n",w,fs->join_filter[w]->cost);
3350                 ret += tmpstr;
3351
3352 //                      Find partial fcns ref'd in this cnf element
3353                 set<int> pfcn_refs;
3354                 collect_partial_fcns_pr(fs->join_filter[w]->pr, pfcn_refs);
3355 //                      Since set<..> is a "Sorted Associative Container",
3356 //                      we can walk through it in sorted order by walking from
3357 //                      begin() to end().  (and the partial fcns must be
3358 //                      evaluated in this order).
3359                 set<int>::iterator si;
3360                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3361                         if(fcn_ref_cnt[(*si)] > 1){
3362                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3363                         }
3364                         if(is_partial_fcn[(*si)]){
3365                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3366                                 ret += "\t\tif(retval) goto end;\n";
3367                         }
3368                         if(fcn_ref_cnt[(*si)] > 1){
3369                                 if(!is_partial_fcn[(*si)]){
3370                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3371                                 }
3372                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3373                                 ret += "\t}\n";
3374                         }
3375                 }
3376
3377                 ret += "\tif( !("+generate_predicate_code(fs->join_filter[w]->pr,schema)+
3378                                 ") ) goto end;\n";
3379         }
3380
3381 //              postfilter
3382         for(w=0;w<fs->postfilter.size();++w){
3383                 sprintf(tmpstr,"//\t\tpostfilter Predicate clause %d.(cost %d)\n",w,fs->postfilter[w]->cost);
3384                 ret += tmpstr;
3385
3386 //                      Find partial fcns ref'd in this cnf element
3387                 set<int> pfcn_refs;
3388                 collect_partial_fcns_pr(fs->postfilter[w]->pr, pfcn_refs);
3389 //                      Since set<..> is a "Sorted Associative Container",
3390 //                      we can walk through it in sorted order by walking from
3391 //                      begin() to end().  (and the partial fcns must be
3392 //                      evaluated in this order).
3393                 set<int>::iterator si;
3394                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3395                         if(fcn_ref_cnt[(*si)] > 1){
3396                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3397                         }
3398                         if(is_partial_fcn[(*si)]){
3399                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3400                                 ret += "\t\tif(retval) goto end;\n";
3401                         }
3402                         if(fcn_ref_cnt[(*si)] > 1){
3403                                 if(!is_partial_fcn[(*si)]){
3404                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3405                                 }
3406                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3407                                 ret += "\t}\n";
3408                         }
3409                 }
3410
3411                 ret += "\tif( !("+generate_predicate_code(fs->postfilter[w]->pr,schema)+
3412                                 ") ) goto end;\n";
3413         }
3414
3415
3416
3417 ///////////////                 post the tuple
3418
3419 //                      test passed : create the tuple, then assign to it.
3420           ret += "/*\t\tCreate and post the tuple\t*/\n";
3421
3422 //              Unpack R fields
3423         for(s=0;s<sl_list.size();++s){
3424                 col_id_set this_se_cids;
3425                 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
3426                 for(auto csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
3427                         if(unpacked_cids.count( (*csi) ) == 0){
3428                                 int tblref = (*csi).tblvar_ref;
3429                                 int schref = (*csi).schema_ref;
3430                                 string field = (*csi).field;
3431                                 if(tblref==0) // LHS from packet
3432                                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3433                                 else    // RHS from hash bucket
3434                                         ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n";
3435                                 unpacked_cids.insert( (*csi) );
3436                         }
3437                 }
3438         }
3439
3440
3441 //                      Unpack partial fcns ref'd by the select clause.
3442 //                      Its a kind of a WHERE clause ...
3443   for(p=sl_fcns_start;p<sl_fcns_end;p++){
3444         if(fcn_ref_cnt[p] > 1){
3445                 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
3446         }
3447         if(is_partial_fcn[p]){
3448                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3449                 ret += "\tif(retval) goto end;\n";
3450         }
3451         if(fcn_ref_cnt[p] > 1){
3452                 if(!is_partial_fcn[p]){
3453                         ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
3454                 }
3455                 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
3456                 ret += "\t}\n";
3457         }
3458   }
3459
3460   // increment the counter of accepted tuples
3461   ret += "\n\t#ifdef LFTA_STATS\n";
3462   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3463   ret += "\t#endif\n\n";
3464
3465 //                      First, compute the size of the tuple.
3466
3467 //                      Unpack any BUFFER type selections into temporaries
3468 //                      so that I can compute their size and not have
3469 //                      to recompute their value during tuple packing.
3470 //                      I can use regular assignment here because
3471 //                      these temporaries are non-persistent.
3472
3473           for(s=0;s<sl_list.size();s++){
3474                 data_type *sdt = sl_list[s]->get_data_type();
3475                 if(sdt->is_buffer_type()){
3476                         sprintf(tmpstr,"\tselvar_%d = ",s);
3477                         ret += tmpstr;
3478                         ret += generate_se_code(sl_list[s],schema);
3479                         ret += ";\n";
3480                 }
3481           }
3482
3483
3484 //              The size of the tuple is the size of the tuple struct plus the
3485 //              size of the buffers to be copied in.
3486
3487           ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
3488           for(s=0;s<sl_list.size();s++){
3489                 data_type *sdt = sl_list[s]->get_data_type();
3490                 if(sdt->is_buffer_type()){
3491                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
3492                         ret += tmpstr;
3493                 }
3494           }
3495           ret += ";\n";
3496
3497
3498           ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
3499           ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
3500
3501 //                      Test passed, make assignments to the tuple.
3502
3503           ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
3504
3505 //                      Mark tuple as REGULAR_TUPLE
3506           ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
3507
3508
3509           for(s=0;s<sl_list.size();s++){
3510                 data_type *sdt = sl_list[s]->get_data_type();
3511                 if(sdt->is_buffer_type()){
3512                         sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
3513                         ret += tmpstr;
3514                         sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
3515                         ret += tmpstr;
3516                 }else{
3517                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
3518                         ret += tmpstr;
3519 //                      if(sdt->needs_hn_translation())
3520 //                              ret += sdt->hton_translation() +"( ";
3521                         ret += generate_se_code(sl_list[s],schema);
3522 //                      if(sdt->needs_hn_translation())
3523 //                              ret += ") ";
3524                         ret += ";\n";
3525                 }
3526           }
3527
3528 //              Generate output.
3529
3530           ret += "\tpost_tuple(tuple);\n";
3531
3532 //              Increment the counter of posted tuples
3533   ret += "\n\t#ifdef LFTA_STATS\n";
3534   ret += "\n\tt->out_tuple_cnt++;\n\n";
3535   ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
3536   ret += "\t#endif\n\n";
3537
3538
3539         return ret;
3540 }
3541
3542 string generate_aggr_accept_body(qp_node *fs,string node_name,table_list *schema, string &temporal_flush){
3543         string ret;
3544         int a,p,g;
3545
3546 //////////////          Processing for aggregtion query
3547
3548 //              First, search for a match.  Start by unpacking the group-by attributes.
3549
3550 //                      One complication : if a real-time aggregate flush occurs,
3551 //                      the GB attr has already been calculated.  So don't compute
3552 //                      it again if 1) its temporal and 2) it will be computed in the
3553 //                      agggregate flush code.
3554
3555 //              Unpack the partial fcns ref'd by the gb's and the aggr defs.
3556   for(p=gb_fcns_start;p<gb_fcns_end;p++){
3557     if(is_partial_fcn[p]){
3558                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3559                 ret += "\tif(retval) goto end;\n";
3560         }
3561   }
3562   for(p=ag_fcns_start;p<ag_fcns_end;p++){
3563     if(is_partial_fcn[p]){
3564                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3565                 ret += "\tif(retval) goto end;\n";
3566         }
3567   }
3568
3569   // increment the counter of accepted tuples
3570   ret += "\n\t#ifdef LFTA_STATS\n";
3571   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3572   ret += "\t#endif\n\n";
3573
3574   ret += "/*\t\tTest if the group is in the hash table \t*/\n";
3575 //                      Compute the values of the group-by variables.
3576   for(g=0;g<gb_tbl->size();g++){
3577           data_type *gdt = gb_tbl->get_data_type(g);
3578           if((! gdt->is_temporal()) || temporal_flush == ""){
3579
3580                   if(gdt->is_buffer_type()){
3581         //                              NOTE : if the SE defining the gb is anything
3582         //                              other than a ref to a variable, this will generate
3583         //                              illegal code.  To be resolved with Spatch.
3584                         sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
3585                                 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
3586                         ret += tmpstr;
3587                         sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
3588                                 gdt->get_buffer_assign_copy().c_str(), g, g);
3589                   }else{
3590                         sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
3591                   }
3592                   ret += tmpstr;
3593           }
3594   }
3595   ret += "\n";
3596
3597 //                      A quick aside : if any of the GB attrs are temporal,
3598 //                      test for change and flush if any change occurred.
3599 //                      We've already computed the flush code,
3600 //                      Put it here if this is not a real time query.
3601 //                      We've already unpacked all column refs, so no need to
3602 //                      do it again here.
3603
3604         string rt_level = fs->get_val_of_def("real_time");
3605         if(rt_level == "" && temporal_flush != ""){
3606                 ret += temporal_flush;
3607         }
3608
3609 //                      Compute the hash bucket
3610         if(gb_tbl->size() > 0){
3611                 ret += "\thashval = ";\
3612                 for(g=0;g<gb_tbl->size();g++){
3613                   if(g>0) ret += " ^ ";
3614                   data_type *gdt = gb_tbl->get_data_type(g);
3615                   if(gdt->is_buffer_type()){
3616                         sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
3617                                 gdt->get_type_str().c_str(), g);
3618                   }else{
3619                         sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
3620                                 gdt->get_type_str().c_str(), g);
3621                   }
3622                   ret += tmpstr;
3623                 }
3624                 ret += ";\n";
3625                 ret += "\thash2 = ((hashval * "+hash_nums[g%NRANDS]+") >> 32) & SLOT_HASH_BITS;\n";
3626         ret+="\tprobe = (hashval >> 32) & (t->max_aggrs-1);\n";
3627         }else{
3628                 ret+="\tprobe = 0;\n";
3629                 ret+="\thash2 = 0;\n\n";
3630         }
3631
3632 //              Does the lfta reference a udaf?
3633           bool has_udaf = false;
3634           for(a=0;a<aggr_tbl->size();a++){
3635                   if(! aggr_tbl->is_builtin(a)) has_udaf = true;
3636           }
3637
3638 //              Scan for a match, or alternatively the best slot.
3639 //              Currently, hardcode 5 tests.
3640         ret +=
3641 "       gen_val = t->generation & SLOT_GEN_BITS;\n"
3642 "       match_found = 0;\n"
3643 "       best_slot = probe;\n"
3644 "       for(i=0;i<5 && match_found == 0;i++){\n"
3645 "               if((t->aggr_table_hashmap[probe] & SLOT_FILLED) && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_HASH_BITS) == hash2 ){\n"
3646 ;
3647         if(gb_tbl->size()>0){
3648                 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
3649                 ret+="\t\tif(";
3650                 string rhs_op, lhs_op;
3651                 for(g=0;g<gb_tbl->size();g++){
3652                   if(g>0) ret += " && ";
3653                   ret += "(";
3654                   sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
3655                   sprintf(tmpstr,"t->aggr_table[probe].gb_var%d",g); rhs_op = tmpstr;
3656                   ret += generate_equality_test(lhs_op,rhs_op,gb_tbl->get_data_type(g));
3657                   ret += ")";
3658                 }
3659          }
3660          ret += "){\n"
3661 "                       match_found = 1;\n"
3662 "                       best_slot = probe;\n"
3663 "               }\n"
3664 "       }\n"
3665 "//             Rate slots in case no match found: prefer empty, then full but old slots\n"
3666 "       if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3667 "               if((t->aggr_table_hashmap[probe] & SLOT_FILLED)==0)\n"
3668 "                       best_slot = probe;\n"
3669 "               }else{\n"
3670 "                       if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) != gen_val){\n"
3671 "                               best_slot = probe;\n"
3672 "                       }\n"
3673 "               }\n"
3674 "               probe++;\n"
3675 "               if(probe >= t->max_aggrs)\n"
3676 "                       probe=0;\n"
3677 "       }\n"
3678 "       if(match_found){\n"
3679 ;
3680         ret += generate_gb_update(node_name, schema, "best_slot",has_udaf);
3681         ret +=
3682 "       }else{\n"
3683 "               if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3684 ;
3685 printf("sgah_qpn name is %s, disorder is %d\n",fs->node_name.c_str(),((sgah_qpn *)fs)->lfta_disorder);
3686         if(((sgah_qpn *)fs)->lfta_disorder <= 1){
3687                 ret +=
3688 "                       if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS)==gen_val){\n"
3689 "                               if((";
3690                 bool first_g = true;
3691                 for(int g=0;g<gb_tbl->size();g++){
3692                         data_type *gdt = gb_tbl->get_data_type(g);
3693                         if(gdt->is_temporal()){
3694                                 if(first_g) first_g = false; else ret+=" + ";
3695                                 ret += "(gb_attr_"+int_to_string(g)+" - t->aggr_table[best_slot].gb_var"+int_to_string(g)+")";
3696                         }
3697                 }
3698                 ret += ") == 0 ){\n";
3699
3700                 ret +=
3701 "                                       fta_aggr_flush_old_"+ node_name+"(f,t->max_aggrs);\n"
3702 "                               }\n"
3703 "                       }\n"
3704 ;
3705         }
3706
3707         ret += generate_tuple_from_aggr(node_name,schema,"best_slot");
3708         ret +=
3709 "\t\t\t#ifdef LFTA_STATS\n"
3710 "\t\t\tif((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val)\n"
3711 "\t\t\t\tt->collision_cnt++;\n\n"
3712 "\t\t\t#endif\n\n"
3713 "\t\t}\n"
3714 ;
3715         ret += generate_init_group(schema,"best_slot");
3716
3717
3718           ret += "\t}\n";
3719
3720         return ret;
3721 }
3722
3723
3724
3725 string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, bool is_wj, set<unsigned int> &s_pids){
3726
3727         string ret="static gs_retval_t accept_packet_"+node_name+
3728                 "(struct FTA *f, FTAID * ftaid, void *pkt, gs_int32_t sz){\n";
3729     ret += "\tstruct packet *p = (struct packet *)pkt;\n";
3730
3731   int a;
3732
3733 //                      Define all of the variables needed by this
3734 //                      procedure.
3735
3736
3737 //                      Gather all column references, need to define unpacking variables.
3738   int w,s;
3739   col_id_set cid_set;
3740   col_id_set::iterator csi;
3741
3742 //              If its a filter join, rebind all colrefs
3743 //              to the first range var, to avoid double unpacking.
3744
3745   if(is_fj){
3746     for(w=0;w<where.size();++w)
3747                 reset_pr_col_ids_tblvars(where[w]->pr, gb_tbl);
3748     for(s=0;s<sl_list.size();s++)
3749                 reset_se_col_ids_tblvars(sl_list[s], gb_tbl);
3750   }
3751
3752   for(w=0;w<where.size();++w){
3753         if(is_wj || is_fj || s_pids.count(w) == 0)
3754                 gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
3755   }
3756   for(s=0;s<sl_list.size();s++){
3757         gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
3758   }
3759
3760   int g;
3761   if(gb_tbl != NULL){
3762         for(g=0;g<gb_tbl->size();g++)
3763           gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
3764   }
3765
3766   //                    Variables for unpacking attributes.
3767   ret += "/*\t\tVariables for unpacking attributes\t*/\n";
3768   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
3769     int schref = (*csi).schema_ref;
3770         int tblref = (*csi).tblvar_ref;
3771     string field = (*csi).field;
3772     data_type dt(schema->get_type_name(schref,field));
3773     sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
3774         field.c_str(), tblref);
3775     ret += tmpstr;
3776   }
3777
3778   ret += "\n\n";
3779
3780 //                      Variables that are always needed
3781   ret += "/*\t\tVariables which are always needed\t*/\n";
3782   ret += "\tgs_retval_t retval;\n";
3783   ret += "\tgs_int32_t tuple_size, tuple_pos, lfta_bailout;\n";
3784   ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
3785
3786   ret+="\tstruct "+generate_fta_name(node_name)+" *t = (struct "+generate_fta_name(node_name)+"*) f;\n\n";
3787
3788
3789 //                      Variables needed for aggregation queries.
3790   if(is_aggr_query){
3791           ret += "\n/*\t\tVariables for aggregation\t*/\n";
3792           ret+="\tunsigned int i, probe;\n";
3793           ret+="\tunsigned int gen_val, match_found, best_slot;\n";
3794           ret+="\tgs_uint64_t hashval, hash2;\n";
3795 //                      Variables for storing group-by attribute values.
3796           if(gb_tbl->size() > 0)
3797                 ret += "/*\t\tGroup-by attributes\t*/\n";
3798           for(g=0;g<gb_tbl->size();g++){
3799                 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3800                 ret += tmpstr;
3801                 data_type *gdt = gb_tbl->get_data_type(g);
3802                 if(gdt->is_buffer_type()){
3803                   sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3804                   ret += tmpstr;
3805                 }
3806           }
3807           ret += "\n";
3808 //                      Temporaries for min/max
3809           string aggr_tmp_str = "";
3810           for(a=0;a<aggr_tbl->size();a++){
3811                 string aggr_op = aggr_tbl->get_op(a);
3812                 if(aggr_op == "MIN" || aggr_op == "MAX"){
3813                         sprintf(tmpstr,"\t%s aggr_tmp_%d;\n",aggr_tbl->get_data_type(a)->get_cvar_type().c_str(),a);
3814                         aggr_tmp_str.append(tmpstr);
3815                 }
3816           }
3817           if(aggr_tmp_str != ""){
3818                 ret += "/*\t\tTemp vars for BUFFER aggregates\t*/\n";
3819                 ret += aggr_tmp_str;
3820                 ret += "\n";
3821           }
3822 //              Variables for udaf output temporaries
3823         bool no_udaf = true;
3824         for(a=0;a<aggr_tbl->size();a++){
3825                 if(! aggr_tbl->is_builtin(a)){
3826                         if(no_udaf){
3827                                 ret+="/*\t\tUDAF output vars.\t*/\n";
3828                                 no_udaf = false;
3829                         }
3830                         int afcn_id = aggr_tbl->get_fcn_id(a);
3831                         data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
3832                         sprintf(tmpstr,"udaf_ret%d", a);
3833                         ret+="\t"+adt->make_cvar(tmpstr)+";\n";
3834                 }
3835         }
3836   }
3837
3838 //                      Variables needed for a filter join query
3839   if(fs->node_type() == "filter_join"){
3840         filter_join_qpn *fjq = (filter_join_qpn *)fs;
3841         bool uses_bloom = fjq->use_bloom;
3842         ret += "/*\t\tJoin fields\t*/\n";
3843         for(g=0;g<fjq->hash_eq.size();g++){
3844                 sprintf(tmpstr,"\t%s s_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_right_se()->get_data_type()->get_cvar_type().c_str(),g);
3845                 ret += tmpstr;
3846                 sprintf(tmpstr,"\t%s r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g);
3847                 ret += tmpstr;
3848           }
3849         if(uses_bloom){
3850                 ret +=
3851 "  /*           Variables for fj bloom filter   */ \n"
3852 "\tunsigned int i=0,j=0,k=0, b, bf_clean = 0, tmp_i, found; \n"
3853 "\tunsigned int bucket, bucket0, bucket1, bucket2;\n"
3854 "\tlong long int curr_fj_ts;\n"
3855 "\tlong long int curr_bin, the_bin;\n"
3856 "\n"
3857 ;
3858         }else{
3859                 ret +=
3860 "  /*           Variables for fj join table     */ \n"
3861 "\tunsigned int i, bucket, found; \n"
3862 "\tunsigned int bucket1, the_bucket;\n"
3863 "       long long int curr_fj_ts;\n"
3864 "\n"
3865 ;
3866         }
3867   }
3868
3869
3870   if(fs->node_type() == "watch_join"){
3871         watch_join_qpn *wlq = (watch_join_qpn *)fs;
3872         ret += "/*\t\tJoin fields\t*/\n";
3873         for(int k=0;k<wlq->key_flds.size(); ++k){
3874                 string kfld = wlq->key_flds[k];
3875                 ret += "\t"+wlq->hash_eq[kfld]->pr->get_right_se()->get_data_type()->get_cvar_type()+" s_equijoin_"+kfld+";\n";
3876                 ret += "\t"+wlq->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_cvar_type()+" r_equijoin_"+kfld+";\n";
3877           }
3878         ret +=
3879 "  /*           Variables for wl join table     */ \n"
3880 "\tunsigned int i, bucket;\n"
3881 "\tunsigned long long int hash; \n";
3882         string wl_schema = wlq->from[1]->get_schema_name();
3883         string wl_elem_str = generate_watchlist_element_name(wl_schema);
3884         ret += "\tstruct "+wl_elem_str+" *rec = NULL;\n";
3885 "\n"
3886 ;
3887   }
3888
3889
3890 //              Variables needed to store selected attributes of BUFFER type
3891 //              temporarily, in order to compute their size for storage
3892 //              in an output tuple.
3893
3894   string select_var_defs = "";
3895   for(int s=0;s<sl_list.size();s++){
3896         data_type *sdt = sl_list[s]->get_data_type();
3897         if(sdt->is_buffer_type()){
3898           sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
3899           select_var_defs.append(tmpstr);
3900         }
3901   }
3902   if(select_var_defs != ""){
3903         ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
3904     ret += select_var_defs;
3905   }
3906
3907 //              Variables to store results of partial functions.
3908   int p;
3909   if(partial_fcns.size()>0){
3910           ret += "/*\t\tVariables for storing results of partial functions. \t*/\n";
3911           for(p=0;p<partial_fcns.size();++p){
3912                 if(is_partial_fcn[p] || (!is_aggr_query && fcn_ref_cnt[p] >1)){
3913                   sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
3914                     partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
3915                   ret += tmpstr;
3916                   if(!is_aggr_query && fcn_ref_cnt[p] >1){
3917                         ret += "\tint fcn_ref_cnt_"+int_to_string(p)+" = 0;\n";
3918                   }
3919                 }
3920           }
3921
3922           if(is_aggr_query) ret += "\tint unpack_failed = 0;\n";
3923           ret += "\n";
3924   }
3925
3926 //              variable to hold packet struct  //
3927         if(packed_return){
3928                 ret += "\t struct "+node_name+"_input_struct *"+node_name+"_input_struct_var;\n";
3929         }
3930
3931
3932   ret += "\t#ifdef LFTA_STATS\n";
3933 // variable to store counter of cpu cycles spend in accept_tuple
3934         ret += "\tgs_uint64_t start_cycle = rdtsc();\n";
3935 // increment counter of received tuples
3936         ret += "\tt->in_tuple_cnt++;\n";
3937   ret += "\t#endif\n";
3938
3939
3940 //      -------------------------------------------------
3941 //              If the packet is "packet", test if its for this lfta,
3942 //              and if so load it into its struct
3943
3944         if(packed_return){
3945                 ret+="\n/*  packed tuple : test and load. \t*/\n";
3946                 ret+="\t"+node_name+"_input_struct_var = (struct "+node_name+"_input_struct *) pkt;\n";
3947                 ret+="\tif("+node_name+"_input_struct_var->__lfta_id_fm_nic__ != "+int_to_string(global_id) + ")\n";
3948                 ret+="\t\tgoto end;\n\n";
3949         }
3950
3951
3952
3953   col_id_set unpacked_cids;     //      Keep track of the cols that have been unpacked.
3954
3955   string temporal_flush;
3956   if(is_aggr_query)
3957         ret += generate_aggr_accept_prelim(fs, node_name, schema, unpacked_cids, temporal_flush);
3958   else {        // non-aggregation operators
3959
3960 // Unpack all the temporal attributes referenced in select clause
3961 // and update the last value of the attribute
3962         col_id_set temp_cids;           //      col ids of temp attributes in select clause
3963
3964         for(s=0;s<sl_list.size();s++){
3965                 data_type *sdt = sl_list[s]->get_data_type();
3966                 if (sdt->is_temporal()) {
3967                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
3968                 }
3969         }
3970 //                      If this is a filter join,
3971 //                      ensure that the temporal range field is unpacked.
3972         if(is_fj){
3973                 col_id window_var_cid(((filter_join_qpn *)fs)->temporal_var);
3974                 if(temp_cids.count(window_var_cid)==0)
3975                         temp_cids.insert(window_var_cid);
3976         }
3977
3978         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
3979                 if(unpacked_cids.count((*csi)) == 0){
3980                         int tblref = (*csi).tblvar_ref;
3981                         int schref = (*csi).schema_ref;
3982                         string field = (*csi).field;
3983                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3984                         sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
3985                         ret += tmpstr;
3986
3987                         unpacked_cids.insert( (*csi) );
3988                 }
3989         }
3990
3991   }
3992
3993   vector<cnf_elem *> filter = fs->get_filter_clause();
3994 //              Test the filter predicate (some query types have additional preds).
3995   if(filter.size() > 0 && !is_wj){      // watchlist join does specialized processing
3996
3997 //              Sort by evaluation cost.
3998 //              First, estimate evaluation costs
3999 //              Eliminate predicates covered by the prefilter (those in s_pids).
4000 //              I need to do it before the sort becuase the indices refer
4001 //              to the position in the unsorted list./
4002         vector<cnf_elem *> tmp_wh;
4003         for(w=0;w<filter.size();++w){
4004                 if(s_pids.count(w) == 0){
4005                         compute_cnf_cost(filter[w],Ext_fcns);
4006                         tmp_wh.push_back(filter[w]);
4007                 }
4008         }
4009         filter = tmp_wh;
4010
4011         sort(filter.begin(), filter.end(), compare_cnf_cost());
4012
4013 //              Now generate the predicates.
4014         for(w=0;w<filter.size();++w){
4015                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,filter[w]->cost);
4016                 ret += tmpstr;
4017 //                      Find the set of variables accessed in this CNF elem,
4018 //                      but in no previous element.
4019                 col_id_set this_pred_cids;
4020                 gather_pr_col_ids(filter[w]->pr, this_pred_cids, gb_tbl);
4021                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4022                         if(unpacked_cids.count( (*csi) ) == 0){
4023                         int tblref = (*csi).tblvar_ref;
4024                         int schref = (*csi).schema_ref;
4025                         string field = (*csi).field;
4026                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4027                                 unpacked_cids.insert( (*csi) );
4028                         }
4029                 }
4030 //                      Find partial fcns ref'd in this cnf element
4031                 set<int> pfcn_refs;
4032                 collect_partial_fcns_pr(filter[w]->pr, pfcn_refs);
4033 //                      Since set<..> is a "Sorted Associative Container",
4034 //                      we can walk through it in sorted order by walking from
4035 //                      begin() to end().  (and the partial fcns must be
4036 //                      evaluated in this order).
4037                 set<int>::iterator si;
4038                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
4039                         if(fcn_ref_cnt[(*si)] > 1){
4040                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
4041                         }
4042                         if(is_partial_fcn[(*si)]){
4043                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
4044                                 ret += "\t\tif(retval) goto end;\n";
4045                         }
4046                         if(fcn_ref_cnt[(*si)] > 1){
4047                                 if(!is_partial_fcn[(*si)]){
4048                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
4049                                 }
4050                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
4051                                 ret += "\t}\n";
4052                         }
4053                 }
4054
4055                 ret += "\tif( !("+generate_predicate_code(filter[w]->pr,schema)+
4056                                 ") ) goto end;\n";
4057         }
4058   }else{
4059           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
4060   }
4061
4062
4063 //                      We've passed the WHERE clause,
4064 //                      unpack the remainder of the accessed fields.
4065   if(is_fj){
4066         ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
4067         vector<cnf_elem *> h_eq = ((filter_join_qpn *)fs)-> hash_eq;
4068         for(w=0;w<h_eq.size();++w){
4069                 col_id_set this_pred_cids;
4070                 gather_pr_col_ids(h_eq[w]->pr, this_pred_cids, gb_tbl);
4071                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4072                         if(unpacked_cids.count( (*csi) ) == 0){
4073                                 int tblref = (*csi).tblvar_ref;
4074                                 int schref = (*csi).schema_ref;
4075                                 string field = (*csi).field;
4076                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4077                                 unpacked_cids.insert( (*csi) );
4078                         }
4079                 }
4080         }
4081   }else if(is_wj){              // STOPPED HERE move this to wj main body
4082 /*
4083         ret += "\n//\tPassed the WHERE clause, unpack the hash fields. \n";
4084         map<string, cnf_elem *> h_eq = ((watch_join_qpn *)fs)-> hash_eq;
4085         vector<string> kflds = ((watch_join_qpn *)fs)->key_flds;
4086         for(w=0;w<kflds.size();++w){
4087                 string kfld = kflds[w];
4088                 col_id_set this_pred_cids;
4089                 gather_pr_col_ids(h_eq[kfld]->pr, this_pred_cids, gb_tbl);
4090                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4091                         if(unpacked_cids.count( (*csi) ) == 0){
4092                                 int tblref = (*csi).tblvar_ref;
4093                                 int schref = (*csi).schema_ref;
4094                                 string field = (*csi).field;
4095                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4096                                 unpacked_cids.insert( (*csi) );
4097                         }
4098                 }
4099         }
4100 */
4101   }else{
4102           ret += "\n/*\tPassed the WHERE clause, unpack the rest of the accessed fields. */\n";
4103
4104           for(csi=cid_set.begin();csi!=cid_set.end();++csi){
4105                 if(unpacked_cids.count( (*csi) ) == 0){
4106                         int schref = (*csi).schema_ref;
4107                         int tblref = (*csi).tblvar_ref;
4108                         string field = (*csi).field;
4109                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4110                         unpacked_cids.insert( (*csi) );
4111                 }
4112           }
4113   }
4114
4115
4116 //////////////////
4117 //////////////////      After this, the query types
4118 //////////////////      are processed differently.
4119
4120   if(!is_aggr_query && !is_fj & !is_wj)
4121         ret += generate_sel_accept_body(fs, node_name, schema);
4122   else if(is_aggr_query)
4123         ret += generate_aggr_accept_body(fs, node_name, schema, temporal_flush);
4124   else{
4125         if(is_fj)
4126                 ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
4127         else
4128                 ret += generate_wj_accept_body((watch_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
4129   }
4130         
4131
4132
4133 //              Finish up.
4134
4135    ret += "\n\tend:\n";
4136   ret += "\t#ifdef LFTA_STATS\n";
4137         ret+= "\tt->cycle_cnt += rdtsc() - start_cycle;\n";
4138   ret += "\t#endif\n";
4139    ret += "\n\treturn 1;\n}\n\n";
4140
4141         return(ret);
4142 }
4143
4144
4145 string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, bool is_aggr_query, bool is_fj, bool uses_bloom){
4146         int g, cl;
4147
4148         string ret = "struct FTA * "+generate_alloc_name(node_name) +
4149            "(struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command,  gs_int32_t sz, void *value){\n";
4150
4151         ret+="\tstruct "+generate_fta_name(node_name)+"* f;\n";
4152         ret+="\tint i;\n";
4153         ret += "\n";
4154         ret+="\tif((f=(struct "+generate_fta_name(node_name)+" *)fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n";
4155
4156 //                              assign a streamid to fta instance
4157         ret+="\t/* assign a streamid */\n";
4158         ret+="\tf->f.ftaid = ftaid;\n";
4159         ret+="\tf->f.ftaid.streamid = (gs_p_t)f;\n";
4160         ret+="\tgslog(LOG_INFO,\"Lfta "+node_name+" has FTAID {ip=%u,port=%u,index=%u,streamid=%u}\\n\",f->f.ftaid.ip,f->f.ftaid.port,f->f.ftaid.index,f->f.ftaid.streamid);\n";
4161
4162         if(is_aggr_query){
4163                 ret += "\tf->n_aggrs = 0;\n";
4164
4165                 ret += "\tf->max_aggrs = ";
4166
4167 //                              Computing the number of aggregate blocks is a little
4168 //                              tricky.  If there are no GB attrs, or if all GB attrs
4169 //                              are temporal, then use a single aggregate block, else
4170 //                              use a default value (10).  A user specification overrides
4171 //                              this logic.
4172                 bool single_group = true;
4173                 for(g=0;g<gb_tbl->size();g++){
4174                         data_type *gdt = gb_tbl->get_data_type(g);
4175                         if(! gdt->is_temporal() ){
4176                                 single_group = false;
4177                         }
4178                 }
4179                 string max_aggr_str = fs->get_val_of_def("aggregate_slots");
4180                 int max_aggr_i = atoi(max_aggr_str.c_str());
4181                 if(max_aggr_i <= 0){
4182                         if(single_group)
4183                                 ret += "2";
4184                         else
4185                                 ret += int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
4186                 }else{
4187                         unsigned int naggrs = 1;                // make it power of 2
4188                         unsigned int nones = 0;
4189                         while(max_aggr_i){
4190                                 if(max_aggr_i&1)
4191                                         nones++;
4192                                 naggrs = naggrs << 1;
4193                                 max_aggr_i = max_aggr_i >> 1;
4194                         }
4195                         if(nones==1)            // in case it was already a power of 2.
4196                                 naggrs/=2;
4197                         ret += int_to_string(naggrs);
4198                 }
4199                 ret += ";\n";
4200
4201                 ret+="\tif ((f->aggr_table = (struct "+generate_aggr_struct_name(node_name)+" *)sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n";
4202                 ret+="\t\treturn(0);\n";
4203                 ret+="\t}\n\n";
4204 //              ret+="/* compute how many integers we need to store the hashmap */\n";
4205 //              ret+="\tf->bitmap_size = (f->max_aggrs % (sizeof(gs_uint32_t) * 4)) ? (f->max_aggrs / (sizeof(gs_uint32_t) * 4) + 1) : (f->max_aggrs / (sizeof(gs_uint32_t) * 4));\n\n";
4206                 ret+="\tif ((f->aggr_table_hashmap = (gs_uint32_t *)sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n";
4207                 ret+="\t\treturn(0);\n";
4208                 ret+="\t}\n";
4209                 ret+="/*\t\tfill bitmap with zero \t*/\n";
4210                 ret+="\tfor (i = 0; i < f->max_aggrs; ++i)\n";
4211                 ret+="\t\tf->aggr_table_hashmap[i] = 0;\n";
4212                 ret+="\tf->generation=0;\n";
4213                 ret+="\tf->flush_pos = f->max_aggrs;\n";
4214
4215                 ret += "\tf->flush_ctr = 0;\n";
4216
4217         }
4218
4219         if(is_fj){
4220                 if(uses_bloom){
4221                         ret+="\tf->first_exec = 1;\n";
4222                         unsigned int n_bloom = 11;
4223                         string n_bloom_str = fs->get_val_of_def("num_bloom");
4224                         int tmp_n_bloom = atoi(n_bloom_str.c_str());
4225                         if(tmp_n_bloom>0)
4226                                 n_bloom = tmp_n_bloom+1;
4227
4228                         unsigned int window_len = ((filter_join_qpn *)fs)->temporal_range;
4229                         if(window_len < n_bloom){
4230                                 n_bloom = window_len+1;
4231                         }
4232
4233                         int bf_exp_size = 12;  // base-2 log of number of bits
4234                         string bloom_len_str = fs->get_val_of_def("bloom_size");
4235                         int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
4236                         if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
4237                                 bf_exp_size = tmp_bf_exp_size;
4238                         }
4239                         int bf_bit_size = 1 << 12;
4240                         int bf_byte_size = bf_bit_size / (8*sizeof(char));
4241
4242                         int bf_tot = n_bloom*bf_byte_size;
4243                         ret+="\tif ((f->bf_table = (unsigned char *)sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n";
4244                         ret+="\t\treturn(0);\n";
4245                         ret+="\t}\n";
4246                         ret +=
4247 "       for(i=0;i<"+int_to_string(bf_tot)+";i++)\n"
4248 "               f->bf_table[i] = 0;\n"
4249 ;
4250                 }else{
4251                         unsigned int ht_size = 4096;
4252                         string ht_size_s = fs->get_val_of_def("aggregate_slots");
4253                         int tmp_ht_size = atoi(ht_size_s.c_str());
4254                         if(tmp_ht_size > 1024){
4255                                 unsigned int hs = 1;            // make it power of 2
4256                                 while(tmp_ht_size){
4257                                         hs =hs << 1;
4258                                         tmp_ht_size = tmp_ht_size >> 1;
4259                                 }
4260                                 ht_size = hs;
4261                         }
4262                         ret+="\tif ((f->join_table = (struct "+generate_fj_struct_name(node_name)+" *) sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n";
4263                         ret+="\t\treturn(0);\n";
4264                         ret+="\t}\n\n";
4265                         ret +=
4266 "       for(i=0;i<"+int_to_string(ht_size)+";i++)\n"
4267 "               f->join_table[i].ts = 0;\n"
4268 ;
4269                 }
4270         }
4271
4272 //                      Initialize the complex literals (which might be handles).
4273
4274         for(cl=0;cl<complex_literals->size();cl++){
4275                 literal_t *l = complex_literals->get_literal(cl);
4276 //              sprintf(tmpstr,"\tf->complex_literal_%d = ",cl);
4277 //              ret += tmpstr + l->to_C_code() + ";\n";
4278                 sprintf(tmpstr,"&(f->complex_literal_%d)",cl);
4279                 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
4280         }
4281
4282         ret += "\n";
4283
4284 //                      Initialize the last seen values of temporal attributes to min(max) value of
4285 //                      their respective type
4286 //                      Create places to hold the last values of temporal attributes referenced in select clause
4287
4288
4289         col_id_set temp_cids;           //      col ids of temp attributes in select clause
4290
4291         int s;
4292         col_id_set::iterator csi;
4293
4294         for(s=0;s<sl_list.size();s++){
4295                 data_type *sdt = sl_list[s]->get_data_type();
4296                 if (sdt->is_temporal()) {
4297                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
4298                 }
4299         }
4300
4301         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
4302                 int tblref = (*csi).tblvar_ref;
4303                 int schref = (*csi).schema_ref;
4304                 string field = (*csi).field;
4305                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
4306                 if (dt.is_increasing()) {
4307                         sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_min_literal().c_str());
4308                         ret += tmpstr;
4309                 } else if (dt.is_decreasing()) {
4310                         sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_max_literal().c_str());
4311                         ret += tmpstr;
4312                 }
4313         }
4314
4315 //      initialize last seen values of temporal groubpy variables
4316         if(is_aggr_query){
4317                 for(g=0;g<gb_tbl->size();g++){
4318                         data_type *dt = gb_tbl->get_data_type(g);
4319                         if(dt->is_temporal()){
4320 /*
4321                                 fprintf(stderr,"group by attribute %s is temporal, ",
4322                                                 gb_tbl->get_name(g).c_str());
4323 */
4324                                 if(dt->is_increasing()){
4325                                         sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_min_literal().c_str());
4326                                 }else{
4327                                         sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_max_literal().c_str());
4328                                 }
4329                                 ret += tmpstr;
4330                         }
4331                 }
4332         }
4333
4334         ret += "\tf->f.alloc_fta="+generate_alloc_name(node_name)+";\n";
4335         ret+="\tf->f.free_fta=free_fta_"+node_name+";\n";
4336         ret+="\tf->f.control_fta=control_fta_"+node_name+";\n";
4337         ret+="\tf->f.accept_packet=accept_packet_"+node_name+";\n";
4338         ret+="\tf->f.clock_fta=clock_fta_"+node_name+";\n\n";
4339
4340 //                      Initialize runtime stats
4341         ret+="\tf->in_tuple_cnt = 0;\n";
4342         ret+="\tf->out_tuple_cnt = 0;\n";
4343         ret+="\tf->out_tuple_sz = 0;\n";
4344         ret+="\tf->accepted_tuple_cnt = 0;\n";
4345         ret+="\tf->cycle_cnt = 0;\n";
4346         ret+="\tf->collision_cnt = 0;\n";
4347         ret+="\tf->eviction_cnt = 0;\n";
4348         ret+="\tf->sampling_rate = 1.0;\n";
4349
4350         ret+="\tf->trace_id = 0;\n\n";
4351     if(param_tbl->size() > 0){
4352         ret+=
4353 "\tif(load_params_"+node_name+"(f, sz, value, 1)){\n"
4354 "#ifndef LFTA_IN_NIC\n"
4355 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small (%d). This query does not have valid parameters, bailing out.\\n\",sz);\n"
4356 "#else\n"
4357 "\t\t}\n"
4358 "#endif\n"
4359 "\t\t\treturn 0;\n"
4360 "\t\t}\n";
4361         }
4362
4363 //                      Register the pass-by-handle parameters
4364     int ph;
4365     for(ph=0;ph<param_handle_table.size();++ph){
4366                 data_type pdt(param_handle_table[ph]->type_name);
4367                 sprintf(tmpstr,"\tf->handle_param_%d = %s((struct FTA *)f,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
4368                 switch(param_handle_table[ph]->val_type){
4369                 case cplx_lit_e:
4370                         ret += tmpstr;
4371                         if(pdt.is_buffer_type()) ret += "&(";
4372                         sprintf(tmpstr,"f->complex_literal_%d",param_handle_table[ph]->complex_literal_idx);
4373                         ret += tmpstr ;
4374                         if(pdt.is_buffer_type()) ret += ")";
4375                         ret +=  ");\n";
4376                         break;
4377                 case litval_e:
4378 //                              not complex, no constructor
4379                         ret += tmpstr;
4380                         ret += param_handle_table[ph]->litval->to_C_code("") + ");\n";
4381                         break;
4382                 case param_e:
4383 //                              query parameter handles are regstered/deregistered in the
4384 //                              load_params function.
4385 //                      ret += "t->param_"+param_handle_table[ph]->param_name;
4386                         break;
4387                 default:
4388                         fprintf(stderr, "INTERNAL ERROR unknown case found when processing pass-by-handle parameter table.\n");
4389                         exit(1);
4390                 }
4391         }
4392
4393         ret += "\treturn (struct FTA *) f;\n";
4394         ret += "}\n\n";
4395
4396         return(ret);
4397 }
4398
4399
4400
4401
4402 //////////////////////////////////////////////////////////////////
4403
4404 string generate_lfta_block(qp_node *fs, table_list *schema, int gid,
4405 //              map<string,string> &int_fcn_defs,
4406                 ext_fcn_list *Ext_fcns, string &schema_embed_str, ifq_t *ifdb, nic_property *nicp, set<unsigned int> &s_pids){
4407         bool is_aggr_query;
4408         int s,p,g;
4409         string retval;
4410
4411 /////////////////////////////////////////////////////////////
4412 ///             Do operator-generic processing, such as
4413 ///             gathering the set of referenced columns,
4414 ///             generating structures, etc.
4415
4416 //              Initialize globals to empty.
4417         gb_tbl = NULL; aggr_tbl = NULL;
4418         global_id = -1; nicprop = NULL;
4419         param_tbl = fs->get_param_tbl();
4420         sl_list.clear(); where.clear();
4421         partial_fcns.clear();
4422         fcn_ref_cnt.clear(); is_partial_fcn.clear();
4423         pred_class.clear(); pred_pos.clear();
4424         sl_fcns_start=sl_fcns_end=wh_fcns_start=wh_fcns_end=0;
4425         gb_fcns_start=gb_fcns_end=ag_fcns_start=ag_fcns_end=0;
4426
4427
4428 //              Does the lfta read packed results from the NIC?
4429         nicprop = nicp;                 // load into global
4430         global_id = gid;
4431     packed_return = false;
4432         if(nicp && nicp->option_exists("Return")){
4433                 if(nicp->option_value("Return") == "Packed"){
4434                         packed_return = true;
4435                 }else{
4436                         fprintf(stderr,"Warning, nic option value of Return=%s is not recognized, ignoring\n",nicp->option_value("Return").c_str());
4437                 }
4438         }
4439
4440
4441 //                      Extract data which defines the query.
4442 //                              complex literals gathered now.
4443         complex_literals = fs->get_cplx_lit_tbl(Ext_fcns);
4444         param_handle_table = fs->get_handle_param_tbl(Ext_fcns);
4445         string node_name = fs->get_node_name();
4446     bool is_fj = false, uses_bloom = false;
4447         bool is_wj = false;
4448         bool is_watch_tbl = false;
4449
4450
4451         if(fs->node_type() == "spx_qpn"){
4452                 is_aggr_query = false;
4453                 spx_qpn *spx_node = (spx_qpn *)fs;
4454                 sl_list = spx_node->get_select_se_list();
4455                 where = spx_node->get_where_clause();
4456                 gb_tbl = NULL;
4457                 aggr_tbl = NULL;
4458         } else
4459         if(fs->node_type() == "sgah_qpn"){
4460                 is_aggr_query = true;
4461                 sgah_qpn *sgah_node = (sgah_qpn *)fs;
4462                 sl_list = sgah_node->get_select_se_list();
4463                 where = sgah_node->get_where_clause();
4464                 gb_tbl = sgah_node->get_gb_tbl();
4465                 aggr_tbl = sgah_node->get_aggr_tbl();
4466
4467                 if((sgah_node->get_having_clause()).size() > 0){
4468                         fprintf(stderr,"Warning in LFTA %s: HAVING clause will be ignored.\n", fs->get_node_name().c_str());
4469                 }
4470         } else
4471         if(fs->node_type() == "filter_join"){
4472                 is_aggr_query = false;
4473         is_fj = true;
4474                 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
4475                 sl_list = fj_node->get_select_se_list();
4476                 where = fj_node->get_where_clause();
4477                 uses_bloom = fj_node->use_bloom;
4478                 gb_tbl = NULL;
4479                 aggr_tbl = NULL;
4480         }else
4481         if(fs->node_type() == "watch_join"){
4482                 is_aggr_query = false;
4483         is_wj = true;
4484                 watch_join_qpn *wl_node = (watch_join_qpn *)fs;
4485                 sl_list = wl_node->get_select_se_list();
4486                 where = wl_node->get_where_clause();
4487                 gb_tbl = NULL;
4488                 aggr_tbl = NULL;
4489         }else
4490         if(fs->node_type() == "watch_tbl_qpn"){
4491                 is_aggr_query = false;
4492         is_watch_tbl = true;
4493                 vector<scalarexp_t *> empty_sl_list;
4494                 vector<cnf_elem *> empty_where;
4495                 sl_list = empty_sl_list;
4496                 where = empty_where;
4497                 gb_tbl = NULL;
4498                 aggr_tbl = NULL;
4499         } else {
4500                 fprintf(stderr,"INTERNAL ERROR, unrecognized node type %s in generate_lfta_block\n", fs->node_type().c_str());
4501                 exit(1);
4502         }
4503
4504 //                      Build list of "partial functions", by clause.
4505 //                      NOTE : partial fcns are not handled well.
4506 //                      The act of searching for them associates the fcn call
4507 //                      in the SE with an index to an array.  Refs to the
4508 //                      fcn value are replaced with refs to the variable they are
4509 //                      unpacked into.  A more general tagging mechanism would be better.
4510
4511         int i;
4512         vector<bool> *pfunc_ptr = NULL;
4513         vector<int> *ref_cnt_ptr = NULL;
4514         if(!is_aggr_query){             // don't collect cacheable fcns on aggr query.
4515                 ref_cnt_ptr = &fcn_ref_cnt;
4516                 pfunc_ptr = &is_partial_fcn;
4517         }
4518
4519         sl_fcns_start = 0;
4520         for(i=0;i<sl_list.size();i++){
4521                 find_partial_fcns(sl_list[i], &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
4522         }
4523         wh_fcns_start = sl_fcns_end = partial_fcns.size();
4524         for(i=0;i<where.size();i++){
4525                 find_partial_fcns_pr(where[i]->pr, &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
4526         }
4527         gb_fcns_start = wh_fcns_end = partial_fcns.size();
4528         if(gb_tbl != NULL){
4529                 for(i=0;i<gb_tbl->size();i++){
4530                         find_partial_fcns(gb_tbl->get_def(i), &partial_fcns, NULL, NULL, Ext_fcns);
4531                 }
4532         }
4533         ag_fcns_start = gb_fcns_end = partial_fcns.size();
4534         if(aggr_tbl != NULL){
4535                 for(i=0;i<aggr_tbl->size();i++){
4536                         find_partial_fcns(aggr_tbl->get_aggr_se(i), &partial_fcns, NULL, &is_partial_fcn, Ext_fcns);
4537                 }
4538         }
4539         ag_fcns_end = partial_fcns.size();
4540
4541 //              Fill up the is_partial_fcn and fcn_ref_cnt arrays.
4542         if(is_aggr_query){
4543                 for(i=0; i<partial_fcns.size();i++){
4544                         fcn_ref_cnt.push_back(1);
4545                         is_partial_fcn.push_back(true);
4546                 }
4547         }
4548
4549 //              Unmark non-partial expensive functions referenced only once.
4550         for(i=0; i<partial_fcns.size();i++){
4551                 if(!is_partial_fcn[i] && fcn_ref_cnt[i] <= 1){
4552                         partial_fcns[i]->set_partial_ref(-1);
4553                 }
4554         }
4555
4556         node_name = normalize_name(node_name);
4557
4558         retval += generate_preamble(schema, /*int_fcn_defs,*/ node_name, schema_embed_str);
4559
4560         if(packed_return){              // generate unpack struct
4561                 vector<tablevar_t *> input_tbls = fs->get_input_tbls();
4562                 int schref = input_tbls[0]->get_schema_ref();
4563                 vector<string> refd_cols;
4564                 for(s=0;s<sl_list.size();++s){
4565                         gather_nicsafe_cols(sl_list[s],refd_cols, nicp, gb_tbl);
4566                 }
4567                 for(p=0;p<where.size();++p){
4568 //                              I'm not disabling these preds ...
4569                         gather_nicsafe_cols(where[p]->pr,refd_cols, nicp, gb_tbl);
4570                 }
4571                 if(gb_tbl){
4572                         for(g=0;g<gb_tbl->size();++g){
4573                           gather_nicsafe_cols(gb_tbl->get_def(g),refd_cols, nicp, gb_tbl);
4574                         }
4575                 }
4576                 sort(refd_cols.begin(), refd_cols.end());
4577                 retval += "struct "+node_name+"_input_struct{\n";
4578                 retval += "\tint __lfta_id_fm_nic__;\n";
4579                 int vsi;
4580                 for(vsi=0;vsi<refd_cols.size();++vsi){
4581                 data_type dt(schema->get_type_name(schref,refd_cols[vsi]));
4582                         retval+="\t"+dt.get_cvar_type()+" unpack_var_"+refd_cols[vsi]+";\n";
4583                 }
4584                 retval+="};\n\n";
4585         }
4586
4587
4588 /////////////////////////////////////////////////////
4589 //                      Common stuff unpacked, do some generation
4590
4591
4592         if(is_aggr_query)
4593           retval += generate_aggr_struct(node_name, gb_tbl, aggr_tbl);
4594         if(is_fj)
4595                 retval += generate_fj_struct((filter_join_qpn *)fs, node_name);
4596         if(is_watch_tbl){
4597                 retval += "\n\n// watchtable code here \n\n";
4598                 watch_tbl_qpn *wl_node = (watch_tbl_qpn *)fs;
4599                 retval += generate_watchlist_structs(node_name, wl_node->table_layout, wl_node->filename, wl_node->refresh_interval);
4600                 retval += generate_watchlist_load(node_name, wl_node->table_layout, wl_node->key_flds);
4601         }
4602         
4603         if(! is_watch_tbl){
4604                 retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, is_wj, uses_bloom, schema);
4605                 retval += generate_tuple_struct(node_name, sl_list) ;
4606
4607                 if(is_aggr_query)
4608                         retval += generate_fta_flush(node_name, schema, Ext_fcns) ;
4609                 if(param_tbl->size() > 0)
4610                         retval += generate_fta_load_params(node_name) ;
4611                 retval += generate_fta_free(node_name, is_aggr_query) ;
4612                 retval +=  generate_fta_control(node_name, schema, is_aggr_query) ;
4613                 retval +=  generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, is_wj, s_pids) ;
4614
4615         /* extract the value of Time_Correlation from interface definition */
4616                 int e,v;
4617                 string es;
4618                 unsigned time_corr;
4619                 vector<tablevar_t *> tvec =  fs->get_input_tbls();
4620                 vector<string> time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es);
4621                 if (time_corr_vec.empty())
4622                         time_corr = DEFAULT_TIME_CORR;
4623                 else
4624                         time_corr = atoi(time_corr_vec[0].c_str());
4625
4626                 retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query, is_wj) );
4627                 retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) );
4628         }
4629
4630   return(retval);
4631 }
4632
4633
4634
4635 int compute_snap_len(qp_node *fs, table_list *schema, string snap_type){
4636
4637 //              Initialize global vars
4638         gb_tbl = NULL;
4639         sl_list.clear(); where.clear();
4640
4641         
4642         if(fs->node_type() == "watch_tbl_qpn"){
4643                 return -1;
4644         }
4645
4646         if(fs->node_type() == "spx_qpn"){
4647                 spx_qpn *spx_node = (spx_qpn *)fs;
4648                 sl_list = spx_node->get_select_se_list();
4649                 where = spx_node->get_where_clause();
4650         }
4651         else if(fs->node_type() == "sgah_qpn"){
4652                 sgah_qpn *sgah_node = (sgah_qpn *)fs;
4653                 sl_list = sgah_node->get_select_se_list();
4654                 where = sgah_node->get_where_clause();
4655                 gb_tbl = sgah_node->get_gb_tbl();
4656         }
4657         else if(fs->node_type() == "filter_join"){
4658                 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
4659                 sl_list = fj_node->get_select_se_list();
4660                 where = fj_node->get_where_clause();
4661         }
4662         else if(fs->node_type() == "watch_join"){
4663                 watch_join_qpn *fj_node = (watch_join_qpn *)fs;
4664                 sl_list = fj_node->get_select_se_list();
4665                 where = fj_node->get_where_clause();
4666         } else{
4667                 fprintf(stderr,"INTERNAL ERROR, node type %s not recognized in compute_snap_len\n",fs->node_type().c_str());
4668                 exit(1);
4669         }
4670
4671 //                      Gather all column references, need to define unpacking variables.
4672   int w,s;
4673   col_id_set cid_set;
4674   col_id_set::iterator csi;
4675
4676   for(w=0;w<where.size();++w)
4677         gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
4678   for(s=0;s<sl_list.size();s++){
4679         gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
4680   }
4681
4682   int g;
4683   if(gb_tbl != NULL){
4684         for(g=0;g<gb_tbl->size();g++)
4685           gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
4686   }
4687
4688   //                    compute snap length
4689   int snap_len = -1;
4690   int n_snap=0;
4691   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4692     int schref = (*csi).schema_ref;
4693         int tblref = (*csi).tblvar_ref;
4694     string field = (*csi).field;
4695
4696         if(snap_type == "index"){
4697                 int pos = schema->get_field_idx(schref, field);
4698                 if(pos>snap_len) snap_len = pos;
4699                 n_snap++;
4700         }else{
4701                 param_list *field_params = schema->get_modifier_list(schref, field);
4702                 if(field_params->contains_key("snap_len")){
4703                         string fld_snap_str = field_params->val_of("snap_len");
4704                         int fld_snap;
4705                         if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
4706                                 if(fld_snap > snap_len) snap_len = fld_snap;
4707                                 n_snap++;
4708                         }else{
4709                                 fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
4710                         }
4711                 }
4712         }
4713   }
4714
4715   if(n_snap == cid_set.size()){
4716         return (snap_len);
4717   }else{
4718         return -1;
4719   }
4720
4721
4722 }
4723
4724 //              Function which computes an optimal
4725 //              set of unpacking functions.
4726
4727 void find_optimal_unpack_fcns(col_id_set &upref_cids, table_list *Schema, map<col_id, string,lt_col_id> &ucol_fcn_map){
4728         map<string, int> pfcn_count;
4729         map<string, int>::iterator msii;
4730         col_id_set::iterator cisi;
4731         set<string>::iterator ssi;
4732         string best_fcn;
4733
4734         while(ucol_fcn_map.size() < upref_cids.size()){
4735
4736 //                      Gather unpack functions referenced by unaccounted-for
4737 //                      columns, and increment their reference count.
4738                 pfcn_count.clear();
4739                 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
4740                         if(ucol_fcn_map.count((*cisi)) == 0){
4741                                 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
4742                                 for(ssi=ufcns.begin();ssi!=ufcns.end();++ssi)
4743                                         pfcn_count[(*ssi)]++;
4744                         }
4745                 }
4746
4747 //              Get the lowest cost per field function.
4748                 float min_cost = 0.0;
4749                 string best_fcn = "";
4750                 for(msii=pfcn_count.begin();msii!=pfcn_count.end();++msii){
4751                         int fcost = Schema->get_ufcn_cost((*msii).first);
4752                         if(fcost < 0){
4753                                 fprintf(stderr,"CONFIGURATION ERROR, unpack function %s either has negative cost or is not defined.\n",(*msii).first.c_str());
4754                                 exit(1);
4755                         }
4756                         float this_cost = (1.0*fcost)/(*msii).second;
4757                         if(msii == pfcn_count.begin() || this_cost < min_cost){
4758                                 min_cost = this_cost;
4759                                 best_fcn = (*msii).first;
4760                         }
4761                 }
4762                 if(best_fcn == ""){
4763                         fprintf(stderr,"ERROR, could not find a best field unpqacking function.\n");
4764                         exit(1);
4765                 }
4766
4767 //              Assign this function to the unassigned fcns which use it.
4768                 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
4769                         if(ucol_fcn_map.count((*cisi)) == 0){
4770                                 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
4771                                 if(ufcns.count(best_fcn)>0)
4772                                         ucol_fcn_map[(*cisi)] = best_fcn;
4773                         }
4774                 }
4775         }
4776 }
4777
4778
4779
4780 //              Generate an initial test test for the lfta
4781 //              Assume that the predicate references no external functions,
4782 //              and especially no partial functions,
4783 //              aggregates, internal functions.
4784 string generate_lfta_prefilter(vector<cnf_set *> &pred_list,
4785                 col_id_set &temp_cids, table_list *Schema, ext_fcn_list *Ext_fcns,
4786                 vector<col_id_set> &lfta_cols, vector<long long int> &lfta_sigs,
4787                 vector<int> &lfta_snap_lens, string iface){
4788   col_id_set tmp_cid_set, cid_set,upref_cids,upall_cids;
4789   col_id_set::iterator csi;
4790         int o,p,q;
4791         string ret;
4792
4793 //              Gather complex literals in the prefilter.
4794         cplx_lit_table *complex_literals = new cplx_lit_table();
4795         for(p=0;p<pred_list.size();++p){
4796                 find_complex_literal_pr(pred_list[p]->pr,Ext_fcns, complex_literals);
4797         }
4798
4799
4800 //              Find the combinable predicates
4801         vector<predicate_t *> pr_list;
4802         for(p=0;p<pred_list.size();++p){
4803         find_combinable_preds(pred_list[p]->pr,&pr_list, Schema, Ext_fcns);
4804         }
4805
4806 //              Analyze the combinable predicates to find the predicate classes.
4807         pred_class.clear();             // idx to equiv pred in equiv_list
4808         pred_pos.clear();               // idx to returned bitmask.
4809         vector<predicate_t *> equiv_list;
4810         vector<int> num_equiv;
4811
4812
4813         for(p=0;p<pr_list.size();++p){
4814                 for(q=0;q<equiv_list.size();++q){
4815                         if(is_equivalent_class_pred_base(equiv_list[q],pr_list[p],Schema,Ext_fcns))
4816                                 break;
4817                 }
4818                 if(q == equiv_list.size()){             // no equiv : create new
4819                         pred_class.push_back(equiv_list.size());
4820                         equiv_list.push_back(pr_list[p]);
4821                         pred_pos.push_back(0);
4822                         num_equiv.push_back(1);
4823
4824                 }else{                  // pr_list[p] is equivalent to pred q
4825                         pred_class.push_back(q);
4826                         pred_pos.push_back(num_equiv[q]);
4827                         num_equiv[q]++;
4828                 }
4829         }
4830
4831 //              Generate the variables which hold the common pred handles
4832         ret += "/*\t\tprefilter global vars.\t*/\n";
4833         for(q=0;q<equiv_list.size();++q){
4834                 for(p=0;p<=(num_equiv[q]/32);++p){
4835                         ret += "void *pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface+";\n";
4836                 }
4837         }
4838
4839 //              Struct to hold prefilter complex literals
4840         ret += "struct prefilter_complex_lit_struct_"+iface+" {\n";
4841         if(complex_literals->size() == 0)
4842                 ret += "\tint no_variable;\n";
4843         int cl;
4844         for(cl=0;cl<complex_literals->size();cl++){
4845                 literal_t *l = complex_literals->get_literal(cl);
4846                 data_type *dtl = new data_type( l->get_type() );
4847                 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
4848                 ret += tmpstr;
4849         }
4850         ret += "} prefilter_complex_lits_"+iface+";\n\n";
4851
4852
4853 //              Generate the prefilter initialziation code
4854         ret += "void init_lfta_prefilter_"+iface+"(){\n";
4855
4856 //              First initialize complex literals, if any.
4857         ret += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4858         for(cl=0;cl<complex_literals->size();cl++){
4859                 literal_t *l = complex_literals->get_literal(cl);
4860                 sprintf(tmpstr,"&(t->complex_literal_%d)",cl);
4861                 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
4862         }
4863
4864
4865         set<int> epred_seen;
4866         for(p=0;p<pr_list.size();++p){
4867                 int q = pred_class[p];
4868 //printf("\tq=%d\n",q);
4869                 if(epred_seen.count(q)>0){
4870                         ret += "\tregister_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4871                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4872                         vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4873                         for(o=0;o<op_list.size();++o){
4874                                 if(! cl_op[o]){
4875                                         ret += generate_se_code(op_list[o],Schema)+", ";
4876                                 }
4877                         }
4878                         ret += "pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+","+int_to_string(pred_pos[p]%32)+");\n";
4879                         epred_seen.insert(q);
4880                 }else{
4881                         ret += "\tpref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+" = (void *)register_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4882                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4883                         vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4884                         for(o=0;o<op_list.size();++o){
4885                                 if(! cl_op[o]){
4886                                         ret += generate_se_code(op_list[o],Schema)+", ";
4887                                 }
4888                         }
4889                         ret += "NULL,"+int_to_string(pred_pos[p]%32)+");\n";
4890                         epred_seen.insert(q);
4891                 }
4892         }
4893         ret += "}\n\n";
4894
4895
4896
4897 //              Start on main body code generation
4898   ret+="gs_uint64_t lfta_prefilter_"+iface+"(void *pkt){\n";
4899
4900
4901 ///--------------------------------------------------------------
4902 ///             Generate and store the prefilter body,
4903 ///             reuse it for the snap length calculator
4904 ///-------------------------------------------------------------
4905         string body;
4906
4907     body += "\tstruct packet *p = (struct packet *)pkt;\n";
4908
4909
4910
4911 //              Gather the colids to store unpacked variables.
4912         for(p=0;p<pred_list.size();++p){
4913         gather_pr_col_ids(pred_list[p]->pr,tmp_cid_set, gb_tbl);
4914         }
4915
4916 //              make the col_ids refer to the base tables, and
4917 //              grab the col_ids with at least one unpacking function.
4918         for(csi=tmp_cid_set.begin();csi!=tmp_cid_set.end();++csi){
4919                 string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
4920                 col_id tmp_col_id;
4921                 tmp_col_id.field = (*csi).field;
4922                 tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
4923                 tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
4924                 cid_set.insert(tmp_col_id);
4925                 field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
4926                 if(fe->get_unpack_fcns().size()>0)
4927                         upref_cids.insert(tmp_col_id);
4928
4929
4930         }
4931
4932 //              Find the set of unpacking programs needed for the
4933 //              prefilter fields.
4934         map<col_id, string,lt_col_id>  ucol_fcn_map;
4935         find_optimal_unpack_fcns(upref_cids, Schema, ucol_fcn_map);
4936         set<string> pref_ufcns;
4937         map<col_id, string,lt_col_id>::iterator mcis;
4938         for(mcis=ucol_fcn_map.begin(); mcis!=ucol_fcn_map.end(); mcis++){
4939                 pref_ufcns.insert((*mcis).second);
4940         }
4941
4942
4943
4944 //                      Variables for unpacking attributes.
4945     body += "/*\t\tVariables for unpacking attributes\t*/\n";
4946     for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4947       int schref = (*csi).schema_ref;
4948           int tblref = (*csi).tblvar_ref;
4949       string field = (*csi).field;
4950       data_type dt(Schema->get_type_name(schref,field));
4951       sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4952         field.c_str(), tblref);
4953       body += tmpstr;
4954       sprintf(tmpstr,"\tgs_retval_t ret_%s_%d;\n", field.c_str(),tblref);
4955       body += tmpstr;
4956     }
4957 //                      Variables for unpacking temporal attributes.
4958     body += "/*\t\tVariables for unpacking temporal attributes\t*/\n";
4959     for(csi=temp_cids.begin(); csi!=temp_cids.end();++csi){
4960           if (cid_set.count(*csi) == 0) {
4961         int schref = (*csi).schema_ref;
4962                 int tblref = (*csi).tblvar_ref;
4963         string field = (*csi).field;
4964         data_type dt(Schema->get_type_name(schref,field));
4965         sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4966           field.c_str(), tblref);
4967         body += tmpstr;
4968
4969           }
4970     }
4971     body += "\n\n";
4972
4973 //              Variables for combinable predicate evaluation
4974         body += "/*\t\tVariables for common prdicate evaluation\t*/\n";
4975         for(q=0;q<equiv_list.size();++q){
4976                 for(p=0;p<=(num_equiv[q]/32);++p){
4977                         body += "unsigned long int pref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = 0;\n";
4978                 }
4979         }
4980
4981
4982 //                      Variables that are always needed
4983     body += "/*\t\tVariables which are always needed\t*/\n";
4984         body += "\tgs_uint64_t retval=0, bitpos=1;\n";
4985         body += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4986
4987 //              Call the unpacking functions for the prefilter fields
4988         if(pref_ufcns.size() > 0)
4989                 body += "\n/*\t\tcall field unpacking functions\t*/\n";
4990         set<string>::iterator ssi;
4991         for(ssi=pref_ufcns.begin(); ssi!=pref_ufcns.end(); ++ssi){
4992                 body += "\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
4993         }
4994
4995
4996 //              Unpack the accessed attributes
4997         body += "\n/*\t\tUnpack the accessed attributes.\t*/\n";
4998     for(csi=cid_set.begin();csi!=cid_set.end();++csi){
4999           int tblref = (*csi).tblvar_ref;
5000       int schref = (*csi).schema_ref;
5001           string field = (*csi).field;
5002           sprintf(tmpstr,"\tret_%s_%d =  (%s(p, &unpack_var_%s_%d) == 0);\n",
5003                 field.c_str(),tblref,Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
5004           body += tmpstr;
5005     }
5006
5007 //              next unpack the temporal attributes and ignore the errors
5008 //              We are assuming here that failed unpack of temporal attributes
5009 //              is not going to overwrite the last stored value
5010 //              Failed upacks are ignored
5011     for(csi=temp_cids.begin();csi!=temp_cids.end();++csi){
5012           int tblref = (*csi).tblvar_ref;
5013       int schref = (*csi).schema_ref;
5014           string field = (*csi).field;
5015           sprintf(tmpstr,"\t%s(p, &prefilter_temp_vars.unpack_var_%s_%d);\n",
5016                  Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
5017           body += tmpstr;
5018     }
5019
5020 //              Evaluate the combinable predicates
5021         if(equiv_list.size()>0)
5022                 body += "/*\t\tEvaluate the combinable predicates.\t*/\n";
5023         for(q=0;q<equiv_list.size();++q){
5024                 for(p=0;p<=(num_equiv[q]/32);++p){
5025
5026 //              Only call the common eval fcn if all ref'd fields present.
5027                         col_id_set pred_cids;
5028                         col_id_set::iterator cpi;
5029                         gather_pr_col_ids(equiv_list[q], pred_cids, gb_tbl);
5030                         if(pred_cids.size()>0){
5031                         body += "\tif(";
5032                                 for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
5033                                         if(cpi != pred_cids.begin())
5034                                                 body += " && ";
5035                                 string field = (*cpi).field;
5036                                         int tblref = (*cpi).tblvar_ref;
5037                                         body += "ret_"+field+"_"+int_to_string(tblref);
5038                                 }
5039                                 body+=")\n";
5040                         }
5041
5042                         body += "\t\tpref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = eval_commonpred_"+equiv_list[q]->get_op()+"(pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface;
5043                         vector<scalarexp_t *> op_list = equiv_list[q]->get_op_list();
5044                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
5045                         for(o=0;o<op_list.size();++o){
5046                                 if(cl_op[o]){
5047                                         body += ","+generate_se_code(op_list[o],Schema);
5048                                 }
5049                         }
5050                         body += ");\n";
5051                 }
5052         }
5053
5054
5055         for(p=0;p<pred_list.size();++p){
5056                 col_id_set pred_cids;
5057                 col_id_set::iterator cpi;
5058                 gather_pr_col_ids(pred_list[p]->pr,pred_cids, gb_tbl);
5059                 if(pred_cids.size()>0){
5060                         body += "\tif(";
5061                         for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
5062                                 if(cpi != pred_cids.begin())
5063                                         body += " && ";
5064                         string field = (*cpi).field;
5065                                 int tblref = (*cpi).tblvar_ref;
5066                                 body += "ret_"+field+"_"+int_to_string(tblref);
5067                         }
5068                         body+=")\n";
5069                 }
5070         body += "\t\tif("+generate_predicate_code(pred_list[p]->pr,Schema)+")\n\t\t\tretval |= bitpos;\n";
5071                 body+="\tbitpos = bitpos << 1;\n";
5072         }
5073
5074 // ---------------------------------------------------------------
5075 //              Finished with the body of the prefilter
5076 // --------------------------------------------------------------
5077
5078         ret += body;
5079
5080 //                      Collect fields referenced by an lfta but not
5081 //                      already unpacked for the prefilter.
5082
5083 //printf("upref_cids is:\n");
5084 //for(csi=upref_cids.begin();csi!=upref_cids.end();csi++)
5085 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
5086 //printf("pref_ufcns is:\n");
5087 //for(ssi=pref_ufcns.begin();ssi!=pref_ufcns.end();++ssi)
5088 //printf("\t%s\n",(*ssi).c_str());
5089
5090         int l;
5091         for(l=0;l<lfta_cols.size();++l){
5092                 for(csi=lfta_cols[l].begin();csi!=lfta_cols[l].end();++csi){
5093                         string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
5094                         col_id tmp_col_id;
5095                         tmp_col_id.field = (*csi).field;
5096                         tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
5097                         tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
5098                         field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
5099                         set<string> fld_ufcns = fe->get_unpack_fcns();
5100 //printf("tmpcol is (%s, %d), ufcns size is %d, upref_cids cnt is %d\n",tmp_col_id.field.c_str(),tmp_col_id.schema_ref,fld_ufcns.size(),  upref_cids.count(tmp_col_id));
5101                         if(fld_ufcns.size()>0 && upref_cids.count(tmp_col_id) == 0){
5102 //              Ensure that this field not already unpacked.
5103                                 bool found = false;
5104                                 for(ssi=fld_ufcns.begin();ssi!=fld_ufcns.end();++ssi){
5105 //printf("\tField has unpacking fcn %s\n",(*ssi).c_str());
5106                                         if(pref_ufcns.count((*ssi))){
5107 //printf("Field already unpacked.\n");
5108                                                 found = true;;
5109                                         }
5110                                 }
5111                                 if(! found){
5112 //printf("\tadding to unpack list\n");
5113                                         upall_cids.insert(tmp_col_id);
5114                                 }
5115                         }
5116                 }
5117         }
5118
5119 //printf("upall_cids is:\n");
5120 //for(csi=upall_cids.begin();csi!=upall_cids.end();csi++)
5121 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
5122
5123 //              Get the set of unpacking programs for these.
5124         map<col_id, string,lt_col_id>  uall_fcn_map;
5125         find_optimal_unpack_fcns(upall_cids, Schema, uall_fcn_map);
5126         set<string> pall_ufcns;
5127         for(mcis=uall_fcn_map.begin(); mcis!=uall_fcn_map.end(); mcis++){
5128 //printf("uall_fcn_map[%s %d] = %s\n",(*mcis).first.field.c_str(),(*mcis).first.schema_ref,(*mcis).second.c_str());
5129                 pall_ufcns.insert((*mcis).second);
5130         }
5131
5132 //              Iterate through the remaining set of unpacking function
5133         if(pall_ufcns.size() > 0)
5134                 ret += "//\t\tcall all remaining field unpacking functions.\n";
5135         for(ssi=pall_ufcns.begin(); ssi!=pall_ufcns.end(); ++ssi){
5136 //              gather the set of columns unpacked by this ufcn
5137                 col_id_set fcol_set;
5138                 for(csi=upall_cids.begin();csi!=upall_cids.end();++csi){
5139                         if(uall_fcn_map[(*csi)] == (*ssi))
5140                                 fcol_set.insert((*csi));
5141                 }
5142
5143 //              gather the set of lftas which access a field unpacked by the fcn
5144                 set<long long int> clfta;
5145                 for(l=0;l<lfta_cols.size();l++){
5146                         for(csi=fcol_set.begin();csi!=fcol_set.end();++csi){
5147                                 if(lfta_cols[l].count((*csi)) > 0)
5148                                         break;
5149                         }
5150                         if(csi != fcol_set.end())
5151                                 clfta.insert(lfta_sigs[l]);
5152                 }
5153
5154 //              generate the unpacking code
5155                 ret += "\tif(";
5156                 set<long long int>::iterator sii;
5157                 for(sii=clfta.begin();sii!=clfta.end();++sii){
5158                         if(sii!=clfta.begin())
5159                                 ret += " || ";
5160                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sii),(*sii));
5161                         ret += tmpstr;
5162                 }
5163                 ret += ")\n\t\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
5164         }
5165
5166
5167     ret += "\treturn(retval);\n\n";
5168   ret += "}\n\n";
5169
5170
5171 // --------------------------------------------------------
5172 //              reuse prefilter body for snaplen calculator
5173 //
5174 //      This is dummy code, so I'm commenting it out.
5175
5176 /*
5177   ret+="gs_uint32_t lfta_pkt_snaplen(void *pkt){\n";
5178
5179         ret += body;
5180
5181         int i;
5182         vector<int> s_snaps = lfta_snap_lens;
5183         sort(s_snaps.begin(), s_snaps.end());
5184
5185         if(s_snaps[0] == -1){
5186                 set<unsigned long long int> sigset;
5187                 for(i=0;i<lfta_snap_lens.size();++i){
5188                         if(lfta_snap_lens[i] == -1){
5189                                 sigset.insert(lfta_sigs[i]);
5190                         }
5191                 }
5192                 ret += "\tif( ";
5193                 set<unsigned long long int>::iterator sulli;
5194                 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
5195                         if(sulli!=sigset.begin())
5196                                 ret += " || ";
5197                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
5198                         ret += tmpstr;
5199                 }
5200                 ret += ") return -1;\n";
5201         }
5202
5203         int nextpos = lfta_snap_lens.size()-1;
5204         int nextval = lfta_snap_lens[nextpos];
5205         while(nextval >= 0){
5206                 set<unsigned long long int> sigset;
5207                 for(i=0;i<lfta_snap_lens.size();++i){
5208                         if(lfta_snap_lens[i] == nextval){
5209                                 sigset.insert(lfta_sigs[i]);
5210                         }
5211                 }
5212                 ret += "\tif( ";
5213                 set<unsigned long long int>::iterator sulli;
5214                 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
5215                         if(sulli!=sigset.begin())
5216                                 ret += " || ";
5217                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
5218                         ret += tmpstr;
5219                 }
5220                 ret += ") return "+int_to_string(nextval)+";\n";
5221
5222                 for(nextpos--;nextpos>0 && lfta_snap_lens[nextpos] == nextval;nextpos--);
5223                 if(nextpos>0)
5224                         nextval = lfta_snap_lens[nextpos];
5225                 else
5226                         nextval = -1;
5227         }
5228         ret += "\treturn 0;\n";
5229         ret += "}\n\n";
5230 */
5231
5232
5233   return(ret);
5234 }
5235
5236
5237
5238
5239 //              Generate the struct which will store the the values of
5240 //              temporal attributesunpacked by prefilter
5241 string generate_lfta_prefilter_struct(col_id_set &cid_set, table_list *Schema) {
5242
5243   col_id_set::iterator csi;
5244
5245 // printf("generate_lfta_prefilter_struct : %d vars\n",cid_set.size());
5246
5247   string ret="struct prefilter_unpacked_temp_vars {\n";
5248   ret += "\t/*\tVariables for unpacking temporal attributes\t*/\n";
5249
5250   string init_code;
5251
5252   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
5253     int schref = (*csi).schema_ref;
5254     int tblref = (*csi).tblvar_ref;
5255     string field = (*csi).field;
5256         data_type dt(Schema->get_type_name(schref,field), Schema->get_modifier_list(schref,field));
5257     sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
5258         field.c_str(), tblref);
5259     ret += tmpstr;
5260
5261         if (init_code != "")
5262                 init_code += ", ";
5263         if (dt.is_increasing())
5264                 init_code += dt.get_min_literal();
5265         else
5266                 init_code += dt.get_max_literal();
5267
5268   }
5269   ret += "};\n\n";
5270
5271   ret += "struct prefilter_unpacked_temp_vars prefilter_temp_vars = {" + init_code + "};\n\n";
5272
5273   return(ret);
5274 }