Improvements to aggregation code and fucntion library
[com/gs-lite.git] / src / ftacmp / generate_lfta_code.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #include <string>
17 #include <stdio.h>
18 #include <stdlib.h>
19 //#include <algo.h>
20 #include<algorithm>
21
22 #include "parse_fta.h"
23 #include "parse_schema.h"
24 #include "analyze_fta.h"
25 #include "generate_utils.h"
26 #include "query_plan.h"
27 #include "generate_lfta_code.h"
28 #include "generate_nic_code.h"
29
30 using namespace std;
31
32 extern int DEFAULT_LFTA_HASH_TABLE_SIZE;
33
34 // default value for correlation between the interface card and
35 // the system clock
36 #define DEFAULT_TIME_CORR 16
37
38
39 //      For fast hashing
40 //#define NRANDS 100
41 extern string hash_nums[NRANDS];
42 /*
43 = {
44 "12916008961267169387ull", "13447227858232756685ull",
45 "15651770379918602919ull", "1154671861688431608ull",
46 "6777078091984849858ull", "14217205709582564356ull",
47 "4955408621820609982ull", "15813680319165523695ull",
48 "9897969721407807129ull", "5799700135519793083ull",
49 "3446529189623437397ull", "2766403683465910630ull",
50 "3759321430908793328ull", "6569396511892890354ull",
51 "11124853911180290924ull", "17425412145238035549ull",
52 "6879931585355039943ull", "16598635011539670441ull",
53 "9615975578494811651ull", "4378135509538422740ull",
54 "741282195344332574ull", "17368612862906255584ull",
55 "17294299200556814618ull", "518343398779663051ull",
56 "3861893449302272757ull", "8951107288843549591ull",
57 "15785139392894559409ull", "5917810836789601602ull",
58 "16169988133001117004ull", "9792861259254509262ull",
59 "5089058010244872136ull", "2130075224835397689ull",
60 "844136788226150435ull", "1303298091153875333ull",
61 "3579898206894361183ull", "7529542662845336496ull",
62 "13151949992653382522ull", "2145333536541545660ull",
63 "11258221828939586934ull", "3741808146124570279ull",
64 "16272841626371307089ull", "12174572036188391283ull",
65 "9749343496254107661ull", "9141275584134508830ull",
66 "10134192232065698216ull", "12944268412561423018ull",
67 "17499725811865666340ull", "5281482378159088661ull",
68 "13254803486023572607ull", "4526762838498717025ull",
69 "15990846379668494011ull", "10680949816169027468ull",
70 "7116154096012931030ull", "5296740689865236632ull",
71 "5222427027515795922ull", "6893215299448261251ull",
72 "10164707755932877485ull", "15325979189512082255ull",
73 "3713267224148573289ull", "12292682741753167354ull",
74 "4098115959960163588ull", "16095675565885113990ull",
75 "11391590846210510720ull", "8432889531466002673ull",
76 "7146668520368482523ull", "7678169991822407997ull",
77 "9882712513525031447ull", "13904414563513869160ull",
78 "1080076724395768626ull", "8448147843172150388ull",
79 "17633093729608185134ull", "10044622457050142303ull",
80 "4128911859292425737ull", "30642269109444395ull",
81 "16124215396922640581ull", "15444089895060081110ull",
82 "16437006538696302944ull", "800338649777443426ull",
83 "5355794945275091932ull", "11656354278827687117ull",
84 "1110873718944691255ull", "10829576045617693977ull",
85 "3846916616884579955ull", "17055821716837625668ull",
86 "13418968402643535758ull", "11671612594828802128ull",
87 "11597298928184328586ull", "13196028510862205499ull",
88 "16539578557089782373ull", "3182048322921507591ull",
89 "10016080431267550241ull", "148751875162592690ull",
90 "10400930266590768572ull", "4023803397139127870ull",
91 "17766462746879108920ull", "14807761432134600873ull",
92 "13521540421053792403ull", "13980983198941385205ull",
93 "16257584414193564367ull", "1760484796451765024ull"
94 };
95 */
96
97
98 // ----------------------------------------------
99 //              Data extracted from the query plan node
100 //              for use by code generation.
101
102 static cplx_lit_table *complex_literals;  //Table of literals with constructors.
103 static vector<handle_param_tbl_entry *> param_handle_table;
104 static param_table *param_tbl;          // Table of all referenced parameters.
105
106 static vector<scalarexp_t *> sl_list;
107 static vector<cnf_elem *> where;
108
109 static gb_table *gb_tbl;                        // Table of all group-by attributes.
110 static aggregate_table *aggr_tbl;       // Table of all referenced aggregates.
111
112 static bool packed_return;              // unpack using structyure, not fcns
113 static nic_property *nicprop;   // nic properties for this interface.
114 static int global_id;
115
116
117 //              The partial_fcns vector can now refer to
118 //              partial functions, or expensive functions
119 //              which can be cached (if there are multiple refs).  A couple
120 //              of int vectors distinguish the cases.
121 static vector<scalarexp_t *> partial_fcns;
122 static vector<int> fcn_ref_cnt;
123 static vector<bool> is_partial_fcn;
124 int sl_fcns_start = 0, sl_fcns_end = 0;
125 int wh_fcns_start = 0, wh_fcns_end = 0;
126 int gb_fcns_start = 0, gb_fcns_end = 0;
127 int ag_fcns_start = 0, ag_fcns_end = 0;
128
129
130 //              These vectors are for combinable predicates.
131 static vector<int> pred_class;  //      identifies the group
132 static vector<int> pred_pos;    //  position in the group.
133
134
135
136 static char tmpstr[1000];
137
138 //////////////////////////////////////////////////////////////////////
139 ///                     Various utilities
140
141 string generate_fta_name(string node_name){
142         string ret = normalize_name(node_name);
143         if(ret == ""){
144                 ret = "default";
145         }
146         ret += "_fta";
147
148         return(ret);
149 }
150
151
152 string generate_aggr_struct_name(string node_name){
153         string ret = normalize_name(node_name);
154         if(ret == ""){
155                 ret = "default";
156         }
157         ret += "_aggr_struct";
158
159         return(ret);
160 }
161
162 string generate_fj_struct_name(string node_name){
163         string ret = normalize_name(node_name);
164         if(ret == ""){
165                 ret = "default";
166         }
167         ret += "_fj_struct";
168
169         return(ret);
170 }
171
172 string generate_watchlist_element_name(string node_name){
173         string ret = normalize_name(node_name);
174         if(ret == ""){
175                 ret = "default";
176         }
177         ret += "__wl_elem";
178
179         return(ret);
180 }
181
182 string generate_watchlist_struct_name(string node_name){
183         string ret = normalize_name(node_name);
184         if(ret == ""){
185                 ret = "default";
186         }
187         ret += "__wl_struct";
188
189         return(ret);
190 }
191
192 string generate_watchlist_name(string node_name){
193         string ret = normalize_name(node_name);
194         if(ret == ""){
195                 ret = "default";
196         }
197         ret += "__wl";
198
199         return(ret);
200 }
201
202 string generate_unpack_code(int tblref, int schref, string field, table_list *schema, string node_name, string end_goto = string("end")){
203         string ret;
204         if(! packed_return){
205                 sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",
206                         schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
207         ret += tmpstr;
208         if(!schema->get_modifier_list(schref,field)->contains_key("required"))
209                 ret += "\tif(retval) goto "+end_goto+";\n";
210
211         }else{
212 //                      TODO: ntoh xforms (aug 2010 : removing ntoh, hton)
213                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
214                 if(dt.is_buffer_type()){
215                         if(dt.get_type() != v_str_t){
216                           ret += "\tif(sizeof(struct "+node_name+"_input_struct)+"+node_name+"_input_struct_var->unpack_var_"+field+".length+int("+node_name+"_input_struct_var->unpack_var_"+field+".data) > sz)\n";
217                           ret += "\t\tgoto "+end_goto+";\n";
218                           ret+= "\t\t"+node_name+"_input_struct_var->unpack_var_"+field+".data += "+node_name+"_input_struct_var->unpack_var_"+field+".length;\n";
219                           ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
220                                 " = "+node_name+"_input_struct_var->unpack_var_"+field+";+\n";
221                         }else{
222                                 fprintf(stderr,"INTERNAL ERROR buffer type not string type in generate_lfta_code.cc:generate_unpack_code\n");
223                                 exit(1);
224                         }
225                 }else{
226                         ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
227                                 " = "+node_name+"_input_struct_var->unpack_var_"+field+";\n";
228                 }
229         }
230         return ret;
231 }
232
233 string generate_aggr_struct(string node_name, gb_table *gb_tbl, aggregate_table *aggr_tbl){
234   string ret = "struct " + generate_aggr_struct_name(node_name) + "{\n";
235
236   int g;
237   for(g=0;g<gb_tbl->size();g++){
238           sprintf(tmpstr,"gb_var%d",g);
239           ret += "\t"+gb_tbl->get_data_type(g)->make_cvar(tmpstr)+";\n";
240   }
241
242   int a;
243   for(a=0;a<aggr_tbl->size();a++){
244           ret += "\t";
245           sprintf(tmpstr,"aggr_var%d",a);
246           if(aggr_tbl->is_builtin(a))
247                 ret+="\t"+aggr_tbl->get_data_type(a)->make_cvar(tmpstr)+";\n";
248           else
249                 ret+="\t"+aggr_tbl->get_storage_type(a)->make_cvar(tmpstr)+";\n";
250   }
251
252 /*
253   ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *next;\n";
254 */
255
256   ret += "};\n\n";
257
258   return(ret);
259 }
260
261
262 string generate_fj_struct(filter_join_qpn *fs, string node_name ){
263   string ret;
264
265   if(fs->use_bloom == false){   // uses hash table instead
266         ret = "struct " + generate_fj_struct_name(node_name) + "{\n";
267         int k;
268         for(k=0;k<fs->hash_eq.size();++k){
269                 sprintf(tmpstr,"key_var%d",k);
270                 ret += "\t"+fs->hash_eq[k]->pr->get_right_se()->get_data_type()->make_cvar(tmpstr)+";\n";
271         }
272         ret += "\tlong long int ts;\n";
273     ret += "};\n\n";
274   }
275
276   return(ret);
277 }
278
279 string generate_watchlist_structs(string node_name, table_def *tbl,
280                 std::string filename, int refresh_interval){
281         string ret;
282
283         ret += "struct "+generate_watchlist_element_name(node_name)+"{\n";
284         vector<field_entry *> fields = tbl->get_fields();
285         for(int f=0;f<fields.size();++f){
286                 data_type dt(fields[f]->get_type());
287                 ret += "\t"+dt.make_cvar(fields[f]->get_name())+";\n";
288         }
289         ret += "\tgs_uint64_t hashval;\n";
290         ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next;\n";
291         ret += "};\n\n";
292
293         ret += "char *"+generate_watchlist_name(node_name)+"__fstr = \""+filename+"\";\n";
294         ret += "struct "+generate_watchlist_struct_name(node_name)+"{\n";
295         ret += "\tstruct "+ generate_watchlist_element_name(node_name)+" **ht;\n";
296         ret += "\tgs_uint32_t ht_size;\n";
297         ret += "\tgs_uint32_t n_elem;\n";
298         ret += "\tgs_uint32_t refresh_interval;\n";
299         ret += "\ttime_t next_refresh;\n";
300         ret += "\ttime_t last_mtime;\n";        
301         ret += "\tchar *filename;\n";
302         ret += "} "+generate_watchlist_name(node_name)+" = { NULL, 0, 0, "+std::to_string(refresh_interval)+", 0, 0, NULL};\n\n";
303
304         return ret;
305 }
306
307 string generate_watchlist_load(string node_name, table_def *tbl, vector<string> keys){
308         string ret;
309         string tgt = generate_watchlist_name(node_name);
310         vector<field_entry *> fields = tbl->get_fields();
311
312         ret += "void reload_watchlist__"+node_name+"(){\n";
313         ret += "\tgs_uint32_t i;\n";
314         ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *ptr = NULL;\n";
315         ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next = NULL;\n";
316         ret += "\tFILE *fl;\n";
317         ret += "\tchar buf[10000];\n";
318         ret += "\tgs_uint32_t buflen = 10000;\n";
319         ret += "\tchar *flds["+std::to_string(fields.size())+"];\n";
320         ret += "\tgs_uint32_t pos, f, linelen, malformed;\n";
321         ret += "\tgs_uint32_t n_malformed, short_lines, toolong_lines, ok;\n";
322         ret += "\tgs_uint64_t hash, bucket;\n";
323         ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *rec;\n\n";
324
325         ret += "//  make sure watchlist file has changed since the last time we loaded it\n";
326     ret += "\tstruct stat file_stat;\n";
327     ret += "\tint err = stat(" + tgt + ".filename, &file_stat);\n";
328     ret += "\tif (err) {\n";
329     ret += "\t\tgslog(LOG_INFO,\"Warning, unable to stat() watchlist file %s to reload " + node_name + ", continue using old version\\n\", " + tgt + ".filename);\n";
330     ret += "\t\treturn;\n";
331     ret += "\t}\n";
332         ret += "\tif (file_stat.st_mtime <= " + tgt + ".last_mtime && file_stat.st_ctime <= " + tgt + ".last_mtime)     // watchlist file hasn't changed since last time\n";
333         ret += "\t\treturn;\n";
334         ret += "\t" + tgt + ".last_mtime = (file_stat.st_mtime>file_stat.st_ctime)?file_stat.st_mtime:file_stat.st_ctime;\n\n";
335
336         ret += "//  Delete old entries.\n";
337         ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n";
338         ret += "\t\tptr="+tgt+".ht[i];\n";
339         ret += "\t\twhile(ptr!=NULL){\n";
340         for(int f=0;f<fields.size();++f){
341                 data_type dt(fields[f]->get_type());
342                 if(dt.is_buffer_type()){
343                         ret += "\t\t\t"+dt.get_buffer_destroy()+"(&(ptr->"+fields[f]->get_name()+"));\n";
344                 }
345         }
346         ret += "\t\t\tnext = ptr->next;\n";
347         ret += "\t\t\tfree(ptr);\n";
348         ret += "\t\t\tptr = next;\n";
349         ret += "\t\t}\n";
350         ret += "\t}\n";
351         ret += "\n// prepare new table. \n";
352         ret += "\tif("+tgt+".n_elem > "+tgt+".ht_size || "+tgt+".ht_size==0){\n";
353         ret += "\t\tif("+tgt+".ht)\n";
354         ret += "\t\t\tfree("+tgt+".ht);\n";
355         ret += "\t\tif("+tgt+".ht_size == 0)\n";
356         ret += "\t\t\t"+tgt+".ht_size = 100000;\n";
357         ret += "\t\telse\n";    
358         ret += "\t\t\t"+tgt+".ht_size = "+tgt+".n_elem;\n";
359         ret += "\t\t"+tgt+".ht = (struct "+generate_watchlist_element_name(node_name)+" **)malloc("+tgt+".ht_size * sizeof(struct "+generate_watchlist_element_name(node_name)+" *));\n";
360         ret += "\t}\n";
361         ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n";
362         ret += "\t\t"+tgt+".ht[i] = NULL;\n";
363         ret += "\t}\n";
364         ret += "\n// load new table\n";
365         ret += "\t"+tgt+".n_elem = 0;\n";
366         ret += "\tfl = fopen("+tgt+".filename, \"r\");\n";
367         ret += "\tif(fl==NULL){\n";
368         ret += "\t\tgslog(LOG_INFO,\"Warning, can't open file %s for watchlist "+node_name+"\\n\","+tgt+".filename);\n";
369         ret += "\t\treturn;\n";
370         ret += "\t}\n";
371         ret += "\tmalformed = 0;\n";
372         ret += "\tshort_lines = 0;\n";
373         ret += "\ttoolong_lines = 0;\n";
374         ret += "\twhile(fgets(buf, buflen, fl) != NULL){\n";
375         ret += "\t\tlinelen = strlen(buf);\n";
376         ret += "\t\tbuf[linelen-1]='\\0';       // strip off trailing newline\n";
377         ret += "\t\tlinelen--;\n";
378
379         ret += "\t\tpos=0;\n";
380         ret += "\t\tmalformed=0;\n";
381         ret += "\t\tok=1;\n";
382         ret += "\t\tflds[0] = buf;\n";
383         ret += "\t\tfor(f=1;pos < linelen && f<"+std::to_string(fields.size())+";++f){\n";
384         ret += "\t\t\tfor(;pos < linelen && buf[pos]!=',' && buf[pos]!='\\n';++pos);\n";
385         ret += "\t\t\tif(pos >= linelen){\n";
386         ret += "\t\t\t\tmalformed = 1;\n";
387         ret += "\t\t\t\tbreak;\n";
388         ret += "\t\t\t}\n";
389         ret += "\t\t\tbuf[pos]='\\0';\n";
390         ret += "\t\t\tpos++;\n";
391         ret += "\t\t\tflds[f]=buf+pos;\n";
392         ret += "\t\t}\n";
393         ret += "\t\tif(malformed){\n";
394         ret += "\t\t\tok=0;\n";
395         ret += "\t\t\tn_malformed++;\n";
396         ret += "\t\t}\n";
397         ret += "\t\tif(f<"+std::to_string(fields.size())+"){\n";
398         ret += "\t\t\tok=0;\n";
399         ret += "\t\t\tshort_lines++;\n";
400         ret += "\t\t}\n";
401         ret += "\t\tif(pos && (pos<linelen)){\n";
402         ret += "\t\t\tok=0;\n";
403         ret += "\t\t\ttoolong_lines++;\n";
404         ret += "\t\t}\n";
405         ret += "\t\tif(f>="+std::to_string(fields.size())+"){\n";
406         ret += "\t\t\trec = (struct "+generate_watchlist_element_name(node_name)+" *)malloc(sizeof(struct "+generate_watchlist_element_name(node_name)+"));\n";
407 //              Extract fields
408         for(int f=0;f<fields.size();++f){
409                 data_type dt(fields[f]->get_type());
410                 ret += "\t\t\t"+dt.get_wl_extract_fcn()+"(flds["+std::to_string(f)+"], &(rec->"+fields[f]->get_name()+"));\n";
411         }
412 //              Compute the hash value
413         ret += "\t\t\thash=0;\n";
414         for(int k=0;k<keys.size();++k){
415                 string key_fld = keys[k];
416                 int f;
417                 for(f=0;f<fields.size();++f){
418                         if(fields[f]->get_name() == key_fld)
419                                 break;
420                 }
421                 data_type dt(fields[f]->get_type());
422
423                 ret +=
424 "\t\t\thash ^= (("+hash_nums[f%NRANDS]+" * lfta_"+
425                         dt.get_type_str()+"_to_hash(rec->"+fields[f]->get_name()+")));\n";
426                 }
427         ret += "\t\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n";
428         ret += "\t\t\trec->hashval = hash;\n";
429         ret += "\t\t\trec->next = "+tgt+".ht[bucket];\n";
430         ret += "\t\t\t"+tgt+".ht[bucket] = rec;\n";
431         ret += "\t\t\t"+tgt+".n_elem++;\n";
432
433         ret += "\t\t}\n";
434         ret += "\t}\n";
435         ret += "\tif(n_malformed+toolong_lines > 0){\n";
436         ret += "\t\tgslog(LOG_INFO,\"Errors reading data for watchlist "+node_name+" from file %s: malformed=%d, too short=%d, too long=%d\\n\","+tgt+".filename, malformed, short_lines, toolong_lines);\n";
437         ret += "\t}\n";
438         ret += "}\n\n";
439
440         return ret;
441 }
442
443
444
445                 
446 string generate_fta_struct(string node_name, gb_table *gb_tbl,
447                 aggregate_table *aggr_tbl, param_table *param_tbl,
448                 cplx_lit_table *complex_literals,
449                 vector<handle_param_tbl_entry *> &param_handle_table,
450                 bool is_aggr_query, bool is_fj, bool is_wj, bool uses_bloom,
451                 table_list *schema){
452
453         string ret = "struct " + generate_fta_name(node_name) + "{\n";
454         ret += "\tstruct FTA f;\n";
455
456 //-------------------------------------------------------------
457 //              Aggregate-specific fields
458
459         if(is_aggr_query){
460 /*
461                 ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_head, *flush_head;\n";
462 */
463                 ret+="\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_table; // the groups\n";
464                 ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n";
465 //              ret+="\tint bitmap_size;\n";
466                 ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n";
467                 ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n";
468                 ret += "\tint max_windows; // max number of open windows.\n";
469                 ret += "\tunsigned int generation; // initially zero, increment on\n";
470                 ret += "\t     // every hash table flush - whether regular or induced.\n";
471                 ret += "\t     // Old groups are identified by a generation mismatch.\n";
472                 ret += "\tunsigned int flush_pos; // next aggr_table entry to examine\n";
473                 ret += "\tunsigned int flush_ctr; // control slow flushing\n";
474
475
476
477                 int g;
478                 bool uses_temporal_flush = false;
479                 for(g=0;g<gb_tbl->size();g++){
480                         data_type *dt = gb_tbl->get_data_type(g);
481                         if(dt->is_temporal()){
482 /*
483                                 fprintf(stderr,"group by attribute %s is temporal, ",
484                                                 gb_tbl->get_name(g).c_str());
485                                 if(dt->is_increasing()){
486                                         fprintf(stderr,"increasing.\n");
487                                 }else{
488                                         fprintf(stderr,"decreasing.\n");
489                                 }
490 */
491                                 data_type *gdt = gb_tbl->get_data_type(g);
492                                 if(gdt->is_buffer_type()){
493                                         fprintf(stderr, "\t but temporal BUFFER types are not supported, skipping.\n");
494                                 }else{
495                                         sprintf(tmpstr,"\t%s last_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
496                                         ret += tmpstr;
497                                         sprintf(tmpstr,"\t%s flush_start_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
498                                         ret += tmpstr;
499                                         sprintf(tmpstr,"\t%s last_flushed_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
500                                         ret += tmpstr;
501                                         uses_temporal_flush = true;
502                                 }
503
504                         }
505
506                 }
507                 if(! uses_temporal_flush){
508                         fprintf(stderr,"Warning: no temporal flush.\n");
509                 }
510         }
511
512 // ---------------------------------------------------------
513 //                      Filter-join specific fields
514
515         if(is_fj){
516                 if(uses_bloom){
517                         ret +=
518 "\tunsigned char * bf_table; //array of bloom filters with layout \n"
519 "\t\t// bit 0 bf 0| bit 0 bf 1| bit 0 bf 2| bit 1 bf 0| bit 1 bf 1|.....\n"
520 "\tint first_exec;\n"
521 "\tlong long int last_bin;\n"
522 "\tint last_bloom_pos;\n"
523 "\n"
524 ;
525                 }else{          // limited hash table
526                         ret +=
527 "  struct "+generate_fj_struct_name(node_name)+" *join_table;\n"
528 "\n"
529 ;
530                 }
531
532         }
533
534 // --------------------------------------------
535 //              watchlist-join specific
536         if(is_wj){
537                 ret += "\ttime_t ux_time;\n";
538         }
539
540 //--------------------------------------------------------
541 //                      Common fields
542
543 //                      Create places to hold the parameters.
544         int p;
545         vector<string> param_vec = param_tbl->get_param_names();
546         for(p=0;p<param_vec.size();p++){
547                 data_type *dt = param_tbl->get_data_type(param_vec[p]);
548                 sprintf(tmpstr,"\t%s param_%s;\n",dt->get_cvar_type().c_str(),
549                                 param_vec[p].c_str());
550                 ret += tmpstr;
551                 if(param_tbl->handle_access(param_vec[p])){
552                         ret += "\tstruct search_handle *param_handle_"+param_vec[p]+";\n";
553                 }
554         }
555
556 //                      Create places to hold complex literals.
557         int cl;
558         for(cl=0;cl<complex_literals->size();cl++){
559                 literal_t *l = complex_literals->get_literal(cl);
560                 data_type *dtl = new data_type( l->get_type() );
561                 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
562                 ret += tmpstr;
563         }
564
565 //                      Create places to hold the pass-by-handle parameters.
566         for(p=0;p<param_handle_table.size();++p){
567                 sprintf(tmpstr,"\tgs_param_handle_t handle_param_%d;\n",p);
568                 ret += tmpstr;
569         }
570
571 //                      Create places to hold the last values of temporal
572 //                      attributes referenced in select clause
573 //                      we also need to store values of the temoral attributed
574 //                      of last flushed tuple in aggr queries
575 //                      to make sure we generate the cirrect temporal tuple
576 //                      in the presense of slow flushes
577
578
579         col_id_set temp_cids;           //      col ids of temp attributes in select clause
580
581         int s;
582         col_id_set::iterator csi;
583
584         for(s=0;s<sl_list.size();s++){
585                 data_type *sdt = sl_list[s]->get_data_type();
586                 if (sdt->is_temporal()) {
587                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
588                 }
589         }
590
591         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
592                 int tblref = (*csi).tblvar_ref;
593                 int schref = (*csi).schema_ref;
594                 string field = (*csi).field;
595                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
596                 sprintf(tmpstr,"\t%s last_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
597                 ret += tmpstr;
598         }
599
600         ret += "\tgs_uint64_t trace_id;\n\n";
601
602 //      Fields to store the runtime stats
603
604         ret += "\tgs_uint32_t in_tuple_cnt;\n";
605         ret += "\tgs_uint32_t out_tuple_cnt;\n";
606         ret += "\tgs_uint32_t out_tuple_sz;\n";
607         ret += "\tgs_uint32_t accepted_tuple_cnt;\n";
608         ret += "\tgs_uint64_t cycle_cnt;\n";
609         ret += "\tgs_uint32_t collision_cnt;\n";
610         ret += "\tgs_uint32_t eviction_cnt;\n";
611         ret += "\tgs_float_t sampling_rate;\n";
612
613
614
615         ret += "};\n\n";
616
617         return(ret);
618 }
619
620 //------------------------------------------------------------
621 //              Set colref tblvars to 0..
622 //              (special processing for join-like operators in an lfta).
623
624 void reset_se_col_ids_tblvars(scalarexp_t *se,  gb_table *gtbl){
625         vector<scalarexp_t *> operands;
626         int o;
627
628         if(! se)
629                 return;
630
631         switch(se->get_operator_type()){
632         case SE_LITERAL:
633         case SE_PARAM:
634         case SE_IFACE_PARAM:
635                 return;
636         case SE_UNARY_OP:
637                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
638                 return;
639         case SE_BINARY_OP:
640                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
641                 reset_se_col_ids_tblvars(se->get_right_se(),gtbl);
642                 return;
643         case SE_COLREF:
644                 if(! se->is_gb() ){
645                         se->get_colref()->set_tablevar_ref(0);
646                 }else{
647                         if(gtbl==NULL){
648                                 fprintf(stderr,"INTERNAL ERROR: gbvar ref in gather_se_col_ids, but gtbl is NULL.\n");
649                                 exit(1);
650                         }
651                         reset_se_col_ids_tblvars(gtbl->get_def(se->get_gb_ref()),gtbl);
652                 }
653                 return;
654         case SE_AGGR_STAR:
655                 return;
656         case SE_AGGR_SE:
657                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
658                 return;
659         case SE_FUNC:
660                 operands = se->get_operands();
661                 for(o=0;o<operands.size();o++){
662                         reset_se_col_ids_tblvars(operands[o], gtbl);
663                 }
664                 return;
665         default:
666                 fprintf(stderr,"INTERNAL ERROR in reset_se_col_ids_tblvars, line %d, character %d: unknown operator type %d\n",
667                                 se->get_lineno(), se->get_charno(),se->get_operator_type());
668                 exit(1);
669         }
670 }
671
672
673 //              reset  column tblvars accessed in this pr.
674
675 void reset_pr_col_ids_tblvars(predicate_t *pr,  gb_table *gtbl){
676         vector<scalarexp_t *> op_list;
677         int o;
678
679         switch(pr->get_operator_type()){
680         case PRED_IN:
681                 reset_se_col_ids_tblvars(pr->get_left_se(), gtbl);
682                 return;
683         case PRED_COMPARE:
684                 reset_se_col_ids_tblvars(pr->get_left_se(),gtbl) ;
685                 reset_se_col_ids_tblvars(pr->get_right_se(),gtbl) ;
686                 return;
687         case PRED_UNARY_OP:
688                 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
689                 return;
690         case PRED_BINARY_OP:
691                 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
692                 reset_pr_col_ids_tblvars(pr->get_right_pr(),gtbl) ;
693                 return;
694         case PRED_FUNC:
695                 op_list = pr->get_op_list();
696                 for(o=0;o<op_list.size();++o){
697                         reset_se_col_ids_tblvars(op_list[o],gtbl) ;
698                 }
699                 return;
700         default:
701                 fprintf(stderr,"INTERNAL ERROR in reset_pr_col_ids_tblvars, line %d, character %d, unknown predicate operator type %d\n",
702                         pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
703         }
704 }
705
706
707
708
709 //                      Generate code that makes reference
710 //                      to the tuple, and not to any aggregates.
711 static string generate_se_code(scalarexp_t *se,table_list *schema){
712         string ret;
713     data_type *ldt, *rdt;
714         int o;
715         vector<scalarexp_t *> operands;
716
717
718         switch(se->get_operator_type()){
719         case SE_LITERAL:
720                 if(se->is_handle_ref()){
721                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
722                         ret = tmpstr;
723                         return(ret);
724                 }
725                 if(se->get_literal()->is_cpx_lit()){
726                         sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
727                         ret = tmpstr;
728                         return(ret);
729                 }
730                 return(se->get_literal()->to_C_code("")); // not complex, no constructor
731         case SE_PARAM:
732                 if(se->is_handle_ref()){
733                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
734                         ret = tmpstr;
735                         return(ret);
736                 }
737                 ret += "t->param_";
738                 ret += se->get_param_name();
739                 return(ret);
740         case SE_UNARY_OP:
741         ldt = se->get_left_se()->get_data_type();
742         if(ldt->complex_operator(se->get_op()) ){
743                         ret +=  ldt->get_complex_operator(se->get_op());
744                         ret += "(";
745                         ret += generate_se_code(se->get_left_se(),schema);
746             ret += ")";
747                 }else{
748                         ret += "(";
749                         ret += se->get_op();
750                         ret += generate_se_code(se->get_left_se(),schema);
751                         ret += ")";
752                 }
753                 return(ret);
754         case SE_BINARY_OP:
755         ldt = se->get_left_se()->get_data_type();
756         rdt = se->get_right_se()->get_data_type();
757
758         if(ldt->complex_operator(rdt, se->get_op()) ){
759                         ret +=  ldt->get_complex_operator(rdt, se->get_op());
760                         ret += "(";
761                         ret += generate_se_code(se->get_left_se(),schema);
762                         ret += ", ";
763                         ret += generate_se_code(se->get_right_se(),schema);
764                         ret += ")";
765                 }else{
766                         ret += "(";
767                         ret += generate_se_code(se->get_left_se(),schema);
768                         ret += se->get_op();
769                         ret += generate_se_code(se->get_right_se(),schema);
770                         ret += ")";
771                 }
772                 return(ret);
773         case SE_COLREF:
774                 if(se->is_gb()){                // OK to ref gb attrs, but they're not yet unpacked ...
775                                                         // so return the defining code.
776                         ret = generate_se_code(gb_tbl->get_def(se->get_gb_ref()), schema );
777
778                 }else{
779                 sprintf(tmpstr,"unpack_var_%s_%d",
780                   se->get_colref()->get_field().c_str(), se->get_colref()->get_tablevar_ref() );
781                 ret = tmpstr;
782                 }
783                 return(ret);
784         case SE_FUNC:
785 //                              Should not be ref'ing any aggr here.
786                 if(se->get_aggr_ref() >= 0){
787                         fprintf(stderr,"INTERNAL ERROR, UDAF reference in generate_se_code.\n");
788                         return("ERROR in generate_se_code");
789                 }
790
791                 if(se->is_partial()){
792                         sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
793                         ret = tmpstr;
794                 }else{
795                         ret += se->op + "(";
796                         operands = se->get_operands();
797                         for(o=0;o<operands.size();o++){
798                                 if(o>0) ret += ", ";
799                                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
800                                         ret += "&";
801                                 ret += generate_se_code(operands[o], schema);
802                         }
803                         ret += ")";
804                 }
805                 return(ret);
806         default:
807                 fprintf(stderr,"INTERNAL ERROR in generate_se_code (lfta), line %d, character %d: unknown operator type %d\n",
808                                 se->get_lineno(), se->get_charno(),se->get_operator_type());
809                 return("ERROR in generate_se_code");
810         }
811 }
812
813 //              generate code that refers only to aggregate data and constants.
814 static string generate_se_code_fm_aggr(scalarexp_t *se, string var, table_list *schema){
815
816         string ret;
817     data_type *ldt, *rdt;
818         int o;
819         vector<scalarexp_t *> operands;
820
821
822         switch(se->get_operator_type()){
823         case SE_LITERAL:
824                 if(se->is_handle_ref()){
825                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
826                         ret = tmpstr;
827                         return(ret);
828                 }
829                 if(se->get_literal()->is_cpx_lit()){
830                         sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
831                         ret = tmpstr;
832                         return(ret);
833                 }
834                 return(se->get_literal()->to_C_code("")); // not complex no constructor
835         case SE_PARAM:
836                 if(se->is_handle_ref()){
837                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
838                         ret = tmpstr;
839                         return(ret);
840                 }
841                 ret += "t->param_";
842                 ret += se->get_param_name();
843                 return(ret);
844         case SE_UNARY_OP:
845         ldt = se->get_left_se()->get_data_type();
846         if(ldt->complex_operator(se->get_op()) ){
847                         ret +=  ldt->get_complex_operator(se->get_op());
848                         ret += "(";
849                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
850             ret += ")";
851                 }else{
852                         ret += "(";
853                         ret += se->get_op();
854                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
855                         ret += ")";
856                 }
857                 return(ret);
858         case SE_BINARY_OP:
859         ldt = se->get_left_se()->get_data_type();
860         rdt = se->get_right_se()->get_data_type();
861
862         if(ldt->complex_operator(rdt, se->get_op()) ){
863                         ret +=  ldt->get_complex_operator(rdt, se->get_op());
864                         ret += "(";
865                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
866                         ret += ", ";
867                         ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
868                         ret += ")";
869                 }else{
870                         ret += "(";
871                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
872                         ret += se->get_op();
873                         ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
874                         ret += ")";
875                 }
876                 return(ret);
877         case SE_COLREF:
878                 if(se->is_gb()){                // OK to ref gb attrs, but they're not yet
879                                                         // unpacked ... so return the defining code.
880                         sprintf(tmpstr,"%sgb_var%d",var.c_str(),se->get_gb_ref());
881                         ret = tmpstr;
882
883                 }else{
884                 fprintf(stderr,"ERROR reference to non-GB column ref not permitted here,"
885                                 "error in generate_se_code_fm_aggr, line %d, character %d.\n",
886                                 se->get_lineno(), se->get_charno());
887                 ret = tmpstr;
888                 }
889                 return(ret);
890         case SE_AGGR_STAR:
891         case SE_AGGR_SE:
892                 sprintf(tmpstr,"%saggr_var%d",var.c_str(),se->get_aggr_ref());
893                 ret = tmpstr;
894                 return(ret);
895         case SE_FUNC:
896 //                              Is it a UDAF?
897                 if(se->get_aggr_ref() >= 0){
898                         sprintf(tmpstr,"udaf_ret%d",se->get_aggr_ref());
899                         ret = tmpstr;
900                         return(ret);
901                 }
902
903                 if(se->is_partial()){
904                         sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
905                         ret = tmpstr;
906                 }else{
907                         ret += se->op + "(";
908                         operands = se->get_operands();
909                         for(o=0;o<operands.size();o++){
910                                 if(o>0) ret += ", ";
911                                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
912                                         ret += "&";
913                                 ret += generate_se_code_fm_aggr(operands[o], var, schema);
914                         }
915                         ret += ")";
916                 }
917                 return(ret);
918         default:
919                 fprintf(stderr,"INTERNAL ERROR in generate_lfta_code.cc::generate_se_code_fm_aggr, line %d, character %d: unknown operator type %d\n",
920                                 se->get_lineno(), se->get_charno(),se->get_operator_type());
921                 return("ERROR in generate_se_code");
922         }
923
924 }
925
926
927 static string unpack_partial_fcn_fm_aggr(scalarexp_t *se, int pfn_id, string var, table_list *schema){
928         string ret;
929         int o;
930         vector<scalarexp_t *> operands;
931
932
933         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
934                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn_fm_aggr. line %d, character %d\n",
935                                 se->get_lineno(), se->get_charno());
936                 return("ERROR in generate_se_code");
937         }
938
939         ret = "\tretval = " + se->get_op() + "( ";
940         sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
941         ret += tmpstr;
942
943         operands = se->get_operands();
944         for(o=0;o<operands.size();o++){
945                 ret += ", ";
946                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
947                         ret += "&";
948                 ret += generate_se_code_fm_aggr(operands[o], var, schema);
949         }
950         ret += ");\n";
951
952         return(ret);
953 }
954
955 static string generate_cached_fcn(scalarexp_t *se, table_list *schema){
956         string ret;
957         int o;
958         vector<scalarexp_t *> operands;
959
960         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
961                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to generate_cached_fcn. line %d, character %d\n",
962                                 se->get_lineno(), se->get_charno());
963                 return("ERROR in generate_se_code");
964         }
965
966         ret = se->get_op() + "( ";
967
968         operands = se->get_operands();
969         for(o=0;o<operands.size();o++){
970                 if(o) ret += ", ";
971                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
972                         ret += "&";
973                 ret += generate_se_code(operands[o], schema);
974         }
975         ret += ");\n";
976
977         return(ret);
978 }
979
980
981
982 static string unpack_partial_fcn(scalarexp_t *se, int pfn_id, table_list *schema){
983         string ret;
984         int o;
985         vector<scalarexp_t *> operands;
986
987
988         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
989                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn. line %d, character %d\n",
990                                 se->get_lineno(), se->get_charno());
991                 return("ERROR in generate_se_code");
992         }
993
994         ret = "\tretval = " + se->get_op() + "( ",
995         sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
996         ret += tmpstr;
997
998         operands = se->get_operands();
999         for(o=0;o<operands.size();o++){
1000                 ret += ", ";
1001                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
1002                         ret += "&";
1003                 ret += generate_se_code(operands[o], schema);
1004         }
1005         ret += ");\n";
1006
1007         return(ret);
1008 }
1009
1010
1011
1012
1013
1014 static string generate_C_comparison_op(string op){
1015   if(op == "=") return("==");
1016   if(op == "<>") return("!=");
1017   return(op);
1018 }
1019
1020 static string generate_C_boolean_op(string op){
1021         if( (op == "AND") || (op == "And") || (op == "and") ){
1022                 return("&&");
1023         }
1024         if( (op == "OR") || (op == "Or") || (op == "or") ){
1025                 return("||");
1026         }
1027         if( (op == "NOT") || (op == "Not") || (op == "not") ){
1028                 return("!");
1029         }
1030
1031         fprintf(stderr,"INTERNAL ERROR: unknown boolean operator %s\n",op.c_str());
1032         return("ERROR UNKNOWN BOOLEAN OPERATOR :"+op);
1033 }
1034
1035
1036 static string generate_predicate_code(predicate_t *pr,table_list *schema){
1037         string ret;
1038         vector<literal_t *>  litv;
1039         int i;
1040     data_type *ldt, *rdt;
1041         vector<scalarexp_t *> op_list;
1042         int o,cref,ppos;
1043         unsigned int bitmask;
1044
1045         switch(pr->get_operator_type()){
1046         case PRED_IN:
1047         ldt = pr->get_left_se()->get_data_type();
1048
1049                 ret += "( ";
1050                 litv = pr->get_lit_vec();
1051                 for(i=0;i<litv.size();i++){
1052                         if(i>0) ret += " || ";
1053                         ret += "( ";
1054
1055                 if(ldt->complex_comparison(ldt) ){
1056                                 ret +=  ldt->get_equals_fcn(ldt) ;
1057                                 ret += "( ";
1058                                 if(ldt->is_buffer_type() ) ret += "&";
1059                                 ret += generate_se_code(pr->get_left_se(), schema);
1060                                 ret += ", ";
1061                                 if(ldt->is_buffer_type() ) ret += "&";
1062                                 if(litv[i]->is_cpx_lit()){
1063                                         sprintf(tmpstr,"t->complex_literal_%d",litv[i]->get_cpx_lit_ref() );
1064                                         ret += tmpstr;
1065                                 }else{
1066                                         ret += litv[i]->to_C_code("");
1067                                 }
1068                                 ret += ") == 0";
1069                         }else{
1070                                 ret += generate_se_code(pr->get_left_se(), schema);
1071                                 ret += " == ";
1072                                 ret += litv[i]->to_C_code("");
1073                         }
1074
1075                         ret += " )";
1076                 }
1077                 ret += " )";
1078                 return(ret);
1079
1080         case PRED_COMPARE:
1081         ldt = pr->get_left_se()->get_data_type();
1082         rdt = pr->get_right_se()->get_data_type();
1083
1084                 ret += "( ";
1085         if(ldt->complex_comparison(rdt) ){
1086 // TODO can use get_equals_fcn if op is "=" ?
1087                         ret += ldt->get_comparison_fcn(rdt);
1088                         ret += "(";
1089                         if(ldt->is_buffer_type() ) ret += "&";
1090                         ret += generate_se_code(pr->get_left_se(),schema);
1091                         ret += ", ";
1092                         if(rdt->is_buffer_type() ) ret += "&";
1093                         ret += generate_se_code(pr->get_right_se(),schema);
1094                         ret += ") ";
1095                         ret +=  generate_C_comparison_op(pr->get_op());
1096                         ret += "0";
1097                 }else{
1098                         ret += generate_se_code(pr->get_left_se(),schema);
1099                         ret +=  generate_C_comparison_op(pr->get_op());
1100                         ret += generate_se_code(pr->get_right_se(),schema);
1101                 }
1102                 ret += " )";
1103                 return(ret);
1104         case PRED_UNARY_OP:
1105                 ret += "( ";
1106                 ret +=  generate_C_boolean_op(pr->get_op());
1107                 ret += generate_predicate_code(pr->get_left_pr(),schema);
1108                 ret += " )";
1109                 return(ret);
1110         case PRED_BINARY_OP:
1111                 ret += "( ";
1112                 ret += generate_predicate_code(pr->get_left_pr(),schema);
1113                 ret +=  generate_C_boolean_op(pr->get_op());
1114                 ret += generate_predicate_code(pr->get_right_pr(),schema);
1115                 ret += " )";
1116                 return(ret);
1117         case PRED_FUNC:
1118                 op_list = pr->get_op_list();
1119                 cref = pr->get_combinable_ref();
1120                 if(cref >= 0){  // predicate is a combinable pred reference
1121                         //              Trust, but verify
1122                         if(pred_class.size() >= cref && pred_class[cref] >= 0){
1123                                 ppos = pred_pos[cref];
1124                                 bitmask = 1 << ppos % 32;
1125                                 sprintf(tmpstr,"(pref_common_pred_val_%d_%d & %u)",pred_class[cref],ppos/32,bitmask);
1126                                 ret = tmpstr;
1127                                 return ret;
1128                         }
1129                 }
1130
1131                 ret =  pr->get_op() + "(";
1132                 if (pr->is_sampling_fcn) {
1133                         ret += "t->sampling_rate";
1134                         if (!op_list.empty())
1135                                 ret += ", ";
1136                 }
1137                 for(o=0;o<op_list.size();++o){
1138                         if(o>0) ret += ", ";
1139                         if(op_list[o]->get_data_type()->is_buffer_type() && (! (op_list[o]->is_handle_ref()) ) )
1140                                         ret += "&";
1141                         ret += generate_se_code(op_list[o],schema);
1142                 }
1143                 ret += " )";
1144                 return(ret);
1145         default:
1146                 fprintf(stderr,"INTERNAL ERROR in generate_predicate_code, line %d, character %d, unknown predicate operator type %d\n",
1147                         pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
1148                 return("ERROR in generate_predicate_code");
1149         }
1150 }
1151
1152
1153 static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *dt){
1154         string ret;
1155
1156     if(dt->complex_comparison(dt) ){
1157                 ret += dt->get_equals_fcn(dt);
1158                 ret += "(";
1159                         if(dt->is_buffer_type() ) ret += "&";
1160                 ret += lhs_op;
1161                 ret += ", ";
1162                         if(dt->is_buffer_type() ) ret += "&";
1163                 ret += rhs_op;
1164                 ret += ") == 0";
1165         }else{
1166                 ret += lhs_op;
1167                 ret += " == ";
1168                 ret += rhs_op;
1169         }
1170
1171         return(ret);
1172 }
1173
1174 //static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
1175 //      string ret;
1176 //
1177  //   if(dt->complex_comparison(dt) ){
1178 //              ret += dt->get_equals_fcn(dt);
1179 //              ret += "(";
1180 //                      if(dt->is_buffer_type() ) ret += "&";
1181 //              ret += lhs_op;
1182 //              ret += ", ";
1183 //                      if(dt->is_buffer_type() ) ret += "&";
1184 //              ret += rhs_op;
1185 //              ret += ") == 0";
1186 //      }else{
1187 //              ret += lhs_op;
1188 //              ret += " == ";
1189 //              ret += rhs_op;
1190 //      }
1191 //
1192 //      return(ret);
1193 //}
1194
1195 //              Here I assume that only MIN and MAX aggregates can be computed
1196 //              over BUFFER data types.
1197
1198 static string generate_aggr_update(string var, aggregate_table *atbl,int aidx, table_list *schema){
1199         string retval = "\t\t";
1200         string op = atbl->get_op(aidx);
1201
1202 //              Is it a UDAF
1203         if(! atbl->is_builtin(aidx)) {
1204                 int o;
1205                 retval += op+"_LFTA_AGGR_UPDATE_(";
1206                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1207                 retval+="("+var+")";
1208                 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1209                 for(o=0;o<opl.size();++o){
1210                         retval += ",";
1211                         if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1212                                         retval.append("&");
1213                         retval += generate_se_code(opl[o], schema);
1214                 }
1215                 retval += ");\n";
1216
1217                 return retval;
1218         }
1219
1220 //              Built-in aggregate processing.
1221
1222         data_type *dt = atbl->get_data_type(aidx);
1223
1224         if(op == "COUNT"){
1225                 retval.append(var);
1226                 retval.append("++;\n");
1227                 return(retval);
1228         }
1229         if(op == "SUM"){
1230                 retval.append(var);
1231                 retval.append(" += ");
1232                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1233                 retval.append(";\n");
1234                 return(retval);
1235         }
1236         if(op == "MIN"){
1237                 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1238                 retval.append(tmpstr);
1239                 if(dt->complex_comparison(dt)){
1240                         if(dt->is_buffer_type())
1241                           sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1242                         else
1243                           sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1244                 }else{
1245                         sprintf(tmpstr,"\t\tif(aggr_tmp_%d < %s)\n",aidx,var.c_str());
1246                 }
1247                 retval.append(tmpstr);
1248                 if(dt->is_buffer_type()){
1249                         sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1250                 }else{
1251                         sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1252                 }
1253                 retval.append(tmpstr);
1254
1255                 return(retval);
1256         }
1257         if(op == "MAX"){
1258                 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1259                 retval.append(tmpstr);
1260                 if(dt->complex_comparison(dt)){
1261                         if(dt->is_buffer_type())
1262                          sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1263                         else
1264                          sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1265                 }else{
1266                         sprintf(tmpstr,"\t\tif(aggr_tmp_%d > %s)\n",aidx,var.c_str());
1267                 }
1268                 retval.append(tmpstr);
1269                 if(dt->is_buffer_type()){
1270                         sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1271                 }else{
1272                         sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1273                 }
1274                 retval.append(tmpstr);
1275
1276                 return(retval);
1277
1278         }
1279         if(op == "AND_AGGR"){
1280                 retval.append(var);
1281                 retval.append(" &= ");
1282                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1283                 retval.append(";\n");
1284                 return(retval);
1285         }
1286         if(op == "OR_AGGR"){
1287                 retval.append(var);
1288                 retval.append(" |= ");
1289                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1290                 retval.append(";\n");
1291                 return(retval);
1292         }
1293         if(op == "XOR_AGGR"){
1294                 retval.append(var);
1295                 retval.append(" ^= ");
1296                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1297                 retval.append(";\n");
1298                 return(retval);
1299         }
1300         fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_update.\n",op.c_str());
1301         return("ERROR: aggregate not recognized: "+op);
1302
1303 }
1304
1305
1306
1307 static string generate_aggr_init(string var, aggregate_table *atbl,int aidx, table_list *schema){
1308         string retval;
1309         string op = atbl->get_op(aidx);
1310
1311 //              Is it a UDAF
1312         if(! atbl->is_builtin(aidx)) {
1313                 int o;
1314                 retval += "\t\t"+op+"_LFTA_AGGR_INIT_(";
1315                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1316                 retval+="("+var+"));\n";
1317 //                      Add 1st tupl
1318                 retval += "\t"+atbl->get_op(aidx)+"_LFTA_AGGR_UPDATE_(";
1319                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1320                 retval+="("+var+")";
1321                 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1322                 for(o=0;o<opl.size();++o){
1323                         retval += ",";
1324                         if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1325                                         retval.append("&");
1326                         retval += generate_se_code(opl[o],schema);
1327                 }
1328                 retval += ");\n";
1329                 return(retval);
1330         }
1331
1332 //              Built-in aggregate processing.
1333
1334
1335         data_type *dt = atbl->get_data_type(aidx);
1336
1337         if(op == "COUNT"){
1338                 retval = "\t\t"+var;
1339                 retval.append(" = 1;\n");
1340                 return(retval);
1341         }
1342
1343         if(op == "SUM" || op == "MIN" || op == "MAX" || op == "AND_AGGR" ||
1344                                                                         op == "OR_AGGR" || op == "XOR_AGGR"){
1345                 if(dt->is_buffer_type()){
1346                         sprintf(tmpstr,"\t\taggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1347                         retval.append(tmpstr);
1348                         sprintf(tmpstr,"\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_assign_copy().c_str(),var.c_str(),aidx);
1349                         retval.append(tmpstr);
1350                 }else{
1351                         retval = "\t\t"+var;
1352                         retval += " = ";
1353                         retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema));
1354                         retval.append(";\n");
1355                 }
1356                 return(retval);
1357         }
1358
1359         fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_init.\n",op.c_str());
1360         return("ERROR: aggregate not recognized: "+op);
1361 }
1362
1363
1364 ////////////////////////////////////////////////////////////
1365
1366
1367 string generate_preamble(table_list *schema, //map<string,string> &int_fcn_defs,
1368         std::string &node_name, std::string &schema_embed_str){
1369 //                      Include these only once, not once per lfta
1370 //      string ret = "#include \"rts.h\"\n";
1371 //      ret +=  "#include \"fta.h\"\n\n");
1372
1373         string ret = "#ifndef LFTA_IN_NIC\n";
1374         ret += "char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n";
1375         ret += "#include<stdio.h>\n";
1376         ret += "#include <limits.h>\n";
1377         ret += "#include <float.h>\n";
1378         ret += "#include <sys/stat.h>\n";
1379         ret += "#include \"rdtsc.h\"\n";
1380         ret += "#endif\n";
1381
1382
1383
1384         return(ret);
1385 }
1386
1387
1388 string generate_tuple_from_aggr(string node_name, table_list *schema, string idx){
1389         int a,p,s;
1390 //                       need to create and output the tuple.
1391         string ret = "/*\t\tCreate an output tuple for the aggregate being kicked out \t*/\n";
1392 //                      Check for any UDAFs with LFTA_BAILOUT
1393         ret += "\tlfta_bailout = 0;\n";
1394         for(a=0;a<aggr_tbl->size();a++){
1395                 if(aggr_tbl->has_bailout(a)){
1396                         ret += "\tlfta_bailout+="+aggr_tbl->get_op(a)+"_LFTA_AGGR_BAILOUT_(";
1397                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1398                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1399                 }
1400         }
1401         ret += "\tif(! lfta_bailout){\n";
1402
1403 //                      First, compute the size of the tuple.
1404
1405 //                      Unpack UDAF return values
1406         for(a=0;a<aggr_tbl->size();a++){
1407                 if(! aggr_tbl->is_builtin(a)){
1408                         ret += "\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_OUTPUT_(&(udaf_ret"+int_to_string(a)+"),";
1409                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1410                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1411
1412                 }
1413         }
1414
1415
1416 //                      Unpack partial fcns ref'd by the select clause.
1417   if(sl_fcns_start != sl_fcns_end){
1418             ret += "\t\tunpack_failed = 0;\n";
1419     for(p=sl_fcns_start;p<sl_fcns_end;p++){
1420           if(is_partial_fcn[p]){
1421                 ret += "\t" + unpack_partial_fcn_fm_aggr(partial_fcns[p], p,
1422                          "t->aggr_table["+idx+"].",schema);
1423                 ret += "\t\tif(retval) unpack_failed = 1;\n";
1424           }
1425     }
1426                                                                 // BEGIN don't allocate tuple if
1427         ret += "\t\tif( unpack_failed == 0 ){\n"; // unpack failed.
1428   }
1429
1430 //                      Unpack any BUFFER type selections into temporaries
1431 //                      so that I can compute their size and not have
1432 //                      to recompute their value during tuple packing.
1433 //                      I can use regular assignment here because
1434 //                      these temporaries are non-persistent.
1435
1436           for(s=0;s<sl_list.size();s++){
1437                 data_type *sdt = sl_list[s]->get_data_type();
1438                 if(sdt->is_buffer_type()){
1439                         sprintf(tmpstr,"\t\t\tselvar_%d = ",s);
1440                         ret += tmpstr;
1441                         ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1442                         ret += ";\n";
1443                 }
1444           }
1445
1446
1447 //              The size of the tuple is the size of the tuple struct plus the
1448 //              size of the buffers to be copied in.
1449
1450           ret += "\t\t\ttuple_size = sizeof( struct ";
1451           ret +=  generate_tuple_name(node_name);
1452           ret += ")";
1453           for(s=0;s<sl_list.size();s++){
1454                 data_type *sdt = sl_list[s]->get_data_type();
1455                 if(sdt->is_buffer_type()){
1456                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
1457                         ret += tmpstr;
1458                 }
1459           }
1460           ret += ";\n";
1461
1462
1463           ret += "\t\t\ttuple = allocate_tuple(f, tuple_size );\n";
1464           ret += "\t\t\tif( tuple != NULL){\n";
1465
1466
1467 //                      Test passed, make assignments to the tuple.
1468
1469           ret += "\t\t\t\ttuple_pos = sizeof( struct ";
1470           ret +=  generate_tuple_name(node_name) ;
1471           ret += ");\n";
1472
1473 //                      Mark tuple as REGULAR_TUPLE
1474           ret += "\n\t\t\t\ttuple->tuple_type = REGULAR_TUPLE;\n";
1475
1476           for(s=0;s<sl_list.size();s++){
1477                 data_type *sdt = sl_list[s]->get_data_type();
1478                 if(sdt->is_buffer_type()){
1479                         sprintf(tmpstr,"\t\t\t\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
1480                         ret += tmpstr;
1481                         sprintf(tmpstr,"\t\t\t\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
1482                         ret += tmpstr;
1483                 }else{
1484                         sprintf(tmpstr,"\t\t\t\ttuple->tuple_var%d = ",s);
1485                         ret += tmpstr;
1486 //                      if(sdt->needs_hn_translation())
1487 //                              ret += sdt->hton_translation() +"( ";
1488                         ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1489 //                      if(sdt->needs_hn_translation())
1490 //                              ret += ") ";
1491                         ret += ";\n";
1492                 }
1493           }
1494
1495 //              Generate output.
1496           ret += "\t\t\t\tpost_tuple(tuple);\n";
1497           ret += "\t\t\t\t#ifdef LFTA_STATS\n";
1498           ret+="\t\t\t\tt->out_tuple_cnt++;\n";
1499           ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
1500           ret += "\t\t\t\t#endif\n\n";
1501           ret += "\t\t\t}\n";
1502
1503           if(sl_fcns_start != sl_fcns_end)      // END don't allocate tuple if
1504                 ret += "\t\t}\n";                               // unpack failed.
1505           ret += "\t}\n";
1506
1507 //                      Need to release memory held by BUFFER types.
1508                 int g;
1509
1510           for(g=0;g<gb_tbl->size();g++){
1511                 data_type *gdt = gb_tbl->get_data_type(g);
1512                 if(gdt->is_buffer_type()){
1513                         sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].gb_var%d));\n",gdt->get_buffer_destroy().c_str(),idx.c_str(),g);
1514                         ret += tmpstr;
1515                 }
1516           }
1517           for(a=0;a<aggr_tbl->size();a++){
1518                 if(aggr_tbl->is_builtin(a)){
1519                         data_type *adt = aggr_tbl->get_data_type(a);
1520                         if(adt->is_buffer_type()){
1521                                 sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].aggr_var%d));\n",adt->get_buffer_destroy().c_str(),idx.c_str(),a);
1522                                 ret += tmpstr;
1523                         }
1524                 }else{
1525                         ret += "\t\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_DESTROY_(";
1526                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1527                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1528                 }
1529           }
1530
1531         ret += "\t\tt->n_aggrs--;\n";
1532
1533         return(ret);
1534
1535 }
1536
1537 string generate_gb_match_test(string idx){
1538         int g;
1539         string ret="\tif (IS_FILLED(t->aggr_table_bitmap, "+idx+") &&  IS_NEW(t->aggr_table_bitmap,"+idx+")";
1540         if(gb_tbl->size()>0){
1541                 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
1542                 ret+="\t\t";
1543
1544 //                      Next, scan list for a match on the group-by attributes.
1545           string rhs_op, lhs_op;
1546           for(g=0;g<gb_tbl->size();g++){
1547                   ret += " && ";
1548                   ret += "(";
1549                   sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
1550                   sprintf(tmpstr,"t->aggr_table[%s].gb_var%d",idx.c_str(),g); rhs_op = tmpstr;
1551                   ret += generate_equality_test(lhs_op, rhs_op, gb_tbl->get_data_type(g) );
1552                   ret += ")";
1553           }
1554          }
1555
1556           ret += "){\n";
1557
1558         return ret;
1559 }
1560
1561 string generate_gb_update(string node_name, table_list *schema, string idx, bool has_udaf){
1562         int g;
1563         string ret;
1564
1565           ret += "/*\t\tMatch found : update in place.\t*/\n";
1566           int a;
1567           has_udaf = false;
1568           for(a=0;a<aggr_tbl->size();a++){
1569                   sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1570                   ret += generate_aggr_update(tmpstr,aggr_tbl,a, schema);
1571                   if(! aggr_tbl->is_builtin(a)) has_udaf = true;
1572           }
1573
1574 //                      garbage collect copied buffer type gb attrs.
1575   for(g=0;g<gb_tbl->size();g++){
1576           data_type *gdt = gb_tbl->get_data_type(g);
1577           if(gdt->is_buffer_type()){
1578                 sprintf(tmpstr,"\t\t\t%s(&(gb_attr_%d));\n",gdt->get_buffer_destroy().c_str(),g);
1579                 ret+=tmpstr;
1580           }
1581         }
1582
1583
1584
1585           bool first_udaf = true;
1586           if(has_udaf){
1587                 ret += "\t\tif(";
1588                 for(a=0;a<aggr_tbl->size();a++){
1589                         if(! aggr_tbl->is_builtin(a)){
1590                                 if(! first_udaf)ret += " || ";
1591                                 else first_udaf = false;
1592                                 ret += aggr_tbl->get_op(a)+"_LFTA_AGGR_FLUSHME_(";
1593                                 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1594                                 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"))";
1595                         }
1596                 }
1597                 ret+="){\n";
1598                 ret+=" fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
1599                 ret += generate_tuple_from_aggr(node_name,schema,idx);
1600                 ret += "\t\tt->aggr_table_hashmap["+idx+"] &= ~SLOT_FILLED;\n";
1601                 ret+="\t\t}\n";
1602         }
1603         return ret;
1604 }
1605
1606
1607 string generate_init_group( table_list *schema, string idx){
1608           int g,a;
1609           string ret="\t\t\tt->aggr_table_hashmap["+idx+"] = hash2 | SLOT_FILLED | gen_val;\n";
1610 //                      Fill up the aggregate block.
1611           for(g=0;g<gb_tbl->size();g++){
1612                   sprintf(tmpstr,"\t\t\tt->aggr_table[%s].gb_var%d = gb_attr_%d;\n",idx.c_str(),g,g);
1613                   ret += tmpstr;
1614           }
1615           for(a=0;a<aggr_tbl->size();a++){
1616                   sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1617                   ret += generate_aggr_init(tmpstr, aggr_tbl,a,  schema);
1618           }
1619           ret+="\t\tt->n_aggrs++;\n";
1620         return ret;
1621 }
1622
1623
1624 string generate_fta_flush(string node_name, table_list *schema,
1625                 ext_fcn_list *Ext_fcns){
1626
1627    string ret;
1628    string select_var_defs ;
1629         int s, p;
1630
1631 //              Flush from previous epoch
1632
1633         ret+="static void fta_aggr_flush_old_"+node_name+"(struct FTA *f, unsigned int nflush){\n";
1634
1635         ret += "\tgs_int32_t tuple_size, tuple_pos;\n";
1636     ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1637         ret += "\tint i, lfta_bailout;\n";
1638         ret += "\tunsigned int gen_val;\n";
1639
1640         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1641         ret += generate_fta_name(node_name)+" *) f;\n";
1642
1643         ret += "\n";
1644
1645
1646 //              Variables needed to store selected attributes of BUFFER type
1647 //              temporarily, in order to compute their size for storage
1648 //              in an output tuple.
1649
1650   select_var_defs = "";
1651   for(s=0;s<sl_list.size();s++){
1652         data_type *sdt = sl_list[s]->get_data_type();
1653         if(sdt->is_buffer_type()){
1654           sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
1655           select_var_defs.append(tmpstr);
1656         }
1657   }
1658   if(select_var_defs != ""){
1659         ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
1660     ret += select_var_defs;
1661   }
1662
1663
1664 //              Variables to store results of partial functions.
1665   if(sl_fcns_start != sl_fcns_end){
1666         ret += "/*\t\tVariables to store the results of partial functions.\t*/\n";
1667         for(p=sl_fcns_start;p<sl_fcns_end;p++){
1668                 sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
1669                         partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
1670                 ret += tmpstr;
1671         }
1672         ret += "\tgs_retval_t retval = 0;\n\tint unpack_failed = 0;\n;";
1673   }
1674
1675 //              Variables for udaf output temporaries
1676         bool no_udaf = true;
1677         int a;
1678         for(a=0;a<aggr_tbl->size();a++){
1679                 if(! aggr_tbl->is_builtin(a)){
1680                         if(no_udaf){
1681                                 ret+="/*\t\tUDAF output vars.\t*/\n";
1682                                 no_udaf = false;
1683                         }
1684                         int afcn_id = aggr_tbl->get_fcn_id(a);
1685                         data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
1686                         sprintf(tmpstr,"udaf_ret%d", a);
1687                         ret+="\t"+adt->make_cvar(tmpstr)+";\n";
1688                 }
1689         }
1690
1691
1692 //  ret+="\tt->flush_finished=1; /* flush will be completed */\n";
1693   ret+="\n";
1694   ret+="\tgen_val = t->generation & SLOT_GEN_BITS;\n";
1695   ret+="\tfor (i=t->flush_pos; (i < t->max_aggrs) && t->n_aggrs && nflush>0; ++i){\n";
1696   ret+="\t\tif ( (t->aggr_table_hashmap[i] & SLOT_FILLED) && (((t->aggr_table_hashmap[i] & SLOT_GEN_BITS) != gen_val ) || (";
1697                 bool first_g=true;
1698                 int g;
1699                 for(g=0;g<gb_tbl->size();g++){
1700                   data_type *gdt = gb_tbl->get_data_type(g);
1701                   if(gdt->is_temporal()){
1702                         if(first_g) first_g=false; else ret+=" || ";
1703                         ret += "t->last_gb_"+int_to_string(g)+" > t->aggr_table[i].gb_var"+int_to_string(g)+" ";
1704                   }
1705                 }
1706   ret += "))) {\n";
1707   ret+="\t\t\tt->aggr_table_hashmap[i] = 0;\n";
1708   ret+=
1709 "#ifdef LFTA_STATS\n"
1710 "\t\t\tt->eviction_cnt++;\n"
1711 "#endif\n"
1712 ;
1713
1714
1715   ret+=generate_tuple_from_aggr(node_name,schema,"i");
1716
1717 //  ret+="\t\t\tt->n_aggrs--;\n";  // done in generate_tuple_from_aggr
1718   ret+="\t\t\tnflush--;\n";
1719   ret+="\t\t}\n";
1720   ret+="\t}\n";
1721   ret+="\tt->flush_pos=i;\n";
1722   ret+="\tif(t->n_aggrs == 0) {\n";
1723   ret+="\t\tt->flush_pos = t->max_aggrs;\n";
1724   ret += "\t}\n\n";
1725
1726   ret+="\tif(t->flush_pos == t->max_aggrs) {\n";
1727
1728   for(int g=0;g<gb_tbl->size();g++){
1729         data_type *dt = gb_tbl->get_data_type(g);
1730         if(dt->is_temporal()){
1731                 data_type *gdt = gb_tbl->get_data_type(g);
1732                 if(!gdt->is_buffer_type()){
1733                         sprintf(tmpstr,"\t\tt->last_flushed_gb_%d = t->flush_start_gb_%d;\n",g,g);
1734                         ret += tmpstr;
1735                 }
1736         }
1737   }
1738   ret += "\t}\n}\n\n";
1739
1740   return(ret);
1741 }
1742
1743 // TODO Remove sprintf to perform string catenation
1744 string generate_fta_load_params(string node_name){
1745         int p;
1746         vector<string> param_names = param_tbl->get_param_names();
1747
1748         string ret = "static int load_params_"+node_name+"(struct "+generate_fta_name(node_name);
1749                 ret += " *t, int sz, void *value, int initial_call){\n";
1750     ret += "\tint pos=0;\n";
1751     ret += "\tint data_pos;\n";
1752
1753         for(p=0;p<param_names.size();p++){
1754                 data_type *dt = param_tbl->get_data_type(param_names[p]);
1755                 if(dt->is_buffer_type()){
1756                         sprintf(tmpstr,"\t%s tmp_var_%s;\n",dt->get_cvar_type().c_str(), param_names[p].c_str() );
1757                         ret += tmpstr;
1758                         sprintf(tmpstr,"\t%s access_var_%s;\n",dt->get_tuple_cvar_type().c_str(), param_names[p].c_str() );
1759                         ret += tmpstr;
1760                 }
1761         }
1762
1763
1764
1765         ret += "\n\tdata_pos = ";
1766         for(p=0;p<param_names.size();p++){
1767                 if(p>0) ret += " + ";
1768                 data_type *dt = param_tbl->get_data_type(param_names[p]);
1769                 ret += "sizeof( ";
1770                 ret +=  dt->get_tuple_cvar_type();
1771                 ret += " )";
1772         }
1773         ret += ";\n";
1774         ret += "\tif(data_pos > sz) return 1;\n\n";
1775
1776
1777         for(p=0;p<param_names.size();p++){
1778                 data_type *dt = param_tbl->get_data_type(param_names[p]);
1779                 if(dt->is_buffer_type()){
1780                         sprintf(tmpstr,"\taccess_var_%s =  *( (%s *)((char *)value+pos) );\n",param_names[p].c_str(), dt->get_tuple_cvar_type().c_str() );
1781                         ret += tmpstr;
1782                         switch( dt->get_type() ){
1783                         case v_str_t:
1784 //                              ret += "\ttmp_var_"+param_names[p]+".data = ntohl( tmp_var_"+param_names[p]+".data );\n";               // ntoh conversion
1785 //                              ret += "\ttmp_var_"+param_names[p]+".length = ntohl( tmp_var_"+param_names[p]+".length );\n";   // ntoh conversion
1786                                 sprintf(tmpstr,"\tif( (access_var_%s.offset) + access_var_%s.length > sz) return 1;\n",param_names[p].c_str(), param_names[p].c_str() );
1787                                 ret += tmpstr;
1788                                 sprintf(tmpstr,"\ttmp_var_%s.data = (gs_sp_t)(value) + access_var_%s.offset ;\n",param_names[p].c_str(), param_names[p].c_str() );
1789                                 ret += tmpstr;
1790                                 sprintf(tmpstr,"\ttmp_var_%s.length =  access_var_%s.length ;\n",param_names[p].c_str(), param_names[p].c_str() );
1791                                 ret += tmpstr;
1792                         break;
1793                         default:
1794                                 fprintf(stderr,"ERROR: parameter %s is of type %s, a buffered type, but I don't know how to unpack it as a parameter.\n",param_names[p].c_str(), dt->get_type_str().c_str() );
1795                                 exit(1);
1796                         break;
1797                         }
1798 //                                      First, destroy the old
1799                         ret += "\tif(! initial_call)\n";
1800                         sprintf(tmpstr,"\t\t%s(&(t->param_%s));\n",dt->get_buffer_destroy().c_str(),param_names[p].c_str());
1801                         ret += tmpstr;
1802 //                                      Next, create the new.
1803                         sprintf(tmpstr,"\t%s((struct FTA *)t, &(t->param_%s), &tmp_var_%s);\n", dt->get_buffer_assign_copy().c_str(), param_names[p].c_str(), param_names[p].c_str() );
1804                         ret += tmpstr;
1805                 }else{
1806 //                      if(dt->needs_hn_translation()){
1807 //                              sprintf(tmpstr,"\tt->param_%s =  %s( *( (%s *)( (char *)value+pos) ) );\n",
1808 //                                param_names[p].c_str(), dt->ntoh_translation().c_str(), dt->get_cvar_type().c_str() );
1809 //                      }else{
1810                                 sprintf(tmpstr,"\tt->param_%s =  *( (%s *)( (char *)value+pos) );\n",
1811                                   param_names[p].c_str(), dt->get_cvar_type().c_str() );
1812 //                      }
1813                         ret += tmpstr;
1814                 }
1815                 sprintf(tmpstr,"\tpos += sizeof( %s );\n",dt->get_cvar_type().c_str() );
1816                 ret += tmpstr;
1817         }
1818
1819 //                      Register the pass-by-handle parameters
1820
1821         ret += "/* register and de-register the pass-by-handle parameters */\n";
1822
1823     int ph;
1824     for(ph=0;ph<param_handle_table.size();++ph){
1825                 data_type pdt(param_handle_table[ph]->type_name);
1826                 switch(param_handle_table[ph]->val_type){
1827                 case cplx_lit_e:
1828                         break;
1829                 case litval_e:
1830                         break;
1831                 case param_e:
1832                         ret += "\tif(! initial_call)\n";
1833                         sprintf(tmpstr, "\t\t%s(t->handle_param_%d);\n",
1834                                 param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1835                         ret += tmpstr;
1836                         sprintf(tmpstr,"\tt->handle_param_%d = %s((struct FTA *)t,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
1837                         ret += tmpstr;
1838
1839                         if(pdt.is_buffer_type()) ret += "&(";
1840                         ret += "t->param_"+param_handle_table[ph]->param_name;
1841                         if(pdt.is_buffer_type()) ret += ")";
1842                         ret += ");\n";
1843                         break;
1844                 default:
1845                         sprintf(tmpstr, "INTERNAL ERROR unknown case (%d) found when processing pass-by-handle parameter table.",param_handle_table[ph]->val_type);
1846                         fprintf(stderr,"%s\n",tmpstr);
1847                         ret+=tmpstr;
1848                 }
1849         }
1850
1851         ret+="\treturn 0;\n";
1852         ret += "}\n\n";
1853
1854         return(ret);
1855 }
1856
1857
1858
1859
1860 string generate_fta_free(string node_name, bool is_aggr_query){
1861
1862         string ret="static gs_retval_t free_fta_"+node_name+"(struct FTA *f, gs_uint32_t recursive){\n";
1863         ret+= "\tstruct "+generate_fta_name(node_name)+
1864                 " * t = (struct "+generate_fta_name(node_name)+" *) f;\n";
1865         ret += "\tint i;\n";
1866
1867         if(is_aggr_query){
1868                 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1869                 ret+="\t/* \t\tmark all groups as old */\n";
1870                 ret+="\tt->generation++;\n";
1871                 ret+="\tt->flush_pos = 0;\n";
1872                 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1873         }
1874
1875 //                      Deregister the pass-by-handle parameters
1876         ret += "/* de-register the pass-by-handle parameters */\n";
1877     int ph;
1878     for(ph=0;ph<param_handle_table.size();++ph){
1879                 sprintf(tmpstr, "\t%s(t->handle_param_%d);\n",
1880                         param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1881                 ret += tmpstr;
1882         }
1883
1884
1885         ret += "\treturn 0;\n}\n\n";
1886         return(ret);
1887 }
1888
1889
1890 string generate_fta_control(string node_name, table_list *schema, bool is_aggr_query){
1891         string ret="static gs_retval_t control_fta_"+node_name+"(struct FTA *f,  gs_int32_t command, gs_int32_t sz, void *value){\n";
1892         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1893                 ret += generate_fta_name(node_name)+" *) f;\n\n";
1894         ret+="\tint i;\n";
1895
1896
1897         ret += "\t/* temp status tuple */\n";
1898         ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1899         ret += "\tgs_int32_t tuple_size;\n";
1900
1901
1902         if(is_aggr_query){
1903                 ret+="\tif(command == FTA_COMMAND_FLUSH){\n";
1904
1905                 ret+="\t\tif (!t->n_aggrs) {\n";
1906                 ret+="\t\t\ttuple = allocate_tuple(f, 0);\n";
1907                 ret+="\t\t\tif( tuple != NULL)\n";
1908                 ret+="\t\t\t\tpost_tuple(tuple);\n";
1909
1910                 ret+="\t\t}else{\n";
1911
1912                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1913                 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1914                 ret +="\t\tt->generation++;\n";
1915                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1916                 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1917                 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1918                 ret+="\t\t\tt->flush_pos = 0;\n";
1919                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1920                 ret+="\t\t}\n";
1921
1922                 ret+="\t}\n";
1923         }
1924         if(param_tbl->size() > 0){
1925                 ret+=
1926 "\tif(command == FTA_COMMAND_LOAD_PARAMS){\n"
1927 "\t\tif(load_params_"+node_name+"(t, sz, value, 0))\n"
1928 "#ifndef LFTA_IN_NIC\n"
1929 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small, ignored.\\n\");\n"
1930 "#else\n"
1931 "\t\t{}\n"
1932 "#endif\n"
1933 "\t}\n";
1934         }
1935         ret+=
1936 "\tif(command == FTA_COMMAND_SET_SAMPLING_RATE){\n"
1937 "\t\tmemcpy(&t->sampling_rate, value, sizeof(gs_float_t));\n"
1938 "\t}\n\n";
1939
1940
1941         ret += "\tif(command == FTA_COMMAND_FILE_DONE ){\n";
1942
1943         if(is_aggr_query){
1944                 ret+="\t\tif (t->n_aggrs) {\n";
1945                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1946                 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1947                 ret +="\t\tt->generation++;\n";
1948                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1949                 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1950                 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1951                 ret+="\t\t\tt->flush_pos = 0;\n";
1952                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1953                 ret+="\t\t}\n";
1954         }
1955
1956         ret += "\t\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+");\n";
1957         ret += "\t\ttuple = allocate_tuple(f, tuple_size );\n";
1958         ret += "\t\tif( tuple == NULL)\n\t\treturn 1;\n";
1959
1960         /* mark tuple as EOF_TUPLE */
1961         ret += "\n\t\t/* Mark tuple as eof_tuple */\n";
1962         ret += "\t\ttuple->tuple_type = EOF_TUPLE;\n";
1963         ret += "\t\tpost_tuple(tuple);\n";
1964         ret += "\t}\n";
1965
1966         ret += "\treturn 0;\n}\n\n";
1967
1968         return(ret);
1969 }
1970
1971 string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query, bool advance_uxtime){
1972         string ret="static gs_retval_t clock_fta_"+node_name+"(struct FTA *f){\n";
1973         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1974                 ret += generate_fta_name(node_name)+" *) f;\n\n";
1975
1976         ret += "\t/* Create a temp status tuple */\n";
1977         ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1978         ret += "\tgs_int32_t tuple_size;\n";
1979         ret += "\tunsigned int i;\n";
1980         ret += "\ttime_t cur_time;\n";
1981         ret += "\tint time_advanced;\n";
1982         ret += "\tstruct fta_stat stats;\n";
1983
1984
1985
1986         /* copy the last seen values of temporal attributes */
1987         col_id_set temp_cids;           //      col ids of temp attributes in select clause
1988
1989
1990         /* HACK: in order to reuse the SE generation code, we need to copy
1991          * the last values of the temp attributes into new variables
1992          * which have names unpack_var_XXX_XXX
1993          */
1994
1995         int s, g;
1996         col_id_set::iterator csi;
1997
1998         for(s=0;s<sl_list.size();s++){
1999                 data_type *sdt = sl_list[s]->get_data_type();
2000                 if (sdt->is_temporal()) {
2001                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
2002                 }
2003         }
2004
2005         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2006                 int tblref = (*csi).tblvar_ref;
2007                 int schref = (*csi).schema_ref;
2008                 string field = (*csi).field;
2009                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2010                 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
2011                 ret += tmpstr;
2012         }
2013
2014         if (is_aggr_query) {
2015                 for(g=0;g<gb_tbl->size();g++){
2016                         data_type *gdt = gb_tbl->get_data_type(g);
2017                         if(gdt->is_temporal()){
2018                                 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
2019                                 ret += tmpstr;
2020                                 data_type *gdt = gb_tbl->get_data_type(g);
2021                                 if(gdt->is_buffer_type()){
2022                                 sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
2023                                 ret += tmpstr;
2024                                 }
2025                         }
2026                 }
2027         }
2028         ret += "\n";
2029
2030         ret += "\ttime_advanced = 0;\n";
2031
2032         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2033                 int tblref = (*csi).tblvar_ref;
2034                 int schref = (*csi).schema_ref;
2035                 string field = (*csi).field;
2036                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2037
2038                 // update last seen value with the value seen
2039                 ret += "\t#ifdef PREFILTER_DEFINED\n";
2040                 sprintf(tmpstr,"\tif (prefilter_temp_vars.unpack_var_%s_%d > t->last_%s_%d) {\n\t\tt->last_%s_%d = prefilter_temp_vars.unpack_var_%s_%d;\n",
2041                         field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref);
2042                 ret += tmpstr;
2043                 ret += "\t\ttime_advanced = 1;\n\t}\n";
2044                 ret += "\t#endif\n";
2045
2046                 // we need to pay special attention to time fields
2047                 if (field == "time" || field == "timestamp" || field == "timestamp_ms"){
2048                         ret += "\tcur_time = time(&cur_time);\n";
2049
2050                         if (field == "time") {
2051                                 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && (t->last_time_%d < (cur_time - %d))) {\n",
2052                                         tblref, time_corr);
2053                                 ret += tmpstr;
2054                                 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = cur_time - %d;\n",
2055                                 field.c_str(), tblref, field.c_str(), tblref, time_corr);
2056                         } else if (field == "timestamp_ms") {
2057                                 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((t->last_timestamp_ms_%d/1000) < (cur_time - %d))) {\n",
2058                                         tblref, time_corr);
2059                                 ret += tmpstr;
2060                                 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = (cur_time - %d) * 1000;\n",
2061                                 field.c_str(), tblref, field.c_str(), tblref, time_corr);
2062                         }else{
2063                                 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((gs_uint32_t)(t->last_%s_%d>>32) < (cur_time - %d))) {\n",
2064                                         field.c_str(), tblref, time_corr);
2065                                 ret += tmpstr;
2066                                 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = ((gs_uint64_t)(cur_time - %d))<<32;\n",
2067                                         field.c_str(), tblref, field.c_str(), tblref, time_corr);
2068                         }
2069                         ret += tmpstr;
2070
2071                         ret += "\t\ttime_advanced = 1;\n";
2072                         ret += "\t}\n";
2073
2074                         sprintf(tmpstr,"\telse\n\t\tunpack_var_%s_%d = t->last_%s_%d;\n",
2075                         field.c_str(), tblref, field.c_str(), tblref);
2076                         ret += tmpstr;
2077                 } else {
2078                         sprintf(tmpstr,"\tunpack_var_%s_%d = t->last_%s_%d;\n",
2079                                 field.c_str(), tblref, field.c_str(), tblref);
2080                         ret += tmpstr;
2081                 }
2082         }
2083
2084
2085         if(advance_uxtime){
2086                 ret += "\tt->ux_time = time(&(t->ux_time));\n";
2087         }
2088
2089         // for aggregation lftas we need to check if the time was advanced beyond the current epoch
2090         if (is_aggr_query) {
2091
2092                 string change_test;
2093                 bool first_one = true;
2094                 for(g=0;g<gb_tbl->size();g++){
2095                   data_type *gdt = gb_tbl->get_data_type(g);
2096                   if(gdt->is_temporal()){
2097 //                                      To perform the test, first need to compute the value
2098 //                                      of the temporal gb attrs.
2099                           if(gdt->is_buffer_type()){
2100         //                              NOTE : if the SE defining the gb is anything
2101         //                              other than a ref to a variable, this will generate
2102         //                              illegal code.  To be resolved with Spatch.
2103                                 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2104                                         g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2105                                 ret+=tmpstr;
2106                                 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2107                                         gdt->get_buffer_assign_copy().c_str(), g, g);
2108                           }else{
2109                                 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2110                           }
2111                           ret += tmpstr;
2112
2113                           sprintf(tmpstr,"t->last_gb_%d",g);   string lhs_op = tmpstr;
2114                           sprintf(tmpstr,"gb_attr_%d",g);   string rhs_op = tmpstr;
2115                           if(first_one){first_one = false;} else {change_test.append(") && (");}
2116                           change_test.append(generate_equality_test(lhs_op, rhs_op, gdt));
2117                   }
2118                 }
2119
2120                 ret += "\n\tif( time_advanced && !( (";
2121                 ret += change_test;
2122                 ret += ") ) ){\n";
2123
2124                 ret += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
2125                 ret += "\t\tif(t->flush_pos<t->max_aggrs) \n";
2126                 ret += "\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2127
2128                 ret += "\t\t/* \t\tmark all groups as old */\n";
2129                 ret +="\t\tt->generation++;\n";
2130                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
2131                 ret += "//\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
2132                 ret += "//\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
2133                 ret += "\t\tt->flush_pos = 0;\n";
2134
2135                 for(g=0;g<gb_tbl->size();g++){
2136                      data_type *gdt = gb_tbl->get_data_type(g);
2137                      if(gdt->is_temporal()){
2138                         sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);                 ret += tmpstr;
2139                         sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);                        ret += tmpstr;
2140                      }
2141                 }
2142                 ret += "\t}\n\n";
2143
2144         }
2145
2146         ret += "\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t) + sizeof(struct fta_stat);\n";
2147         ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
2148         ret += "\tif( tuple == NULL)\n\t\treturn 1;\n";
2149
2150
2151         for(s=0;s<sl_list.size();s++){
2152                 data_type *sdt = sl_list[s]->get_data_type();
2153                 if(sdt->is_temporal()){
2154
2155                         if (sl_list[s]->is_gb()) {
2156                                 sprintf(tmpstr,"\tt->last_flushed_gb_%d = (t->n_aggrs) ? t->last_flushed_gb_%d : %s;\n",sl_list[s]->get_gb_ref(), sl_list[s]->get_gb_ref(), generate_se_code(sl_list[s],schema).c_str());
2157                                 ret += tmpstr;
2158                         }
2159
2160                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2161                         ret += tmpstr;
2162 //                      if(sdt->needs_hn_translation())
2163 //                              ret += sdt->hton_translation() +"( ";
2164                         if (sl_list[s]->is_gb()) {
2165                                 sprintf(tmpstr, "t->last_flushed_gb_%d",sl_list[s]->get_gb_ref());
2166                                 ret += tmpstr;
2167                         } else{
2168                                 ret += generate_se_code(sl_list[s],schema);
2169                         }
2170 //                      if(sdt->needs_hn_translation())
2171 //                              ret += " )";
2172                         ret += ";\n";
2173                 }
2174         }
2175
2176         /* mark tuple as temporal */
2177         ret += "\n\t/* Mark tuple as temporal */\n";
2178         ret += "\ttuple->tuple_type = TEMPORAL_TUPLE;\n";
2179
2180         ret += "\n\t/* Copy trace id */\n";
2181         ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+"), &t->trace_id, sizeof(gs_uint64_t));\n\n";
2182
2183         ret += "\n\t/* Populate runtime stats */\n";
2184         ret += "\tstats.ftaid = f->ftaid;\n";
2185         ret += "\tstats.in_tuple_cnt = t->in_tuple_cnt;\n";
2186         ret += "\tstats.out_tuple_cnt = t->out_tuple_cnt;\n";
2187         ret += "\tstats.out_tuple_sz = t->out_tuple_sz;\n";
2188         ret += "\tstats.accepted_tuple_cnt = t->accepted_tuple_cnt;\n";
2189         ret += "\tstats.cycle_cnt = t->cycle_cnt;\n";
2190         ret += "\tstats.collision_cnt = t->collision_cnt;\n";
2191         ret += "\tstats.eviction_cnt = t->eviction_cnt;\n";
2192         ret += "\tstats.sampling_rate    = t->sampling_rate;\n";
2193
2194         ret += "\n#ifdef LFTA_PROFILE\n";
2195         ret += "\n\t/* Print stats */\n";
2196         ret += "\tfprintf(stderr, \"STATS " + node_name + " \");\n";
2197         ret += "\tfprintf(stderr, \"in_tuple_cnt= %u \", t->in_tuple_cnt);\n";
2198         ret += "\tfprintf(stderr, \"out_tuple_cnt= %u \", t->out_tuple_cnt);\n";
2199         ret += "\tfprintf(stderr, \"out_tuple_sz= %u \", t->out_tuple_sz);\n";
2200         ret += "\tfprintf(stderr, \"accepted_tuple_cnt= %u \", t->accepted_tuple_cnt);\n";
2201         ret += "\tfprintf(stderr, \"cycle_cnt= %llu \", t->cycle_cnt);\n";
2202         ret += "\tfprintf(stderr, \"cycle_per_in_tuple= %lf \", ((double)t->cycle_cnt)/((double)t->in_tuple_cnt));\n";
2203         ret += "\tfprintf(stderr, \"collision_cnt= %d\\n\", t->collision_cnt);\n\n";
2204         ret += "\tfprintf(stderr, \"eviction_cnt= %d\\n\", t->eviction_cnt);\n\n";
2205         ret += "\n#endif\n";
2206
2207
2208         ret += "\n\t/* Copy stats */\n";
2209         ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t), &stats, sizeof(fta_stat));\n\n";
2210         ret+="\tpost_tuple(tuple);\n";
2211
2212         ret += "\n\t/* Send a heartbeat message to clearinghouse */\n";
2213         ret += "\tfta_heartbeat(f->ftaid, t->trace_id++, 1, &stats);\n";
2214
2215         ret += "\n\t/* Reset runtime stats */\n";
2216         ret += "\tt->in_tuple_cnt = 0;\n";
2217         ret += "\tt->out_tuple_cnt = 0;\n";
2218         ret += "\tt->out_tuple_sz = 0;\n";
2219         ret += "\tt->accepted_tuple_cnt = 0;\n";
2220         ret += "\tt->cycle_cnt = 0;\n";
2221         ret += "\tt->collision_cnt = 0;\n";
2222         ret += "\tt->eviction_cnt = 0;\n";
2223
2224         ret += "\treturn 0;\n}\n\n";
2225
2226         return(ret);
2227 }
2228
2229
2230 //              accept processing before the where clause,
2231 //              do flush processwing.
2232 string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *schema,   col_id_set &unpacked_cids, string &temporal_flush){
2233         int s;
2234
2235 //              Slow flush
2236   string ret="\n/*\tslow flush\t*/\n";
2237   string slow_flush_str = fs->get_val_of_def("slow_flush");
2238   int n_slow_flush = atoi(slow_flush_str.c_str());
2239   if(n_slow_flush <= 0) n_slow_flush = 2;
2240   if(n_slow_flush > 1){
2241         ret += "\tt->flush_ctr++;\n";
2242         ret += "\tif(t->flush_ctr >= "+int_to_string(n_slow_flush)+"){\n";
2243         ret += "\t\tt->flush_ctr = 0;\n";
2244     ret+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n";
2245         ret += "\t}\n\n";
2246   }else{
2247     ret+="\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n\n";
2248   }
2249
2250
2251         string change_test;
2252         bool first_one = true;
2253         int g;
2254     col_id_set flush_cids;              //      col ids accessed when computing flush variables.
2255                                                         //  unpack them at temporal flush test time.
2256     temporal_flush = "";
2257
2258
2259         for(g=0;g<gb_tbl->size();g++){
2260                   data_type *gdt = gb_tbl->get_data_type(g);
2261                   if(gdt->is_temporal()){
2262                           gather_se_col_ids(gb_tbl->get_def(g), flush_cids, gb_tbl);
2263
2264 //                                      To perform the test, first need to compute the value
2265 //                                      of the temporal gb attrs.
2266                           if(gdt->is_buffer_type()){
2267         //                              NOTE : if the SE defining the gb is anything
2268         //                              other than a ref to a variable, this will generate
2269         //                              illegal code.  To be resolved with Spatch.
2270                                 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2271                                         g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2272                                 temporal_flush += tmpstr;
2273                                 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2274                                         gdt->get_buffer_assign_copy().c_str(), g, g);
2275                           }else{
2276                                 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2277                           }
2278                           temporal_flush += tmpstr;
2279 //                                      END computing the value of the temporal GB attr.
2280
2281
2282                           sprintf(tmpstr,"t->last_gb_%d",g);   string lhs_op = tmpstr;
2283                           sprintf(tmpstr,"gb_attr_%d",g);   string rhs_op = tmpstr;
2284                           if(first_one){first_one = false;} else {change_test.append(") && (");}
2285                           change_test += generate_equality_test(lhs_op, rhs_op, gdt);
2286                   }
2287         }
2288         if(!first_one){         // will be false iff. there is a temporal GB attribute
2289                   temporal_flush += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
2290                   temporal_flush += "\tif( !( (";
2291                   temporal_flush += change_test;
2292                   temporal_flush += ") ) ){\n";
2293
2294 //                temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2295                   temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs){ \n";
2296                   temporal_flush+="\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2297                   temporal_flush+="\t\t}\n";
2298                   temporal_flush+="\t\t/* \t\tmark all groups as old */\n";
2299                   temporal_flush+="\t\tt->generation++;\n";
2300                   temporal_flush+="\t\tt->flush_pos = 0;\n";
2301
2302
2303 //                              Now set the saved temporal value of the gb to the
2304 //                              current value of the gb.  Only for simple types,
2305 //                              not for buffer types -- but the strings are not
2306 //                              temporal in any case.
2307
2308                         for(g=0;g<gb_tbl->size();g++){
2309                           data_type *gdt = gb_tbl->get_data_type(g);
2310                           if(gdt->is_temporal()){
2311                                   if(gdt->is_buffer_type()){
2312
2313                                         fprintf(stderr,"ERROR : can't handle temporal buffer types, ignoring in buffer flush control.\n");
2314                                   }else{
2315                                         sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);
2316                                         temporal_flush += tmpstr;
2317                                         sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);
2318                                         temporal_flush += tmpstr;
2319                                   }
2320                                 }
2321                         }
2322                   temporal_flush += "\t}\n\n";
2323         }
2324
2325 // Unpack all the temporal attributes referenced in select clause
2326 // and update the last value of the attribute
2327         col_id_set temp_cids;           //      col ids of temp attributes in select clause
2328         col_id_set::iterator csi;
2329
2330         for(s=0;s<sl_list.size();s++){
2331                 data_type *sdt = sl_list[s]->get_data_type();
2332                 if (sdt->is_temporal()) {
2333                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
2334                 }
2335         }
2336
2337         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2338                 if(unpacked_cids.count((*csi)) == 0){
2339                         int tblref = (*csi).tblvar_ref;
2340                         int schref = (*csi).schema_ref;
2341                         string field = (*csi).field;
2342                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2343 /*
2344                         data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2345                         sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",
2346                                 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2347                         ret += tmpstr;
2348                         ret += "\tif(retval) return 1;\n";
2349 */
2350                         sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
2351                         ret += tmpstr;
2352
2353                         unpacked_cids.insert( (*csi) );
2354                 }
2355         }
2356
2357
2358 //                              Do the flush here if this is a real_time query
2359         string rt_level = fs->get_val_of_def("real_time");
2360         if(rt_level != "" && temporal_flush != ""){
2361                 for(csi=flush_cids.begin(); csi != flush_cids.end();++csi){
2362                         if(unpacked_cids.count((*csi)) == 0){
2363                         int tblref = (*csi).tblvar_ref;
2364                         int schref = (*csi).schema_ref;
2365                         string field = (*csi).field;
2366                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2367 /*
2368                                 sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",
2369                                 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2370                         ret += tmpstr;
2371                         ret += "\tif(retval) return 1;\n";
2372 */
2373                                 unpacked_cids.insert( (*csi) );
2374                         }
2375                 }
2376                 ret += temporal_flush;
2377         }
2378
2379         return ret;
2380  }
2381
2382 string generate_sel_accept_body(qp_node *fs, string node_name, table_list *schema){
2383
2384 int p,s;
2385 string ret;
2386
2387 ///////////////                 Processing for filter-only query
2388
2389 //                      test passed : create the tuple, then assign to it.
2390           ret += "/*\t\tCreate and post the tuple\t*/\n";
2391
2392 //                      Unpack partial fcns ref'd by the select clause.
2393 //                      Its a kind of a WHERE clause ...
2394   for(p=sl_fcns_start;p<sl_fcns_end;p++){
2395         if(fcn_ref_cnt[p] > 1){
2396                 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2397         }
2398         if(is_partial_fcn[p]){
2399                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2400                 ret += "\tif(retval) goto end;\n";
2401         }
2402         if(fcn_ref_cnt[p] > 1){
2403                 if(!is_partial_fcn[p]){
2404                         ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2405                 }
2406                 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2407                 ret += "\t}\n";
2408         }
2409   }
2410
2411   // increment the counter of accepted tuples
2412   ret += "\n\t#ifdef LFTA_STATS\n";
2413   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2414   ret += "\t#endif\n\n";
2415
2416 //                      First, compute the size of the tuple.
2417
2418 //                      Unpack any BUFFER type selections into temporaries
2419 //                      so that I can compute their size and not have
2420 //                      to recompute their value during tuple packing.
2421 //                      I can use regular assignment here because
2422 //                      these temporaries are non-persistent.
2423
2424           for(s=0;s<sl_list.size();s++){
2425                 data_type *sdt = sl_list[s]->get_data_type();
2426                 if(sdt->is_buffer_type()){
2427                         sprintf(tmpstr,"\tselvar_%d = ",s);
2428                         ret += tmpstr;
2429                         ret += generate_se_code(sl_list[s],schema);
2430                         ret += ";\n";
2431                 }
2432           }
2433
2434
2435 //              The size of the tuple is the size of the tuple struct plus the
2436 //              size of the buffers to be copied in.
2437
2438           ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
2439           for(s=0;s<sl_list.size();s++){
2440                 data_type *sdt = sl_list[s]->get_data_type();
2441                 if(sdt->is_buffer_type()){
2442                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
2443                         ret += tmpstr;
2444                 }
2445           }
2446           ret += ";\n";
2447
2448
2449           ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
2450           ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
2451
2452 //                      Test passed, make assignments to the tuple.
2453
2454           ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
2455
2456 //                      Mark tuple as REGULAR_TUPLE
2457           ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
2458
2459
2460           for(s=0;s<sl_list.size();s++){
2461                 data_type *sdt = sl_list[s]->get_data_type();
2462                 if(sdt->is_buffer_type()){
2463                         sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
2464                         ret += tmpstr;
2465                         sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
2466                         ret += tmpstr;
2467                 }else{
2468                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2469                         ret += tmpstr;
2470 //                      if(sdt->needs_hn_translation())
2471 //                              ret += sdt->hton_translation() +"( ";
2472                         ret += generate_se_code(sl_list[s],schema);
2473 //                      if(sdt->needs_hn_translation())
2474 //                              ret += ") ";
2475                         ret += ";\n";
2476                 }
2477           }
2478
2479 //              Generate output.
2480
2481           ret += "\tpost_tuple(tuple);\n";
2482
2483 //              Increment the counter of posted tuples
2484   ret += "\n\t#ifdef LFTA_STATS\n";
2485   ret += "\tt->out_tuple_cnt++;\n";
2486   ret+="\tt->out_tuple_sz+=tuple_size;\n";
2487   ret += "\t#endif\n\n";
2488
2489
2490
2491         return ret;
2492 }
2493
2494 //      TODO Ensure that postfilter predicates are being generated
2495 string generate_fj_accept_body(filter_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
2496
2497 int p,s,w;
2498 string ret;
2499
2500 //                      Get parameters
2501         unsigned int window_len = fs->temporal_range;
2502         unsigned int n_bloom = 11;
2503         string n_bloom_str = fs->get_val_of_def("num_bloom");
2504         int tmp_n_bloom = atoi(n_bloom_str.c_str());
2505         if(tmp_n_bloom>0)
2506                 n_bloom = tmp_n_bloom+1;
2507         float bloom_width = (window_len+1.0)/(1.0*n_bloom-1);
2508         sprintf(tmpstr,"%f",bloom_width);
2509         string bloom_width_str = tmpstr;
2510
2511         if(window_len < n_bloom){
2512                 n_bloom = window_len+1;
2513                 bloom_width_str = "1";
2514         }
2515
2516
2517 //              Grab the current window time
2518         scalarexp_t winvar(fs->temporal_var);
2519         ret += "\tcurr_fj_ts = "+generate_se_code(&winvar,schema)+";\n";
2520
2521         int bf_exp_size = 12;  // base-2 log of number of bits
2522         string bloom_len_str = fs->get_val_of_def("bloom_size");
2523         int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
2524         if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
2525                 bf_exp_size = tmp_bf_exp_size;
2526         }
2527         int bf_bit_size = 1 << bf_exp_size;
2528         int bf_byte_size = bf_bit_size / (8*sizeof(char));
2529
2530         unsigned int ht_size = 4096;
2531         string ht_size_s = fs->get_val_of_def("aggregate_slots");
2532         int tmp_ht_size = atoi(ht_size_s.c_str());
2533         if(tmp_ht_size > 1024){
2534                 unsigned int hs = 1;            // make it power of 2
2535                 while(tmp_ht_size){
2536                         hs =hs << 1;
2537                         tmp_ht_size = tmp_ht_size >> 1;
2538                 }
2539                 ht_size = hs;
2540         }
2541
2542         int i, bf_mask = 0;
2543         if(fs->use_bloom){
2544                 for(i=0;i<bf_exp_size;i++)
2545                         bf_mask = (bf_mask << 1) | 1;
2546         }else{
2547                 for(i=ht_size;i>1;i=i>>1)
2548                         bf_mask = (bf_mask << 1) | 1;
2549         }
2550
2551 /*
2552 printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=%d, bf_byte_size=%d, ht_size=%d, ht_size_s=%s, bf_mask=%d\n",
2553         n_bloom,
2554         window_len,
2555         bloom_width_str.c_str(),
2556         bf_exp_size,
2557         bf_bit_size,
2558         bf_byte_size,
2559         ht_size,
2560         ht_size_s.c_str(),
2561         bf_mask);
2562 */
2563
2564
2565
2566
2567 //              If this is a bloom-filter fj, first test if the
2568 //              bloom filter needs to be advanced.
2569 //              SET_BF_EMPTY(table,number of bloom filters,bloom filter index,bit index)
2570 //              t->bf_size : number of bits in bloom filter
2571 //              TODO: vectorize?
2572 //              TODO: Don't iterate more than n_bloom times!
2573 //                      As written, its possible to wrap around many times.
2574         if(fs->use_bloom){
2575                 ret +=
2576 "//                     Clean out old bloom filters if needed.\n"
2577 "//                     TODO vectorize this ? \n"
2578 "       if(t->first_exec){\n"
2579 "               t->first_exec = 0;\n"
2580 "               t->last_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2581 "               t->last_bloom_pos = t->last_bin % "+int_to_string(n_bloom)+";\n"
2582 "       }else{\n"
2583 "               curr_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2584 "               if(curr_bin != t->last_bin){\n"
2585 "                       for(the_bin=t->last_bin+1;the_bin<=curr_bin;the_bin++){\n"
2586 "                               t->last_bloom_pos++;\n"
2587 "                               if(t->last_bloom_pos >= "+int_to_string(n_bloom)+")\n"
2588 "                                       t->last_bloom_pos = 0;\n"
2589 "                               tmp_i = t->last_bloom_pos;\n"
2590 "                               for(j=0;j<"+int_to_string(bf_bit_size)+";j++){\n"
2591 "                                       SET_BF_EMPTY(t->bf_table, "+int_to_string(n_bloom)+", tmp_i,j);\n"
2592 "                               }\n"
2593 "                       }\n"
2594 "               }\n"
2595 "               t->last_bin = curr_bin;\n"
2596 "       }\n"
2597 ;
2598         }
2599
2600
2601 //-----------------------------------------------------------------
2602 //              First, determine whether to do S (filter stream) processing.
2603
2604         ret +=
2605 "//             S (filtering stream) predicate, should it be processed?\n"
2606 "\n"
2607 ;
2608 // Sort S preds based on cost.
2609         vector<cnf_elem *> s_filt = fs->pred_t1;
2610         col_id_set::iterator csi;
2611   if(s_filt.size() > 0){
2612
2613 //                      Unpack fields ref'd in the S pred
2614         for(w=0;w<s_filt.size();++w){
2615                 col_id_set this_pred_cids;
2616                 gather_pr_col_ids(s_filt[w]->pr, this_pred_cids, gb_tbl);
2617                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2618                         if(unpacked_cids.count( (*csi) ) == 0){
2619                                 int tblref = (*csi).tblvar_ref;
2620                                 int schref = (*csi).schema_ref;
2621                                 string field = (*csi).field;
2622                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name,"end_s");
2623                                 unpacked_cids.insert( (*csi) );
2624                         }
2625                 }
2626         }
2627
2628
2629 //              Sort by evaluation cost.
2630 //              First, estimate evaluation costs
2631 //              Eliminate predicates covered by the prefilter (those in s_pids).
2632 //              I need to do it before the sort becuase the indices refer
2633 //              to the position in the unsorted list.
2634         vector<cnf_elem *> tmp_wh;
2635         for(w=0;w<s_filt.size();++w){
2636                 compute_cnf_cost(s_filt[w],Ext_fcns);
2637                 tmp_wh.push_back(s_filt[w]);
2638         }
2639         s_filt = tmp_wh;
2640
2641         sort(s_filt.begin(), s_filt.end(), compare_cnf_cost());
2642
2643 //              Now generate the predicates.
2644         for(w=0;w<s_filt.size();++w){
2645                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,s_filt[w]->cost);
2646                 ret += tmpstr;
2647
2648 //                      Find partial fcns ref'd in this cnf element
2649                 set<int> pfcn_refs;
2650                 collect_partial_fcns_pr(s_filt[w]->pr, pfcn_refs);
2651 //                      Since set<..> is a "Sorted Associative Container",
2652 //                      we can walk through it in sorted order by walking from
2653 //                      begin() to end().  (and the partial fcns must be
2654 //                      evaluated in this order).
2655                 set<int>::iterator si;
2656                 string pf_preds;
2657                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2658                         if(fcn_ref_cnt[(*si)] > 1){
2659                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2660                         }
2661                         if(is_partial_fcn[(*si)]){
2662                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2663                                 ret += "\t\tif(retval) goto end_s;\n";
2664                         }
2665                         if(fcn_ref_cnt[(*si)] > 1){
2666                                 if(!is_partial_fcn[(*si)]){
2667                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2668 //              Testing for S is a side branch.
2669 //              I don't want a cacheable partial function to be
2670 //              marked as evaluated.  Therefore I mark the function
2671 //              as evalauted ONLY IF it is not partial.
2672                                         ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2673                                 }
2674                                 ret += "\t}\n";
2675                         }
2676                 }
2677
2678                 ret += "\tif( !("+generate_predicate_code(s_filt[w]->pr,schema)+
2679                                 ") ) goto end_s;\n";
2680         }
2681   }else{
2682           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2683   }
2684
2685         for(p=0;p<fs->hash_eq.size();++p)
2686                 ret += "\t\ts_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_right_se(),schema)+";\n";
2687
2688         if(fs->use_bloom){
2689 //                      First, generate the S scalar expressions in the hash_eq
2690
2691 //                      Iterate over the bloom filters
2692                 for(i=0;i<3;i++){
2693                         ret += "\t\tbucket=0;\n";
2694                         for(p=0;p<fs->hash_eq.size();++p){
2695                                 ret +=
2696 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2697         fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2698         +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2699                         }
2700 //              SET_BF_BIT(table,number of bloom filters,bloom filter index,bit index)
2701                                 ret +=
2702 "               bucket &= "+int_to_string(bf_mask)+";\n"
2703 "               SET_BF_BIT(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket);\n"
2704 "\n"
2705 ;
2706                 }
2707         }else{
2708                 ret += "// Add the S record to the hash table, choose a position\n";
2709                 ret += "\t\tbucket=0;\n";
2710                 for(p=0;p<fs->hash_eq.size();++p){
2711                         ret +=
2712 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2713         fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2714         +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2715                 }
2716                 ret +=
2717 "               bucket &= "+int_to_string(bf_mask)+";\n"
2718 "               bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2719 ;
2720 //                      Try the first bucket
2721                 ret += "\t\tif(";
2722                 for(p=0;p<fs->hash_eq.size();++p){
2723                         if(p>0) ret += " && ";
2724 //                      ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2725 //                                      " == s_equijoin_"+int_to_string(p);
2726                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2727                         string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2728                         string rhs_op = "s_equijoin_"+int_to_string(p);
2729                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2730                 }
2731                 ret += "){\n\t\t\tthe_bucket = bucket;\n";
2732                 ret += "\t\t}else{\n\t\t\tif(";
2733                 for(p=0;p<fs->hash_eq.size();++p){
2734                         if(p>0) ret += " && ";
2735 //                      ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2736 //                                      " == s_equijoin_"+int_to_string(p);
2737                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2738                         string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
2739                         string rhs_op = "s_equijoin_"+int_to_string(p);
2740                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2741                 }
2742                 ret +=  "){\n\t\t\t\tthe_bucket = bucket1;\n";
2743                 ret += "\t\t\t}else{ \n\t\t\t\tif(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n";
2744                 ret+="\t\t\t\t\tthe_bucket = bucket;\n\t\t\t\telse\n\t\t\t\t\tthe_bucket=bucket1;\n";
2745                 ret += "\t\t\t}\n\t\t}\n";
2746                 for(p=0;p<fs->hash_eq.size();++p){
2747                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2748                         if(hdt->is_buffer_type()){
2749                                 sprintf(tmpstr,"\t\t%s(f, &(t->join_table[the_bucket].key_var%d), &s_equijoin_%d);\n", hdt->get_buffer_assign_copy().c_str(), p, p);
2750                                 ret += tmpstr;
2751                         }else{
2752                                 ret += "\t\tt->join_table[the_bucket].key_var"+int_to_string(p)+
2753                                         " = s_equijoin_"+int_to_string(p)+";\n";
2754                         }
2755                 }
2756                 ret+="\t\tt->join_table[the_bucket].ts =  curr_fj_ts;\n";
2757         }
2758   ret += "\tend_s:\n";
2759
2760 //      ------------------------------------------------------------
2761 //              Next, determine if the R record should be processed.
2762
2763
2764         ret +=
2765 "//             R (main stream) cheap predicate\n"
2766 "\n"
2767 ;
2768
2769 //              Unpack r_filt fields
2770         vector<cnf_elem *> r_filt = fs->pred_t0;
2771         for(w=0;w<r_filt.size();++w){
2772                 col_id_set this_pred_cids;
2773                 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
2774                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2775                         if(unpacked_cids.count( (*csi) ) == 0){
2776                                 int tblref = (*csi).tblvar_ref;
2777                                 int schref = (*csi).schema_ref;
2778                                 string field = (*csi).field;
2779                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2780                                 unpacked_cids.insert( (*csi) );
2781                         }
2782                 }
2783         }
2784
2785 // Sort R preds based on cost.
2786
2787         vector<cnf_elem *> tmp_wh;
2788         for(w=0;w<r_filt.size();++w){
2789                 compute_cnf_cost(r_filt[w],Ext_fcns);
2790                 tmp_wh.push_back(r_filt[w]);
2791         }
2792         r_filt = tmp_wh;
2793
2794         sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
2795
2796 //              WARNING! the constant 20 below is a wild-ass guess.
2797         int cheap_rpos;
2798         for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++);
2799
2800 //              Test the cheap filters on R.
2801   if(cheap_rpos >0){
2802
2803 //              Now generate the predicates.
2804         for(w=0;w<cheap_rpos;++w){
2805                 sprintf(tmpstr,"//\t\tcheap R predicate clause %d. (cost %d)\n",w,r_filt[w]->cost);
2806                 ret += tmpstr;
2807
2808 //                      Find partial fcns ref'd in this cnf element
2809                 set<int> pfcn_refs;
2810                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2811 //                      Since set<..> is a "Sorted Associative Container",
2812 //                      we can walk through it in sorted order by walking from
2813 //                      begin() to end().  (and the partial fcns must be
2814 //                      evaluated in this order).
2815                 set<int>::iterator si;
2816                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2817                         if(fcn_ref_cnt[(*si)] > 1){
2818                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2819                         }
2820                         if(is_partial_fcn[(*si)]){
2821                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2822                                 ret += "\t\tif(retval) goto end;\n";
2823                         }
2824                         if(fcn_ref_cnt[(*si)] > 1){
2825                                 if(!is_partial_fcn[(*si)]){
2826                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2827                                 }
2828                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2829                                 ret += "\t}\n";
2830                         }
2831                 }
2832
2833                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2834                                 ") ) goto end;\n";
2835         }
2836   }else{
2837           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2838   }
2839
2840         ret += "\n//    Do the join\n\n";
2841         for(p=0;p<fs->hash_eq.size();++p)
2842                 ret += "\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n";
2843
2844
2845 //                      Passed the cheap pred, now test the join with S.
2846         if(fs->use_bloom){
2847                 for(i=0;i<3;i++){
2848                         ret += "\t\tbucket"+int_to_string(i)+"=0;\n";
2849                         for(p=0;p<fs->hash_eq.size();++p){
2850                                 ret +=
2851 "       bucket"+int_to_string(i)+
2852         " ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2853         fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
2854         +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2855                         }
2856                                 ret +=
2857 "       bucket"+int_to_string(i)+" &= "+int_to_string(bf_mask)+";\n";
2858                 }
2859                 ret += "\tfound = 0;\n";
2860                 ret += "\tfor(b=0;b<"+int_to_string(n_bloom)+" && !found; b++){\n";
2861                 ret +=
2862 "\t\tif(IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket0) && "
2863 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket1) && "
2864 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket2))\n "
2865 "\t\t\tfound=1;\n"
2866 "\t}\n"
2867 ;
2868                 ret +=
2869 "       if(!found)\n"
2870 "               goto end;\n"
2871 ;
2872         }else{
2873                 ret += "\tfound = 0;\n";
2874                 ret += "\t\tbucket=0;\n";
2875                 for(p=0;p<fs->hash_eq.size();++p){
2876                         ret +=
2877 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2878         fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
2879         +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2880                 }
2881                 ret +=
2882 "               bucket &= "+int_to_string(bf_mask)+";\n"
2883 "               bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2884 ;
2885 //                      Try the first bucket
2886                 ret += "\t\tif(";
2887                 for(p=0;p<fs->hash_eq.size();++p){
2888                         if(p>0) ret += " && ";
2889 //                      ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2890 //                                      " == r_equijoin_"+int_to_string(p);
2891                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2892                         string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2893                         string rhs_op = "s_equijoin_"+int_to_string(p);
2894                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2895                 }
2896                 if(p>0) ret += " && ";
2897                 ret += "t->join_table[bucket].ts+"+int_to_string(fs->temporal_range)+" <=  curr_fj_ts";
2898                 ret += "){\n\t\t\tfound = 1;\n";
2899                 ret += "\t\t}else {if(";
2900                 for(p=0;p<fs->hash_eq.size();++p){
2901                         if(p>0) ret += " && ";
2902 //                      ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2903 //                                      " == r_equijoin_"+int_to_string(p);
2904                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2905                         string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
2906                         string rhs_op = "s_equijoin_"+int_to_string(p);
2907                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2908                 }
2909                 if(p>0) ret += " && ";
2910                 ret += "t->join_table[bucket1].ts+"+int_to_string(fs->temporal_range)+" <=  curr_fj_ts";
2911                 ret +=  ")\n\t\t\tfound=1;\n";
2912                 ret+="\t\t}\n";
2913                 ret +=
2914 "       if(!found)\n"
2915 "               goto end;\n"
2916 ;
2917         }
2918
2919
2920 //              Test the expensive filters on R.
2921   if(cheap_rpos < r_filt.size()){
2922
2923 //              Now generate the predicates.
2924         for(w=cheap_rpos;w<r_filt.size();++w){
2925                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
2926                 ret += tmpstr;
2927
2928 //                      Find partial fcns ref'd in this cnf element
2929                 set<int> pfcn_refs;
2930                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2931 //                      Since set<..> is a "Sorted Associative Container",
2932 //                      we can walk through it in sorted order by walking from
2933 //                      begin() to end().  (and the partial fcns must be
2934 //                      evaluated in this order).
2935                 set<int>::iterator si;
2936                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2937                         if(fcn_ref_cnt[(*si)] > 1){
2938                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2939                         }
2940                         if(is_partial_fcn[(*si)]){
2941                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2942                                 ret += "\t\tif(retval) goto end;\n";
2943                         }
2944                         if(fcn_ref_cnt[(*si)] > 1){
2945                                 if(!is_partial_fcn[(*si)]){
2946                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2947                                 }
2948                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2949                                 ret += "\t}\n";
2950                         }
2951                 }
2952
2953                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2954                                 ") ) goto end;\n";
2955         }
2956   }else{
2957           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2958   }
2959
2960
2961
2962 ///////////////                 post the tuple
2963
2964 //                      test passed : create the tuple, then assign to it.
2965           ret += "/*\t\tCreate and post the tuple\t*/\n";
2966
2967 //              Unpack r_filt fields
2968         for(s=0;s<sl_list.size();++s){
2969                 col_id_set this_se_cids;
2970                 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
2971                 for(csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
2972                         if(unpacked_cids.count( (*csi) ) == 0){
2973                                 int tblref = (*csi).tblvar_ref;
2974                                 int schref = (*csi).schema_ref;
2975                                 string field = (*csi).field;
2976                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2977                                 unpacked_cids.insert( (*csi) );
2978                         }
2979                 }
2980         }
2981
2982
2983 //                      Unpack partial fcns ref'd by the select clause.
2984 //                      Its a kind of a WHERE clause ...
2985   for(p=sl_fcns_start;p<sl_fcns_end;p++){
2986         if(fcn_ref_cnt[p] > 1){
2987                 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2988         }
2989         if(is_partial_fcn[p]){
2990                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2991                 ret += "\tif(retval) goto end;\n";
2992         }
2993         if(fcn_ref_cnt[p] > 1){
2994                 if(!is_partial_fcn[p]){
2995                         ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2996                 }
2997                 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2998                 ret += "\t}\n";
2999         }
3000   }
3001
3002   // increment the counter of accepted tuples
3003   ret += "\n\t#ifdef LFTA_STATS\n";
3004   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3005   ret += "\t#endif\n\n";
3006
3007 //                      First, compute the size of the tuple.
3008
3009 //                      Unpack any BUFFER type selections into temporaries
3010 //                      so that I can compute their size and not have
3011 //                      to recompute their value during tuple packing.
3012 //                      I can use regular assignment here because
3013 //                      these temporaries are non-persistent.
3014
3015           for(s=0;s<sl_list.size();s++){
3016                 data_type *sdt = sl_list[s]->get_data_type();
3017                 if(sdt->is_buffer_type()){
3018                         sprintf(tmpstr,"\tselvar_%d = ",s);
3019                         ret += tmpstr;
3020                         ret += generate_se_code(sl_list[s],schema);
3021                         ret += ";\n";
3022                 }
3023           }
3024
3025
3026 //              The size of the tuple is the size of the tuple struct plus the
3027 //              size of the buffers to be copied in.
3028
3029           ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
3030           for(s=0;s<sl_list.size();s++){
3031                 data_type *sdt = sl_list[s]->get_data_type();
3032                 if(sdt->is_buffer_type()){
3033                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
3034                         ret += tmpstr;
3035                 }
3036           }
3037           ret += ";\n";
3038
3039
3040           ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
3041           ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
3042
3043 //                      Test passed, make assignments to the tuple.
3044
3045           ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
3046
3047 //                      Mark tuple as REGULAR_TUPLE
3048           ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
3049
3050
3051           for(s=0;s<sl_list.size();s++){
3052                 data_type *sdt = sl_list[s]->get_data_type();
3053                 if(sdt->is_buffer_type()){
3054                         sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
3055                         ret += tmpstr;
3056                         sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
3057                         ret += tmpstr;
3058                 }else{
3059                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
3060                         ret += tmpstr;
3061 //                      if(sdt->needs_hn_translation())
3062 //                              ret += sdt->hton_translation() +"( ";
3063                         ret += generate_se_code(sl_list[s],schema);
3064 //                      if(sdt->needs_hn_translation())
3065 //                              ret += ") ";
3066                         ret += ";\n";
3067                 }
3068           }
3069
3070 //              Generate output.
3071
3072           ret += "\tpost_tuple(tuple);\n";
3073
3074 //              Increment the counter of posted tuples
3075   ret += "\n\t#ifdef LFTA_STATS\n";
3076   ret += "\n\tt->out_tuple_cnt++;\n\n";
3077   ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
3078   ret += "\t#endif\n\n";
3079
3080
3081         return ret;
3082 }
3083
3084
3085 string generate_wj_accept_body(watch_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
3086
3087 int p,s,w;
3088 string ret;
3089
3090
3091         string wl_schema = fs->from[1]->get_schema_name();
3092         string wl_elem_str = generate_watchlist_element_name(wl_schema);
3093         string wl_node_str = generate_watchlist_struct_name(wl_schema);
3094         string tgt = generate_watchlist_name(wl_schema);
3095
3096         ret += "//\n//\t\tGenerate test to update watchtable here\n//\n\n";
3097
3098
3099
3100
3101
3102 //      ------------------------------------------------------------
3103 //              Determine if the R record should be processed.
3104
3105
3106         ret +=
3107 "//             R (main stream) cheap predicate\n"
3108 "\n"
3109 ;
3110
3111 //              Unpack r_filt fields
3112         vector<cnf_elem *> r_filt = fs->pred_t0;
3113         for(w=0;w<r_filt.size();++w){
3114                 col_id_set this_pred_cids;
3115                 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
3116                 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3117                         if(unpacked_cids.count( (*csi) ) == 0){
3118                                 int tblref = (*csi).tblvar_ref;
3119                                 int schref = (*csi).schema_ref;
3120                                 string field = (*csi).field;
3121                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3122                                 unpacked_cids.insert( (*csi) );
3123                         }
3124                 }
3125         }
3126
3127 // Sort R preds based on cost.
3128
3129         vector<cnf_elem *> tmp_wh;
3130         for(w=0;w<r_filt.size();++w){
3131                 compute_cnf_cost(r_filt[w],Ext_fcns);
3132                 tmp_wh.push_back(r_filt[w]);
3133         }
3134         r_filt = tmp_wh;
3135
3136         sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
3137
3138 //              WARNING! the constant 20 below is a wild-ass guess.
3139         int cheap_rpos;
3140         for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++);
3141
3142 //              Test the cheap filters on R.
3143   if(cheap_rpos >0){
3144
3145 //              Now generate the predicates.
3146         for(w=0;w<cheap_rpos;++w){
3147                 sprintf(tmpstr,"//\t\tCheap R predicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
3148                 ret += tmpstr;
3149
3150 //                      Find partial fcns ref'd in this cnf element
3151                 set<int> pfcn_refs;
3152                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
3153 //                      Since set<..> is a "Sorted Associative Container",
3154 //                      we can walk through it in sorted order by walking from
3155 //                      begin() to end().  (and the partial fcns must be
3156 //                      evaluated in this order).
3157                 set<int>::iterator si;
3158                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3159                         if(fcn_ref_cnt[(*si)] > 1){
3160                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3161                         }
3162                         if(is_partial_fcn[(*si)]){
3163                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3164                                 ret += "\t\tif(retval) goto end;\n";
3165                         }
3166                         if(fcn_ref_cnt[(*si)] > 1){
3167                                 if(!is_partial_fcn[(*si)]){
3168                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3169                                 }
3170                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3171                                 ret += "\t}\n";
3172                         }
3173                 }
3174
3175                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
3176                                 ") ) goto end;\n";
3177         }
3178   }else{
3179           ret += "\n\n/*\t\t (no cheap R predicate to test)\t*/\n\n";
3180   }
3181
3182         ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
3183         map<string, cnf_elem *> h_eq = ((watch_join_qpn *)fs)-> hash_eq;
3184         vector<string> kflds = ((watch_join_qpn *)fs)->key_flds;
3185         for(w=0;w<kflds.size();++w){
3186                 string kfld = kflds[w];
3187                 col_id_set this_pred_cids;
3188                 gather_pr_col_ids(h_eq[kfld]->pr, this_pred_cids, gb_tbl);
3189                 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3190                         if(unpacked_cids.count( (*csi) ) == 0){
3191                                 int tblref = (*csi).tblvar_ref;
3192                                 int schref = (*csi).schema_ref;
3193                                 string field = (*csi).field;
3194                                 if(tblref==0) // LHS from packet, don't unpack the RHS
3195                                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3196                                 unpacked_cids.insert( (*csi) );
3197                         }
3198                 }
3199         }
3200
3201
3202         ret += "\n//    Do the join\n\n";
3203         ret += "\n//            (ensure that the watchtable is fresh)\n";
3204         ret += "\tif(t->ux_time >= "+tgt+".next_refresh){\n";
3205         ret += "\t\treload_watchlist__"+wl_schema+"();\n";
3206         ret += "\t\t"+tgt+".next_refresh = t->ux_time+"+tgt+".refresh_interval;\n";
3207         ret += "\t}\n\n";
3208
3209
3210         for(p=0;p<fs->key_flds.size();++p){
3211                 string kfld = fs->key_flds[p];
3212                 ret += "\tr_equijoin_"+kfld+" = "+generate_se_code(fs->hash_eq[kfld]->pr->get_left_se(),schema)+";\n";
3213         }
3214
3215
3216 //                      Passed the cheap pred, now test the join with S.
3217         ret += "\tbucket=0;\n";
3218         ret += "\thash=0;\n";
3219         for(p=0;p<fs->key_flds.size();++p){
3220                 string kfld = fs->key_flds[p];
3221                 ret +=
3222 "               hash ^= (("+hash_nums[p%NRANDS]+" * lfta_"+
3223         fs->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_type_str()+
3224         +"_to_hash(r_equijoin_"+kfld+")));\n";
3225         }
3226         ret += "\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n";
3227
3228         ret += "\t\trec = "+tgt+".ht[bucket];\n";
3229         ret += "\t\twhile(rec!=NULL){\n";
3230         ret += "\t\t\tif(hash==rec->hashval){\n";
3231         ret += "\t\t\t\tif(";
3232         for(p=0;p<fs->key_flds.size();++p){
3233                 string kfld = fs->key_flds[p];
3234                 if(p>0) ret += " && ";
3235                 data_type *hdt = fs->hash_eq[kfld]->pr->get_right_se()->get_data_type();
3236                 string lhs_op = "r_equijoin_"+kfld;
3237                 string rhs_op = "rec->"+kfld;
3238                 ret += generate_equality_test(lhs_op,rhs_op,hdt);
3239         }
3240         ret += ")\n";
3241         ret += "\t\t\t\t\tbreak;\n";
3242         ret += "\t\t\t}\n";
3243         ret += "\t\t\trec=rec->next;\n";
3244         ret += "\t\t}\n";
3245         ret += "\t\tif(rec==NULL)\n";
3246         ret += "\t\t\tgoto end;\n";
3247                 
3248         ret += "\n/*\tPassed the hash lookup clause, unpack the other predicate fields. */\n";
3249         for(w=0;w<where.size();++w){
3250                 col_id_set this_pred_cids;
3251                 gather_pr_col_ids(where[w]->pr, this_pred_cids, gb_tbl);
3252                 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3253                         if(unpacked_cids.count( (*csi) ) == 0){
3254                                 int tblref = (*csi).tblvar_ref;
3255                                 int schref = (*csi).schema_ref;
3256                                 string field = (*csi).field;
3257                                 if(tblref==0) // LHS from packet
3258                                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3259                                 else    // RHS from hash bucket
3260                                         ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n";
3261                                 unpacked_cids.insert( (*csi) );
3262                         }
3263                 }
3264         }
3265
3266
3267 //              Test the expensive filters on R.
3268 //                      TODO Should merge this with other predicates and eval in order
3269 //                              of cost - see the fj code.
3270 //                      TODO join and postfilter predicates haven't been costed yet.
3271   if(cheap_rpos < r_filt.size()){
3272
3273 //              Now generate the predicates.
3274         for(w=cheap_rpos;w<r_filt.size();++w){
3275                 sprintf(tmpstr,"//\t\tExpensive R predicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
3276                 ret += tmpstr;
3277
3278 //                      Find partial fcns ref'd in this cnf element
3279                 set<int> pfcn_refs;
3280                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
3281 //                      Since set<..> is a "Sorted Associative Container",
3282 //                      we can walk through it in sorted order by walking from
3283 //                      begin() to end().  (and the partial fcns must be
3284 //                      evaluated in this order).
3285                 set<int>::iterator si;
3286                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3287                         if(fcn_ref_cnt[(*si)] > 1){
3288                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3289                         }
3290                         if(is_partial_fcn[(*si)]){
3291                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3292                                 ret += "\t\tif(retval) goto end;\n";
3293                         }
3294                         if(fcn_ref_cnt[(*si)] > 1){
3295                                 if(!is_partial_fcn[(*si)]){
3296                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3297                                 }
3298                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3299                                 ret += "\t}\n";
3300                         }
3301                 }
3302
3303                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
3304                                 ") ) goto end;\n";
3305         }
3306   }else{
3307           ret += "\n\n/*\t\t (no expensive R predicate to test)\t*/\n\n";
3308   }
3309
3310 //              TODO sort the additional predicates by cost
3311
3312 //              S-only
3313         for(w=0;w<fs->pred_t1.size();++w){
3314                 sprintf(tmpstr,"//\t\tS Predicate clause %d.(cost %d)\n",w,fs->pred_t1[w]->cost);
3315                 ret += tmpstr;
3316
3317 //                      Find partial fcns ref'd in this cnf element
3318                 set<int> pfcn_refs;
3319                 collect_partial_fcns_pr(fs->pred_t1[w]->pr, pfcn_refs);
3320 //                      Since set<..> is a "Sorted Associative Container",
3321 //                      we can walk through it in sorted order by walking from
3322 //                      begin() to end().  (and the partial fcns must be
3323 //                      evaluated in this order).
3324                 set<int>::iterator si;
3325                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3326                         if(fcn_ref_cnt[(*si)] > 1){
3327                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3328                         }
3329                         if(is_partial_fcn[(*si)]){
3330                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3331                                 ret += "\t\tif(retval) goto end;\n";
3332                         }
3333                         if(fcn_ref_cnt[(*si)] > 1){
3334                                 if(!is_partial_fcn[(*si)]){
3335                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3336                                 }
3337                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3338                                 ret += "\t}\n";
3339                         }
3340                 }
3341
3342                 ret += "\tif( !("+generate_predicate_code(fs->pred_t1[w]->pr,schema)+
3343                                 ") ) goto end;\n";
3344         }
3345
3346 //              non hash-eq join 
3347         for(w=0;w<fs->join_filter.size();++w){
3348                 sprintf(tmpstr,"//\t\tJoin Predicate clause %d.(cost %d)\n",w,fs->join_filter[w]->cost);
3349                 ret += tmpstr;
3350
3351 //                      Find partial fcns ref'd in this cnf element
3352                 set<int> pfcn_refs;
3353                 collect_partial_fcns_pr(fs->join_filter[w]->pr, pfcn_refs);
3354 //                      Since set<..> is a "Sorted Associative Container",
3355 //                      we can walk through it in sorted order by walking from
3356 //                      begin() to end().  (and the partial fcns must be
3357 //                      evaluated in this order).
3358                 set<int>::iterator si;
3359                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3360                         if(fcn_ref_cnt[(*si)] > 1){
3361                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3362                         }
3363                         if(is_partial_fcn[(*si)]){
3364                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3365                                 ret += "\t\tif(retval) goto end;\n";
3366                         }
3367                         if(fcn_ref_cnt[(*si)] > 1){
3368                                 if(!is_partial_fcn[(*si)]){
3369                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3370                                 }
3371                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3372                                 ret += "\t}\n";
3373                         }
3374                 }
3375
3376                 ret += "\tif( !("+generate_predicate_code(fs->join_filter[w]->pr,schema)+
3377                                 ") ) goto end;\n";
3378         }
3379
3380 //              postfilter
3381         for(w=0;w<fs->postfilter.size();++w){
3382                 sprintf(tmpstr,"//\t\tpostfilter Predicate clause %d.(cost %d)\n",w,fs->postfilter[w]->cost);
3383                 ret += tmpstr;
3384
3385 //                      Find partial fcns ref'd in this cnf element
3386                 set<int> pfcn_refs;
3387                 collect_partial_fcns_pr(fs->postfilter[w]->pr, pfcn_refs);
3388 //                      Since set<..> is a "Sorted Associative Container",
3389 //                      we can walk through it in sorted order by walking from
3390 //                      begin() to end().  (and the partial fcns must be
3391 //                      evaluated in this order).
3392                 set<int>::iterator si;
3393                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3394                         if(fcn_ref_cnt[(*si)] > 1){
3395                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3396                         }
3397                         if(is_partial_fcn[(*si)]){
3398                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3399                                 ret += "\t\tif(retval) goto end;\n";
3400                         }
3401                         if(fcn_ref_cnt[(*si)] > 1){
3402                                 if(!is_partial_fcn[(*si)]){
3403                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3404                                 }
3405                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3406                                 ret += "\t}\n";
3407                         }
3408                 }
3409
3410                 ret += "\tif( !("+generate_predicate_code(fs->postfilter[w]->pr,schema)+
3411                                 ") ) goto end;\n";
3412         }
3413
3414
3415
3416 ///////////////                 post the tuple
3417
3418 //                      test passed : create the tuple, then assign to it.
3419           ret += "/*\t\tCreate and post the tuple\t*/\n";
3420
3421 //              Unpack R fields
3422         for(s=0;s<sl_list.size();++s){
3423                 col_id_set this_se_cids;
3424                 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
3425                 for(auto csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
3426                         if(unpacked_cids.count( (*csi) ) == 0){
3427                                 int tblref = (*csi).tblvar_ref;
3428                                 int schref = (*csi).schema_ref;
3429                                 string field = (*csi).field;
3430                                 if(tblref==0) // LHS from packet
3431                                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3432                                 else    // RHS from hash bucket
3433                                         ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n";
3434                                 unpacked_cids.insert( (*csi) );
3435                         }
3436                 }
3437         }
3438
3439
3440 //                      Unpack partial fcns ref'd by the select clause.
3441 //                      Its a kind of a WHERE clause ...
3442   for(p=sl_fcns_start;p<sl_fcns_end;p++){
3443         if(fcn_ref_cnt[p] > 1){
3444                 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
3445         }
3446         if(is_partial_fcn[p]){
3447                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3448                 ret += "\tif(retval) goto end;\n";
3449         }
3450         if(fcn_ref_cnt[p] > 1){
3451                 if(!is_partial_fcn[p]){
3452                         ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
3453                 }
3454                 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
3455                 ret += "\t}\n";
3456         }
3457   }
3458
3459   // increment the counter of accepted tuples
3460   ret += "\n\t#ifdef LFTA_STATS\n";
3461   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3462   ret += "\t#endif\n\n";
3463
3464 //                      First, compute the size of the tuple.
3465
3466 //                      Unpack any BUFFER type selections into temporaries
3467 //                      so that I can compute their size and not have
3468 //                      to recompute their value during tuple packing.
3469 //                      I can use regular assignment here because
3470 //                      these temporaries are non-persistent.
3471
3472           for(s=0;s<sl_list.size();s++){
3473                 data_type *sdt = sl_list[s]->get_data_type();
3474                 if(sdt->is_buffer_type()){
3475                         sprintf(tmpstr,"\tselvar_%d = ",s);
3476                         ret += tmpstr;
3477                         ret += generate_se_code(sl_list[s],schema);
3478                         ret += ";\n";
3479                 }
3480           }
3481
3482
3483 //              The size of the tuple is the size of the tuple struct plus the
3484 //              size of the buffers to be copied in.
3485
3486           ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
3487           for(s=0;s<sl_list.size();s++){
3488                 data_type *sdt = sl_list[s]->get_data_type();
3489                 if(sdt->is_buffer_type()){
3490                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
3491                         ret += tmpstr;
3492                 }
3493           }
3494           ret += ";\n";
3495
3496
3497           ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
3498           ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
3499
3500 //                      Test passed, make assignments to the tuple.
3501
3502           ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
3503
3504 //                      Mark tuple as REGULAR_TUPLE
3505           ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
3506
3507
3508           for(s=0;s<sl_list.size();s++){
3509                 data_type *sdt = sl_list[s]->get_data_type();
3510                 if(sdt->is_buffer_type()){
3511                         sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
3512                         ret += tmpstr;
3513                         sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
3514                         ret += tmpstr;
3515                 }else{
3516                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
3517                         ret += tmpstr;
3518 //                      if(sdt->needs_hn_translation())
3519 //                              ret += sdt->hton_translation() +"( ";
3520                         ret += generate_se_code(sl_list[s],schema);
3521 //                      if(sdt->needs_hn_translation())
3522 //                              ret += ") ";
3523                         ret += ";\n";
3524                 }
3525           }
3526
3527 //              Generate output.
3528
3529           ret += "\tpost_tuple(tuple);\n";
3530
3531 //              Increment the counter of posted tuples
3532   ret += "\n\t#ifdef LFTA_STATS\n";
3533   ret += "\n\tt->out_tuple_cnt++;\n\n";
3534   ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
3535   ret += "\t#endif\n\n";
3536
3537
3538         return ret;
3539 }
3540
3541 string generate_aggr_accept_body(qp_node *fs,string node_name,table_list *schema, string &temporal_flush){
3542         string ret;
3543         int a,p,g;
3544
3545 //////////////          Processing for aggregtion query
3546
3547 //              First, search for a match.  Start by unpacking the group-by attributes.
3548
3549 //                      One complication : if a real-time aggregate flush occurs,
3550 //                      the GB attr has already been calculated.  So don't compute
3551 //                      it again if 1) its temporal and 2) it will be computed in the
3552 //                      agggregate flush code.
3553
3554 //              Unpack the partial fcns ref'd by the gb's and the aggr defs.
3555   for(p=gb_fcns_start;p<gb_fcns_end;p++){
3556     if(is_partial_fcn[p]){
3557                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3558                 ret += "\tif(retval) goto end;\n";
3559         }
3560   }
3561   for(p=ag_fcns_start;p<ag_fcns_end;p++){
3562     if(is_partial_fcn[p]){
3563                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3564                 ret += "\tif(retval) goto end;\n";
3565         }
3566   }
3567
3568   // increment the counter of accepted tuples
3569   ret += "\n\t#ifdef LFTA_STATS\n";
3570   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3571   ret += "\t#endif\n\n";
3572
3573   ret += "/*\t\tTest if the group is in the hash table \t*/\n";
3574 //                      Compute the values of the group-by variables.
3575   for(g=0;g<gb_tbl->size();g++){
3576           data_type *gdt = gb_tbl->get_data_type(g);
3577           if((! gdt->is_temporal()) || temporal_flush == ""){
3578
3579                   if(gdt->is_buffer_type()){
3580         //                              NOTE : if the SE defining the gb is anything
3581         //                              other than a ref to a variable, this will generate
3582         //                              illegal code.  To be resolved with Spatch.
3583                         sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
3584                                 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
3585                         ret += tmpstr;
3586                         sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
3587                                 gdt->get_buffer_assign_copy().c_str(), g, g);
3588                   }else{
3589                         sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
3590                   }
3591                   ret += tmpstr;
3592           }
3593   }
3594   ret += "\n";
3595
3596 //                      A quick aside : if any of the GB attrs are temporal,
3597 //                      test for change and flush if any change occurred.
3598 //                      We've already computed the flush code,
3599 //                      Put it here if this is not a real time query.
3600 //                      We've already unpacked all column refs, so no need to
3601 //                      do it again here.
3602
3603         string rt_level = fs->get_val_of_def("real_time");
3604         if(rt_level == "" && temporal_flush != ""){
3605                 ret += temporal_flush;
3606         }
3607
3608 //                      Compute the hash bucket
3609         if(gb_tbl->size() > 0){
3610                 ret += "\thashval = ";\
3611                 for(g=0;g<gb_tbl->size();g++){
3612                   if(g>0) ret += " ^ ";
3613                   data_type *gdt = gb_tbl->get_data_type(g);
3614                   if(gdt->is_buffer_type()){
3615                         sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
3616                                 gdt->get_type_str().c_str(), g);
3617                   }else{
3618                         sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
3619                                 gdt->get_type_str().c_str(), g);
3620                   }
3621                   ret += tmpstr;
3622                 }
3623                 ret += ";\n";
3624                 ret += "\thash2 = ((hashval * "+hash_nums[g%NRANDS]+") >> 32) & SLOT_HASH_BITS;\n";
3625         ret+="\tprobe = (hashval >> 32) & (t->max_aggrs-1);\n";
3626         }else{
3627                 ret+="\tprobe = 0;\n";
3628                 ret+="\thash2 = 0;\n\n";
3629         }
3630
3631 //              Does the lfta reference a udaf?
3632           bool has_udaf = false;
3633           for(a=0;a<aggr_tbl->size();a++){
3634                   if(! aggr_tbl->is_builtin(a)) has_udaf = true;
3635           }
3636
3637 //              Scan for a match, or alternatively the best slot.
3638 //              Currently, hardcode 5 tests.
3639         ret +=
3640 "       gen_val = t->generation & SLOT_GEN_BITS;\n"
3641 "       match_found = 0;\n"
3642 "       best_slot = probe;\n"
3643 "       for(i=0;i<5 && match_found == 0;i++){\n"
3644 "               if((t->aggr_table_hashmap[probe] & SLOT_FILLED) && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_HASH_BITS) == hash2 ){\n"
3645 ;
3646         if(gb_tbl->size()>0){
3647                 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
3648                 ret+="\t\tif(";
3649                 string rhs_op, lhs_op;
3650                 for(g=0;g<gb_tbl->size();g++){
3651                   if(g>0) ret += " && ";
3652                   ret += "(";
3653                   sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
3654                   sprintf(tmpstr,"t->aggr_table[probe].gb_var%d",g); rhs_op = tmpstr;
3655                   ret += generate_equality_test(lhs_op,rhs_op,gb_tbl->get_data_type(g));
3656                   ret += ")";
3657                 }
3658          }
3659          ret += "){\n"
3660 "                       match_found = 1;\n"
3661 "                       best_slot = probe;\n"
3662 "               }\n"
3663 "       }\n"
3664 "//             Rate slots in case no match found: prefer empty, then full but old slots\n"
3665 "       if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3666 "               if((t->aggr_table_hashmap[probe] & SLOT_FILLED)==0)\n"
3667 "                       best_slot = probe;\n"
3668 "               }else{\n"
3669 "                       if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) != gen_val){\n"
3670 "                               best_slot = probe;\n"
3671 "                       }\n"
3672 "               }\n"
3673 "               probe++;\n"
3674 "               if(probe >= t->max_aggrs)\n"
3675 "                       probe=0;\n"
3676 "       }\n"
3677 "       if(match_found){\n"
3678 ;
3679         ret += generate_gb_update(node_name, schema, "best_slot",has_udaf);
3680         ret +=
3681 "       }else{\n"
3682 "               if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3683 ;
3684 printf("sgah_qpn name is %s, disorder is %d\n",fs->node_name.c_str(),((sgah_qpn *)fs)->lfta_disorder);
3685         if(((sgah_qpn *)fs)->lfta_disorder <= 1){
3686                 ret +=
3687 "                       if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS)==gen_val){\n"
3688 "                               if((";
3689                 bool first_g = true;
3690                 for(int g=0;g<gb_tbl->size();g++){
3691                         data_type *gdt = gb_tbl->get_data_type(g);
3692                         if(gdt->is_temporal()){
3693                                 if(first_g) first_g = false; else ret+=" + ";
3694                                 ret += "(gb_attr_"+int_to_string(g)+" - t->aggr_table[best_slot].gb_var"+int_to_string(g)+")";
3695                         }
3696                 }
3697                 ret += ") == 0 ){\n";
3698
3699                 ret +=
3700 "                                       fta_aggr_flush_old_"+ node_name+"(f,t->max_aggrs);\n"
3701 "                               }\n"
3702 "                       }\n"
3703 ;
3704         }
3705
3706         ret += generate_tuple_from_aggr(node_name,schema,"best_slot");
3707         ret +=
3708 "\t\t\t#ifdef LFTA_STATS\n"
3709 "\t\t\tif((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val)\n"
3710 "\t\t\t\tt->collision_cnt++;\n\n"
3711 "\t\t\t#endif\n\n"
3712 "\t\t}\n"
3713 ;
3714         ret += generate_init_group(schema,"best_slot");
3715
3716
3717           ret += "\t}\n";
3718
3719         return ret;
3720 }
3721
3722
3723
3724 string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, bool is_wj, set<unsigned int> &s_pids){
3725
3726         string ret="static gs_retval_t accept_packet_"+node_name+
3727                 "(struct FTA *f, FTAID * ftaid, void *pkt, gs_int32_t sz){\n";
3728     ret += "\tstruct packet *p = (struct packet *)pkt;\n";
3729
3730   int a;
3731
3732 //                      Define all of the variables needed by this
3733 //                      procedure.
3734
3735
3736 //                      Gather all column references, need to define unpacking variables.
3737   int w,s;
3738   col_id_set cid_set;
3739   col_id_set::iterator csi;
3740
3741 //              If its a filter join, rebind all colrefs
3742 //              to the first range var, to avoid double unpacking.
3743
3744   if(is_fj){
3745     for(w=0;w<where.size();++w)
3746                 reset_pr_col_ids_tblvars(where[w]->pr, gb_tbl);
3747     for(s=0;s<sl_list.size();s++)
3748                 reset_se_col_ids_tblvars(sl_list[s], gb_tbl);
3749   }
3750
3751   for(w=0;w<where.size();++w){
3752         if(is_wj || is_fj || s_pids.count(w) == 0)
3753                 gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
3754   }
3755   for(s=0;s<sl_list.size();s++){
3756         gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
3757   }
3758
3759   int g;
3760   if(gb_tbl != NULL){
3761         for(g=0;g<gb_tbl->size();g++)
3762           gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
3763   }
3764
3765   //                    Variables for unpacking attributes.
3766   ret += "/*\t\tVariables for unpacking attributes\t*/\n";
3767   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
3768     int schref = (*csi).schema_ref;
3769         int tblref = (*csi).tblvar_ref;
3770     string field = (*csi).field;
3771     data_type dt(schema->get_type_name(schref,field));
3772     sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
3773         field.c_str(), tblref);
3774     ret += tmpstr;
3775   }
3776
3777   ret += "\n\n";
3778
3779 //                      Variables that are always needed
3780   ret += "/*\t\tVariables which are always needed\t*/\n";
3781   ret += "\tgs_retval_t retval;\n";
3782   ret += "\tgs_int32_t tuple_size, tuple_pos, lfta_bailout;\n";
3783   ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
3784
3785   ret+="\tstruct "+generate_fta_name(node_name)+" *t = (struct "+generate_fta_name(node_name)+"*) f;\n\n";
3786
3787
3788 //                      Variables needed for aggregation queries.
3789   if(is_aggr_query){
3790           ret += "\n/*\t\tVariables for aggregation\t*/\n";
3791           ret+="\tunsigned int i, probe;\n";
3792           ret+="\tunsigned int gen_val, match_found, best_slot;\n";
3793           ret+="\tgs_uint64_t hashval, hash2;\n";
3794 //                      Variables for storing group-by attribute values.
3795           if(gb_tbl->size() > 0)
3796                 ret += "/*\t\tGroup-by attributes\t*/\n";
3797           for(g=0;g<gb_tbl->size();g++){
3798                 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3799                 ret += tmpstr;
3800                 data_type *gdt = gb_tbl->get_data_type(g);
3801                 if(gdt->is_buffer_type()){
3802                   sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3803                   ret += tmpstr;
3804                 }
3805           }
3806           ret += "\n";
3807 //                      Temporaries for min/max
3808           string aggr_tmp_str = "";
3809           for(a=0;a<aggr_tbl->size();a++){
3810                 string aggr_op = aggr_tbl->get_op(a);
3811                 if(aggr_op == "MIN" || aggr_op == "MAX"){
3812                         sprintf(tmpstr,"\t%s aggr_tmp_%d;\n",aggr_tbl->get_data_type(a)->get_cvar_type().c_str(),a);
3813                         aggr_tmp_str.append(tmpstr);
3814                 }
3815           }
3816           if(aggr_tmp_str != ""){
3817                 ret += "/*\t\tTemp vars for BUFFER aggregates\t*/\n";
3818                 ret += aggr_tmp_str;
3819                 ret += "\n";
3820           }
3821 //              Variables for udaf output temporaries
3822         bool no_udaf = true;
3823         for(a=0;a<aggr_tbl->size();a++){
3824                 if(! aggr_tbl->is_builtin(a)){
3825                         if(no_udaf){
3826                                 ret+="/*\t\tUDAF output vars.\t*/\n";
3827                                 no_udaf = false;
3828                         }
3829                         int afcn_id = aggr_tbl->get_fcn_id(a);
3830                         data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
3831                         sprintf(tmpstr,"udaf_ret%d", a);
3832                         ret+="\t"+adt->make_cvar(tmpstr)+";\n";
3833                 }
3834         }
3835   }
3836
3837 //                      Variables needed for a filter join query
3838   if(fs->node_type() == "filter_join"){
3839         filter_join_qpn *fjq = (filter_join_qpn *)fs;
3840         bool uses_bloom = fjq->use_bloom;
3841         ret += "/*\t\tJoin fields\t*/\n";
3842         for(g=0;g<fjq->hash_eq.size();g++){
3843                 sprintf(tmpstr,"\t%s s_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_right_se()->get_data_type()->get_cvar_type().c_str(),g);
3844                 ret += tmpstr;
3845                 sprintf(tmpstr,"\t%s r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g);
3846                 ret += tmpstr;
3847           }
3848         if(uses_bloom){
3849                 ret +=
3850 "  /*           Variables for fj bloom filter   */ \n"
3851 "\tunsigned int i=0,j=0,k=0, b, bf_clean = 0, tmp_i, found; \n"
3852 "\tunsigned int bucket, bucket0, bucket1, bucket2;\n"
3853 "\tlong long int curr_fj_ts;\n"
3854 "\tlong long int curr_bin, the_bin;\n"
3855 "\n"
3856 ;
3857         }else{
3858                 ret +=
3859 "  /*           Variables for fj join table     */ \n"
3860 "\tunsigned int i, bucket, found; \n"
3861 "\tunsigned int bucket1, the_bucket;\n"
3862 "       long long int curr_fj_ts;\n"
3863 "\n"
3864 ;
3865         }
3866   }
3867
3868
3869   if(fs->node_type() == "watch_join"){
3870         watch_join_qpn *wlq = (watch_join_qpn *)fs;
3871         ret += "/*\t\tJoin fields\t*/\n";
3872         for(int k=0;k<wlq->key_flds.size(); ++k){
3873                 string kfld = wlq->key_flds[k];
3874                 ret += "\t"+wlq->hash_eq[kfld]->pr->get_right_se()->get_data_type()->get_cvar_type()+" s_equijoin_"+kfld+";\n";
3875                 ret += "\t"+wlq->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_cvar_type()+" r_equijoin_"+kfld+";\n";
3876           }
3877         ret +=
3878 "  /*           Variables for wl join table     */ \n"
3879 "\tunsigned int i, bucket;\n"
3880 "\tunsigned long long int hash; \n";
3881         string wl_schema = wlq->from[1]->get_schema_name();
3882         string wl_elem_str = generate_watchlist_element_name(wl_schema);
3883         ret += "\tstruct "+wl_elem_str+" *rec = NULL;\n";
3884 "\n"
3885 ;
3886   }
3887
3888
3889 //              Variables needed to store selected attributes of BUFFER type
3890 //              temporarily, in order to compute their size for storage
3891 //              in an output tuple.
3892
3893   string select_var_defs = "";
3894   for(int s=0;s<sl_list.size();s++){
3895         data_type *sdt = sl_list[s]->get_data_type();
3896         if(sdt->is_buffer_type()){
3897           sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
3898           select_var_defs.append(tmpstr);
3899         }
3900   }
3901   if(select_var_defs != ""){
3902         ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
3903     ret += select_var_defs;
3904   }
3905
3906 //              Variables to store results of partial functions.
3907   int p;
3908   if(partial_fcns.size()>0){
3909           ret += "/*\t\tVariables for storing results of partial functions. \t*/\n";
3910           for(p=0;p<partial_fcns.size();++p){
3911                 if(is_partial_fcn[p] || (!is_aggr_query && fcn_ref_cnt[p] >1)){
3912                   sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
3913                     partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
3914                   ret += tmpstr;
3915                   if(!is_aggr_query && fcn_ref_cnt[p] >1){
3916                         ret += "\tint fcn_ref_cnt_"+int_to_string(p)+" = 0;\n";
3917                   }
3918                 }
3919           }
3920
3921           if(is_aggr_query) ret += "\tint unpack_failed = 0;\n";
3922           ret += "\n";
3923   }
3924
3925 //              variable to hold packet struct  //
3926         if(packed_return){
3927                 ret += "\t struct "+node_name+"_input_struct *"+node_name+"_input_struct_var;\n";
3928         }
3929
3930
3931   ret += "\t#ifdef LFTA_STATS\n";
3932 // variable to store counter of cpu cycles spend in accept_tuple
3933         ret += "\tgs_uint64_t start_cycle = rdtsc();\n";
3934 // increment counter of received tuples
3935         ret += "\tt->in_tuple_cnt++;\n";
3936   ret += "\t#endif\n";
3937
3938
3939 //      -------------------------------------------------
3940 //              If the packet is "packet", test if its for this lfta,
3941 //              and if so load it into its struct
3942
3943         if(packed_return){
3944                 ret+="\n/*  packed tuple : test and load. \t*/\n";
3945                 ret+="\t"+node_name+"_input_struct_var = (struct "+node_name+"_input_struct *) pkt;\n";
3946                 ret+="\tif("+node_name+"_input_struct_var->__lfta_id_fm_nic__ != "+int_to_string(global_id) + ")\n";
3947                 ret+="\t\tgoto end;\n\n";
3948         }
3949
3950
3951
3952   col_id_set unpacked_cids;     //      Keep track of the cols that have been unpacked.
3953
3954   string temporal_flush;
3955   if(is_aggr_query)
3956         ret += generate_aggr_accept_prelim(fs, node_name, schema, unpacked_cids, temporal_flush);
3957   else {        // non-aggregation operators
3958
3959 // Unpack all the temporal attributes referenced in select clause
3960 // and update the last value of the attribute
3961         col_id_set temp_cids;           //      col ids of temp attributes in select clause
3962
3963         for(s=0;s<sl_list.size();s++){
3964                 data_type *sdt = sl_list[s]->get_data_type();
3965                 if (sdt->is_temporal()) {
3966                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
3967                 }
3968         }
3969 //                      If this is a filter join,
3970 //                      ensure that the temporal range field is unpacked.
3971         if(is_fj){
3972                 col_id window_var_cid(((filter_join_qpn *)fs)->temporal_var);
3973                 if(temp_cids.count(window_var_cid)==0)
3974                         temp_cids.insert(window_var_cid);
3975         }
3976
3977         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
3978                 if(unpacked_cids.count((*csi)) == 0){
3979                         int tblref = (*csi).tblvar_ref;
3980                         int schref = (*csi).schema_ref;
3981                         string field = (*csi).field;
3982                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3983                         sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
3984                         ret += tmpstr;
3985
3986                         unpacked_cids.insert( (*csi) );
3987                 }
3988         }
3989
3990   }
3991
3992   vector<cnf_elem *> filter = fs->get_filter_clause();
3993 //              Test the filter predicate (some query types have additional preds).
3994   if(filter.size() > 0 && !is_wj){      // watchlist join does specialized processing
3995
3996 //              Sort by evaluation cost.
3997 //              First, estimate evaluation costs
3998 //              Eliminate predicates covered by the prefilter (those in s_pids).
3999 //              I need to do it before the sort becuase the indices refer
4000 //              to the position in the unsorted list./
4001         vector<cnf_elem *> tmp_wh;
4002         for(w=0;w<filter.size();++w){
4003                 if(s_pids.count(w) == 0){
4004                         compute_cnf_cost(filter[w],Ext_fcns);
4005                         tmp_wh.push_back(filter[w]);
4006                 }
4007         }
4008         filter = tmp_wh;
4009
4010         sort(filter.begin(), filter.end(), compare_cnf_cost());
4011
4012 //              Now generate the predicates.
4013         for(w=0;w<filter.size();++w){
4014                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,filter[w]->cost);
4015                 ret += tmpstr;
4016 //                      Find the set of variables accessed in this CNF elem,
4017 //                      but in no previous element.
4018                 col_id_set this_pred_cids;
4019                 gather_pr_col_ids(filter[w]->pr, this_pred_cids, gb_tbl);
4020                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4021                         if(unpacked_cids.count( (*csi) ) == 0){
4022                         int tblref = (*csi).tblvar_ref;
4023                         int schref = (*csi).schema_ref;
4024                         string field = (*csi).field;
4025                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4026                                 unpacked_cids.insert( (*csi) );
4027                         }
4028                 }
4029 //                      Find partial fcns ref'd in this cnf element
4030                 set<int> pfcn_refs;
4031                 collect_partial_fcns_pr(filter[w]->pr, pfcn_refs);
4032 //                      Since set<..> is a "Sorted Associative Container",
4033 //                      we can walk through it in sorted order by walking from
4034 //                      begin() to end().  (and the partial fcns must be
4035 //                      evaluated in this order).
4036                 set<int>::iterator si;
4037                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
4038                         if(fcn_ref_cnt[(*si)] > 1){
4039                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
4040                         }
4041                         if(is_partial_fcn[(*si)]){
4042                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
4043                                 ret += "\t\tif(retval) goto end;\n";
4044                         }
4045                         if(fcn_ref_cnt[(*si)] > 1){
4046                                 if(!is_partial_fcn[(*si)]){
4047                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
4048                                 }
4049                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
4050                                 ret += "\t}\n";
4051                         }
4052                 }
4053
4054                 ret += "\tif( !("+generate_predicate_code(filter[w]->pr,schema)+
4055                                 ") ) goto end;\n";
4056         }
4057   }else{
4058           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
4059   }
4060
4061
4062 //                      We've passed the WHERE clause,
4063 //                      unpack the remainder of the accessed fields.
4064   if(is_fj){
4065         ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
4066         vector<cnf_elem *> h_eq = ((filter_join_qpn *)fs)-> hash_eq;
4067         for(w=0;w<h_eq.size();++w){
4068                 col_id_set this_pred_cids;
4069                 gather_pr_col_ids(h_eq[w]->pr, this_pred_cids, gb_tbl);
4070                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4071                         if(unpacked_cids.count( (*csi) ) == 0){
4072                                 int tblref = (*csi).tblvar_ref;
4073                                 int schref = (*csi).schema_ref;
4074                                 string field = (*csi).field;
4075                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4076                                 unpacked_cids.insert( (*csi) );
4077                         }
4078                 }
4079         }
4080   }else if(is_wj){              // STOPPED HERE move this to wj main body
4081 /*
4082         ret += "\n//\tPassed the WHERE clause, unpack the hash fields. \n";
4083         map<string, cnf_elem *> h_eq = ((watch_join_qpn *)fs)-> hash_eq;
4084         vector<string> kflds = ((watch_join_qpn *)fs)->key_flds;
4085         for(w=0;w<kflds.size();++w){
4086                 string kfld = kflds[w];
4087                 col_id_set this_pred_cids;
4088                 gather_pr_col_ids(h_eq[kfld]->pr, this_pred_cids, gb_tbl);
4089                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4090                         if(unpacked_cids.count( (*csi) ) == 0){
4091                                 int tblref = (*csi).tblvar_ref;
4092                                 int schref = (*csi).schema_ref;
4093                                 string field = (*csi).field;
4094                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4095                                 unpacked_cids.insert( (*csi) );
4096                         }
4097                 }
4098         }
4099 */
4100   }else{
4101           ret += "\n/*\tPassed the WHERE clause, unpack the rest of the accessed fields. */\n";
4102
4103           for(csi=cid_set.begin();csi!=cid_set.end();++csi){
4104                 if(unpacked_cids.count( (*csi) ) == 0){
4105                         int schref = (*csi).schema_ref;
4106                         int tblref = (*csi).tblvar_ref;
4107                         string field = (*csi).field;
4108                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4109                         unpacked_cids.insert( (*csi) );
4110                 }
4111           }
4112   }
4113
4114
4115 //////////////////
4116 //////////////////      After this, the query types
4117 //////////////////      are processed differently.
4118
4119   if(!is_aggr_query && !is_fj & !is_wj)
4120         ret += generate_sel_accept_body(fs, node_name, schema);
4121   else if(is_aggr_query)
4122         ret += generate_aggr_accept_body(fs, node_name, schema, temporal_flush);
4123   else{
4124         if(is_fj)
4125                 ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
4126         else
4127                 ret += generate_wj_accept_body((watch_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
4128   }
4129         
4130
4131
4132 //              Finish up.
4133
4134    ret += "\n\tend:\n";
4135   ret += "\t#ifdef LFTA_STATS\n";
4136         ret+= "\tt->cycle_cnt += rdtsc() - start_cycle;\n";
4137   ret += "\t#endif\n";
4138    ret += "\n\treturn 1;\n}\n\n";
4139
4140         return(ret);
4141 }
4142
4143
4144 string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, bool is_aggr_query, bool is_fj, bool uses_bloom){
4145         int g, cl;
4146
4147         string ret = "struct FTA * "+generate_alloc_name(node_name) +
4148            "(struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command,  gs_int32_t sz, void *value){\n";
4149
4150         ret+="\tstruct "+generate_fta_name(node_name)+"* f;\n";
4151         ret+="\tint i;\n";
4152         ret += "\n";
4153         ret+="\tif((f=fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n";
4154
4155 //                              assign a streamid to fta instance
4156         ret+="\t/* assign a streamid */\n";
4157         ret+="\tf->f.ftaid = ftaid;\n";
4158         ret+="\tf->f.ftaid.streamid = (gs_p_t)f;\n";
4159         ret+="\tgslog(LOG_INFO,\"Lfta "+node_name+" has FTAID {ip=%u,port=%u,index=%u,streamid=%u}\\n\",f->f.ftaid.ip,f->f.ftaid.port,f->f.ftaid.index,f->f.ftaid.streamid);\n";
4160
4161         if(is_aggr_query){
4162                 ret += "\tf->n_aggrs = 0;\n";
4163
4164                 ret += "\tf->max_aggrs = ";
4165
4166 //                              Computing the number of aggregate blocks is a little
4167 //                              tricky.  If there are no GB attrs, or if all GB attrs
4168 //                              are temporal, then use a single aggregate block, else
4169 //                              use a default value (10).  A user specification overrides
4170 //                              this logic.
4171                 bool single_group = true;
4172                 for(g=0;g<gb_tbl->size();g++){
4173                         data_type *gdt = gb_tbl->get_data_type(g);
4174                         if(! gdt->is_temporal() ){
4175                                 single_group = false;
4176                         }
4177                 }
4178                 string max_aggr_str = fs->get_val_of_def("aggregate_slots");
4179                 int max_aggr_i = atoi(max_aggr_str.c_str());
4180                 if(max_aggr_i <= 0){
4181                         if(single_group)
4182                                 ret += "2";
4183                         else
4184                                 ret += int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
4185                 }else{
4186                         unsigned int naggrs = 1;                // make it power of 2
4187                         unsigned int nones = 0;
4188                         while(max_aggr_i){
4189                                 if(max_aggr_i&1)
4190                                         nones++;
4191                                 naggrs = naggrs << 1;
4192                                 max_aggr_i = max_aggr_i >> 1;
4193                         }
4194                         if(nones==1)            // in case it was already a power of 2.
4195                                 naggrs/=2;
4196                         ret += int_to_string(naggrs);
4197                 }
4198                 ret += ";\n";
4199
4200                 ret+="\tif ((f->aggr_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n";
4201                 ret+="\t\treturn(0);\n";
4202                 ret+="\t}\n\n";
4203 //              ret+="/* compute how many integers we need to store the hashmap */\n";
4204 //              ret+="\tf->bitmap_size = (f->max_aggrs % (sizeof(gs_uint32_t) * 4)) ? (f->max_aggrs / (sizeof(gs_uint32_t) * 4) + 1) : (f->max_aggrs / (sizeof(gs_uint32_t) * 4));\n\n";
4205                 ret+="\tif ((f->aggr_table_hashmap = sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n";
4206                 ret+="\t\treturn(0);\n";
4207                 ret+="\t}\n";
4208                 ret+="/*\t\tfill bitmap with zero \t*/\n";
4209                 ret+="\tfor (i = 0; i < f->max_aggrs; ++i)\n";
4210                 ret+="\t\tf->aggr_table_hashmap[i] = 0;\n";
4211                 ret+="\tf->generation=0;\n";
4212                 ret+="\tf->flush_pos = f->max_aggrs;\n";
4213
4214                 ret += "\tf->flush_ctr = 0;\n";
4215
4216         }
4217
4218         if(is_fj){
4219                 if(uses_bloom){
4220                         ret+="\tf->first_exec = 1;\n";
4221                         unsigned int n_bloom = 11;
4222                         string n_bloom_str = fs->get_val_of_def("num_bloom");
4223                         int tmp_n_bloom = atoi(n_bloom_str.c_str());
4224                         if(tmp_n_bloom>0)
4225                                 n_bloom = tmp_n_bloom+1;
4226
4227                         unsigned int window_len = ((filter_join_qpn *)fs)->temporal_range;
4228                         if(window_len < n_bloom){
4229                                 n_bloom = window_len+1;
4230                         }
4231
4232                         int bf_exp_size = 12;  // base-2 log of number of bits
4233                         string bloom_len_str = fs->get_val_of_def("bloom_size");
4234                         int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
4235                         if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
4236                                 bf_exp_size = tmp_bf_exp_size;
4237                         }
4238                         int bf_bit_size = 1 << 12;
4239                         int bf_byte_size = bf_bit_size / (8*sizeof(char));
4240
4241                         int bf_tot = n_bloom*bf_byte_size;
4242                         ret+="\tif ((f->bf_table = sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n";
4243                         ret+="\t\treturn(0);\n";
4244                         ret+="\t}\n";
4245                         ret +=
4246 "       for(i=0;i<"+int_to_string(bf_tot)+";i++)\n"
4247 "               f->bf_table[i] = 0;\n"
4248 ;
4249                 }else{
4250                         unsigned int ht_size = 4096;
4251                         string ht_size_s = fs->get_val_of_def("aggregate_slots");
4252                         int tmp_ht_size = atoi(ht_size_s.c_str());
4253                         if(tmp_ht_size > 1024){
4254                                 unsigned int hs = 1;            // make it power of 2
4255                                 while(tmp_ht_size){
4256                                         hs =hs << 1;
4257                                         tmp_ht_size = tmp_ht_size >> 1;
4258                                 }
4259                                 ht_size = hs;
4260                         }
4261                         ret+="\tif ((f->join_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n";
4262                         ret+="\t\treturn(0);\n";
4263                         ret+="\t}\n\n";
4264                         ret +=
4265 "       for(i=0;i<"+int_to_string(ht_size)+";i++)\n"
4266 "               f->join_table[i].ts = 0;\n"
4267 ;
4268                 }
4269         }
4270
4271 //                      Initialize the complex literals (which might be handles).
4272
4273         for(cl=0;cl<complex_literals->size();cl++){
4274                 literal_t *l = complex_literals->get_literal(cl);
4275 //              sprintf(tmpstr,"\tf->complex_literal_%d = ",cl);
4276 //              ret += tmpstr + l->to_C_code() + ";\n";
4277                 sprintf(tmpstr,"&(f->complex_literal_%d)",cl);
4278                 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
4279         }
4280
4281         ret += "\n";
4282
4283 //                      Initialize the last seen values of temporal attributes to min(max) value of
4284 //                      their respective type
4285 //                      Create places to hold the last values of temporal attributes referenced in select clause
4286
4287
4288         col_id_set temp_cids;           //      col ids of temp attributes in select clause
4289
4290         int s;
4291         col_id_set::iterator csi;
4292
4293         for(s=0;s<sl_list.size();s++){
4294                 data_type *sdt = sl_list[s]->get_data_type();
4295                 if (sdt->is_temporal()) {
4296                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
4297                 }
4298         }
4299
4300         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
4301                 int tblref = (*csi).tblvar_ref;
4302                 int schref = (*csi).schema_ref;
4303                 string field = (*csi).field;
4304                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
4305                 if (dt.is_increasing()) {
4306                         sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_min_literal().c_str());
4307                         ret += tmpstr;
4308                 } else if (dt.is_decreasing()) {
4309                         sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_max_literal().c_str());
4310                         ret += tmpstr;
4311                 }
4312         }
4313
4314 //      initialize last seen values of temporal groubpy variables
4315         if(is_aggr_query){
4316                 for(g=0;g<gb_tbl->size();g++){
4317                         data_type *dt = gb_tbl->get_data_type(g);
4318                         if(dt->is_temporal()){
4319 /*
4320                                 fprintf(stderr,"group by attribute %s is temporal, ",
4321                                                 gb_tbl->get_name(g).c_str());
4322 */
4323                                 if(dt->is_increasing()){
4324                                         sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_min_literal().c_str());
4325                                 }else{
4326                                         sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_max_literal().c_str());
4327                                 }
4328                                 ret += tmpstr;
4329                         }
4330                 }
4331         }
4332
4333         ret += "\tf->f.alloc_fta="+generate_alloc_name(node_name)+";\n";
4334         ret+="\tf->f.free_fta=free_fta_"+node_name+";\n";
4335         ret+="\tf->f.control_fta=control_fta_"+node_name+";\n";
4336         ret+="\tf->f.accept_packet=accept_packet_"+node_name+";\n";
4337         ret+="\tf->f.clock_fta=clock_fta_"+node_name+";\n\n";
4338
4339 //                      Initialize runtime stats
4340         ret+="\tf->in_tuple_cnt = 0;\n";
4341         ret+="\tf->out_tuple_cnt = 0;\n";
4342         ret+="\tf->out_tuple_sz = 0;\n";
4343         ret+="\tf->accepted_tuple_cnt = 0;\n";
4344         ret+="\tf->cycle_cnt = 0;\n";
4345         ret+="\tf->collision_cnt = 0;\n";
4346         ret+="\tf->eviction_cnt = 0;\n";
4347         ret+="\tf->sampling_rate = 1.0;\n";
4348
4349         ret+="\tf->trace_id = 0;\n\n";
4350     if(param_tbl->size() > 0){
4351         ret+=
4352 "\tif(load_params_"+node_name+"(f, sz, value, 1)){\n"
4353 "#ifndef LFTA_IN_NIC\n"
4354 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small (%d). This query does not have valid parameters, bailing out.\\n\",sz);\n"
4355 "#else\n"
4356 "\t\t}\n"
4357 "#endif\n"
4358 "\t\t\treturn 0;\n"
4359 "\t\t}\n";
4360         }
4361
4362 //                      Register the pass-by-handle parameters
4363     int ph;
4364     for(ph=0;ph<param_handle_table.size();++ph){
4365                 data_type pdt(param_handle_table[ph]->type_name);
4366                 sprintf(tmpstr,"\tf->handle_param_%d = %s((struct FTA *)f,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
4367                 switch(param_handle_table[ph]->val_type){
4368                 case cplx_lit_e:
4369                         ret += tmpstr;
4370                         if(pdt.is_buffer_type()) ret += "&(";
4371                         sprintf(tmpstr,"f->complex_literal_%d",param_handle_table[ph]->complex_literal_idx);
4372                         ret += tmpstr ;
4373                         if(pdt.is_buffer_type()) ret += ")";
4374                         ret +=  ");\n";
4375                         break;
4376                 case litval_e:
4377 //                              not complex, no constructor
4378                         ret += tmpstr;
4379                         ret += param_handle_table[ph]->litval->to_C_code("") + ");\n";
4380                         break;
4381                 case param_e:
4382 //                              query parameter handles are regstered/deregistered in the
4383 //                              load_params function.
4384 //                      ret += "t->param_"+param_handle_table[ph]->param_name;
4385                         break;
4386                 default:
4387                         fprintf(stderr, "INTERNAL ERROR unknown case found when processing pass-by-handle parameter table.\n");
4388                         exit(1);
4389                 }
4390         }
4391
4392         ret += "\treturn (struct FTA *) f;\n";
4393         ret += "}\n\n";
4394
4395         return(ret);
4396 }
4397
4398
4399
4400
4401 //////////////////////////////////////////////////////////////////
4402
4403 string generate_lfta_block(qp_node *fs, table_list *schema, int gid,
4404 //              map<string,string> &int_fcn_defs,
4405                 ext_fcn_list *Ext_fcns, string &schema_embed_str, ifq_t *ifdb, nic_property *nicp, set<unsigned int> &s_pids){
4406         bool is_aggr_query;
4407         int s,p,g;
4408         string retval;
4409
4410 /////////////////////////////////////////////////////////////
4411 ///             Do operator-generic processing, such as
4412 ///             gathering the set of referenced columns,
4413 ///             generating structures, etc.
4414
4415 //              Initialize globals to empty.
4416         gb_tbl = NULL; aggr_tbl = NULL;
4417         global_id = -1; nicprop = NULL;
4418         param_tbl = fs->get_param_tbl();
4419         sl_list.clear(); where.clear();
4420         partial_fcns.clear();
4421         fcn_ref_cnt.clear(); is_partial_fcn.clear();
4422         pred_class.clear(); pred_pos.clear();
4423         sl_fcns_start=sl_fcns_end=wh_fcns_start=wh_fcns_end=0;
4424         gb_fcns_start=gb_fcns_end=ag_fcns_start=ag_fcns_end=0;
4425
4426
4427 //              Does the lfta read packed results from the NIC?
4428         nicprop = nicp;                 // load into global
4429         global_id = gid;
4430     packed_return = false;
4431         if(nicp && nicp->option_exists("Return")){
4432                 if(nicp->option_value("Return") == "Packed"){
4433                         packed_return = true;
4434                 }else{
4435                         fprintf(stderr,"Warning, nic option value of Return=%s is not recognized, ignoring\n",nicp->option_value("Return").c_str());
4436                 }
4437         }
4438
4439
4440 //                      Extract data which defines the query.
4441 //                              complex literals gathered now.
4442         complex_literals = fs->get_cplx_lit_tbl(Ext_fcns);
4443         param_handle_table = fs->get_handle_param_tbl(Ext_fcns);
4444         string node_name = fs->get_node_name();
4445     bool is_fj = false, uses_bloom = false;
4446         bool is_wj = false;
4447         bool is_watch_tbl = false;
4448
4449
4450         if(fs->node_type() == "spx_qpn"){
4451                 is_aggr_query = false;
4452                 spx_qpn *spx_node = (spx_qpn *)fs;
4453                 sl_list = spx_node->get_select_se_list();
4454                 where = spx_node->get_where_clause();
4455                 gb_tbl = NULL;
4456                 aggr_tbl = NULL;
4457         } else
4458         if(fs->node_type() == "sgah_qpn"){
4459                 is_aggr_query = true;
4460                 sgah_qpn *sgah_node = (sgah_qpn *)fs;
4461                 sl_list = sgah_node->get_select_se_list();
4462                 where = sgah_node->get_where_clause();
4463                 gb_tbl = sgah_node->get_gb_tbl();
4464                 aggr_tbl = sgah_node->get_aggr_tbl();
4465
4466                 if((sgah_node->get_having_clause()).size() > 0){
4467                         fprintf(stderr,"Warning in LFTA %s: HAVING clause will be ignored.\n", fs->get_node_name().c_str());
4468                 }
4469         } else
4470         if(fs->node_type() == "filter_join"){
4471                 is_aggr_query = false;
4472         is_fj = true;
4473                 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
4474                 sl_list = fj_node->get_select_se_list();
4475                 where = fj_node->get_where_clause();
4476                 uses_bloom = fj_node->use_bloom;
4477                 gb_tbl = NULL;
4478                 aggr_tbl = NULL;
4479         }else
4480         if(fs->node_type() == "watch_join"){
4481                 is_aggr_query = false;
4482         is_wj = true;
4483                 watch_join_qpn *wl_node = (watch_join_qpn *)fs;
4484                 sl_list = wl_node->get_select_se_list();
4485                 where = wl_node->get_where_clause();
4486                 gb_tbl = NULL;
4487                 aggr_tbl = NULL;
4488         }else
4489         if(fs->node_type() == "watch_tbl_qpn"){
4490                 is_aggr_query = false;
4491         is_watch_tbl = true;
4492                 vector<scalarexp_t *> empty_sl_list;
4493                 vector<cnf_elem *> empty_where;
4494                 sl_list = empty_sl_list;
4495                 where = empty_where;
4496                 gb_tbl = NULL;
4497                 aggr_tbl = NULL;
4498         } else {
4499                 fprintf(stderr,"INTERNAL ERROR, unrecognized node type %s in generate_lfta_block\n", fs->node_type().c_str());
4500                 exit(1);
4501         }
4502
4503 //                      Build list of "partial functions", by clause.
4504 //                      NOTE : partial fcns are not handles well.
4505 //                      The act of searching for them associates the fcn call
4506 //                      in the SE with an index to an array.  Refs to the
4507 //                      fcn value are replaced with refs to the variable they are
4508 //                      unpacked into.  A more general tagging mechanism would be better.
4509
4510         int i;
4511         vector<bool> *pfunc_ptr = NULL;
4512         vector<int> *ref_cnt_ptr = NULL;
4513         if(!is_aggr_query){             // don't collect cacheable fcns on aggr query.
4514                 ref_cnt_ptr = &fcn_ref_cnt;
4515                 pfunc_ptr = &is_partial_fcn;
4516         }
4517
4518         sl_fcns_start = 0;
4519         for(i=0;i<sl_list.size();i++){
4520                 find_partial_fcns(sl_list[i], &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
4521         }
4522         wh_fcns_start = sl_fcns_end = partial_fcns.size();
4523         for(i=0;i<where.size();i++){
4524                 find_partial_fcns_pr(where[i]->pr, &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
4525         }
4526         gb_fcns_start = wh_fcns_end = partial_fcns.size();
4527         if(gb_tbl != NULL){
4528                 for(i=0;i<gb_tbl->size();i++){
4529                         find_partial_fcns(gb_tbl->get_def(i), &partial_fcns, NULL, NULL, Ext_fcns);
4530                 }
4531         }
4532         ag_fcns_start = gb_fcns_end = partial_fcns.size();
4533         if(aggr_tbl != NULL){
4534                 for(i=0;i<aggr_tbl->size();i++){
4535                         find_partial_fcns(aggr_tbl->get_aggr_se(i), NULL, NULL, &is_partial_fcn, Ext_fcns);
4536                 }
4537         }
4538         ag_fcns_end = partial_fcns.size();
4539
4540 //              Fill up the is_partial_fcn and fcn_ref_cnt arrays.
4541         if(is_aggr_query){
4542                 for(i=0; i<partial_fcns.size();i++){
4543                         fcn_ref_cnt.push_back(1);
4544                         is_partial_fcn.push_back(true);
4545                 }
4546         }
4547
4548 //              Unmark non-partial expensive functions referenced only once.
4549         for(i=0; i<partial_fcns.size();i++){
4550                 if(!is_partial_fcn[i] && fcn_ref_cnt[i] <= 1){
4551                         partial_fcns[i]->set_partial_ref(-1);
4552                 }
4553         }
4554
4555         node_name = normalize_name(node_name);
4556
4557         retval += generate_preamble(schema, /*int_fcn_defs,*/ node_name, schema_embed_str);
4558
4559         if(packed_return){              // generate unpack struct
4560                 vector<tablevar_t *> input_tbls = fs->get_input_tbls();
4561                 int schref = input_tbls[0]->get_schema_ref();
4562                 vector<string> refd_cols;
4563                 for(s=0;s<sl_list.size();++s){
4564                         gather_nicsafe_cols(sl_list[s],refd_cols, nicp, gb_tbl);
4565                 }
4566                 for(p=0;p<where.size();++p){
4567 //                              I'm not disabling these preds ...
4568                         gather_nicsafe_cols(where[p]->pr,refd_cols, nicp, gb_tbl);
4569                 }
4570                 if(gb_tbl){
4571                         for(g=0;g<gb_tbl->size();++g){
4572                           gather_nicsafe_cols(gb_tbl->get_def(g),refd_cols, nicp, gb_tbl);
4573                         }
4574                 }
4575                 sort(refd_cols.begin(), refd_cols.end());
4576                 retval += "struct "+node_name+"_input_struct{\n";
4577                 retval += "\tint __lfta_id_fm_nic__;\n";
4578                 int vsi;
4579                 for(vsi=0;vsi<refd_cols.size();++vsi){
4580                 data_type dt(schema->get_type_name(schref,refd_cols[vsi]));
4581                         retval+="\t"+dt.get_cvar_type()+" unpack_var_"+refd_cols[vsi]+";\n";
4582                 }
4583                 retval+="};\n\n";
4584         }
4585
4586
4587 /////////////////////////////////////////////////////
4588 //                      Common stuff unpacked, do some generation
4589
4590
4591         if(is_aggr_query)
4592           retval += generate_aggr_struct(node_name, gb_tbl, aggr_tbl);
4593         if(is_fj)
4594                 retval += generate_fj_struct((filter_join_qpn *)fs, node_name);
4595         if(is_watch_tbl){
4596                 retval += "\n\n// watchtable code here \n\n";
4597                 watch_tbl_qpn *wl_node = (watch_tbl_qpn *)fs;
4598                 retval += generate_watchlist_structs(node_name, wl_node->table_layout, wl_node->filename, wl_node->refresh_interval);
4599                 retval += generate_watchlist_load(node_name, wl_node->table_layout, wl_node->key_flds);
4600         }
4601         
4602         if(! is_watch_tbl){
4603                 retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, is_wj, uses_bloom, schema);
4604                 retval += generate_tuple_struct(node_name, sl_list) ;
4605
4606                 if(is_aggr_query)
4607                         retval += generate_fta_flush(node_name, schema, Ext_fcns) ;
4608                 if(param_tbl->size() > 0)
4609                         retval += generate_fta_load_params(node_name) ;
4610                 retval += generate_fta_free(node_name, is_aggr_query) ;
4611                 retval +=  generate_fta_control(node_name, schema, is_aggr_query) ;
4612                 retval +=  generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, is_wj, s_pids) ;
4613
4614         /* extract the value of Time_Correlation from interface definition */
4615                 int e,v;
4616                 string es;
4617                 unsigned time_corr;
4618                 vector<tablevar_t *> tvec =  fs->get_input_tbls();
4619                 vector<string> time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es);
4620                 if (time_corr_vec.empty())
4621                         time_corr = DEFAULT_TIME_CORR;
4622                 else
4623                         time_corr = atoi(time_corr_vec[0].c_str());
4624
4625                 retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query, is_wj) );
4626                 retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) );
4627         }
4628
4629   return(retval);
4630 }
4631
4632
4633
4634 int compute_snap_len(qp_node *fs, table_list *schema, string snap_type){
4635
4636 //              Initialize global vars
4637         gb_tbl = NULL;
4638         sl_list.clear(); where.clear();
4639
4640         
4641         if(fs->node_type() == "watch_tbl_qpn"){
4642                 return -1;
4643         }
4644
4645         if(fs->node_type() == "spx_qpn"){
4646                 spx_qpn *spx_node = (spx_qpn *)fs;
4647                 sl_list = spx_node->get_select_se_list();
4648                 where = spx_node->get_where_clause();
4649         }
4650         else if(fs->node_type() == "sgah_qpn"){
4651                 sgah_qpn *sgah_node = (sgah_qpn *)fs;
4652                 sl_list = sgah_node->get_select_se_list();
4653                 where = sgah_node->get_where_clause();
4654                 gb_tbl = sgah_node->get_gb_tbl();
4655         }
4656         else if(fs->node_type() == "filter_join"){
4657                 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
4658                 sl_list = fj_node->get_select_se_list();
4659                 where = fj_node->get_where_clause();
4660         }
4661         else if(fs->node_type() == "watch_join"){
4662                 watch_join_qpn *fj_node = (watch_join_qpn *)fs;
4663                 sl_list = fj_node->get_select_se_list();
4664                 where = fj_node->get_where_clause();
4665         } else{
4666                 fprintf(stderr,"INTERNAL ERROR, node type %s not recognized in compute_snap_len\n",fs->node_type().c_str());
4667                 exit(1);
4668         }
4669
4670 //                      Gather all column references, need to define unpacking variables.
4671   int w,s;
4672   col_id_set cid_set;
4673   col_id_set::iterator csi;
4674
4675   for(w=0;w<where.size();++w)
4676         gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
4677   for(s=0;s<sl_list.size();s++){
4678         gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
4679   }
4680
4681   int g;
4682   if(gb_tbl != NULL){
4683         for(g=0;g<gb_tbl->size();g++)
4684           gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
4685   }
4686
4687   //                    compute snap length
4688   int snap_len = -1;
4689   int n_snap=0;
4690   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4691     int schref = (*csi).schema_ref;
4692         int tblref = (*csi).tblvar_ref;
4693     string field = (*csi).field;
4694
4695         if(snap_type == "index"){
4696                 int pos = schema->get_field_idx(schref, field);
4697                 if(pos>snap_len) snap_len = pos;
4698                 n_snap++;
4699         }else{
4700                 param_list *field_params = schema->get_modifier_list(schref, field);
4701                 if(field_params->contains_key("snap_len")){
4702                         string fld_snap_str = field_params->val_of("snap_len");
4703                         int fld_snap;
4704                         if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
4705                                 if(fld_snap > snap_len) snap_len = fld_snap;
4706                                 n_snap++;
4707                         }else{
4708                                 fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
4709                         }
4710                 }
4711         }
4712   }
4713
4714   if(n_snap == cid_set.size()){
4715         return (snap_len);
4716   }else{
4717         return -1;
4718   }
4719
4720
4721 }
4722
4723 //              Function which computes an optimal
4724 //              set of unpacking functions.
4725
4726 void find_optimal_unpack_fcns(col_id_set &upref_cids, table_list *Schema, map<col_id, string,lt_col_id> &ucol_fcn_map){
4727         map<string, int> pfcn_count;
4728         map<string, int>::iterator msii;
4729         col_id_set::iterator cisi;
4730         set<string>::iterator ssi;
4731         string best_fcn;
4732
4733         while(ucol_fcn_map.size() < upref_cids.size()){
4734
4735 //                      Gather unpack functions referenced by unaccounted-for
4736 //                      columns, and increment their reference count.
4737                 pfcn_count.clear();
4738                 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
4739                         if(ucol_fcn_map.count((*cisi)) == 0){
4740                                 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
4741                                 for(ssi=ufcns.begin();ssi!=ufcns.end();++ssi)
4742                                         pfcn_count[(*ssi)]++;
4743                         }
4744                 }
4745
4746 //              Get the lowest cost per field function.
4747                 float min_cost = 0.0;
4748                 string best_fcn = "";
4749                 for(msii=pfcn_count.begin();msii!=pfcn_count.end();++msii){
4750                         int fcost = Schema->get_ufcn_cost((*msii).first);
4751                         if(fcost < 0){
4752                                 fprintf(stderr,"CONFIGURATION ERROR, unpack function %s either has negative cost or is not defined.\n",(*msii).first.c_str());
4753                                 exit(1);
4754                         }
4755                         float this_cost = (1.0*fcost)/(*msii).second;
4756                         if(msii == pfcn_count.begin() || this_cost < min_cost){
4757                                 min_cost = this_cost;
4758                                 best_fcn = (*msii).first;
4759                         }
4760                 }
4761                 if(best_fcn == ""){
4762                         fprintf(stderr,"ERROR, could not find a best field unpqacking function.\n");
4763                         exit(1);
4764                 }
4765
4766 //              Assign this function to the unassigned fcns which use it.
4767                 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
4768                         if(ucol_fcn_map.count((*cisi)) == 0){
4769                                 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
4770                                 if(ufcns.count(best_fcn)>0)
4771                                         ucol_fcn_map[(*cisi)] = best_fcn;
4772                         }
4773                 }
4774         }
4775 }
4776
4777
4778
4779 //              Generate an initial test test for the lfta
4780 //              Assume that the predicate references no external functions,
4781 //              and especially no partial functions,
4782 //              aggregates, internal functions.
4783 string generate_lfta_prefilter(vector<cnf_set *> &pred_list,
4784                 col_id_set &temp_cids, table_list *Schema, ext_fcn_list *Ext_fcns,
4785                 vector<col_id_set> &lfta_cols, vector<long long int> &lfta_sigs,
4786                 vector<int> &lfta_snap_lens, string iface){
4787   col_id_set tmp_cid_set, cid_set,upref_cids,upall_cids;
4788   col_id_set::iterator csi;
4789         int o,p,q;
4790         string ret;
4791
4792 //              Gather complex literals in the prefilter.
4793         cplx_lit_table *complex_literals = new cplx_lit_table();
4794         for(p=0;p<pred_list.size();++p){
4795                 find_complex_literal_pr(pred_list[p]->pr,Ext_fcns, complex_literals);
4796         }
4797
4798
4799 //              Find the combinable predicates
4800         vector<predicate_t *> pr_list;
4801         for(p=0;p<pred_list.size();++p){
4802         find_combinable_preds(pred_list[p]->pr,&pr_list, Schema, Ext_fcns);
4803         }
4804
4805 //              Analyze the combinable predicates to find the predicate classes.
4806         pred_class.clear();             // idx to equiv pred in equiv_list
4807         pred_pos.clear();               // idx to returned bitmask.
4808         vector<predicate_t *> equiv_list;
4809         vector<int> num_equiv;
4810
4811
4812         for(p=0;p<pr_list.size();++p){
4813                 for(q=0;q<equiv_list.size();++q){
4814                         if(is_equivalent_class_pred_base(equiv_list[q],pr_list[p],Schema,Ext_fcns))
4815                                 break;
4816                 }
4817                 if(q == equiv_list.size()){             // no equiv : create new
4818                         pred_class.push_back(equiv_list.size());
4819                         equiv_list.push_back(pr_list[p]);
4820                         pred_pos.push_back(0);
4821                         num_equiv.push_back(1);
4822
4823                 }else{                  // pr_list[p] is equivalent to pred q
4824                         pred_class.push_back(q);
4825                         pred_pos.push_back(num_equiv[q]);
4826                         num_equiv[q]++;
4827                 }
4828         }
4829
4830 //              Generate the variables which hold the common pred handles
4831         ret += "/*\t\tprefilter global vars.\t*/\n";
4832         for(q=0;q<equiv_list.size();++q){
4833                 for(p=0;p<=(num_equiv[q]/32);++p){
4834                         ret += "void *pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface+";\n";
4835                 }
4836         }
4837
4838 //              Struct to hold prefilter complex literals
4839         ret += "struct prefilter_complex_lit_struct_"+iface+" {\n";
4840         if(complex_literals->size() == 0)
4841                 ret += "\tint no_variable;\n";
4842         int cl;
4843         for(cl=0;cl<complex_literals->size();cl++){
4844                 literal_t *l = complex_literals->get_literal(cl);
4845                 data_type *dtl = new data_type( l->get_type() );
4846                 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
4847                 ret += tmpstr;
4848         }
4849         ret += "} prefilter_complex_lits_"+iface+";\n\n";
4850
4851
4852 //              Generate the prefilter initialziation code
4853         ret += "void init_lfta_prefilter_"+iface+"(){\n";
4854
4855 //              First initialize complex literals, if any.
4856         ret += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4857         for(cl=0;cl<complex_literals->size();cl++){
4858                 literal_t *l = complex_literals->get_literal(cl);
4859                 sprintf(tmpstr,"&(t->complex_literal_%d)",cl);
4860                 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
4861         }
4862
4863
4864         set<int> epred_seen;
4865         for(p=0;p<pr_list.size();++p){
4866                 int q = pred_class[p];
4867 //printf("\tq=%d\n",q);
4868                 if(epred_seen.count(q)>0){
4869                         ret += "\tregister_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4870                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4871                         vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4872                         for(o=0;o<op_list.size();++o){
4873                                 if(! cl_op[o]){
4874                                         ret += generate_se_code(op_list[o],Schema)+", ";
4875                                 }
4876                         }
4877                         ret += "pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+","+int_to_string(pred_pos[p]%32)+");\n";
4878                         epred_seen.insert(q);
4879                 }else{
4880                         ret += "\tpref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+" = (void *)register_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4881                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4882                         vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4883                         for(o=0;o<op_list.size();++o){
4884                                 if(! cl_op[o]){
4885                                         ret += generate_se_code(op_list[o],Schema)+", ";
4886                                 }
4887                         }
4888                         ret += "NULL,"+int_to_string(pred_pos[p]%32)+");\n";
4889                         epred_seen.insert(q);
4890                 }
4891         }
4892         ret += "}\n\n";
4893
4894
4895
4896 //              Start on main body code generation
4897   ret+="gs_uint64_t lfta_prefilter_"+iface+"(void *pkt){\n";
4898
4899
4900 ///--------------------------------------------------------------
4901 ///             Generate and store the prefilter body,
4902 ///             reuse it for the snap length calculator
4903 ///-------------------------------------------------------------
4904         string body;
4905
4906     body += "\tstruct packet *p = (struct packet *)pkt;\n";
4907
4908
4909
4910 //              Gather the colids to store unpacked variables.
4911         for(p=0;p<pred_list.size();++p){
4912         gather_pr_col_ids(pred_list[p]->pr,tmp_cid_set, gb_tbl);
4913         }
4914
4915 //              make the col_ids refer to the base tables, and
4916 //              grab the col_ids with at least one unpacking function.
4917         for(csi=tmp_cid_set.begin();csi!=tmp_cid_set.end();++csi){
4918                 string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
4919                 col_id tmp_col_id;
4920                 tmp_col_id.field = (*csi).field;
4921                 tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
4922                 tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
4923                 cid_set.insert(tmp_col_id);
4924                 field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
4925                 if(fe->get_unpack_fcns().size()>0)
4926                         upref_cids.insert(tmp_col_id);
4927
4928
4929         }
4930
4931 //              Find the set of unpacking programs needed for the
4932 //              prefilter fields.
4933         map<col_id, string,lt_col_id>  ucol_fcn_map;
4934         find_optimal_unpack_fcns(upref_cids, Schema, ucol_fcn_map);
4935         set<string> pref_ufcns;
4936         map<col_id, string,lt_col_id>::iterator mcis;
4937         for(mcis=ucol_fcn_map.begin(); mcis!=ucol_fcn_map.end(); mcis++){
4938                 pref_ufcns.insert((*mcis).second);
4939         }
4940
4941
4942
4943 //                      Variables for unpacking attributes.
4944     body += "/*\t\tVariables for unpacking attributes\t*/\n";
4945     for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4946       int schref = (*csi).schema_ref;
4947           int tblref = (*csi).tblvar_ref;
4948       string field = (*csi).field;
4949       data_type dt(Schema->get_type_name(schref,field));
4950       sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4951         field.c_str(), tblref);
4952       body += tmpstr;
4953       sprintf(tmpstr,"\tgs_retval_t ret_%s_%d;\n", field.c_str(),tblref);
4954       body += tmpstr;
4955     }
4956 //                      Variables for unpacking temporal attributes.
4957     body += "/*\t\tVariables for unpacking temporal attributes\t*/\n";
4958     for(csi=temp_cids.begin(); csi!=temp_cids.end();++csi){
4959           if (cid_set.count(*csi) == 0) {
4960         int schref = (*csi).schema_ref;
4961                 int tblref = (*csi).tblvar_ref;
4962         string field = (*csi).field;
4963         data_type dt(Schema->get_type_name(schref,field));
4964         sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4965           field.c_str(), tblref);
4966         body += tmpstr;
4967
4968           }
4969     }
4970     body += "\n\n";
4971
4972 //              Variables for combinable predicate evaluation
4973         body += "/*\t\tVariables for common prdicate evaluation\t*/\n";
4974         for(q=0;q<equiv_list.size();++q){
4975                 for(p=0;p<=(num_equiv[q]/32);++p){
4976                         body += "unsigned long int pref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = 0;\n";
4977                 }
4978         }
4979
4980
4981 //                      Variables that are always needed
4982     body += "/*\t\tVariables which are always needed\t*/\n";
4983         body += "\tgs_uint64_t retval=0, bitpos=1;\n";
4984         body += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4985
4986 //              Call the unpacking functions for the prefilter fields
4987         if(pref_ufcns.size() > 0)
4988                 body += "\n/*\t\tcall field unpacking functions\t*/\n";
4989         set<string>::iterator ssi;
4990         for(ssi=pref_ufcns.begin(); ssi!=pref_ufcns.end(); ++ssi){
4991                 body += "\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
4992         }
4993
4994
4995 //              Unpack the accessed attributes
4996         body += "\n/*\t\tUnpack the accessed attributes.\t*/\n";
4997     for(csi=cid_set.begin();csi!=cid_set.end();++csi){
4998           int tblref = (*csi).tblvar_ref;
4999       int schref = (*csi).schema_ref;
5000           string field = (*csi).field;
5001           sprintf(tmpstr,"\tret_%s_%d =  (%s(p, &unpack_var_%s_%d) == 0);\n",
5002                 field.c_str(),tblref,Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
5003           body += tmpstr;
5004     }
5005
5006 //              next unpack the temporal attributes and ignore the errors
5007 //              We are assuming here that failed unpack of temporal attributes
5008 //              is not going to overwrite the last stored value
5009 //              Failed upacks are ignored
5010     for(csi=temp_cids.begin();csi!=temp_cids.end();++csi){
5011           int tblref = (*csi).tblvar_ref;
5012       int schref = (*csi).schema_ref;
5013           string field = (*csi).field;
5014           sprintf(tmpstr,"\t%s(p, &prefilter_temp_vars.unpack_var_%s_%d);\n",
5015                  Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
5016           body += tmpstr;
5017     }
5018
5019 //              Evaluate the combinable predicates
5020         if(equiv_list.size()>0)
5021                 body += "/*\t\tEvaluate the combinable predicates.\t*/\n";
5022         for(q=0;q<equiv_list.size();++q){
5023                 for(p=0;p<=(num_equiv[q]/32);++p){
5024
5025 //              Only call the common eval fcn if all ref'd fields present.
5026                         col_id_set pred_cids;
5027                         col_id_set::iterator cpi;
5028                         gather_pr_col_ids(equiv_list[q], pred_cids, gb_tbl);
5029                         if(pred_cids.size()>0){
5030                         body += "\tif(";
5031                                 for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
5032                                         if(cpi != pred_cids.begin())
5033                                                 body += " && ";
5034                                 string field = (*cpi).field;
5035                                         int tblref = (*cpi).tblvar_ref;
5036                                         body += "ret_"+field+"_"+int_to_string(tblref);
5037                                 }
5038                                 body+=")\n";
5039                         }
5040
5041                         body += "\t\tpref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = eval_commonpred_"+equiv_list[q]->get_op()+"(pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface;
5042                         vector<scalarexp_t *> op_list = equiv_list[q]->get_op_list();
5043                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
5044                         for(o=0;o<op_list.size();++o){
5045                                 if(cl_op[o]){
5046                                         body += ","+generate_se_code(op_list[o],Schema);
5047                                 }
5048                         }
5049                         body += ");\n";
5050                 }
5051         }
5052
5053
5054         for(p=0;p<pred_list.size();++p){
5055                 col_id_set pred_cids;
5056                 col_id_set::iterator cpi;
5057                 gather_pr_col_ids(pred_list[p]->pr,pred_cids, gb_tbl);
5058                 if(pred_cids.size()>0){
5059                         body += "\tif(";
5060                         for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
5061                                 if(cpi != pred_cids.begin())
5062                                         body += " && ";
5063                         string field = (*cpi).field;
5064                                 int tblref = (*cpi).tblvar_ref;
5065                                 body += "ret_"+field+"_"+int_to_string(tblref);
5066                         }
5067                         body+=")\n";
5068                 }
5069         body += "\t\tif("+generate_predicate_code(pred_list[p]->pr,Schema)+")\n\t\t\tretval |= bitpos;\n";
5070                 body+="\tbitpos = bitpos << 1;\n";
5071         }
5072
5073 // ---------------------------------------------------------------
5074 //              Finished with the body of the prefilter
5075 // --------------------------------------------------------------
5076
5077         ret += body;
5078
5079 //                      Collect fields referenced by an lfta but not
5080 //                      already unpacked for the prefilter.
5081
5082 //printf("upref_cids is:\n");
5083 //for(csi=upref_cids.begin();csi!=upref_cids.end();csi++)
5084 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
5085 //printf("pref_ufcns is:\n");
5086 //for(ssi=pref_ufcns.begin();ssi!=pref_ufcns.end();++ssi)
5087 //printf("\t%s\n",(*ssi).c_str());
5088
5089         int l;
5090         for(l=0;l<lfta_cols.size();++l){
5091                 for(csi=lfta_cols[l].begin();csi!=lfta_cols[l].end();++csi){
5092                         string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
5093                         col_id tmp_col_id;
5094                         tmp_col_id.field = (*csi).field;
5095                         tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
5096                         tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
5097                         field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
5098                         set<string> fld_ufcns = fe->get_unpack_fcns();
5099 //printf("tmpcol is (%s, %d), ufcns size is %d, upref_cids cnt is %d\n",tmp_col_id.field.c_str(),tmp_col_id.schema_ref,fld_ufcns.size(),  upref_cids.count(tmp_col_id));
5100                         if(fld_ufcns.size()>0 && upref_cids.count(tmp_col_id) == 0){
5101 //              Ensure that this field not already unpacked.
5102                                 bool found = false;
5103                                 for(ssi=fld_ufcns.begin();ssi!=fld_ufcns.end();++ssi){
5104 //printf("\tField has unpacking fcn %s\n",(*ssi).c_str());
5105                                         if(pref_ufcns.count((*ssi))){
5106 //printf("Field already unpacked.\n");
5107                                                 found = true;;
5108                                         }
5109                                 }
5110                                 if(! found){
5111 //printf("\tadding to unpack list\n");
5112                                         upall_cids.insert(tmp_col_id);
5113                                 }
5114                         }
5115                 }
5116         }
5117
5118 //printf("upall_cids is:\n");
5119 //for(csi=upall_cids.begin();csi!=upall_cids.end();csi++)
5120 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
5121
5122 //              Get the set of unpacking programs for these.
5123         map<col_id, string,lt_col_id>  uall_fcn_map;
5124         find_optimal_unpack_fcns(upall_cids, Schema, uall_fcn_map);
5125         set<string> pall_ufcns;
5126         for(mcis=uall_fcn_map.begin(); mcis!=uall_fcn_map.end(); mcis++){
5127 //printf("uall_fcn_map[%s %d] = %s\n",(*mcis).first.field.c_str(),(*mcis).first.schema_ref,(*mcis).second.c_str());
5128                 pall_ufcns.insert((*mcis).second);
5129         }
5130
5131 //              Iterate through the remaining set of unpacking function
5132         if(pall_ufcns.size() > 0)
5133                 ret += "//\t\tcall all remaining field unpacking functions.\n";
5134         for(ssi=pall_ufcns.begin(); ssi!=pall_ufcns.end(); ++ssi){
5135 //              gather the set of columns unpacked by this ufcn
5136                 col_id_set fcol_set;
5137                 for(csi=upall_cids.begin();csi!=upall_cids.end();++csi){
5138                         if(uall_fcn_map[(*csi)] == (*ssi))
5139                                 fcol_set.insert((*csi));
5140                 }
5141
5142 //              gather the set of lftas which access a field unpacked by the fcn
5143                 set<long long int> clfta;
5144                 for(l=0;l<lfta_cols.size();l++){
5145                         for(csi=fcol_set.begin();csi!=fcol_set.end();++csi){
5146                                 if(lfta_cols[l].count((*csi)) > 0)
5147                                         break;
5148                         }
5149                         if(csi != fcol_set.end())
5150                                 clfta.insert(lfta_sigs[l]);
5151                 }
5152
5153 //              generate the unpacking code
5154                 ret += "\tif(";
5155                 set<long long int>::iterator sii;
5156                 for(sii=clfta.begin();sii!=clfta.end();++sii){
5157                         if(sii!=clfta.begin())
5158                                 ret += " || ";
5159                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sii),(*sii));
5160                         ret += tmpstr;
5161                 }
5162                 ret += ")\n\t\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
5163         }
5164
5165
5166     ret += "\treturn(retval);\n\n";
5167   ret += "}\n\n";
5168
5169
5170 // --------------------------------------------------------
5171 //              reuse prefilter body for snaplen calculator
5172 //
5173 //      This is dummy code, so I'm commenting it out.
5174
5175 /*
5176   ret+="gs_uint32_t lfta_pkt_snaplen(void *pkt){\n";
5177
5178         ret += body;
5179
5180         int i;
5181         vector<int> s_snaps = lfta_snap_lens;
5182         sort(s_snaps.begin(), s_snaps.end());
5183
5184         if(s_snaps[0] == -1){
5185                 set<unsigned long long int> sigset;
5186                 for(i=0;i<lfta_snap_lens.size();++i){
5187                         if(lfta_snap_lens[i] == -1){
5188                                 sigset.insert(lfta_sigs[i]);
5189                         }
5190                 }
5191                 ret += "\tif( ";
5192                 set<unsigned long long int>::iterator sulli;
5193                 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
5194                         if(sulli!=sigset.begin())
5195                                 ret += " || ";
5196                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
5197                         ret += tmpstr;
5198                 }
5199                 ret += ") return -1;\n";
5200         }
5201
5202         int nextpos = lfta_snap_lens.size()-1;
5203         int nextval = lfta_snap_lens[nextpos];
5204         while(nextval >= 0){
5205                 set<unsigned long long int> sigset;
5206                 for(i=0;i<lfta_snap_lens.size();++i){
5207                         if(lfta_snap_lens[i] == nextval){
5208                                 sigset.insert(lfta_sigs[i]);
5209                         }
5210                 }
5211                 ret += "\tif( ";
5212                 set<unsigned long long int>::iterator sulli;
5213                 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
5214                         if(sulli!=sigset.begin())
5215                                 ret += " || ";
5216                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
5217                         ret += tmpstr;
5218                 }
5219                 ret += ") return "+int_to_string(nextval)+";\n";
5220
5221                 for(nextpos--;nextpos>0 && lfta_snap_lens[nextpos] == nextval;nextpos--);
5222                 if(nextpos>0)
5223                         nextval = lfta_snap_lens[nextpos];
5224                 else
5225                         nextval = -1;
5226         }
5227         ret += "\treturn 0;\n";
5228         ret += "}\n\n";
5229 */
5230
5231
5232   return(ret);
5233 }
5234
5235
5236
5237
5238 //              Generate the struct which will store the the values of
5239 //              temporal attributesunpacked by prefilter
5240 string generate_lfta_prefilter_struct(col_id_set &cid_set, table_list *Schema) {
5241
5242   col_id_set::iterator csi;
5243
5244 // printf("generate_lfta_prefilter_struct : %d vars\n",cid_set.size());
5245
5246   string ret="struct prefilter_unpacked_temp_vars {\n";
5247   ret += "\t/*\tVariables for unpacking temporal attributes\t*/\n";
5248
5249   string init_code;
5250
5251   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
5252     int schref = (*csi).schema_ref;
5253     int tblref = (*csi).tblvar_ref;
5254     string field = (*csi).field;
5255         data_type dt(Schema->get_type_name(schref,field), Schema->get_modifier_list(schref,field));
5256     sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
5257         field.c_str(), tblref);
5258     ret += tmpstr;
5259
5260         if (init_code != "")
5261                 init_code += ", ";
5262         if (dt.is_increasing())
5263                 init_code += dt.get_min_literal();
5264         else
5265                 init_code += dt.get_max_literal();
5266
5267   }
5268   ret += "};\n\n";
5269
5270   ret += "struct prefilter_unpacked_temp_vars prefilter_temp_vars = {" + init_code + "};\n\n";
5271
5272   return(ret);
5273 }