Added watchlist support
[com/gs-lite.git] / src / ftacmp / generate_lfta_code.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #include <string>
17 #include <stdio.h>
18 #include <stdlib.h>
19 //#include <algo.h>
20 #include<algorithm>
21
22 #include "parse_fta.h"
23 #include "parse_schema.h"
24 #include "analyze_fta.h"
25 #include "generate_utils.h"
26 #include "query_plan.h"
27 #include "generate_lfta_code.h"
28 #include "generate_nic_code.h"
29
30 using namespace std;
31
32 extern int DEFAULT_LFTA_HASH_TABLE_SIZE;
33
34 // default value for correlation between the interface card and
35 // the system clock
36 #define DEFAULT_TIME_CORR 16
37
38
39 //      For fast hashing
40 //#define NRANDS 100
41 extern string hash_nums[NRANDS];
42 /*
43 = {
44 "12916008961267169387ull", "13447227858232756685ull",
45 "15651770379918602919ull", "1154671861688431608ull",
46 "6777078091984849858ull", "14217205709582564356ull",
47 "4955408621820609982ull", "15813680319165523695ull",
48 "9897969721407807129ull", "5799700135519793083ull",
49 "3446529189623437397ull", "2766403683465910630ull",
50 "3759321430908793328ull", "6569396511892890354ull",
51 "11124853911180290924ull", "17425412145238035549ull",
52 "6879931585355039943ull", "16598635011539670441ull",
53 "9615975578494811651ull", "4378135509538422740ull",
54 "741282195344332574ull", "17368612862906255584ull",
55 "17294299200556814618ull", "518343398779663051ull",
56 "3861893449302272757ull", "8951107288843549591ull",
57 "15785139392894559409ull", "5917810836789601602ull",
58 "16169988133001117004ull", "9792861259254509262ull",
59 "5089058010244872136ull", "2130075224835397689ull",
60 "844136788226150435ull", "1303298091153875333ull",
61 "3579898206894361183ull", "7529542662845336496ull",
62 "13151949992653382522ull", "2145333536541545660ull",
63 "11258221828939586934ull", "3741808146124570279ull",
64 "16272841626371307089ull", "12174572036188391283ull",
65 "9749343496254107661ull", "9141275584134508830ull",
66 "10134192232065698216ull", "12944268412561423018ull",
67 "17499725811865666340ull", "5281482378159088661ull",
68 "13254803486023572607ull", "4526762838498717025ull",
69 "15990846379668494011ull", "10680949816169027468ull",
70 "7116154096012931030ull", "5296740689865236632ull",
71 "5222427027515795922ull", "6893215299448261251ull",
72 "10164707755932877485ull", "15325979189512082255ull",
73 "3713267224148573289ull", "12292682741753167354ull",
74 "4098115959960163588ull", "16095675565885113990ull",
75 "11391590846210510720ull", "8432889531466002673ull",
76 "7146668520368482523ull", "7678169991822407997ull",
77 "9882712513525031447ull", "13904414563513869160ull",
78 "1080076724395768626ull", "8448147843172150388ull",
79 "17633093729608185134ull", "10044622457050142303ull",
80 "4128911859292425737ull", "30642269109444395ull",
81 "16124215396922640581ull", "15444089895060081110ull",
82 "16437006538696302944ull", "800338649777443426ull",
83 "5355794945275091932ull", "11656354278827687117ull",
84 "1110873718944691255ull", "10829576045617693977ull",
85 "3846916616884579955ull", "17055821716837625668ull",
86 "13418968402643535758ull", "11671612594828802128ull",
87 "11597298928184328586ull", "13196028510862205499ull",
88 "16539578557089782373ull", "3182048322921507591ull",
89 "10016080431267550241ull", "148751875162592690ull",
90 "10400930266590768572ull", "4023803397139127870ull",
91 "17766462746879108920ull", "14807761432134600873ull",
92 "13521540421053792403ull", "13980983198941385205ull",
93 "16257584414193564367ull", "1760484796451765024ull"
94 };
95 */
96
97
98 // ----------------------------------------------
99 //              Data extracted from the query plan node
100 //              for use by code generation.
101
102 static cplx_lit_table *complex_literals;  //Table of literals with constructors.
103 static vector<handle_param_tbl_entry *> param_handle_table;
104 static param_table *param_tbl;          // Table of all referenced parameters.
105
106 static vector<scalarexp_t *> sl_list;
107 static vector<cnf_elem *> where;
108
109 static gb_table *gb_tbl;                        // Table of all group-by attributes.
110 static aggregate_table *aggr_tbl;       // Table of all referenced aggregates.
111
112 static bool packed_return;              // unpack using structyure, not fcns
113 static nic_property *nicprop;   // nic properties for this interface.
114 static int global_id;
115
116
117 //              The partial_fcns vector can now refer to
118 //              partial functions, or expensive functions
119 //              which can be cached (if there are multiple refs).  A couple
120 //              of int vectors distinguish the cases.
121 static vector<scalarexp_t *> partial_fcns;
122 static vector<int> fcn_ref_cnt;
123 static vector<bool> is_partial_fcn;
124 int sl_fcns_start = 0, sl_fcns_end = 0;
125 int wh_fcns_start = 0, wh_fcns_end = 0;
126 int gb_fcns_start = 0, gb_fcns_end = 0;
127 int ag_fcns_start = 0, ag_fcns_end = 0;
128
129
130 //              These vectors are for combinable predicates.
131 static vector<int> pred_class;  //      identifies the group
132 static vector<int> pred_pos;    //  position in the group.
133
134
135
136 static char tmpstr[1000];
137
138 //////////////////////////////////////////////////////////////////////
139 ///                     Various utilities
140
141 string generate_fta_name(string node_name){
142         string ret = normalize_name(node_name);
143         if(ret == ""){
144                 ret = "default";
145         }
146         ret += "_fta";
147
148         return(ret);
149 }
150
151
152 string generate_aggr_struct_name(string node_name){
153         string ret = normalize_name(node_name);
154         if(ret == ""){
155                 ret = "default";
156         }
157         ret += "_aggr_struct";
158
159         return(ret);
160 }
161
162 string generate_fj_struct_name(string node_name){
163         string ret = normalize_name(node_name);
164         if(ret == ""){
165                 ret = "default";
166         }
167         ret += "_fj_struct";
168
169         return(ret);
170 }
171
172 string generate_watchlist_element_name(string node_name){
173         string ret = normalize_name(node_name);
174         if(ret == ""){
175                 ret = "default";
176         }
177         ret += "__wl_elem";
178
179         return(ret);
180 }
181
182 string generate_watchlist_struct_name(string node_name){
183         string ret = normalize_name(node_name);
184         if(ret == ""){
185                 ret = "default";
186         }
187         ret += "__wl_struct";
188
189         return(ret);
190 }
191
192 string generate_watchlist_name(string node_name){
193         string ret = normalize_name(node_name);
194         if(ret == ""){
195                 ret = "default";
196         }
197         ret += "__wl";
198
199         return(ret);
200 }
201
202 string generate_unpack_code(int tblref, int schref, string field, table_list *schema, string node_name, string end_goto = string("end")){
203         string ret;
204         if(! packed_return){
205                 sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",
206                         schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
207         ret += tmpstr;
208         if(!schema->get_modifier_list(schref,field)->contains_key("required"))
209                 ret += "\tif(retval) goto "+end_goto+";\n";
210
211         }else{
212 //                      TODO: ntoh xforms (aug 2010 : removing ntoh, hton)
213                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
214                 if(dt.is_buffer_type()){
215                         if(dt.get_type() != v_str_t){
216                           ret += "\tif(sizeof(struct "+node_name+"_input_struct)+"+node_name+"_input_struct_var->unpack_var_"+field+".length+int("+node_name+"_input_struct_var->unpack_var_"+field+".data) > sz)\n";
217                           ret += "\t\tgoto "+end_goto+";\n";
218                           ret+= "\t\t"+node_name+"_input_struct_var->unpack_var_"+field+".data += "+node_name+"_input_struct_var->unpack_var_"+field+".length;\n";
219                           ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
220                                 " = "+node_name+"_input_struct_var->unpack_var_"+field+";+\n";
221                         }else{
222                                 fprintf(stderr,"INTERNAL ERROR buffer type not string type in generate_lfta_code.cc:generate_unpack_code\n");
223                                 exit(1);
224                         }
225                 }else{
226                         ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
227                                 " = "+node_name+"_input_struct_var->unpack_var_"+field+";\n";
228                 }
229         }
230         return ret;
231 }
232
233 string generate_aggr_struct(string node_name, gb_table *gb_tbl, aggregate_table *aggr_tbl){
234   string ret = "struct " + generate_aggr_struct_name(node_name) + "{\n";
235
236   int g;
237   for(g=0;g<gb_tbl->size();g++){
238           sprintf(tmpstr,"gb_var%d",g);
239           ret += "\t"+gb_tbl->get_data_type(g)->make_cvar(tmpstr)+";\n";
240   }
241
242   int a;
243   for(a=0;a<aggr_tbl->size();a++){
244           ret += "\t";
245           sprintf(tmpstr,"aggr_var%d",a);
246           if(aggr_tbl->is_builtin(a))
247                 ret+="\t"+aggr_tbl->get_data_type(a)->make_cvar(tmpstr)+";\n";
248           else
249                 ret+="\t"+aggr_tbl->get_storage_type(a)->make_cvar(tmpstr)+";\n";
250   }
251
252 /*
253   ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *next;\n";
254 */
255
256   ret += "};\n\n";
257
258   return(ret);
259 }
260
261
262 string generate_fj_struct(filter_join_qpn *fs, string node_name ){
263   string ret;
264
265   if(fs->use_bloom == false){   // uses hash table instead
266         ret = "struct " + generate_fj_struct_name(node_name) + "{\n";
267         int k;
268         for(k=0;k<fs->hash_eq.size();++k){
269                 sprintf(tmpstr,"key_var%d",k);
270                 ret += "\t"+fs->hash_eq[k]->pr->get_right_se()->get_data_type()->make_cvar(tmpstr)+";\n";
271         }
272         ret += "\tlong long int ts;\n";
273     ret += "};\n\n";
274   }
275
276   return(ret);
277 }
278
279 string generate_watchlist_structs(string node_name, table_def *tbl,
280                 std::string filename, int refresh_interval){
281         string ret;
282
283         ret += "struct "+generate_watchlist_element_name(node_name)+"{\n";
284         vector<field_entry *> fields = tbl->get_fields();
285         for(int f=0;f<fields.size();++f){
286                 data_type dt(fields[f]->get_type());
287                 ret += "\t"+dt.make_cvar(fields[f]->get_name())+";\n";
288         }
289         ret += "\tgs_uint64_t hashval;\n";
290         ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next;\n";
291         ret += "};\n\n";
292
293         ret += "char *"+generate_watchlist_name(node_name)+"__fstr = \""+filename+"\";\n";
294         ret += "struct "+generate_watchlist_struct_name(node_name)+"{\n";
295         ret += "\tstruct "+ generate_watchlist_element_name(node_name)+" **ht;\n";
296         ret += "\tgs_uint32_t ht_size;\n";
297         ret += "\tgs_uint32_t n_elem;\n";
298         ret += "\tgs_uint32_t refresh_interval;\n";
299         ret += "\ttime_t next_refresh;\n";
300         ret += "\ttime_t last_mtime;\n";        
301         ret += "\tchar *filename;\n";
302         ret += "} "+generate_watchlist_name(node_name)+" = { NULL, 0, 0, "+std::to_string(refresh_interval)+", 0, 0, NULL};\n\n";
303
304         return ret;
305 }
306
307 string generate_watchlist_load(string node_name, table_def *tbl, vector<string> keys){
308         string ret;
309         string tgt = generate_watchlist_name(node_name);
310         vector<field_entry *> fields = tbl->get_fields();
311
312         ret += "void reload_watchlist__"+node_name+"(){\n";
313         ret += "\tgs_uint32_t i;\n";
314         ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *ptr = NULL;\n";
315         ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next = NULL;\n";
316         ret += "\tFILE *fl;\n";
317         ret += "\tchar buf[10000];\n";
318         ret += "\tgs_uint32_t buflen = 10000;\n";
319         ret += "\tchar *flds["+std::to_string(fields.size())+"];\n";
320         ret += "\tgs_uint32_t pos, f, linelen, malformed;\n";
321         ret += "\tgs_uint32_t n_malformed, short_lines, toolong_lines, ok;\n";
322         ret += "\tgs_uint64_t hash, bucket;\n";
323         ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *rec;\n\n";
324
325         ret += "//  make sure watchlist file has changed since the last time we loaded it\n";
326     ret += "\tstruct stat file_stat;\n";
327     ret += "\tint err = stat(" + tgt + ".filename, &file_stat);\n";
328     ret += "\tif (err) {\n";
329     ret += "\t\tgslog(LOG_INFO,\"Warning, unable to stat() watchlist file %s to reload " + node_name + ", continue using old version\\n\", " + tgt + ".filename);\n";
330     ret += "\t\treturn;\n";
331     ret += "\t}\n";
332         ret += "\tif (file_stat.st_mtime <= " + tgt + ".last_mtime && file_stat.st_ctime <= " + tgt + ".last_mtime)     // watchlist file hasn't changed since last time\n";
333         ret += "\t\treturn;\n";
334         ret += "\t" + tgt + ".last_mtime = (file_stat.st_mtime>file_stat.st_ctime)?file_stat.st_mtime:file_stat.st_ctime;\n\n";
335
336         ret += "//  Delete old entries.\n";
337         ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n";
338         ret += "\t\tptr="+tgt+".ht[i];\n";
339         ret += "\t\twhile(ptr!=NULL){\n";
340         for(int f=0;f<fields.size();++f){
341                 data_type dt(fields[f]->get_type());
342                 if(dt.is_buffer_type()){
343                         ret += "\t\t\t"+dt.get_buffer_destroy()+"(&(ptr->"+fields[f]->get_name()+"));\n";
344                 }
345         }
346         ret += "\t\t\tnext = ptr->next;\n";
347         ret += "\t\t\tfree(ptr);\n";
348         ret += "\t\t\tptr = next;\n";
349         ret += "\t\t}\n";
350         ret += "\t}\n";
351         ret += "\n// prepare new table. \n";
352         ret += "\tif("+tgt+".n_elem > "+tgt+".ht_size || "+tgt+".ht_size==0){\n";
353         ret += "\t\tif("+tgt+".ht)\n";
354         ret += "\t\t\tfree("+tgt+".ht);\n";
355         ret += "\t\tif("+tgt+".ht_size == 0)\n";
356         ret += "\t\t\t"+tgt+".ht_size = 100000;\n";
357         ret += "\t\telse\n";    
358         ret += "\t\t\t"+tgt+".ht_size = "+tgt+".n_elem;\n";
359         ret += "\t\t"+tgt+".ht = (struct "+generate_watchlist_element_name(node_name)+" **)malloc("+tgt+".ht_size * sizeof(struct "+generate_watchlist_element_name(node_name)+" *));\n";
360         ret += "\t}\n";
361         ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n";
362         ret += "\t\t"+tgt+".ht[i] = NULL;\n";
363         ret += "\t}\n";
364         ret += "\n// load new table\n";
365         ret += "\t"+tgt+".n_elem = 0;\n";
366         ret += "\tfl = fopen("+tgt+".filename, \"r\");\n";
367         ret += "\tif(fl==NULL){\n";
368         ret += "\t\tgslog(LOG_INFO,\"Warning, can't open file %s for watchlist "+node_name+"\\n\","+tgt+".filename);\n";
369         ret += "\t\treturn;\n";
370         ret += "\t}\n";
371         ret += "\tmalformed = 0;\n";
372         ret += "\tshort_lines = 0;\n";
373         ret += "\ttoolong_lines = 0;\n";
374         ret += "\twhile(fgets(buf, buflen, fl) != NULL){\n";
375         ret += "\t\tlinelen = strlen(buf);\n";
376         ret += "\t\tbuf[linelen-1]='\\0';       // strip off trailing newline\n";
377         ret += "\t\tlinelen--;\n";
378
379         ret += "\t\tpos=0;\n";
380         ret += "\t\tmalformed=0;\n";
381         ret += "\t\tok=1;\n";
382         ret += "\t\tflds[0] = buf;\n";
383         ret += "\t\tfor(f=1;pos < linelen && f<"+std::to_string(fields.size())+";++f){\n";
384         ret += "\t\t\tfor(;pos < linelen && buf[pos]!=',' && buf[pos]!='\\n';++pos);\n";
385         ret += "\t\t\tif(pos >= linelen){\n";
386         ret += "\t\t\t\tmalformed = 1;\n";
387         ret += "\t\t\t\tbreak;\n";
388         ret += "\t\t\t}\n";
389         ret += "\t\t\tbuf[pos]='\\0';\n";
390         ret += "\t\t\tpos++;\n";
391         ret += "\t\t\tflds[f]=buf+pos;\n";
392         ret += "\t\t}\n";
393         ret += "\t\tif(malformed){\n";
394         ret += "\t\t\tok=0;\n";
395         ret += "\t\t\tn_malformed++;\n";
396         ret += "\t\t}\n";
397         ret += "\t\tif(f<"+std::to_string(fields.size())+"){\n";
398         ret += "\t\t\tok=0;\n";
399         ret += "\t\t\tshort_lines++;\n";
400         ret += "\t\t}\n";
401         ret += "\t\tif(pos && (pos<linelen)){\n";
402         ret += "\t\t\tok=0;\n";
403         ret += "\t\t\ttoolong_lines++;\n";
404         ret += "\t\t}\n";
405         ret += "\t\tif(f>="+std::to_string(fields.size())+"){\n";
406         ret += "\t\t\trec = (struct "+generate_watchlist_element_name(node_name)+" *)malloc(sizeof(struct "+generate_watchlist_element_name(node_name)+"));\n";
407 //              Extract fields
408         for(int f=0;f<fields.size();++f){
409                 data_type dt(fields[f]->get_type());
410                 ret += "\t\t\t"+dt.get_wl_extract_fcn()+"(flds["+std::to_string(f)+"], &(rec->"+fields[f]->get_name()+"));\n";
411         }
412 //              Compute the hash value
413         ret += "\t\t\thash=0;\n";
414         for(int k=0;k<keys.size();++k){
415                 string key_fld = keys[k];
416                 int f;
417                 for(f=0;f<fields.size();++f){
418                         if(fields[f]->get_name() == key_fld)
419                                 break;
420                 }
421                 data_type dt(fields[f]->get_type());
422
423                 ret +=
424 "\t\t\thash ^= (("+hash_nums[f%NRANDS]+" * lfta_"+
425                         dt.get_type_str()+"_to_hash(rec->"+fields[f]->get_name()+")));\n";
426                 }
427         ret += "\t\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n";
428         ret += "\t\t\trec->hashval = hash;\n";
429         ret += "\t\t\trec->next = "+tgt+".ht[bucket];\n";
430         ret += "\t\t\t"+tgt+".ht[bucket] = rec;\n";
431         ret += "\t\t\t"+tgt+".n_elem++;\n";
432
433         ret += "\t\t}\n";
434         ret += "\t}\n";
435         ret += "\tif(n_malformed+toolong_lines > 0){\n";
436         ret += "\t\tgslog(LOG_INFO,\"Errors reading data for watchlist "+node_name+" from file %s: malformed=%d, too short=%d, too long=%d\\n\","+tgt+".filename, malformed, short_lines, toolong_lines);\n";
437         ret += "\t}\n";
438         ret += "}\n\n";
439
440         return ret;
441 }
442
443
444
445                 
446 string generate_fta_struct(string node_name, gb_table *gb_tbl,
447                 aggregate_table *aggr_tbl, param_table *param_tbl,
448                 cplx_lit_table *complex_literals,
449                 vector<handle_param_tbl_entry *> &param_handle_table,
450                 bool is_aggr_query, bool is_fj, bool is_wj, bool uses_bloom,
451                 table_list *schema){
452
453         string ret = "struct " + generate_fta_name(node_name) + "{\n";
454         ret += "\tstruct FTA f;\n";
455
456 //-------------------------------------------------------------
457 //              Aggregate-specific fields
458
459         if(is_aggr_query){
460 /*
461                 ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_head, *flush_head;\n";
462 */
463                 ret+="\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_table; // the groups\n";
464                 ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n";
465 //              ret+="\tint bitmap_size;\n";
466                 ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n";
467                 ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n";
468                 ret += "\tint max_windows; // max number of open windows.\n";
469                 ret += "\tunsigned int generation; // initially zero, increment on\n";
470                 ret += "\t     // every hash table flush - whether regular or induced.\n";
471                 ret += "\t     // Old groups are identified by a generation mismatch.\n";
472                 ret += "\tunsigned int flush_pos; // next aggr_table entry to examine\n";
473                 ret += "\tunsigned int flush_ctr; // control slow flushing\n";
474
475
476
477                 int g;
478                 bool uses_temporal_flush = false;
479                 for(g=0;g<gb_tbl->size();g++){
480                         data_type *dt = gb_tbl->get_data_type(g);
481                         if(dt->is_temporal()){
482 /*
483                                 fprintf(stderr,"group by attribute %s is temporal, ",
484                                                 gb_tbl->get_name(g).c_str());
485                                 if(dt->is_increasing()){
486                                         fprintf(stderr,"increasing.\n");
487                                 }else{
488                                         fprintf(stderr,"decreasing.\n");
489                                 }
490 */
491                                 data_type *gdt = gb_tbl->get_data_type(g);
492                                 if(gdt->is_buffer_type()){
493                                         fprintf(stderr, "\t but temporal BUFFER types are not supported, skipping.\n");
494                                 }else{
495                                         sprintf(tmpstr,"\t%s last_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
496                                         ret += tmpstr;
497                                         sprintf(tmpstr,"\t%s flush_start_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
498                                         ret += tmpstr;
499                                         sprintf(tmpstr,"\t%s last_flushed_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
500                                         ret += tmpstr;
501                                         uses_temporal_flush = true;
502                                 }
503
504                         }
505
506                 }
507                 if(! uses_temporal_flush){
508                         fprintf(stderr,"Warning: no temporal flush.\n");
509                 }
510         }
511
512 // ---------------------------------------------------------
513 //                      Filter-join specific fields
514
515         if(is_fj){
516                 if(uses_bloom){
517                         ret +=
518 "\tunsigned char * bf_table; //array of bloom filters with layout \n"
519 "\t\t// bit 0 bf 0| bit 0 bf 1| bit 0 bf 2| bit 1 bf 0| bit 1 bf 1|.....\n"
520 "\tint first_exec;\n"
521 "\tlong long int last_bin;\n"
522 "\tint last_bloom_pos;\n"
523 "\n"
524 ;
525                 }else{          // limited hash table
526                         ret +=
527 "  struct "+generate_fj_struct_name(node_name)+" *join_table;\n"
528 "\n"
529 ;
530                 }
531
532         }
533
534 // --------------------------------------------
535 //              watchlist-join specific
536         if(is_wj){
537                 ret += "\ttime_t ux_time;\n";
538         }
539
540 //--------------------------------------------------------
541 //                      Common fields
542
543 //                      Create places to hold the parameters.
544         int p;
545         vector<string> param_vec = param_tbl->get_param_names();
546         for(p=0;p<param_vec.size();p++){
547                 data_type *dt = param_tbl->get_data_type(param_vec[p]);
548                 sprintf(tmpstr,"\t%s param_%s;\n",dt->get_cvar_type().c_str(),
549                                 param_vec[p].c_str());
550                 ret += tmpstr;
551                 if(param_tbl->handle_access(param_vec[p])){
552                         ret += "\tstruct search_handle *param_handle_"+param_vec[p]+";\n";
553                 }
554         }
555
556 //                      Create places to hold complex literals.
557         int cl;
558         for(cl=0;cl<complex_literals->size();cl++){
559                 literal_t *l = complex_literals->get_literal(cl);
560                 data_type *dtl = new data_type( l->get_type() );
561                 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
562                 ret += tmpstr;
563         }
564
565 //                      Create places to hold the pass-by-handle parameters.
566         for(p=0;p<param_handle_table.size();++p){
567                 sprintf(tmpstr,"\tgs_param_handle_t handle_param_%d;\n",p);
568                 ret += tmpstr;
569         }
570
571 //                      Create places to hold the last values of temporal
572 //                      attributes referenced in select clause
573 //                      we also need to store values of the temoral attributed
574 //                      of last flushed tuple in aggr queries
575 //                      to make sure we generate the cirrect temporal tuple
576 //                      in the presense of slow flushes
577
578
579         col_id_set temp_cids;           //      col ids of temp attributes in select clause
580
581         int s;
582         col_id_set::iterator csi;
583
584         for(s=0;s<sl_list.size();s++){
585                 data_type *sdt = sl_list[s]->get_data_type();
586                 if (sdt->is_temporal()) {
587                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
588                 }
589         }
590
591         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
592                 int tblref = (*csi).tblvar_ref;
593                 int schref = (*csi).schema_ref;
594                 string field = (*csi).field;
595                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
596                 sprintf(tmpstr,"\t%s last_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
597                 ret += tmpstr;
598         }
599
600         ret += "\tgs_uint64_t trace_id;\n\n";
601
602 //      Fields to store the runtime stats
603
604         ret += "\tgs_uint32_t in_tuple_cnt;\n";
605         ret += "\tgs_uint32_t out_tuple_cnt;\n";
606         ret += "\tgs_uint32_t out_tuple_sz;\n";
607         ret += "\tgs_uint32_t accepted_tuple_cnt;\n";
608         ret += "\tgs_uint64_t cycle_cnt;\n";
609         ret += "\tgs_uint32_t collision_cnt;\n";
610         ret += "\tgs_uint32_t eviction_cnt;\n";
611         ret += "\tgs_float_t sampling_rate;\n";
612
613
614
615         ret += "};\n\n";
616
617         return(ret);
618 }
619
620 //------------------------------------------------------------
621 //              Set colref tblvars to 0..
622 //              (special processing for join-like operators in an lfta).
623
624 void reset_se_col_ids_tblvars(scalarexp_t *se,  gb_table *gtbl){
625         vector<scalarexp_t *> operands;
626         int o;
627
628         if(! se)
629                 return;
630
631         switch(se->get_operator_type()){
632         case SE_LITERAL:
633         case SE_PARAM:
634         case SE_IFACE_PARAM:
635                 return;
636         case SE_UNARY_OP:
637                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
638                 return;
639         case SE_BINARY_OP:
640                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
641                 reset_se_col_ids_tblvars(se->get_right_se(),gtbl);
642                 return;
643         case SE_COLREF:
644                 if(! se->is_gb() ){
645                         se->get_colref()->set_tablevar_ref(0);
646                 }else{
647                         if(gtbl==NULL){
648                                 fprintf(stderr,"INTERNAL ERROR: gbvar ref in gather_se_col_ids, but gtbl is NULL.\n");
649                                 exit(1);
650                         }
651                         reset_se_col_ids_tblvars(gtbl->get_def(se->get_gb_ref()),gtbl);
652                 }
653                 return;
654         case SE_AGGR_STAR:
655                 return;
656         case SE_AGGR_SE:
657                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
658                 return;
659         case SE_FUNC:
660                 operands = se->get_operands();
661                 for(o=0;o<operands.size();o++){
662                         reset_se_col_ids_tblvars(operands[o], gtbl);
663                 }
664                 return;
665         default:
666                 fprintf(stderr,"INTERNAL ERROR in reset_se_col_ids_tblvars, line %d, character %d: unknown operator type %d\n",
667                                 se->get_lineno(), se->get_charno(),se->get_operator_type());
668                 exit(1);
669         }
670 }
671
672
673 //              reset  column tblvars accessed in this pr.
674
675 void reset_pr_col_ids_tblvars(predicate_t *pr,  gb_table *gtbl){
676         vector<scalarexp_t *> op_list;
677         int o;
678
679         switch(pr->get_operator_type()){
680         case PRED_IN:
681                 reset_se_col_ids_tblvars(pr->get_left_se(), gtbl);
682                 return;
683         case PRED_COMPARE:
684                 reset_se_col_ids_tblvars(pr->get_left_se(),gtbl) ;
685                 reset_se_col_ids_tblvars(pr->get_right_se(),gtbl) ;
686                 return;
687         case PRED_UNARY_OP:
688                 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
689                 return;
690         case PRED_BINARY_OP:
691                 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
692                 reset_pr_col_ids_tblvars(pr->get_right_pr(),gtbl) ;
693                 return;
694         case PRED_FUNC:
695                 op_list = pr->get_op_list();
696                 for(o=0;o<op_list.size();++o){
697                         reset_se_col_ids_tblvars(op_list[o],gtbl) ;
698                 }
699                 return;
700         default:
701                 fprintf(stderr,"INTERNAL ERROR in reset_pr_col_ids_tblvars, line %d, character %d, unknown predicate operator type %d\n",
702                         pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
703         }
704 }
705
706
707
708
709 //                      Generate code that makes reference
710 //                      to the tuple, and not to any aggregates.
711 static string generate_se_code(scalarexp_t *se,table_list *schema){
712         string ret;
713     data_type *ldt, *rdt;
714         int o;
715         vector<scalarexp_t *> operands;
716
717
718         switch(se->get_operator_type()){
719         case SE_LITERAL:
720                 if(se->is_handle_ref()){
721                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
722                         ret = tmpstr;
723                         return(ret);
724                 }
725                 if(se->get_literal()->is_cpx_lit()){
726                         sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
727                         ret = tmpstr;
728                         return(ret);
729                 }
730                 return(se->get_literal()->to_C_code("")); // not complex, no constructor
731         case SE_PARAM:
732                 if(se->is_handle_ref()){
733                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
734                         ret = tmpstr;
735                         return(ret);
736                 }
737                 ret += "t->param_";
738                 ret += se->get_param_name();
739                 return(ret);
740         case SE_UNARY_OP:
741         ldt = se->get_left_se()->get_data_type();
742         if(ldt->complex_operator(se->get_op()) ){
743                         ret +=  ldt->get_complex_operator(se->get_op());
744                         ret += "(";
745                         ret += generate_se_code(se->get_left_se(),schema);
746             ret += ")";
747                 }else{
748                         ret += "(";
749                         ret += se->get_op();
750                         ret += generate_se_code(se->get_left_se(),schema);
751                         ret += ")";
752                 }
753                 return(ret);
754         case SE_BINARY_OP:
755         ldt = se->get_left_se()->get_data_type();
756         rdt = se->get_right_se()->get_data_type();
757
758         if(ldt->complex_operator(rdt, se->get_op()) ){
759                         ret +=  ldt->get_complex_operator(rdt, se->get_op());
760                         ret += "(";
761                         ret += generate_se_code(se->get_left_se(),schema);
762                         ret += ", ";
763                         ret += generate_se_code(se->get_right_se(),schema);
764                         ret += ")";
765                 }else{
766                         ret += "(";
767                         ret += generate_se_code(se->get_left_se(),schema);
768                         ret += se->get_op();
769                         ret += generate_se_code(se->get_right_se(),schema);
770                         ret += ")";
771                 }
772                 return(ret);
773         case SE_COLREF:
774                 if(se->is_gb()){                // OK to ref gb attrs, but they're not yet unpacked ...
775                                                         // so return the defining code.
776                         ret = generate_se_code(gb_tbl->get_def(se->get_gb_ref()), schema );
777
778                 }else{
779                 sprintf(tmpstr,"unpack_var_%s_%d",
780                   se->get_colref()->get_field().c_str(), se->get_colref()->get_tablevar_ref() );
781                 ret = tmpstr;
782                 }
783                 return(ret);
784         case SE_FUNC:
785 //                              Should not be ref'ing any aggr here.
786                 if(se->get_aggr_ref() >= 0){
787                         fprintf(stderr,"INTERNAL ERROR, UDAF reference in generate_se_code.\n");
788                         return("ERROR in generate_se_code");
789                 }
790
791                 if(se->is_partial()){
792                         sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
793                         ret = tmpstr;
794                 }else{
795                         ret += se->op + "(";
796                         operands = se->get_operands();
797                         for(o=0;o<operands.size();o++){
798                                 if(o>0) ret += ", ";
799                                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
800                                         ret += "&";
801                                 ret += generate_se_code(operands[o], schema);
802                         }
803                         ret += ")";
804                 }
805                 return(ret);
806         default:
807                 fprintf(stderr,"INTERNAL ERROR in generate_se_code (lfta), line %d, character %d: unknown operator type %d\n",
808                                 se->get_lineno(), se->get_charno(),se->get_operator_type());
809                 return("ERROR in generate_se_code");
810         }
811 }
812
813 //              generate code that refers only to aggregate data and constants.
814 static string generate_se_code_fm_aggr(scalarexp_t *se, string var, table_list *schema){
815
816         string ret;
817     data_type *ldt, *rdt;
818         int o;
819         vector<scalarexp_t *> operands;
820
821
822         switch(se->get_operator_type()){
823         case SE_LITERAL:
824                 if(se->is_handle_ref()){
825                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
826                         ret = tmpstr;
827                         return(ret);
828                 }
829                 if(se->get_literal()->is_cpx_lit()){
830                         sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
831                         ret = tmpstr;
832                         return(ret);
833                 }
834                 return(se->get_literal()->to_C_code("")); // not complex no constructor
835         case SE_PARAM:
836                 if(se->is_handle_ref()){
837                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
838                         ret = tmpstr;
839                         return(ret);
840                 }
841                 ret += "t->param_";
842                 ret += se->get_param_name();
843                 return(ret);
844         case SE_UNARY_OP:
845         ldt = se->get_left_se()->get_data_type();
846         if(ldt->complex_operator(se->get_op()) ){
847                         ret +=  ldt->get_complex_operator(se->get_op());
848                         ret += "(";
849                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
850             ret += ")";
851                 }else{
852                         ret += "(";
853                         ret += se->get_op();
854                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
855                         ret += ")";
856                 }
857                 return(ret);
858         case SE_BINARY_OP:
859         ldt = se->get_left_se()->get_data_type();
860         rdt = se->get_right_se()->get_data_type();
861
862         if(ldt->complex_operator(rdt, se->get_op()) ){
863                         ret +=  ldt->get_complex_operator(rdt, se->get_op());
864                         ret += "(";
865                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
866                         ret += ", ";
867                         ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
868                         ret += ")";
869                 }else{
870                         ret += "(";
871                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
872                         ret += se->get_op();
873                         ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
874                         ret += ")";
875                 }
876                 return(ret);
877         case SE_COLREF:
878                 if(se->is_gb()){                // OK to ref gb attrs, but they're not yet
879                                                         // unpacked ... so return the defining code.
880                         sprintf(tmpstr,"%sgb_var%d",var.c_str(),se->get_gb_ref());
881                         ret = tmpstr;
882
883                 }else{
884                 fprintf(stderr,"ERROR reference to non-GB column ref not permitted here,"
885                                 "error in generate_se_code_fm_aggr, line %d, character %d.\n",
886                                 se->get_lineno(), se->get_charno());
887                 ret = tmpstr;
888                 }
889                 return(ret);
890         case SE_AGGR_STAR:
891         case SE_AGGR_SE:
892                 sprintf(tmpstr,"%saggr_var%d",var.c_str(),se->get_aggr_ref());
893                 ret = tmpstr;
894                 return(ret);
895         case SE_FUNC:
896 //                              Is it a UDAF?
897                 if(se->get_aggr_ref() >= 0){
898                         sprintf(tmpstr,"udaf_ret%d",se->get_aggr_ref());
899                         ret = tmpstr;
900                         return(ret);
901                 }
902
903                 if(se->is_partial()){
904                         sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
905                         ret = tmpstr;
906                 }else{
907                         ret += se->op + "(";
908                         operands = se->get_operands();
909                         for(o=0;o<operands.size();o++){
910                                 if(o>0) ret += ", ";
911                                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
912                                         ret += "&";
913                                 ret += generate_se_code_fm_aggr(operands[o], var, schema);
914                         }
915                         ret += ")";
916                 }
917                 return(ret);
918         default:
919                 fprintf(stderr,"INTERNAL ERROR in generate_lfta_code.cc::generate_se_code_fm_aggr, line %d, character %d: unknown operator type %d\n",
920                                 se->get_lineno(), se->get_charno(),se->get_operator_type());
921                 return("ERROR in generate_se_code");
922         }
923
924 }
925
926
927 static string unpack_partial_fcn_fm_aggr(scalarexp_t *se, int pfn_id, string var, table_list *schema){
928         string ret;
929         int o;
930         vector<scalarexp_t *> operands;
931
932
933         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
934                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn_fm_aggr. line %d, character %d\n",
935                                 se->get_lineno(), se->get_charno());
936                 return("ERROR in generate_se_code");
937         }
938
939         ret = "\tretval = " + se->get_op() + "( ";
940         sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
941         ret += tmpstr;
942
943         operands = se->get_operands();
944         for(o=0;o<operands.size();o++){
945                 ret += ", ";
946                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
947                         ret += "&";
948                 ret += generate_se_code_fm_aggr(operands[o], var, schema);
949         }
950         ret += ");\n";
951
952         return(ret);
953 }
954
955 static string generate_cached_fcn(scalarexp_t *se, table_list *schema){
956         string ret;
957         int o;
958         vector<scalarexp_t *> operands;
959
960         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
961                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to generate_cached_fcn. line %d, character %d\n",
962                                 se->get_lineno(), se->get_charno());
963                 return("ERROR in generate_se_code");
964         }
965
966         ret = se->get_op() + "( ";
967
968         operands = se->get_operands();
969         for(o=0;o<operands.size();o++){
970                 if(o) ret += ", ";
971                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
972                         ret += "&";
973                 ret += generate_se_code(operands[o], schema);
974         }
975         ret += ");\n";
976
977         return(ret);
978 }
979
980
981
982 static string unpack_partial_fcn(scalarexp_t *se, int pfn_id, table_list *schema){
983         string ret;
984         int o;
985         vector<scalarexp_t *> operands;
986
987
988         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
989                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn. line %d, character %d\n",
990                                 se->get_lineno(), se->get_charno());
991                 return("ERROR in generate_se_code");
992         }
993
994         ret = "\tretval = " + se->get_op() + "( ",
995         sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
996         ret += tmpstr;
997
998         operands = se->get_operands();
999         for(o=0;o<operands.size();o++){
1000                 ret += ", ";
1001                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
1002                         ret += "&";
1003                 ret += generate_se_code(operands[o], schema);
1004         }
1005         ret += ");\n";
1006
1007         return(ret);
1008 }
1009
1010
1011
1012
1013
1014 static string generate_C_comparison_op(string op){
1015   if(op == "=") return("==");
1016   if(op == "<>") return("!=");
1017   return(op);
1018 }
1019
1020 static string generate_C_boolean_op(string op){
1021         if( (op == "AND") || (op == "And") || (op == "and") ){
1022                 return("&&");
1023         }
1024         if( (op == "OR") || (op == "Or") || (op == "or") ){
1025                 return("||");
1026         }
1027         if( (op == "NOT") || (op == "Not") || (op == "not") ){
1028                 return("!");
1029         }
1030
1031         fprintf(stderr,"INTERNAL ERROR: unknown boolean operator %s\n",op.c_str());
1032         return("ERROR UNKNOWN BOOLEAN OPERATOR :"+op);
1033 }
1034
1035
1036 static string generate_predicate_code(predicate_t *pr,table_list *schema){
1037         string ret;
1038         vector<literal_t *>  litv;
1039         int i;
1040     data_type *ldt, *rdt;
1041         vector<scalarexp_t *> op_list;
1042         int o,cref,ppos;
1043         unsigned int bitmask;
1044
1045         switch(pr->get_operator_type()){
1046         case PRED_IN:
1047         ldt = pr->get_left_se()->get_data_type();
1048
1049                 ret += "( ";
1050                 litv = pr->get_lit_vec();
1051                 for(i=0;i<litv.size();i++){
1052                         if(i>0) ret += " || ";
1053                         ret += "( ";
1054
1055                 if(ldt->complex_comparison(ldt) ){
1056                                 ret +=  ldt->get_comparison_fcn(ldt) ;
1057                                 ret += "( ";
1058                                 if(ldt->is_buffer_type() ) ret += "&";
1059                                 ret += generate_se_code(pr->get_left_se(), schema);
1060                                 ret += ", ";
1061                                 if(ldt->is_buffer_type() ) ret += "&";
1062                                 if(litv[i]->is_cpx_lit()){
1063                                         sprintf(tmpstr,"t->complex_literal_%d",litv[i]->get_cpx_lit_ref() );
1064                                         ret += tmpstr;
1065                                 }else{
1066                                         ret += litv[i]->to_C_code("");
1067                                 }
1068                                 ret += ") == 0";
1069                         }else{
1070                                 ret += generate_se_code(pr->get_left_se(), schema);
1071                                 ret += " == ";
1072                                 ret += litv[i]->to_C_code("");
1073                         }
1074
1075                         ret += " )";
1076                 }
1077                 ret += " )";
1078                 return(ret);
1079
1080         case PRED_COMPARE:
1081         ldt = pr->get_left_se()->get_data_type();
1082         rdt = pr->get_right_se()->get_data_type();
1083
1084                 ret += "( ";
1085         if(ldt->complex_comparison(rdt) ){
1086                         ret += ldt->get_comparison_fcn(rdt);
1087                         ret += "(";
1088                         if(ldt->is_buffer_type() ) ret += "&";
1089                         ret += generate_se_code(pr->get_left_se(),schema);
1090                         ret += ", ";
1091                         if(rdt->is_buffer_type() ) ret += "&";
1092                         ret += generate_se_code(pr->get_right_se(),schema);
1093                         ret += ") ";
1094                         ret +=  generate_C_comparison_op(pr->get_op());
1095                         ret += "0";
1096                 }else{
1097                         ret += generate_se_code(pr->get_left_se(),schema);
1098                         ret +=  generate_C_comparison_op(pr->get_op());
1099                         ret += generate_se_code(pr->get_right_se(),schema);
1100                 }
1101                 ret += " )";
1102                 return(ret);
1103         case PRED_UNARY_OP:
1104                 ret += "( ";
1105                 ret +=  generate_C_boolean_op(pr->get_op());
1106                 ret += generate_predicate_code(pr->get_left_pr(),schema);
1107                 ret += " )";
1108                 return(ret);
1109         case PRED_BINARY_OP:
1110                 ret += "( ";
1111                 ret += generate_predicate_code(pr->get_left_pr(),schema);
1112                 ret +=  generate_C_boolean_op(pr->get_op());
1113                 ret += generate_predicate_code(pr->get_right_pr(),schema);
1114                 ret += " )";
1115                 return(ret);
1116         case PRED_FUNC:
1117                 op_list = pr->get_op_list();
1118                 cref = pr->get_combinable_ref();
1119                 if(cref >= 0){  // predicate is a combinable pred reference
1120                         //              Trust, but verify
1121                         if(pred_class.size() >= cref && pred_class[cref] >= 0){
1122                                 ppos = pred_pos[cref];
1123                                 bitmask = 1 << ppos % 32;
1124                                 sprintf(tmpstr,"(pref_common_pred_val_%d_%d & %u)",pred_class[cref],ppos/32,bitmask);
1125                                 ret = tmpstr;
1126                                 return ret;
1127                         }
1128                 }
1129
1130                 ret =  pr->get_op() + "(";
1131                 if (pr->is_sampling_fcn) {
1132                         ret += "t->sampling_rate";
1133                         if (!op_list.empty())
1134                                 ret += ", ";
1135                 }
1136                 for(o=0;o<op_list.size();++o){
1137                         if(o>0) ret += ", ";
1138                         if(op_list[o]->get_data_type()->is_buffer_type() && (! (op_list[o]->is_handle_ref()) ) )
1139                                         ret += "&";
1140                         ret += generate_se_code(op_list[o],schema);
1141                 }
1142                 ret += " )";
1143                 return(ret);
1144         default:
1145                 fprintf(stderr,"INTERNAL ERROR in generate_predicate_code, line %d, character %d, unknown predicate operator type %d\n",
1146                         pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
1147                 return("ERROR in generate_predicate_code");
1148         }
1149 }
1150
1151
1152 static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *dt){
1153         string ret;
1154
1155     if(dt->complex_comparison(dt) ){
1156                 ret += dt->get_comparison_fcn(dt);
1157                 ret += "(";
1158                         if(dt->is_buffer_type() ) ret += "&";
1159                 ret += lhs_op;
1160                 ret += ", ";
1161                         if(dt->is_buffer_type() ) ret += "&";
1162                 ret += rhs_op;
1163                 ret += ") == 0";
1164         }else{
1165                 ret += lhs_op;
1166                 ret += " == ";
1167                 ret += rhs_op;
1168         }
1169
1170         return(ret);
1171 }
1172
1173 static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
1174         string ret;
1175
1176     if(dt->complex_comparison(dt) ){
1177                 ret += dt->get_comparison_fcn(dt);
1178                 ret += "(";
1179                         if(dt->is_buffer_type() ) ret += "&";
1180                 ret += lhs_op;
1181                 ret += ", ";
1182                         if(dt->is_buffer_type() ) ret += "&";
1183                 ret += rhs_op;
1184                 ret += ") == 0";
1185         }else{
1186                 ret += lhs_op;
1187                 ret += " == ";
1188                 ret += rhs_op;
1189         }
1190
1191         return(ret);
1192 }
1193
1194 //              Here I assume that only MIN and MAX aggregates can be computed
1195 //              over BUFFER data types.
1196
1197 static string generate_aggr_update(string var, aggregate_table *atbl,int aidx, table_list *schema){
1198         string retval = "\t\t";
1199         string op = atbl->get_op(aidx);
1200
1201 //              Is it a UDAF
1202         if(! atbl->is_builtin(aidx)) {
1203                 int o;
1204                 retval += op+"_LFTA_AGGR_UPDATE_(";
1205                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1206                 retval+="("+var+")";
1207                 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1208                 for(o=0;o<opl.size();++o){
1209                         retval += ",";
1210                         if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1211                                         retval.append("&");
1212                         retval += generate_se_code(opl[o], schema);
1213                 }
1214                 retval += ");\n";
1215
1216                 return retval;
1217         }
1218
1219 //              Built-in aggregate processing.
1220
1221         data_type *dt = atbl->get_data_type(aidx);
1222
1223         if(op == "COUNT"){
1224                 retval.append(var);
1225                 retval.append("++;\n");
1226                 return(retval);
1227         }
1228         if(op == "SUM"){
1229                 retval.append(var);
1230                 retval.append(" += ");
1231                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1232                 retval.append(";\n");
1233                 return(retval);
1234         }
1235         if(op == "MIN"){
1236                 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1237                 retval.append(tmpstr);
1238                 if(dt->complex_comparison(dt)){
1239                         if(dt->is_buffer_type())
1240                           sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1241                         else
1242                           sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1243                 }else{
1244                         sprintf(tmpstr,"\t\tif(aggr_tmp_%d < %s)\n",aidx,var.c_str());
1245                 }
1246                 retval.append(tmpstr);
1247                 if(dt->is_buffer_type()){
1248                         sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1249                 }else{
1250                         sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1251                 }
1252                 retval.append(tmpstr);
1253
1254                 return(retval);
1255         }
1256         if(op == "MAX"){
1257                 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1258                 retval.append(tmpstr);
1259                 if(dt->complex_comparison(dt)){
1260                         if(dt->is_buffer_type())
1261                          sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1262                         else
1263                          sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1264                 }else{
1265                         sprintf(tmpstr,"\t\tif(aggr_tmp_%d > %s)\n",aidx,var.c_str());
1266                 }
1267                 retval.append(tmpstr);
1268                 if(dt->is_buffer_type()){
1269                         sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1270                 }else{
1271                         sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1272                 }
1273                 retval.append(tmpstr);
1274
1275                 return(retval);
1276
1277         }
1278         if(op == "AND_AGGR"){
1279                 retval.append(var);
1280                 retval.append(" &= ");
1281                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1282                 retval.append(";\n");
1283                 return(retval);
1284         }
1285         if(op == "OR_AGGR"){
1286                 retval.append(var);
1287                 retval.append(" |= ");
1288                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1289                 retval.append(";\n");
1290                 return(retval);
1291         }
1292         if(op == "XOR_AGGR"){
1293                 retval.append(var);
1294                 retval.append(" ^= ");
1295                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1296                 retval.append(";\n");
1297                 return(retval);
1298         }
1299         fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_update.\n",op.c_str());
1300         return("ERROR: aggregate not recognized: "+op);
1301
1302 }
1303
1304
1305
1306 static string generate_aggr_init(string var, aggregate_table *atbl,int aidx, table_list *schema){
1307         string retval;
1308         string op = atbl->get_op(aidx);
1309
1310 //              Is it a UDAF
1311         if(! atbl->is_builtin(aidx)) {
1312                 int o;
1313                 retval += "\t\t"+op+"_LFTA_AGGR_INIT_(";
1314                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1315                 retval+="("+var+"));\n";
1316 //                      Add 1st tupl
1317                 retval += "\t"+atbl->get_op(aidx)+"_LFTA_AGGR_UPDATE_(";
1318                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1319                 retval+="("+var+")";
1320                 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1321                 for(o=0;o<opl.size();++o){
1322                         retval += ",";
1323                         if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1324                                         retval.append("&");
1325                         retval += generate_se_code(opl[o],schema);
1326                 }
1327                 retval += ");\n";
1328                 return(retval);
1329         }
1330
1331 //              Built-in aggregate processing.
1332
1333
1334         data_type *dt = atbl->get_data_type(aidx);
1335
1336         if(op == "COUNT"){
1337                 retval = "\t\t"+var;
1338                 retval.append(" = 1;\n");
1339                 return(retval);
1340         }
1341
1342         if(op == "SUM" || op == "MIN" || op == "MAX" || op == "AND_AGGR" ||
1343                                                                         op == "OR_AGGR" || op == "XOR_AGGR"){
1344                 if(dt->is_buffer_type()){
1345                         sprintf(tmpstr,"\t\taggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1346                         retval.append(tmpstr);
1347                         sprintf(tmpstr,"\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_assign_copy().c_str(),var.c_str(),aidx);
1348                         retval.append(tmpstr);
1349                 }else{
1350                         retval = "\t\t"+var;
1351                         retval += " = ";
1352                         retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema));
1353                         retval.append(";\n");
1354                 }
1355                 return(retval);
1356         }
1357
1358         fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_init.\n",op.c_str());
1359         return("ERROR: aggregate not recognized: "+op);
1360 }
1361
1362
1363 ////////////////////////////////////////////////////////////
1364
1365
1366 string generate_preamble(table_list *schema, //map<string,string> &int_fcn_defs,
1367         std::string &node_name, std::string &schema_embed_str){
1368 //                      Include these only once, not once per lfta
1369 //      string ret = "#include \"rts.h\"\n";
1370 //      ret +=  "#include \"fta.h\"\n\n");
1371
1372         string ret = "#ifndef LFTA_IN_NIC\n";
1373         ret += "char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n";
1374         ret += "#include<stdio.h>\n";
1375         ret += "#include <limits.h>\n";
1376         ret += "#include <float.h>\n";
1377         ret += "#include <sys/stat.h>\n";
1378         ret += "#include \"rdtsc.h\"\n";
1379         ret += "#endif\n";
1380
1381
1382
1383         return(ret);
1384 }
1385
1386
1387 string generate_tuple_from_aggr(string node_name, table_list *schema, string idx){
1388         int a,p,s;
1389 //                       need to create and output the tuple.
1390         string ret = "/*\t\tCreate an output tuple for the aggregate being kicked out \t*/\n";
1391 //                      Check for any UDAFs with LFTA_BAILOUT
1392         ret += "\tlfta_bailout = 0;\n";
1393         for(a=0;a<aggr_tbl->size();a++){
1394                 if(aggr_tbl->has_bailout(a)){
1395                         ret += "\tlfta_bailout+="+aggr_tbl->get_op(a)+"_LFTA_AGGR_BAILOUT_(";
1396                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1397                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1398                 }
1399         }
1400         ret += "\tif(! lfta_bailout){\n";
1401
1402 //                      First, compute the size of the tuple.
1403
1404 //                      Unpack UDAF return values
1405         for(a=0;a<aggr_tbl->size();a++){
1406                 if(! aggr_tbl->is_builtin(a)){
1407                         ret += "\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_OUTPUT_(&(udaf_ret"+int_to_string(a)+"),";
1408                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1409                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1410
1411                 }
1412         }
1413
1414
1415 //                      Unpack partial fcns ref'd by the select clause.
1416   if(sl_fcns_start != sl_fcns_end){
1417             ret += "\t\tunpack_failed = 0;\n";
1418     for(p=sl_fcns_start;p<sl_fcns_end;p++){
1419           if(is_partial_fcn[p]){
1420                 ret += "\t" + unpack_partial_fcn_fm_aggr(partial_fcns[p], p,
1421                          "t->aggr_table["+idx+"].",schema);
1422                 ret += "\t\tif(retval) unpack_failed = 1;\n";
1423           }
1424     }
1425                                                                 // BEGIN don't allocate tuple if
1426         ret += "\t\tif( unpack_failed == 0 ){\n"; // unpack failed.
1427   }
1428
1429 //                      Unpack any BUFFER type selections into temporaries
1430 //                      so that I can compute their size and not have
1431 //                      to recompute their value during tuple packing.
1432 //                      I can use regular assignment here because
1433 //                      these temporaries are non-persistent.
1434
1435           for(s=0;s<sl_list.size();s++){
1436                 data_type *sdt = sl_list[s]->get_data_type();
1437                 if(sdt->is_buffer_type()){
1438                         sprintf(tmpstr,"\t\t\tselvar_%d = ",s);
1439                         ret += tmpstr;
1440                         ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1441                         ret += ";\n";
1442                 }
1443           }
1444
1445
1446 //              The size of the tuple is the size of the tuple struct plus the
1447 //              size of the buffers to be copied in.
1448
1449           ret += "\t\t\ttuple_size = sizeof( struct ";
1450           ret +=  generate_tuple_name(node_name);
1451           ret += ")";
1452           for(s=0;s<sl_list.size();s++){
1453                 data_type *sdt = sl_list[s]->get_data_type();
1454                 if(sdt->is_buffer_type()){
1455                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
1456                         ret += tmpstr;
1457                 }
1458           }
1459           ret += ";\n";
1460
1461
1462           ret += "\t\t\ttuple = allocate_tuple(f, tuple_size );\n";
1463           ret += "\t\t\tif( tuple != NULL){\n";
1464
1465
1466 //                      Test passed, make assignments to the tuple.
1467
1468           ret += "\t\t\t\ttuple_pos = sizeof( struct ";
1469           ret +=  generate_tuple_name(node_name) ;
1470           ret += ");\n";
1471
1472 //                      Mark tuple as REGULAR_TUPLE
1473           ret += "\n\t\t\t\ttuple->tuple_type = REGULAR_TUPLE;\n";
1474
1475           for(s=0;s<sl_list.size();s++){
1476                 data_type *sdt = sl_list[s]->get_data_type();
1477                 if(sdt->is_buffer_type()){
1478                         sprintf(tmpstr,"\t\t\t\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
1479                         ret += tmpstr;
1480                         sprintf(tmpstr,"\t\t\t\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
1481                         ret += tmpstr;
1482                 }else{
1483                         sprintf(tmpstr,"\t\t\t\ttuple->tuple_var%d = ",s);
1484                         ret += tmpstr;
1485 //                      if(sdt->needs_hn_translation())
1486 //                              ret += sdt->hton_translation() +"( ";
1487                         ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1488 //                      if(sdt->needs_hn_translation())
1489 //                              ret += ") ";
1490                         ret += ";\n";
1491                 }
1492           }
1493
1494 //              Generate output.
1495           ret += "\t\t\t\tpost_tuple(tuple);\n";
1496           ret += "\t\t\t\t#ifdef LFTA_STATS\n";
1497           ret+="\t\t\t\tt->out_tuple_cnt++;\n";
1498           ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
1499           ret += "\t\t\t\t#endif\n\n";
1500           ret += "\t\t\t}\n";
1501
1502           if(sl_fcns_start != sl_fcns_end)      // END don't allocate tuple if
1503                 ret += "\t\t}\n";                               // unpack failed.
1504           ret += "\t}\n";
1505
1506 //                      Need to release memory held by BUFFER types.
1507                 int g;
1508
1509           for(g=0;g<gb_tbl->size();g++){
1510                 data_type *gdt = gb_tbl->get_data_type(g);
1511                 if(gdt->is_buffer_type()){
1512                         sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].gb_var%d));\n",gdt->get_buffer_destroy().c_str(),idx.c_str(),g);
1513                         ret += tmpstr;
1514                 }
1515           }
1516           for(a=0;a<aggr_tbl->size();a++){
1517                 if(aggr_tbl->is_builtin(a)){
1518                         data_type *adt = aggr_tbl->get_data_type(a);
1519                         if(adt->is_buffer_type()){
1520                                 sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].aggr_var%d));\n",adt->get_buffer_destroy().c_str(),idx.c_str(),a);
1521                                 ret += tmpstr;
1522                         }
1523                 }else{
1524                         ret += "\t\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_DESTROY_(";
1525                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1526                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1527                 }
1528           }
1529
1530         ret += "\t\tt->n_aggrs--;\n";
1531
1532         return(ret);
1533
1534 }
1535
1536 string generate_gb_match_test(string idx){
1537         int g;
1538         string ret="\tif (IS_FILLED(t->aggr_table_bitmap, "+idx+") &&  IS_NEW(t->aggr_table_bitmap,"+idx+")";
1539         if(gb_tbl->size()>0){
1540                 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
1541                 ret+="\t\t";
1542
1543 //                      Next, scan list for a match on the group-by attributes.
1544           string rhs_op, lhs_op;
1545           for(g=0;g<gb_tbl->size();g++){
1546                   ret += " && ";
1547                   ret += "(";
1548                   sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
1549                   sprintf(tmpstr,"t->aggr_table[%s].gb_var%d",idx.c_str(),g); rhs_op = tmpstr;
1550                   ret += generate_equality_test(lhs_op, rhs_op, gb_tbl->get_data_type(g) );
1551                   ret += ")";
1552           }
1553          }
1554
1555           ret += "){\n";
1556
1557         return ret;
1558 }
1559
1560 string generate_gb_update(string node_name, table_list *schema, string idx, bool has_udaf){
1561         int g;
1562         string ret;
1563
1564           ret += "/*\t\tMatch found : update in place.\t*/\n";
1565           int a;
1566           has_udaf = false;
1567           for(a=0;a<aggr_tbl->size();a++){
1568                   sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1569                   ret += generate_aggr_update(tmpstr,aggr_tbl,a, schema);
1570                   if(! aggr_tbl->is_builtin(a)) has_udaf = true;
1571           }
1572
1573 //                      garbage collect copied buffer type gb attrs.
1574   for(g=0;g<gb_tbl->size();g++){
1575           data_type *gdt = gb_tbl->get_data_type(g);
1576           if(gdt->is_buffer_type()){
1577                 sprintf(tmpstr,"\t\t\t%s(&(gb_attr_%d));\n",gdt->get_buffer_destroy().c_str(),g);
1578                 ret+=tmpstr;
1579           }
1580         }
1581
1582
1583
1584           bool first_udaf = true;
1585           if(has_udaf){
1586                 ret += "\t\tif(";
1587                 for(a=0;a<aggr_tbl->size();a++){
1588                         if(! aggr_tbl->is_builtin(a)){
1589                                 if(! first_udaf)ret += " || ";
1590                                 else first_udaf = false;
1591                                 ret += aggr_tbl->get_op(a)+"_LFTA_AGGR_FLUSHME_(";
1592                                 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1593                                 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"))";
1594                         }
1595                 }
1596                 ret+="){\n";
1597                 ret+=" fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
1598                 ret += generate_tuple_from_aggr(node_name,schema,idx);
1599                 ret += "\t\tt->aggr_table_hashmap["+idx+"] &= ~SLOT_FILLED;\n";
1600                 ret+="\t\t}\n";
1601         }
1602         return ret;
1603 }
1604
1605
1606 string generate_init_group( table_list *schema, string idx){
1607           int g,a;
1608           string ret="\t\t\tt->aggr_table_hashmap["+idx+"] = hash2 | SLOT_FILLED | gen_val;\n";
1609 //                      Fill up the aggregate block.
1610           for(g=0;g<gb_tbl->size();g++){
1611                   sprintf(tmpstr,"\t\t\tt->aggr_table[%s].gb_var%d = gb_attr_%d;\n",idx.c_str(),g,g);
1612                   ret += tmpstr;
1613           }
1614           for(a=0;a<aggr_tbl->size();a++){
1615                   sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1616                   ret += generate_aggr_init(tmpstr, aggr_tbl,a,  schema);
1617           }
1618           ret+="\t\tt->n_aggrs++;\n";
1619         return ret;
1620 }
1621
1622
1623 string generate_fta_flush(string node_name, table_list *schema,
1624                 ext_fcn_list *Ext_fcns){
1625
1626    string ret;
1627    string select_var_defs ;
1628         int s, p;
1629
1630 //              Flush from previous epoch
1631
1632         ret+="static void fta_aggr_flush_old_"+node_name+"(struct FTA *f, unsigned int nflush){\n";
1633
1634         ret += "\tgs_int32_t tuple_size, tuple_pos;\n";
1635     ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1636         ret += "\tint i, lfta_bailout;\n";
1637         ret += "\tunsigned int gen_val;\n";
1638
1639         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1640         ret += generate_fta_name(node_name)+" *) f;\n";
1641
1642         ret += "\n";
1643
1644
1645 //              Variables needed to store selected attributes of BUFFER type
1646 //              temporarily, in order to compute their size for storage
1647 //              in an output tuple.
1648
1649   select_var_defs = "";
1650   for(s=0;s<sl_list.size();s++){
1651         data_type *sdt = sl_list[s]->get_data_type();
1652         if(sdt->is_buffer_type()){
1653           sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
1654           select_var_defs.append(tmpstr);
1655         }
1656   }
1657   if(select_var_defs != ""){
1658         ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
1659     ret += select_var_defs;
1660   }
1661
1662
1663 //              Variables to store results of partial functions.
1664   if(sl_fcns_start != sl_fcns_end){
1665         ret += "/*\t\tVariables to store the results of partial functions.\t*/\n";
1666         for(p=sl_fcns_start;p<sl_fcns_end;p++){
1667                 sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
1668                         partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
1669                 ret += tmpstr;
1670         }
1671         ret += "\tgs_retval_t retval = 0;\n\tint unpack_failed = 0;\n;";
1672   }
1673
1674 //              Variables for udaf output temporaries
1675         bool no_udaf = true;
1676         int a;
1677         for(a=0;a<aggr_tbl->size();a++){
1678                 if(! aggr_tbl->is_builtin(a)){
1679                         if(no_udaf){
1680                                 ret+="/*\t\tUDAF output vars.\t*/\n";
1681                                 no_udaf = false;
1682                         }
1683                         int afcn_id = aggr_tbl->get_fcn_id(a);
1684                         data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
1685                         sprintf(tmpstr,"udaf_ret%d", a);
1686                         ret+="\t"+adt->make_cvar(tmpstr)+";\n";
1687                 }
1688         }
1689
1690
1691 //  ret+="\tt->flush_finished=1; /* flush will be completed */\n";
1692   ret+="\n";
1693   ret+="\tgen_val = t->generation & SLOT_GEN_BITS;\n";
1694   ret+="\tfor (i=t->flush_pos; (i < t->max_aggrs) && t->n_aggrs && nflush>0; ++i){\n";
1695   ret+="\t\tif ( (t->aggr_table_hashmap[i] & SLOT_FILLED) && (((t->aggr_table_hashmap[i] & SLOT_GEN_BITS) != gen_val ) || (";
1696                 bool first_g=true;
1697                 int g;
1698                 for(g=0;g<gb_tbl->size();g++){
1699                   data_type *gdt = gb_tbl->get_data_type(g);
1700                   if(gdt->is_temporal()){
1701                         if(first_g) first_g=false; else ret+=" || ";
1702                         ret += "t->last_gb_"+int_to_string(g)+" > t->aggr_table[i].gb_var"+int_to_string(g)+" ";
1703                   }
1704                 }
1705   ret += "))) {\n";
1706   ret+="\t\t\tt->aggr_table_hashmap[i] = 0;\n";
1707   ret+=
1708 "#ifdef LFTA_STATS\n"
1709 "\t\t\tt->eviction_cnt++;\n"
1710 "#endif\n"
1711 ;
1712
1713
1714   ret+=generate_tuple_from_aggr(node_name,schema,"i");
1715
1716 //  ret+="\t\t\tt->n_aggrs--;\n";  // done in generate_tuple_from_aggr
1717   ret+="\t\t\tnflush--;\n";
1718   ret+="\t\t}\n";
1719   ret+="\t}\n";
1720   ret+="\tt->flush_pos=i;\n";
1721   ret+="\tif(t->n_aggrs == 0) {\n";
1722   ret+="\t\tt->flush_pos = t->max_aggrs;\n";
1723   ret += "\t}\n\n";
1724
1725   ret+="\tif(t->flush_pos == t->max_aggrs) {\n";
1726
1727   for(int g=0;g<gb_tbl->size();g++){
1728         data_type *dt = gb_tbl->get_data_type(g);
1729         if(dt->is_temporal()){
1730                 data_type *gdt = gb_tbl->get_data_type(g);
1731                 if(!gdt->is_buffer_type()){
1732                         sprintf(tmpstr,"\t\tt->last_flushed_gb_%d = t->flush_start_gb_%d;\n",g,g);
1733                         ret += tmpstr;
1734                 }
1735         }
1736   }
1737   ret += "\t}\n}\n\n";
1738
1739   return(ret);
1740 }
1741
1742 // TODO Remove sprintf to perform string catenation
1743 string generate_fta_load_params(string node_name){
1744         int p;
1745         vector<string> param_names = param_tbl->get_param_names();
1746
1747         string ret = "static int load_params_"+node_name+"(struct "+generate_fta_name(node_name);
1748                 ret += " *t, int sz, void *value, int initial_call){\n";
1749     ret += "\tint pos=0;\n";
1750     ret += "\tint data_pos;\n";
1751
1752         for(p=0;p<param_names.size();p++){
1753                 data_type *dt = param_tbl->get_data_type(param_names[p]);
1754                 if(dt->is_buffer_type()){
1755                         sprintf(tmpstr,"\t%s tmp_var_%s;\n",dt->get_cvar_type().c_str(), param_names[p].c_str() );
1756                         ret += tmpstr;
1757                         sprintf(tmpstr,"\t%s access_var_%s;\n",dt->get_tuple_cvar_type().c_str(), param_names[p].c_str() );
1758                         ret += tmpstr;
1759                 }
1760         }
1761
1762
1763
1764         ret += "\n\tdata_pos = ";
1765         for(p=0;p<param_names.size();p++){
1766                 if(p>0) ret += " + ";
1767                 data_type *dt = param_tbl->get_data_type(param_names[p]);
1768                 ret += "sizeof( ";
1769                 ret +=  dt->get_tuple_cvar_type();
1770                 ret += " )";
1771         }
1772         ret += ";\n";
1773         ret += "\tif(data_pos > sz) return 1;\n\n";
1774
1775
1776         for(p=0;p<param_names.size();p++){
1777                 data_type *dt = param_tbl->get_data_type(param_names[p]);
1778                 if(dt->is_buffer_type()){
1779                         sprintf(tmpstr,"\taccess_var_%s =  *( (%s *)((char *)value+pos) );\n",param_names[p].c_str(), dt->get_tuple_cvar_type().c_str() );
1780                         ret += tmpstr;
1781                         switch( dt->get_type() ){
1782                         case v_str_t:
1783 //                              ret += "\ttmp_var_"+param_names[p]+".data = ntohl( tmp_var_"+param_names[p]+".data );\n";               // ntoh conversion
1784 //                              ret += "\ttmp_var_"+param_names[p]+".length = ntohl( tmp_var_"+param_names[p]+".length );\n";   // ntoh conversion
1785                                 sprintf(tmpstr,"\tif( (access_var_%s.offset) + access_var_%s.length > sz) return 1;\n",param_names[p].c_str(), param_names[p].c_str() );
1786                                 ret += tmpstr;
1787                                 sprintf(tmpstr,"\ttmp_var_%s.data = (gs_sp_t)(value) + access_var_%s.offset ;\n",param_names[p].c_str(), param_names[p].c_str() );
1788                                 ret += tmpstr;
1789                                 sprintf(tmpstr,"\ttmp_var_%s.length =  access_var_%s.length ;\n",param_names[p].c_str(), param_names[p].c_str() );
1790                                 ret += tmpstr;
1791                         break;
1792                         default:
1793                                 fprintf(stderr,"ERROR: parameter %s is of type %s, a buffered type, but I don't know how to unpack it as a parameter.\n",param_names[p].c_str(), dt->get_type_str().c_str() );
1794                                 exit(1);
1795                         break;
1796                         }
1797 //                                      First, destroy the old
1798                         ret += "\tif(! initial_call)\n";
1799                         sprintf(tmpstr,"\t\t%s(&(t->param_%s));\n",dt->get_buffer_destroy().c_str(),param_names[p].c_str());
1800                         ret += tmpstr;
1801 //                                      Next, create the new.
1802                         sprintf(tmpstr,"\t%s((struct FTA *)t, &(t->param_%s), &tmp_var_%s);\n", dt->get_buffer_assign_copy().c_str(), param_names[p].c_str(), param_names[p].c_str() );
1803                         ret += tmpstr;
1804                 }else{
1805 //                      if(dt->needs_hn_translation()){
1806 //                              sprintf(tmpstr,"\tt->param_%s =  %s( *( (%s *)( (char *)value+pos) ) );\n",
1807 //                                param_names[p].c_str(), dt->ntoh_translation().c_str(), dt->get_cvar_type().c_str() );
1808 //                      }else{
1809                                 sprintf(tmpstr,"\tt->param_%s =  *( (%s *)( (char *)value+pos) );\n",
1810                                   param_names[p].c_str(), dt->get_cvar_type().c_str() );
1811 //                      }
1812                         ret += tmpstr;
1813                 }
1814                 sprintf(tmpstr,"\tpos += sizeof( %s );\n",dt->get_cvar_type().c_str() );
1815                 ret += tmpstr;
1816         }
1817
1818 //                      Register the pass-by-handle parameters
1819
1820         ret += "/* register and de-register the pass-by-handle parameters */\n";
1821
1822     int ph;
1823     for(ph=0;ph<param_handle_table.size();++ph){
1824                 data_type pdt(param_handle_table[ph]->type_name);
1825                 switch(param_handle_table[ph]->val_type){
1826                 case cplx_lit_e:
1827                         break;
1828                 case litval_e:
1829                         break;
1830                 case param_e:
1831                         ret += "\tif(! initial_call)\n";
1832                         sprintf(tmpstr, "\t\t%s(t->handle_param_%d);\n",
1833                                 param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1834                         ret += tmpstr;
1835                         sprintf(tmpstr,"\tt->handle_param_%d = %s((struct FTA *)t,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
1836                         ret += tmpstr;
1837
1838                         if(pdt.is_buffer_type()) ret += "&(";
1839                         ret += "t->param_"+param_handle_table[ph]->param_name;
1840                         if(pdt.is_buffer_type()) ret += ")";
1841                         ret += ");\n";
1842                         break;
1843                 default:
1844                         sprintf(tmpstr, "INTERNAL ERROR unknown case (%d) found when processing pass-by-handle parameter table.",param_handle_table[ph]->val_type);
1845                         fprintf(stderr,"%s\n",tmpstr);
1846                         ret+=tmpstr;
1847                 }
1848         }
1849
1850         ret+="\treturn 0;\n";
1851         ret += "}\n\n";
1852
1853         return(ret);
1854 }
1855
1856
1857
1858
1859 string generate_fta_free(string node_name, bool is_aggr_query){
1860
1861         string ret="static gs_retval_t free_fta_"+node_name+"(struct FTA *f, gs_uint32_t recursive){\n";
1862         ret+= "\tstruct "+generate_fta_name(node_name)+
1863                 " * t = (struct "+generate_fta_name(node_name)+" *) f;\n";
1864         ret += "\tint i;\n";
1865
1866         if(is_aggr_query){
1867                 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1868                 ret+="\t/* \t\tmark all groups as old */\n";
1869                 ret+="\tt->generation++;\n";
1870                 ret+="\tt->flush_pos = 0;\n";
1871                 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1872         }
1873
1874 //                      Deregister the pass-by-handle parameters
1875         ret += "/* de-register the pass-by-handle parameters */\n";
1876     int ph;
1877     for(ph=0;ph<param_handle_table.size();++ph){
1878                 sprintf(tmpstr, "\t%s(t->handle_param_%d);\n",
1879                         param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1880                 ret += tmpstr;
1881         }
1882
1883
1884         ret += "\treturn 0;\n}\n\n";
1885         return(ret);
1886 }
1887
1888
1889 string generate_fta_control(string node_name, table_list *schema, bool is_aggr_query){
1890         string ret="static gs_retval_t control_fta_"+node_name+"(struct FTA *f,  gs_int32_t command, gs_int32_t sz, void *value){\n";
1891         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1892                 ret += generate_fta_name(node_name)+" *) f;\n\n";
1893         ret+="\tint i;\n";
1894
1895
1896         ret += "\t/* temp status tuple */\n";
1897         ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1898         ret += "\tgs_int32_t tuple_size;\n";
1899
1900
1901         if(is_aggr_query){
1902                 ret+="\tif(command == FTA_COMMAND_FLUSH){\n";
1903
1904                 ret+="\t\tif (!t->n_aggrs) {\n";
1905                 ret+="\t\t\ttuple = allocate_tuple(f, 0);\n";
1906                 ret+="\t\t\tif( tuple != NULL)\n";
1907                 ret+="\t\t\t\tpost_tuple(tuple);\n";
1908
1909                 ret+="\t\t}else{\n";
1910
1911                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1912                 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1913                 ret +="\t\tt->generation++;\n";
1914                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1915                 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1916                 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1917                 ret+="\t\t\tt->flush_pos = 0;\n";
1918                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1919                 ret+="\t\t}\n";
1920
1921                 ret+="\t}\n";
1922         }
1923         if(param_tbl->size() > 0){
1924                 ret+=
1925 "\tif(command == FTA_COMMAND_LOAD_PARAMS){\n"
1926 "\t\tif(load_params_"+node_name+"(t, sz, value, 0))\n"
1927 "#ifndef LFTA_IN_NIC\n"
1928 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small, ignored.\\n\");\n"
1929 "#else\n"
1930 "\t\t{}\n"
1931 "#endif\n"
1932 "\t}\n";
1933         }
1934         ret+=
1935 "\tif(command == FTA_COMMAND_SET_SAMPLING_RATE){\n"
1936 "\t\tmemcpy(&t->sampling_rate, value, sizeof(gs_float_t));\n"
1937 "\t}\n\n";
1938
1939
1940         ret += "\tif(command == FTA_COMMAND_FILE_DONE ){\n";
1941
1942         if(is_aggr_query){
1943                 ret+="\t\tif (t->n_aggrs) {\n";
1944                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1945                 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1946                 ret +="\t\tt->generation++;\n";
1947                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1948                 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1949                 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1950                 ret+="\t\t\tt->flush_pos = 0;\n";
1951                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1952                 ret+="\t\t}\n";
1953         }
1954
1955         ret += "\t\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+");\n";
1956         ret += "\t\ttuple = allocate_tuple(f, tuple_size );\n";
1957         ret += "\t\tif( tuple == NULL)\n\t\treturn 1;\n";
1958
1959         /* mark tuple as EOF_TUPLE */
1960         ret += "\n\t\t/* Mark tuple as eof_tuple */\n";
1961         ret += "\t\ttuple->tuple_type = EOF_TUPLE;\n";
1962         ret += "\t\tpost_tuple(tuple);\n";
1963         ret += "\t}\n";
1964
1965         ret += "\treturn 0;\n}\n\n";
1966
1967         return(ret);
1968 }
1969
1970 string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query, bool advance_uxtime){
1971         string ret="static gs_retval_t clock_fta_"+node_name+"(struct FTA *f){\n";
1972         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1973                 ret += generate_fta_name(node_name)+" *) f;\n\n";
1974
1975         ret += "\t/* Create a temp status tuple */\n";
1976         ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1977         ret += "\tgs_int32_t tuple_size;\n";
1978         ret += "\tunsigned int i;\n";
1979         ret += "\ttime_t cur_time;\n";
1980         ret += "\tint time_advanced;\n";
1981         ret += "\tstruct fta_stat stats;\n";
1982
1983
1984
1985         /* copy the last seen values of temporal attributes */
1986         col_id_set temp_cids;           //      col ids of temp attributes in select clause
1987
1988
1989         /* HACK: in order to reuse the SE generation code, we need to copy
1990          * the last values of the temp attributes into new variables
1991          * which have names unpack_var_XXX_XXX
1992          */
1993
1994         int s, g;
1995         col_id_set::iterator csi;
1996
1997         for(s=0;s<sl_list.size();s++){
1998                 data_type *sdt = sl_list[s]->get_data_type();
1999                 if (sdt->is_temporal()) {
2000                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
2001                 }
2002         }
2003
2004         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2005                 int tblref = (*csi).tblvar_ref;
2006                 int schref = (*csi).schema_ref;
2007                 string field = (*csi).field;
2008                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2009                 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
2010                 ret += tmpstr;
2011         }
2012
2013         if (is_aggr_query) {
2014                 for(g=0;g<gb_tbl->size();g++){
2015                         data_type *gdt = gb_tbl->get_data_type(g);
2016                         if(gdt->is_temporal()){
2017                                 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
2018                                 ret += tmpstr;
2019                                 data_type *gdt = gb_tbl->get_data_type(g);
2020                                 if(gdt->is_buffer_type()){
2021                                 sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
2022                                 ret += tmpstr;
2023                                 }
2024                         }
2025                 }
2026         }
2027         ret += "\n";
2028
2029         ret += "\ttime_advanced = 0;\n";
2030
2031         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2032                 int tblref = (*csi).tblvar_ref;
2033                 int schref = (*csi).schema_ref;
2034                 string field = (*csi).field;
2035                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2036
2037                 // update last seen value with the value seen
2038                 ret += "\t#ifdef PREFILTER_DEFINED\n";
2039                 sprintf(tmpstr,"\tif (prefilter_temp_vars.unpack_var_%s_%d > t->last_%s_%d) {\n\t\tt->last_%s_%d = prefilter_temp_vars.unpack_var_%s_%d;\n",
2040                         field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref);
2041                 ret += tmpstr;
2042                 ret += "\t\ttime_advanced = 1;\n\t}\n";
2043                 ret += "\t#endif\n";
2044
2045                 // we need to pay special attention to time fields
2046                 if (field == "time" || field == "timestamp" || field == "timestamp_ms"){
2047                         ret += "\tcur_time = time(&cur_time);\n";
2048
2049                         if (field == "time") {
2050                                 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && (t->last_time_%d < (cur_time - %d))) {\n",
2051                                         tblref, time_corr);
2052                                 ret += tmpstr;
2053                                 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = cur_time - %d;\n",
2054                                 field.c_str(), tblref, field.c_str(), tblref, time_corr);
2055                         } else if (field == "timestamp_ms") {
2056                                 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((t->last_timestamp_ms_%d/1000) < (cur_time - %d))) {\n",
2057                                         tblref, time_corr);
2058                                 ret += tmpstr;
2059                                 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = (cur_time - %d) * 1000;\n",
2060                                 field.c_str(), tblref, field.c_str(), tblref, time_corr);
2061                         }else{
2062                                 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((gs_uint32_t)(t->last_%s_%d>>32) < (cur_time - %d))) {\n",
2063                                         field.c_str(), tblref, time_corr);
2064                                 ret += tmpstr;
2065                                 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = ((gs_uint64_t)(cur_time - %d))<<32;\n",
2066                                         field.c_str(), tblref, field.c_str(), tblref, time_corr);
2067                         }
2068                         ret += tmpstr;
2069
2070                         ret += "\t\ttime_advanced = 1;\n";
2071                         ret += "\t}\n";
2072
2073                         sprintf(tmpstr,"\telse\n\t\tunpack_var_%s_%d = t->last_%s_%d;\n",
2074                         field.c_str(), tblref, field.c_str(), tblref);
2075                         ret += tmpstr;
2076                 } else {
2077                         sprintf(tmpstr,"\tunpack_var_%s_%d = t->last_%s_%d;\n",
2078                                 field.c_str(), tblref, field.c_str(), tblref);
2079                         ret += tmpstr;
2080                 }
2081         }
2082
2083
2084         if(advance_uxtime){
2085                 ret += "\tt->ux_time = time(&(t->ux_time));\n";
2086         }
2087
2088         // for aggregation lftas we need to check if the time was advanced beyond the current epoch
2089         if (is_aggr_query) {
2090
2091                 string change_test;
2092                 bool first_one = true;
2093                 for(g=0;g<gb_tbl->size();g++){
2094                   data_type *gdt = gb_tbl->get_data_type(g);
2095                   if(gdt->is_temporal()){
2096 //                                      To perform the test, first need to compute the value
2097 //                                      of the temporal gb attrs.
2098                           if(gdt->is_buffer_type()){
2099         //                              NOTE : if the SE defining the gb is anything
2100         //                              other than a ref to a variable, this will generate
2101         //                              illegal code.  To be resolved with Spatch.
2102                                 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2103                                         g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2104                                 ret+=tmpstr;
2105                                 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2106                                         gdt->get_buffer_assign_copy().c_str(), g, g);
2107                           }else{
2108                                 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2109                           }
2110                           ret += tmpstr;
2111
2112                           sprintf(tmpstr,"t->last_gb_%d",g);   string lhs_op = tmpstr;
2113                           sprintf(tmpstr,"gb_attr_%d",g);   string rhs_op = tmpstr;
2114                           if(first_one){first_one = false;} else {change_test.append(") && (");}
2115                           change_test.append(generate_equality_test(lhs_op, rhs_op, gdt));
2116                   }
2117                 }
2118
2119                 ret += "\n\tif( time_advanced && !( (";
2120                 ret += change_test;
2121                 ret += ") ) ){\n";
2122
2123                 ret += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
2124                 ret += "\t\tif(t->flush_pos<t->max_aggrs) \n";
2125                 ret += "\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2126
2127                 ret += "\t\t/* \t\tmark all groups as old */\n";
2128                 ret +="\t\tt->generation++;\n";
2129                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
2130                 ret += "//\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
2131                 ret += "//\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
2132                 ret += "\t\tt->flush_pos = 0;\n";
2133
2134                 for(g=0;g<gb_tbl->size();g++){
2135                      data_type *gdt = gb_tbl->get_data_type(g);
2136                      if(gdt->is_temporal()){
2137                         sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);                 ret += tmpstr;
2138                         sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);                        ret += tmpstr;
2139                      }
2140                 }
2141                 ret += "\t}\n\n";
2142
2143         }
2144
2145         ret += "\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t) + sizeof(struct fta_stat);\n";
2146         ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
2147         ret += "\tif( tuple == NULL)\n\t\treturn 1;\n";
2148
2149
2150         for(s=0;s<sl_list.size();s++){
2151                 data_type *sdt = sl_list[s]->get_data_type();
2152                 if(sdt->is_temporal()){
2153
2154                         if (sl_list[s]->is_gb()) {
2155                                 sprintf(tmpstr,"\tt->last_flushed_gb_%d = (t->n_aggrs) ? t->last_flushed_gb_%d : %s;\n",sl_list[s]->get_gb_ref(), sl_list[s]->get_gb_ref(), generate_se_code(sl_list[s],schema).c_str());
2156                                 ret += tmpstr;
2157                         }
2158
2159                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2160                         ret += tmpstr;
2161 //                      if(sdt->needs_hn_translation())
2162 //                              ret += sdt->hton_translation() +"( ";
2163                         if (sl_list[s]->is_gb()) {
2164                                 sprintf(tmpstr, "t->last_flushed_gb_%d",sl_list[s]->get_gb_ref());
2165                                 ret += tmpstr;
2166                         } else{
2167                                 ret += generate_se_code(sl_list[s],schema);
2168                         }
2169 //                      if(sdt->needs_hn_translation())
2170 //                              ret += " )";
2171                         ret += ";\n";
2172                 }
2173         }
2174
2175         /* mark tuple as temporal */
2176         ret += "\n\t/* Mark tuple as temporal */\n";
2177         ret += "\ttuple->tuple_type = TEMPORAL_TUPLE;\n";
2178
2179         ret += "\n\t/* Copy trace id */\n";
2180         ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+"), &t->trace_id, sizeof(gs_uint64_t));\n\n";
2181
2182         ret += "\n\t/* Populate runtime stats */\n";
2183         ret += "\tstats.ftaid = f->ftaid;\n";
2184         ret += "\tstats.in_tuple_cnt = t->in_tuple_cnt;\n";
2185         ret += "\tstats.out_tuple_cnt = t->out_tuple_cnt;\n";
2186         ret += "\tstats.out_tuple_sz = t->out_tuple_sz;\n";
2187         ret += "\tstats.accepted_tuple_cnt = t->accepted_tuple_cnt;\n";
2188         ret += "\tstats.cycle_cnt = t->cycle_cnt;\n";
2189         ret += "\tstats.collision_cnt = t->collision_cnt;\n";
2190         ret += "\tstats.eviction_cnt = t->eviction_cnt;\n";
2191         ret += "\tstats.sampling_rate    = t->sampling_rate;\n";
2192
2193         ret += "\n#ifdef LFTA_PROFILE\n";
2194         ret += "\n\t/* Print stats */\n";
2195         ret += "\tfprintf(stderr, \"STATS " + node_name + " \");\n";
2196         ret += "\tfprintf(stderr, \"in_tuple_cnt= %u \", t->in_tuple_cnt);\n";
2197         ret += "\tfprintf(stderr, \"out_tuple_cnt= %u \", t->out_tuple_cnt);\n";
2198         ret += "\tfprintf(stderr, \"out_tuple_sz= %u \", t->out_tuple_sz);\n";
2199         ret += "\tfprintf(stderr, \"accepted_tuple_cnt= %u \", t->accepted_tuple_cnt);\n";
2200         ret += "\tfprintf(stderr, \"cycle_cnt= %llu \", t->cycle_cnt);\n";
2201         ret += "\tfprintf(stderr, \"cycle_per_in_tuple= %lf \", ((double)t->cycle_cnt)/((double)t->in_tuple_cnt));\n";
2202         ret += "\tfprintf(stderr, \"collision_cnt= %d\\n\", t->collision_cnt);\n\n";
2203         ret += "\tfprintf(stderr, \"eviction_cnt= %d\\n\", t->eviction_cnt);\n\n";
2204         ret += "\n#endif\n";
2205
2206
2207         ret += "\n\t/* Copy stats */\n";
2208         ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t), &stats, sizeof(fta_stat));\n\n";
2209         ret+="\tpost_tuple(tuple);\n";
2210
2211         ret += "\n\t/* Send a heartbeat message to clearinghouse */\n";
2212         ret += "\tfta_heartbeat(f->ftaid, t->trace_id++, 1, &stats);\n";
2213
2214         ret += "\n\t/* Reset runtime stats */\n";
2215         ret += "\tt->in_tuple_cnt = 0;\n";
2216         ret += "\tt->out_tuple_cnt = 0;\n";
2217         ret += "\tt->out_tuple_sz = 0;\n";
2218         ret += "\tt->accepted_tuple_cnt = 0;\n";
2219         ret += "\tt->cycle_cnt = 0;\n";
2220         ret += "\tt->collision_cnt = 0;\n";
2221         ret += "\tt->eviction_cnt = 0;\n";
2222
2223         ret += "\treturn 0;\n}\n\n";
2224
2225         return(ret);
2226 }
2227
2228
2229 //              accept processing before the where clause,
2230 //              do flush processwing.
2231 string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *schema,   col_id_set &unpacked_cids, string &temporal_flush){
2232         int s;
2233
2234 //              Slow flush
2235   string ret="\n/*\tslow flush\t*/\n";
2236   string slow_flush_str = fs->get_val_of_def("slow_flush");
2237   int n_slow_flush = atoi(slow_flush_str.c_str());
2238   if(n_slow_flush <= 0) n_slow_flush = 2;
2239   if(n_slow_flush > 1){
2240         ret += "\tt->flush_ctr++;\n";
2241         ret += "\tif(t->flush_ctr >= "+int_to_string(n_slow_flush)+"){\n";
2242         ret += "\t\tt->flush_ctr = 0;\n";
2243     ret+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n";
2244         ret += "\t}\n\n";
2245   }else{
2246     ret+="\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n\n";
2247   }
2248
2249
2250         string change_test;
2251         bool first_one = true;
2252         int g;
2253     col_id_set flush_cids;              //      col ids accessed when computing flush variables.
2254                                                         //  unpack them at temporal flush test time.
2255     temporal_flush = "";
2256
2257
2258         for(g=0;g<gb_tbl->size();g++){
2259                   data_type *gdt = gb_tbl->get_data_type(g);
2260                   if(gdt->is_temporal()){
2261                           gather_se_col_ids(gb_tbl->get_def(g), flush_cids, gb_tbl);
2262
2263 //                                      To perform the test, first need to compute the value
2264 //                                      of the temporal gb attrs.
2265                           if(gdt->is_buffer_type()){
2266         //                              NOTE : if the SE defining the gb is anything
2267         //                              other than a ref to a variable, this will generate
2268         //                              illegal code.  To be resolved with Spatch.
2269                                 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2270                                         g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2271                                 temporal_flush += tmpstr;
2272                                 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2273                                         gdt->get_buffer_assign_copy().c_str(), g, g);
2274                           }else{
2275                                 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2276                           }
2277                           temporal_flush += tmpstr;
2278 //                                      END computing the value of the temporal GB attr.
2279
2280
2281                           sprintf(tmpstr,"t->last_gb_%d",g);   string lhs_op = tmpstr;
2282                           sprintf(tmpstr,"gb_attr_%d",g);   string rhs_op = tmpstr;
2283                           if(first_one){first_one = false;} else {change_test.append(") && (");}
2284                           change_test += generate_equality_test(lhs_op, rhs_op, gdt);
2285                   }
2286         }
2287         if(!first_one){         // will be false iff. there is a temporal GB attribute
2288                   temporal_flush += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
2289                   temporal_flush += "\tif( !( (";
2290                   temporal_flush += change_test;
2291                   temporal_flush += ") ) ){\n";
2292
2293 //                temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2294                   temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs){ \n";
2295                   temporal_flush+="\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2296                   temporal_flush+="\t\t}\n";
2297                   temporal_flush+="\t\t/* \t\tmark all groups as old */\n";
2298                   temporal_flush+="\t\tt->generation++;\n";
2299                   temporal_flush+="\t\tt->flush_pos = 0;\n";
2300
2301
2302 //                              Now set the saved temporal value of the gb to the
2303 //                              current value of the gb.  Only for simple types,
2304 //                              not for buffer types -- but the strings are not
2305 //                              temporal in any case.
2306
2307                         for(g=0;g<gb_tbl->size();g++){
2308                           data_type *gdt = gb_tbl->get_data_type(g);
2309                           if(gdt->is_temporal()){
2310                                   if(gdt->is_buffer_type()){
2311
2312                                         fprintf(stderr,"ERROR : can't handle temporal buffer types, ignoring in buffer flush control.\n");
2313                                   }else{
2314                                         sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);
2315                                         temporal_flush += tmpstr;
2316                                         sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);
2317                                         temporal_flush += tmpstr;
2318                                   }
2319                                 }
2320                         }
2321                   temporal_flush += "\t}\n\n";
2322         }
2323
2324 // Unpack all the temporal attributes referenced in select clause
2325 // and update the last value of the attribute
2326         col_id_set temp_cids;           //      col ids of temp attributes in select clause
2327         col_id_set::iterator csi;
2328
2329         for(s=0;s<sl_list.size();s++){
2330                 data_type *sdt = sl_list[s]->get_data_type();
2331                 if (sdt->is_temporal()) {
2332                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
2333                 }
2334         }
2335
2336         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2337                 if(unpacked_cids.count((*csi)) == 0){
2338                         int tblref = (*csi).tblvar_ref;
2339                         int schref = (*csi).schema_ref;
2340                         string field = (*csi).field;
2341                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2342 /*
2343                         data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2344                         sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",
2345                                 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2346                         ret += tmpstr;
2347                         ret += "\tif(retval) return 1;\n";
2348 */
2349                         sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
2350                         ret += tmpstr;
2351
2352                         unpacked_cids.insert( (*csi) );
2353                 }
2354         }
2355
2356
2357 //                              Do the flush here if this is a real_time query
2358         string rt_level = fs->get_val_of_def("real_time");
2359         if(rt_level != "" && temporal_flush != ""){
2360                 for(csi=flush_cids.begin(); csi != flush_cids.end();++csi){
2361                         if(unpacked_cids.count((*csi)) == 0){
2362                         int tblref = (*csi).tblvar_ref;
2363                         int schref = (*csi).schema_ref;
2364                         string field = (*csi).field;
2365                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2366 /*
2367                                 sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",
2368                                 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2369                         ret += tmpstr;
2370                         ret += "\tif(retval) return 1;\n";
2371 */
2372                                 unpacked_cids.insert( (*csi) );
2373                         }
2374                 }
2375                 ret += temporal_flush;
2376         }
2377
2378         return ret;
2379  }
2380
2381 string generate_sel_accept_body(qp_node *fs, string node_name, table_list *schema){
2382
2383 int p,s;
2384 string ret;
2385
2386 ///////////////                 Processing for filter-only query
2387
2388 //                      test passed : create the tuple, then assign to it.
2389           ret += "/*\t\tCreate and post the tuple\t*/\n";
2390
2391 //                      Unpack partial fcns ref'd by the select clause.
2392 //                      Its a kind of a WHERE clause ...
2393   for(p=sl_fcns_start;p<sl_fcns_end;p++){
2394         if(fcn_ref_cnt[p] > 1){
2395                 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2396         }
2397         if(is_partial_fcn[p]){
2398                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2399                 ret += "\tif(retval) goto end;\n";
2400         }
2401         if(fcn_ref_cnt[p] > 1){
2402                 if(!is_partial_fcn[p]){
2403                         ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2404                 }
2405                 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2406                 ret += "\t}\n";
2407         }
2408   }
2409
2410   // increment the counter of accepted tuples
2411   ret += "\n\t#ifdef LFTA_STATS\n";
2412   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2413   ret += "\t#endif\n\n";
2414
2415 //                      First, compute the size of the tuple.
2416
2417 //                      Unpack any BUFFER type selections into temporaries
2418 //                      so that I can compute their size and not have
2419 //                      to recompute their value during tuple packing.
2420 //                      I can use regular assignment here because
2421 //                      these temporaries are non-persistent.
2422
2423           for(s=0;s<sl_list.size();s++){
2424                 data_type *sdt = sl_list[s]->get_data_type();
2425                 if(sdt->is_buffer_type()){
2426                         sprintf(tmpstr,"\tselvar_%d = ",s);
2427                         ret += tmpstr;
2428                         ret += generate_se_code(sl_list[s],schema);
2429                         ret += ";\n";
2430                 }
2431           }
2432
2433
2434 //              The size of the tuple is the size of the tuple struct plus the
2435 //              size of the buffers to be copied in.
2436
2437           ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
2438           for(s=0;s<sl_list.size();s++){
2439                 data_type *sdt = sl_list[s]->get_data_type();
2440                 if(sdt->is_buffer_type()){
2441                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
2442                         ret += tmpstr;
2443                 }
2444           }
2445           ret += ";\n";
2446
2447
2448           ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
2449           ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
2450
2451 //                      Test passed, make assignments to the tuple.
2452
2453           ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
2454
2455 //                      Mark tuple as REGULAR_TUPLE
2456           ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
2457
2458
2459           for(s=0;s<sl_list.size();s++){
2460                 data_type *sdt = sl_list[s]->get_data_type();
2461                 if(sdt->is_buffer_type()){
2462                         sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
2463                         ret += tmpstr;
2464                         sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
2465                         ret += tmpstr;
2466                 }else{
2467                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2468                         ret += tmpstr;
2469 //                      if(sdt->needs_hn_translation())
2470 //                              ret += sdt->hton_translation() +"( ";
2471                         ret += generate_se_code(sl_list[s],schema);
2472 //                      if(sdt->needs_hn_translation())
2473 //                              ret += ") ";
2474                         ret += ";\n";
2475                 }
2476           }
2477
2478 //              Generate output.
2479
2480           ret += "\tpost_tuple(tuple);\n";
2481
2482 //              Increment the counter of posted tuples
2483   ret += "\n\t#ifdef LFTA_STATS\n";
2484   ret += "\tt->out_tuple_cnt++;\n";
2485   ret+="\tt->out_tuple_sz+=tuple_size;\n";
2486   ret += "\t#endif\n\n";
2487
2488
2489
2490         return ret;
2491 }
2492
2493 //      TODO Ensure that postfilter predicates are being generated
2494 string generate_fj_accept_body(filter_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
2495
2496 int p,s,w;
2497 string ret;
2498
2499 //                      Get parameters
2500         unsigned int window_len = fs->temporal_range;
2501         unsigned int n_bloom = 11;
2502         string n_bloom_str = fs->get_val_of_def("num_bloom");
2503         int tmp_n_bloom = atoi(n_bloom_str.c_str());
2504         if(tmp_n_bloom>0)
2505                 n_bloom = tmp_n_bloom+1;
2506         float bloom_width = (window_len+1.0)/(1.0*n_bloom-1);
2507         sprintf(tmpstr,"%f",bloom_width);
2508         string bloom_width_str = tmpstr;
2509
2510         if(window_len < n_bloom){
2511                 n_bloom = window_len+1;
2512                 bloom_width_str = "1";
2513         }
2514
2515
2516 //              Grab the current window time
2517         scalarexp_t winvar(fs->temporal_var);
2518         ret += "\tcurr_fj_ts = "+generate_se_code(&winvar,schema)+";\n";
2519
2520         int bf_exp_size = 12;  // base-2 log of number of bits
2521         string bloom_len_str = fs->get_val_of_def("bloom_size");
2522         int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
2523         if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
2524                 bf_exp_size = tmp_bf_exp_size;
2525         }
2526         int bf_bit_size = 1 << bf_exp_size;
2527         int bf_byte_size = bf_bit_size / (8*sizeof(char));
2528
2529         unsigned int ht_size = 4096;
2530         string ht_size_s = fs->get_val_of_def("aggregate_slots");
2531         int tmp_ht_size = atoi(ht_size_s.c_str());
2532         if(tmp_ht_size > 1024){
2533                 unsigned int hs = 1;            // make it power of 2
2534                 while(tmp_ht_size){
2535                         hs =hs << 1;
2536                         tmp_ht_size = tmp_ht_size >> 1;
2537                 }
2538                 ht_size = hs;
2539         }
2540
2541         int i, bf_mask = 0;
2542         if(fs->use_bloom){
2543                 for(i=0;i<bf_exp_size;i++)
2544                         bf_mask = (bf_mask << 1) | 1;
2545         }else{
2546                 for(i=ht_size;i>1;i=i>>1)
2547                         bf_mask = (bf_mask << 1) | 1;
2548         }
2549
2550 /*
2551 printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=%d, bf_byte_size=%d, ht_size=%d, ht_size_s=%s, bf_mask=%d\n",
2552         n_bloom,
2553         window_len,
2554         bloom_width_str.c_str(),
2555         bf_exp_size,
2556         bf_bit_size,
2557         bf_byte_size,
2558         ht_size,
2559         ht_size_s.c_str(),
2560         bf_mask);
2561 */
2562
2563
2564
2565
2566 //              If this is a bloom-filter fj, first test if the
2567 //              bloom filter needs to be advanced.
2568 //              SET_BF_EMPTY(table,number of bloom filters,bloom filter index,bit index)
2569 //              t->bf_size : number of bits in bloom filter
2570 //              TODO: vectorize?
2571 //              TODO: Don't iterate more than n_bloom times!
2572 //                      As written, its possible to wrap around many times.
2573         if(fs->use_bloom){
2574                 ret +=
2575 "//                     Clean out old bloom filters if needed.\n"
2576 "//                     TODO vectorize this ? \n"
2577 "       if(t->first_exec){\n"
2578 "               t->first_exec = 0;\n"
2579 "               t->last_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2580 "               t->last_bloom_pos = t->last_bin % "+int_to_string(n_bloom)+";\n"
2581 "       }else{\n"
2582 "               curr_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2583 "               if(curr_bin != t->last_bin){\n"
2584 "                       for(the_bin=t->last_bin+1;the_bin<=curr_bin;the_bin++){\n"
2585 "                               t->last_bloom_pos++;\n"
2586 "                               if(t->last_bloom_pos >= "+int_to_string(n_bloom)+")\n"
2587 "                                       t->last_bloom_pos = 0;\n"
2588 "                               tmp_i = t->last_bloom_pos;\n"
2589 "                               for(j=0;j<"+int_to_string(bf_bit_size)+";j++){\n"
2590 "                                       SET_BF_EMPTY(t->bf_table, "+int_to_string(n_bloom)+", tmp_i,j);\n"
2591 "                               }\n"
2592 "                       }\n"
2593 "               }\n"
2594 "               t->last_bin = curr_bin;\n"
2595 "       }\n"
2596 ;
2597         }
2598
2599
2600 //-----------------------------------------------------------------
2601 //              First, determine whether to do S (filter stream) processing.
2602
2603         ret +=
2604 "//             S (filtering stream) predicate, should it be processed?\n"
2605 "\n"
2606 ;
2607 // Sort S preds based on cost.
2608         vector<cnf_elem *> s_filt = fs->pred_t1;
2609         col_id_set::iterator csi;
2610   if(s_filt.size() > 0){
2611
2612 //                      Unpack fields ref'd in the S pred
2613         for(w=0;w<s_filt.size();++w){
2614                 col_id_set this_pred_cids;
2615                 gather_pr_col_ids(s_filt[w]->pr, this_pred_cids, gb_tbl);
2616                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2617                         if(unpacked_cids.count( (*csi) ) == 0){
2618                                 int tblref = (*csi).tblvar_ref;
2619                                 int schref = (*csi).schema_ref;
2620                                 string field = (*csi).field;
2621                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name,"end_s");
2622                                 unpacked_cids.insert( (*csi) );
2623                         }
2624                 }
2625         }
2626
2627
2628 //              Sort by evaluation cost.
2629 //              First, estimate evaluation costs
2630 //              Eliminate predicates covered by the prefilter (those in s_pids).
2631 //              I need to do it before the sort becuase the indices refer
2632 //              to the position in the unsorted list.
2633         vector<cnf_elem *> tmp_wh;
2634         for(w=0;w<s_filt.size();++w){
2635                 compute_cnf_cost(s_filt[w],Ext_fcns);
2636                 tmp_wh.push_back(s_filt[w]);
2637         }
2638         s_filt = tmp_wh;
2639
2640         sort(s_filt.begin(), s_filt.end(), compare_cnf_cost());
2641
2642 //              Now generate the predicates.
2643         for(w=0;w<s_filt.size();++w){
2644                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,s_filt[w]->cost);
2645                 ret += tmpstr;
2646
2647 //                      Find partial fcns ref'd in this cnf element
2648                 set<int> pfcn_refs;
2649                 collect_partial_fcns_pr(s_filt[w]->pr, pfcn_refs);
2650 //                      Since set<..> is a "Sorted Associative Container",
2651 //                      we can walk through it in sorted order by walking from
2652 //                      begin() to end().  (and the partial fcns must be
2653 //                      evaluated in this order).
2654                 set<int>::iterator si;
2655                 string pf_preds;
2656                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2657                         if(fcn_ref_cnt[(*si)] > 1){
2658                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2659                         }
2660                         if(is_partial_fcn[(*si)]){
2661                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2662                                 ret += "\t\tif(retval) goto end_s;\n";
2663                         }
2664                         if(fcn_ref_cnt[(*si)] > 1){
2665                                 if(!is_partial_fcn[(*si)]){
2666                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2667 //              Testing for S is a side branch.
2668 //              I don't want a cacheable partial function to be
2669 //              marked as evaluated.  Therefore I mark the function
2670 //              as evalauted ONLY IF it is not partial.
2671                                         ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2672                                 }
2673                                 ret += "\t}\n";
2674                         }
2675                 }
2676
2677                 ret += "\tif( !("+generate_predicate_code(s_filt[w]->pr,schema)+
2678                                 ") ) goto end_s;\n";
2679         }
2680   }else{
2681           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2682   }
2683
2684         for(p=0;p<fs->hash_eq.size();++p)
2685                 ret += "\t\ts_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_right_se(),schema)+";\n";
2686
2687         if(fs->use_bloom){
2688 //                      First, generate the S scalar expressions in the hash_eq
2689
2690 //                      Iterate over the bloom filters
2691                 for(i=0;i<3;i++){
2692                         ret += "\t\tbucket=0;\n";
2693                         for(p=0;p<fs->hash_eq.size();++p){
2694                                 ret +=
2695 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2696         fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2697         +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2698                         }
2699 //              SET_BF_BIT(table,number of bloom filters,bloom filter index,bit index)
2700                                 ret +=
2701 "               bucket &= "+int_to_string(bf_mask)+";\n"
2702 "               SET_BF_BIT(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket);\n"
2703 "\n"
2704 ;
2705                 }
2706         }else{
2707                 ret += "// Add the S record to the hash table, choose a position\n";
2708                 ret += "\t\tbucket=0;\n";
2709                 for(p=0;p<fs->hash_eq.size();++p){
2710                         ret +=
2711 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2712         fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2713         +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2714                 }
2715                 ret +=
2716 "               bucket &= "+int_to_string(bf_mask)+";\n"
2717 "               bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2718 ;
2719 //                      Try the first bucket
2720                 ret += "\t\tif(";
2721                 for(p=0;p<fs->hash_eq.size();++p){
2722                         if(p>0) ret += " && ";
2723 //                      ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2724 //                                      " == s_equijoin_"+int_to_string(p);
2725                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2726                         string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2727                         string rhs_op = "s_equijoin_"+int_to_string(p);
2728                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2729                 }
2730                 ret += "){\n\t\t\tthe_bucket = bucket;\n";
2731                 ret += "\t\t}else{\n\t\t\tif(";
2732                 for(p=0;p<fs->hash_eq.size();++p){
2733                         if(p>0) ret += " && ";
2734 //                      ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2735 //                                      " == s_equijoin_"+int_to_string(p);
2736                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2737                         string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
2738                         string rhs_op = "s_equijoin_"+int_to_string(p);
2739                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2740                 }
2741                 ret +=  "){\n\t\t\t\tthe_bucket = bucket1;\n";
2742                 ret += "\t\t\t}else{ \n\t\t\t\tif(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n";
2743                 ret+="\t\t\t\t\tthe_bucket = bucket;\n\t\t\t\telse\n\t\t\t\t\tthe_bucket=bucket1;\n";
2744                 ret += "\t\t\t}\n\t\t}\n";
2745                 for(p=0;p<fs->hash_eq.size();++p){
2746                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2747                         if(hdt->is_buffer_type()){
2748                                 sprintf(tmpstr,"\t\t%s(f, &(t->join_table[the_bucket].key_var%d), &s_equijoin_%d);\n", hdt->get_buffer_assign_copy().c_str(), p, p);
2749                                 ret += tmpstr;
2750                         }else{
2751                                 ret += "\t\tt->join_table[the_bucket].key_var"+int_to_string(p)+
2752                                         " = s_equijoin_"+int_to_string(p)+";\n";
2753                         }
2754                 }
2755                 ret+="\t\tt->join_table[the_bucket].ts =  curr_fj_ts;\n";
2756         }
2757   ret += "\tend_s:\n";
2758
2759 //      ------------------------------------------------------------
2760 //              Next, determine if the R record should be processed.
2761
2762
2763         ret +=
2764 "//             R (main stream) cheap predicate\n"
2765 "\n"
2766 ;
2767
2768 //              Unpack r_filt fields
2769         vector<cnf_elem *> r_filt = fs->pred_t0;
2770         for(w=0;w<r_filt.size();++w){
2771                 col_id_set this_pred_cids;
2772                 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
2773                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2774                         if(unpacked_cids.count( (*csi) ) == 0){
2775                                 int tblref = (*csi).tblvar_ref;
2776                                 int schref = (*csi).schema_ref;
2777                                 string field = (*csi).field;
2778                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2779                                 unpacked_cids.insert( (*csi) );
2780                         }
2781                 }
2782         }
2783
2784 // Sort R preds based on cost.
2785
2786         vector<cnf_elem *> tmp_wh;
2787         for(w=0;w<r_filt.size();++w){
2788                 compute_cnf_cost(r_filt[w],Ext_fcns);
2789                 tmp_wh.push_back(r_filt[w]);
2790         }
2791         r_filt = tmp_wh;
2792
2793         sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
2794
2795 //              WARNING! the constant 20 below is a wild-ass guess.
2796         int cheap_rpos;
2797         for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++);
2798
2799 //              Test the cheap filters on R.
2800   if(cheap_rpos >0){
2801
2802 //              Now generate the predicates.
2803         for(w=0;w<cheap_rpos;++w){
2804                 sprintf(tmpstr,"//\t\tcheap R predicate clause %d. (cost %d)\n",w,r_filt[w]->cost);
2805                 ret += tmpstr;
2806
2807 //                      Find partial fcns ref'd in this cnf element
2808                 set<int> pfcn_refs;
2809                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2810 //                      Since set<..> is a "Sorted Associative Container",
2811 //                      we can walk through it in sorted order by walking from
2812 //                      begin() to end().  (and the partial fcns must be
2813 //                      evaluated in this order).
2814                 set<int>::iterator si;
2815                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2816                         if(fcn_ref_cnt[(*si)] > 1){
2817                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2818                         }
2819                         if(is_partial_fcn[(*si)]){
2820                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2821                                 ret += "\t\tif(retval) goto end;\n";
2822                         }
2823                         if(fcn_ref_cnt[(*si)] > 1){
2824                                 if(!is_partial_fcn[(*si)]){
2825                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2826                                 }
2827                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2828                                 ret += "\t}\n";
2829                         }
2830                 }
2831
2832                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2833                                 ") ) goto end;\n";
2834         }
2835   }else{
2836           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2837   }
2838
2839         ret += "\n//    Do the join\n\n";
2840         for(p=0;p<fs->hash_eq.size();++p)
2841                 ret += "\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n";
2842
2843
2844 //                      Passed the cheap pred, now test the join with S.
2845         if(fs->use_bloom){
2846                 for(i=0;i<3;i++){
2847                         ret += "\t\tbucket"+int_to_string(i)+"=0;\n";
2848                         for(p=0;p<fs->hash_eq.size();++p){
2849                                 ret +=
2850 "       bucket"+int_to_string(i)+
2851         " ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2852         fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
2853         +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2854                         }
2855                                 ret +=
2856 "       bucket"+int_to_string(i)+" &= "+int_to_string(bf_mask)+";\n";
2857                 }
2858                 ret += "\tfound = 0;\n";
2859                 ret += "\tfor(b=0;b<"+int_to_string(n_bloom)+" && !found; b++){\n";
2860                 ret +=
2861 "\t\tif(IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket0) && "
2862 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket1) && "
2863 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket2))\n "
2864 "\t\t\tfound=1;\n"
2865 "\t}\n"
2866 ;
2867                 ret +=
2868 "       if(!found)\n"
2869 "               goto end;\n"
2870 ;
2871         }else{
2872                 ret += "\tfound = 0;\n";
2873                 ret += "\t\tbucket=0;\n";
2874                 for(p=0;p<fs->hash_eq.size();++p){
2875                         ret +=
2876 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2877         fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
2878         +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2879                 }
2880                 ret +=
2881 "               bucket &= "+int_to_string(bf_mask)+";\n"
2882 "               bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2883 ;
2884 //                      Try the first bucket
2885                 ret += "\t\tif(";
2886                 for(p=0;p<fs->hash_eq.size();++p){
2887                         if(p>0) ret += " && ";
2888 //                      ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2889 //                                      " == r_equijoin_"+int_to_string(p);
2890                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2891                         string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2892                         string rhs_op = "s_equijoin_"+int_to_string(p);
2893                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2894                 }
2895                 if(p>0) ret += " && ";
2896                 ret += "t->join_table[bucket].ts+"+int_to_string(fs->temporal_range)+" <=  curr_fj_ts";
2897                 ret += "){\n\t\t\tfound = 1;\n";
2898                 ret += "\t\t}else {if(";
2899                 for(p=0;p<fs->hash_eq.size();++p){
2900                         if(p>0) ret += " && ";
2901 //                      ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2902 //                                      " == r_equijoin_"+int_to_string(p);
2903                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2904                         string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
2905                         string rhs_op = "s_equijoin_"+int_to_string(p);
2906                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2907                 }
2908                 if(p>0) ret += " && ";
2909                 ret += "t->join_table[bucket1].ts+"+int_to_string(fs->temporal_range)+" <=  curr_fj_ts";
2910                 ret +=  ")\n\t\t\tfound=1;\n";
2911                 ret+="\t\t}\n";
2912                 ret +=
2913 "       if(!found)\n"
2914 "               goto end;\n"
2915 ;
2916         }
2917
2918
2919 //              Test the expensive filters on R.
2920   if(cheap_rpos < r_filt.size()){
2921
2922 //              Now generate the predicates.
2923         for(w=cheap_rpos;w<r_filt.size();++w){
2924                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
2925                 ret += tmpstr;
2926
2927 //                      Find partial fcns ref'd in this cnf element
2928                 set<int> pfcn_refs;
2929                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2930 //                      Since set<..> is a "Sorted Associative Container",
2931 //                      we can walk through it in sorted order by walking from
2932 //                      begin() to end().  (and the partial fcns must be
2933 //                      evaluated in this order).
2934                 set<int>::iterator si;
2935                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2936                         if(fcn_ref_cnt[(*si)] > 1){
2937                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2938                         }
2939                         if(is_partial_fcn[(*si)]){
2940                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2941                                 ret += "\t\tif(retval) goto end;\n";
2942                         }
2943                         if(fcn_ref_cnt[(*si)] > 1){
2944                                 if(!is_partial_fcn[(*si)]){
2945                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2946                                 }
2947                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2948                                 ret += "\t}\n";
2949                         }
2950                 }
2951
2952                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2953                                 ") ) goto end;\n";
2954         }
2955   }else{
2956           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2957   }
2958
2959
2960
2961 ///////////////                 post the tuple
2962
2963 //                      test passed : create the tuple, then assign to it.
2964           ret += "/*\t\tCreate and post the tuple\t*/\n";
2965
2966 //              Unpack r_filt fields
2967         for(s=0;s<sl_list.size();++s){
2968                 col_id_set this_se_cids;
2969                 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
2970                 for(csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
2971                         if(unpacked_cids.count( (*csi) ) == 0){
2972                                 int tblref = (*csi).tblvar_ref;
2973                                 int schref = (*csi).schema_ref;
2974                                 string field = (*csi).field;
2975                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2976                                 unpacked_cids.insert( (*csi) );
2977                         }
2978                 }
2979         }
2980
2981
2982 //                      Unpack partial fcns ref'd by the select clause.
2983 //                      Its a kind of a WHERE clause ...
2984   for(p=sl_fcns_start;p<sl_fcns_end;p++){
2985         if(fcn_ref_cnt[p] > 1){
2986                 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2987         }
2988         if(is_partial_fcn[p]){
2989                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2990                 ret += "\tif(retval) goto end;\n";
2991         }
2992         if(fcn_ref_cnt[p] > 1){
2993                 if(!is_partial_fcn[p]){
2994                         ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2995                 }
2996                 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2997                 ret += "\t}\n";
2998         }
2999   }
3000
3001   // increment the counter of accepted tuples
3002   ret += "\n\t#ifdef LFTA_STATS\n";
3003   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3004   ret += "\t#endif\n\n";
3005
3006 //                      First, compute the size of the tuple.
3007
3008 //                      Unpack any BUFFER type selections into temporaries
3009 //                      so that I can compute their size and not have
3010 //                      to recompute their value during tuple packing.
3011 //                      I can use regular assignment here because
3012 //                      these temporaries are non-persistent.
3013
3014           for(s=0;s<sl_list.size();s++){
3015                 data_type *sdt = sl_list[s]->get_data_type();
3016                 if(sdt->is_buffer_type()){
3017                         sprintf(tmpstr,"\tselvar_%d = ",s);
3018                         ret += tmpstr;
3019                         ret += generate_se_code(sl_list[s],schema);
3020                         ret += ";\n";
3021                 }
3022           }
3023
3024
3025 //              The size of the tuple is the size of the tuple struct plus the
3026 //              size of the buffers to be copied in.
3027
3028           ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
3029           for(s=0;s<sl_list.size();s++){
3030                 data_type *sdt = sl_list[s]->get_data_type();
3031                 if(sdt->is_buffer_type()){
3032                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
3033                         ret += tmpstr;
3034                 }
3035           }
3036           ret += ";\n";
3037
3038
3039           ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
3040           ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
3041
3042 //                      Test passed, make assignments to the tuple.
3043
3044           ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
3045
3046 //                      Mark tuple as REGULAR_TUPLE
3047           ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
3048
3049
3050           for(s=0;s<sl_list.size();s++){
3051                 data_type *sdt = sl_list[s]->get_data_type();
3052                 if(sdt->is_buffer_type()){
3053                         sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
3054                         ret += tmpstr;
3055                         sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
3056                         ret += tmpstr;
3057                 }else{
3058                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
3059                         ret += tmpstr;
3060 //                      if(sdt->needs_hn_translation())
3061 //                              ret += sdt->hton_translation() +"( ";
3062                         ret += generate_se_code(sl_list[s],schema);
3063 //                      if(sdt->needs_hn_translation())
3064 //                              ret += ") ";
3065                         ret += ";\n";
3066                 }
3067           }
3068
3069 //              Generate output.
3070
3071           ret += "\tpost_tuple(tuple);\n";
3072
3073 //              Increment the counter of posted tuples
3074   ret += "\n\t#ifdef LFTA_STATS\n";
3075   ret += "\n\tt->out_tuple_cnt++;\n\n";
3076   ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
3077   ret += "\t#endif\n\n";
3078
3079
3080         return ret;
3081 }
3082
3083
3084 string generate_wj_accept_body(watch_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
3085
3086 int p,s,w;
3087 string ret;
3088
3089
3090         string wl_schema = fs->from[1]->get_schema_name();
3091         string wl_elem_str = generate_watchlist_element_name(wl_schema);
3092         string wl_node_str = generate_watchlist_struct_name(wl_schema);
3093         string tgt = generate_watchlist_name(wl_schema);
3094
3095         ret += "//\n//\t\tGenerate test to update watchtable here\n//\n\n";
3096
3097
3098
3099
3100
3101 //      ------------------------------------------------------------
3102 //              Determine if the R record should be processed.
3103
3104
3105         ret +=
3106 "//             R (main stream) cheap predicate\n"
3107 "\n"
3108 ;
3109
3110 //              Unpack r_filt fields
3111         vector<cnf_elem *> r_filt = fs->pred_t0;
3112         for(w=0;w<r_filt.size();++w){
3113                 col_id_set this_pred_cids;
3114                 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
3115                 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3116                         if(unpacked_cids.count( (*csi) ) == 0){
3117                                 int tblref = (*csi).tblvar_ref;
3118                                 int schref = (*csi).schema_ref;
3119                                 string field = (*csi).field;
3120                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3121                                 unpacked_cids.insert( (*csi) );
3122                         }
3123                 }
3124         }
3125
3126 // Sort R preds based on cost.
3127
3128         vector<cnf_elem *> tmp_wh;
3129         for(w=0;w<r_filt.size();++w){
3130                 compute_cnf_cost(r_filt[w],Ext_fcns);
3131                 tmp_wh.push_back(r_filt[w]);
3132         }
3133         r_filt = tmp_wh;
3134
3135         sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
3136
3137 //              WARNING! the constant 20 below is a wild-ass guess.
3138         int cheap_rpos;
3139         for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++);
3140
3141 //              Test the cheap filters on R.
3142   if(cheap_rpos >0){
3143
3144 //              Now generate the predicates.
3145         for(w=0;w<cheap_rpos;++w){
3146                 sprintf(tmpstr,"//\t\tCheap R predicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
3147                 ret += tmpstr;
3148
3149 //                      Find partial fcns ref'd in this cnf element
3150                 set<int> pfcn_refs;
3151                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
3152 //                      Since set<..> is a "Sorted Associative Container",
3153 //                      we can walk through it in sorted order by walking from
3154 //                      begin() to end().  (and the partial fcns must be
3155 //                      evaluated in this order).
3156                 set<int>::iterator si;
3157                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3158                         if(fcn_ref_cnt[(*si)] > 1){
3159                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3160                         }
3161                         if(is_partial_fcn[(*si)]){
3162                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3163                                 ret += "\t\tif(retval) goto end;\n";
3164                         }
3165                         if(fcn_ref_cnt[(*si)] > 1){
3166                                 if(!is_partial_fcn[(*si)]){
3167                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3168                                 }
3169                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3170                                 ret += "\t}\n";
3171                         }
3172                 }
3173
3174                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
3175                                 ") ) goto end;\n";
3176         }
3177   }else{
3178           ret += "\n\n/*\t\t (no cheap R predicate to test)\t*/\n\n";
3179   }
3180
3181         ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
3182         map<string, cnf_elem *> h_eq = ((watch_join_qpn *)fs)-> hash_eq;
3183         vector<string> kflds = ((watch_join_qpn *)fs)->key_flds;
3184         for(w=0;w<kflds.size();++w){
3185                 string kfld = kflds[w];
3186                 col_id_set this_pred_cids;
3187                 gather_pr_col_ids(h_eq[kfld]->pr, this_pred_cids, gb_tbl);
3188                 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3189                         if(unpacked_cids.count( (*csi) ) == 0){
3190                                 int tblref = (*csi).tblvar_ref;
3191                                 int schref = (*csi).schema_ref;
3192                                 string field = (*csi).field;
3193                                 if(tblref==0) // LHS from packet, don't unpack the RHS
3194                                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3195                                 unpacked_cids.insert( (*csi) );
3196                         }
3197                 }
3198         }
3199
3200
3201         ret += "\n//    Do the join\n\n";
3202         ret += "\n//            (ensure that the watchtable is fresh)\n";
3203         ret += "\tif(t->ux_time >= "+tgt+".next_refresh){\n";
3204         ret += "\t\treload_watchlist__"+wl_schema+"();\n";
3205         ret += "\t\t"+tgt+".next_refresh = t->ux_time+"+tgt+".refresh_interval;\n";
3206         ret += "\t}\n\n";
3207
3208
3209         for(p=0;p<fs->key_flds.size();++p){
3210                 string kfld = fs->key_flds[p];
3211                 ret += "\tr_equijoin_"+kfld+" = "+generate_se_code(fs->hash_eq[kfld]->pr->get_left_se(),schema)+";\n";
3212         }
3213
3214
3215 //                      Passed the cheap pred, now test the join with S.
3216         ret += "\tbucket=0;\n";
3217         ret += "\thash=0;\n";
3218         for(p=0;p<fs->key_flds.size();++p){
3219                 string kfld = fs->key_flds[p];
3220                 ret +=
3221 "               hash ^= (("+hash_nums[p%NRANDS]+" * lfta_"+
3222         fs->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_type_str()+
3223         +"_to_hash(r_equijoin_"+kfld+")));\n";
3224         }
3225         ret += "\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n";
3226
3227         ret += "\t\trec = "+tgt+".ht[bucket];\n";
3228         ret += "\t\twhile(rec!=NULL){\n";
3229         ret += "\t\t\tif(hash==rec->hashval){\n";
3230         ret += "\t\t\t\tif(";
3231         for(p=0;p<fs->key_flds.size();++p){
3232                 string kfld = fs->key_flds[p];
3233                 if(p>0) ret += " && ";
3234                 data_type *hdt = fs->hash_eq[kfld]->pr->get_right_se()->get_data_type();
3235                 string lhs_op = "r_equijoin_"+kfld;
3236                 string rhs_op = "rec->"+kfld;
3237                 ret += generate_equality_test(lhs_op,rhs_op,hdt);
3238         }
3239         ret += ")\n";
3240         ret += "\t\t\t\t\tbreak;\n";
3241         ret += "\t\t\t}\n";
3242         ret += "\t\t\trec=rec->next;\n";
3243         ret += "\t\t}\n";
3244         ret += "\t\tif(rec==NULL)\n";
3245         ret += "\t\t\tgoto end;\n";
3246                 
3247         ret += "\n/*\tPassed the hash lookup clause, unpack the other predicate fields. */\n";
3248         for(w=0;w<where.size();++w){
3249                 col_id_set this_pred_cids;
3250                 gather_pr_col_ids(where[w]->pr, this_pred_cids, gb_tbl);
3251                 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3252                         if(unpacked_cids.count( (*csi) ) == 0){
3253                                 int tblref = (*csi).tblvar_ref;
3254                                 int schref = (*csi).schema_ref;
3255                                 string field = (*csi).field;
3256                                 if(tblref==0) // LHS from packet
3257                                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3258                                 else    // RHS from hash bucket
3259                                         ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n";
3260                                 unpacked_cids.insert( (*csi) );
3261                         }
3262                 }
3263         }
3264
3265
3266 //              Test the expensive filters on R.
3267 //                      TODO Should merge this with other predicates and eval in order
3268 //                              of cost - see the fj code.
3269 //                      TODO join and postfilter predicates haven't been costed yet.
3270   if(cheap_rpos < r_filt.size()){
3271
3272 //              Now generate the predicates.
3273         for(w=cheap_rpos;w<r_filt.size();++w){
3274                 sprintf(tmpstr,"//\t\tExpensive R predicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
3275                 ret += tmpstr;
3276
3277 //                      Find partial fcns ref'd in this cnf element
3278                 set<int> pfcn_refs;
3279                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
3280 //                      Since set<..> is a "Sorted Associative Container",
3281 //                      we can walk through it in sorted order by walking from
3282 //                      begin() to end().  (and the partial fcns must be
3283 //                      evaluated in this order).
3284                 set<int>::iterator si;
3285                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3286                         if(fcn_ref_cnt[(*si)] > 1){
3287                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3288                         }
3289                         if(is_partial_fcn[(*si)]){
3290                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3291                                 ret += "\t\tif(retval) goto end;\n";
3292                         }
3293                         if(fcn_ref_cnt[(*si)] > 1){
3294                                 if(!is_partial_fcn[(*si)]){
3295                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3296                                 }
3297                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3298                                 ret += "\t}\n";
3299                         }
3300                 }
3301
3302                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
3303                                 ") ) goto end;\n";
3304         }
3305   }else{
3306           ret += "\n\n/*\t\t (no expensive R predicate to test)\t*/\n\n";
3307   }
3308
3309 //              TODO sort the additional predicates by cost
3310
3311 //              S-only
3312         for(w=0;w<fs->pred_t1.size();++w){
3313                 sprintf(tmpstr,"//\t\tS Predicate clause %d.(cost %d)\n",w,fs->pred_t1[w]->cost);
3314                 ret += tmpstr;
3315
3316 //                      Find partial fcns ref'd in this cnf element
3317                 set<int> pfcn_refs;
3318                 collect_partial_fcns_pr(fs->pred_t1[w]->pr, pfcn_refs);
3319 //                      Since set<..> is a "Sorted Associative Container",
3320 //                      we can walk through it in sorted order by walking from
3321 //                      begin() to end().  (and the partial fcns must be
3322 //                      evaluated in this order).
3323                 set<int>::iterator si;
3324                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3325                         if(fcn_ref_cnt[(*si)] > 1){
3326                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3327                         }
3328                         if(is_partial_fcn[(*si)]){
3329                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3330                                 ret += "\t\tif(retval) goto end;\n";
3331                         }
3332                         if(fcn_ref_cnt[(*si)] > 1){
3333                                 if(!is_partial_fcn[(*si)]){
3334                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3335                                 }
3336                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3337                                 ret += "\t}\n";
3338                         }
3339                 }
3340
3341                 ret += "\tif( !("+generate_predicate_code(fs->pred_t1[w]->pr,schema)+
3342                                 ") ) goto end;\n";
3343         }
3344
3345 //              non hash-eq join 
3346         for(w=0;w<fs->join_filter.size();++w){
3347                 sprintf(tmpstr,"//\t\tJoin Predicate clause %d.(cost %d)\n",w,fs->join_filter[w]->cost);
3348                 ret += tmpstr;
3349
3350 //                      Find partial fcns ref'd in this cnf element
3351                 set<int> pfcn_refs;
3352                 collect_partial_fcns_pr(fs->join_filter[w]->pr, pfcn_refs);
3353 //                      Since set<..> is a "Sorted Associative Container",
3354 //                      we can walk through it in sorted order by walking from
3355 //                      begin() to end().  (and the partial fcns must be
3356 //                      evaluated in this order).
3357                 set<int>::iterator si;
3358                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3359                         if(fcn_ref_cnt[(*si)] > 1){
3360                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3361                         }
3362                         if(is_partial_fcn[(*si)]){
3363                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3364                                 ret += "\t\tif(retval) goto end;\n";
3365                         }
3366                         if(fcn_ref_cnt[(*si)] > 1){
3367                                 if(!is_partial_fcn[(*si)]){
3368                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3369                                 }
3370                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3371                                 ret += "\t}\n";
3372                         }
3373                 }
3374
3375                 ret += "\tif( !("+generate_predicate_code(fs->join_filter[w]->pr,schema)+
3376                                 ") ) goto end;\n";
3377         }
3378
3379 //              postfilter
3380         for(w=0;w<fs->postfilter.size();++w){
3381                 sprintf(tmpstr,"//\t\tpostfilter Predicate clause %d.(cost %d)\n",w,fs->postfilter[w]->cost);
3382                 ret += tmpstr;
3383
3384 //                      Find partial fcns ref'd in this cnf element
3385                 set<int> pfcn_refs;
3386                 collect_partial_fcns_pr(fs->postfilter[w]->pr, pfcn_refs);
3387 //                      Since set<..> is a "Sorted Associative Container",
3388 //                      we can walk through it in sorted order by walking from
3389 //                      begin() to end().  (and the partial fcns must be
3390 //                      evaluated in this order).
3391                 set<int>::iterator si;
3392                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3393                         if(fcn_ref_cnt[(*si)] > 1){
3394                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3395                         }
3396                         if(is_partial_fcn[(*si)]){
3397                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3398                                 ret += "\t\tif(retval) goto end;\n";
3399                         }
3400                         if(fcn_ref_cnt[(*si)] > 1){
3401                                 if(!is_partial_fcn[(*si)]){
3402                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3403                                 }
3404                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3405                                 ret += "\t}\n";
3406                         }
3407                 }
3408
3409                 ret += "\tif( !("+generate_predicate_code(fs->postfilter[w]->pr,schema)+
3410                                 ") ) goto end;\n";
3411         }
3412
3413
3414
3415 ///////////////                 post the tuple
3416
3417 //                      test passed : create the tuple, then assign to it.
3418           ret += "/*\t\tCreate and post the tuple\t*/\n";
3419
3420 //              Unpack R fields
3421         for(s=0;s<sl_list.size();++s){
3422                 col_id_set this_se_cids;
3423                 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
3424                 for(auto csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
3425                         if(unpacked_cids.count( (*csi) ) == 0){
3426                                 int tblref = (*csi).tblvar_ref;
3427                                 int schref = (*csi).schema_ref;
3428                                 string field = (*csi).field;
3429                                 if(tblref==0) // LHS from packet
3430                                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3431                                 else    // RHS from hash bucket
3432                                         ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n";
3433                                 unpacked_cids.insert( (*csi) );
3434                         }
3435                 }
3436         }
3437
3438
3439 //                      Unpack partial fcns ref'd by the select clause.
3440 //                      Its a kind of a WHERE clause ...
3441   for(p=sl_fcns_start;p<sl_fcns_end;p++){
3442         if(fcn_ref_cnt[p] > 1){
3443                 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
3444         }
3445         if(is_partial_fcn[p]){
3446                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3447                 ret += "\tif(retval) goto end;\n";
3448         }
3449         if(fcn_ref_cnt[p] > 1){
3450                 if(!is_partial_fcn[p]){
3451                         ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
3452                 }
3453                 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
3454                 ret += "\t}\n";
3455         }
3456   }
3457
3458   // increment the counter of accepted tuples
3459   ret += "\n\t#ifdef LFTA_STATS\n";
3460   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3461   ret += "\t#endif\n\n";
3462
3463 //                      First, compute the size of the tuple.
3464
3465 //                      Unpack any BUFFER type selections into temporaries
3466 //                      so that I can compute their size and not have
3467 //                      to recompute their value during tuple packing.
3468 //                      I can use regular assignment here because
3469 //                      these temporaries are non-persistent.
3470
3471           for(s=0;s<sl_list.size();s++){
3472                 data_type *sdt = sl_list[s]->get_data_type();
3473                 if(sdt->is_buffer_type()){
3474                         sprintf(tmpstr,"\tselvar_%d = ",s);
3475                         ret += tmpstr;
3476                         ret += generate_se_code(sl_list[s],schema);
3477                         ret += ";\n";
3478                 }
3479           }
3480
3481
3482 //              The size of the tuple is the size of the tuple struct plus the
3483 //              size of the buffers to be copied in.
3484
3485           ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
3486           for(s=0;s<sl_list.size();s++){
3487                 data_type *sdt = sl_list[s]->get_data_type();
3488                 if(sdt->is_buffer_type()){
3489                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
3490                         ret += tmpstr;
3491                 }
3492           }
3493           ret += ";\n";
3494
3495
3496           ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
3497           ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
3498
3499 //                      Test passed, make assignments to the tuple.
3500
3501           ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
3502
3503 //                      Mark tuple as REGULAR_TUPLE
3504           ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
3505
3506
3507           for(s=0;s<sl_list.size();s++){
3508                 data_type *sdt = sl_list[s]->get_data_type();
3509                 if(sdt->is_buffer_type()){
3510                         sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
3511                         ret += tmpstr;
3512                         sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
3513                         ret += tmpstr;
3514                 }else{
3515                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
3516                         ret += tmpstr;
3517 //                      if(sdt->needs_hn_translation())
3518 //                              ret += sdt->hton_translation() +"( ";
3519                         ret += generate_se_code(sl_list[s],schema);
3520 //                      if(sdt->needs_hn_translation())
3521 //                              ret += ") ";
3522                         ret += ";\n";
3523                 }
3524           }
3525
3526 //              Generate output.
3527
3528           ret += "\tpost_tuple(tuple);\n";
3529
3530 //              Increment the counter of posted tuples
3531   ret += "\n\t#ifdef LFTA_STATS\n";
3532   ret += "\n\tt->out_tuple_cnt++;\n\n";
3533   ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
3534   ret += "\t#endif\n\n";
3535
3536
3537         return ret;
3538 }
3539
3540 string generate_aggr_accept_body(qp_node *fs,string node_name,table_list *schema, string &temporal_flush){
3541         string ret;
3542         int a,p,g;
3543
3544 //////////////          Processing for aggregtion query
3545
3546 //              First, search for a match.  Start by unpacking the group-by attributes.
3547
3548 //                      One complication : if a real-time aggregate flush occurs,
3549 //                      the GB attr has already been calculated.  So don't compute
3550 //                      it again if 1) its temporal and 2) it will be computed in the
3551 //                      agggregate flush code.
3552
3553 //              Unpack the partial fcns ref'd by the gb's and the aggr defs.
3554   for(p=gb_fcns_start;p<gb_fcns_end;p++){
3555     if(is_partial_fcn[p]){
3556                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3557                 ret += "\tif(retval) goto end;\n";
3558         }
3559   }
3560   for(p=ag_fcns_start;p<ag_fcns_end;p++){
3561     if(is_partial_fcn[p]){
3562                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3563                 ret += "\tif(retval) goto end;\n";
3564         }
3565   }
3566
3567   // increment the counter of accepted tuples
3568   ret += "\n\t#ifdef LFTA_STATS\n";
3569   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3570   ret += "\t#endif\n\n";
3571
3572   ret += "/*\t\tTest if the group is in the hash table \t*/\n";
3573 //                      Compute the values of the group-by variables.
3574   for(g=0;g<gb_tbl->size();g++){
3575           data_type *gdt = gb_tbl->get_data_type(g);
3576           if((! gdt->is_temporal()) || temporal_flush == ""){
3577
3578                   if(gdt->is_buffer_type()){
3579         //                              NOTE : if the SE defining the gb is anything
3580         //                              other than a ref to a variable, this will generate
3581         //                              illegal code.  To be resolved with Spatch.
3582                         sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
3583                                 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
3584                         ret += tmpstr;
3585                         sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
3586                                 gdt->get_buffer_assign_copy().c_str(), g, g);
3587                   }else{
3588                         sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
3589                   }
3590                   ret += tmpstr;
3591           }
3592   }
3593   ret += "\n";
3594
3595 //                      A quick aside : if any of the GB attrs are temporal,
3596 //                      test for change and flush if any change occurred.
3597 //                      We've already computed the flush code,
3598 //                      Put it here if this is not a real time query.
3599 //                      We've already unpacked all column refs, so no need to
3600 //                      do it again here.
3601
3602         string rt_level = fs->get_val_of_def("real_time");
3603         if(rt_level == "" && temporal_flush != ""){
3604                 ret += temporal_flush;
3605         }
3606
3607 //                      Compute the hash bucket
3608         if(gb_tbl->size() > 0){
3609                 ret += "\thashval = ";\
3610                 for(g=0;g<gb_tbl->size();g++){
3611                   if(g>0) ret += " ^ ";
3612                   data_type *gdt = gb_tbl->get_data_type(g);
3613                   if(gdt->is_buffer_type()){
3614                         sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
3615                                 gdt->get_type_str().c_str(), g);
3616                   }else{
3617                         sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
3618                                 gdt->get_type_str().c_str(), g);
3619                   }
3620                   ret += tmpstr;
3621                 }
3622                 ret += ";\n";
3623                 ret += "\thash2 = ((hashval * "+hash_nums[g%NRANDS]+") >> 32) & SLOT_HASH_BITS;\n";
3624         ret+="\tprobe = (hashval >> 32) & (t->max_aggrs-1);\n";
3625         }else{
3626                 ret+="\tprobe = 0;\n";
3627                 ret+="\thash2 = 0;\n\n";
3628         }
3629
3630 //              Does the lfta reference a udaf?
3631           bool has_udaf = false;
3632           for(a=0;a<aggr_tbl->size();a++){
3633                   if(! aggr_tbl->is_builtin(a)) has_udaf = true;
3634           }
3635
3636 //              Scan for a match, or alternatively the best slot.
3637 //              Currently, hardcode 5 tests.
3638         ret +=
3639 "       gen_val = t->generation & SLOT_GEN_BITS;\n"
3640 "       match_found = 0;\n"
3641 "       best_slot = probe;\n"
3642 "       for(i=0;i<5 && match_found == 0;i++){\n"
3643 "               if((t->aggr_table_hashmap[probe] & SLOT_FILLED) && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_HASH_BITS) == hash2 ){\n"
3644 ;
3645         if(gb_tbl->size()>0){
3646                 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
3647                 ret+="\t\tif(";
3648                 string rhs_op, lhs_op;
3649                 for(g=0;g<gb_tbl->size();g++){
3650                   if(g>0) ret += " && ";
3651                   ret += "(";
3652                   sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
3653                   sprintf(tmpstr,"t->aggr_table[probe].gb_var%d",g); rhs_op = tmpstr;
3654                   ret += generate_equality_test(lhs_op,rhs_op,gb_tbl->get_data_type(g));
3655                   ret += ")";
3656                 }
3657          }
3658          ret += "){\n"
3659 "                       match_found = 1;\n"
3660 "                       best_slot = probe;\n"
3661 "               }\n"
3662 "       }\n"
3663 "//             Rate slots in case no match found: prefer empty, then full but old slots\n"
3664 "       if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3665 "               if((t->aggr_table_hashmap[probe] & SLOT_FILLED)==0)\n"
3666 "                       best_slot = probe;\n"
3667 "               }else{\n"
3668 "                       if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) != gen_val){\n"
3669 "                               best_slot = probe;\n"
3670 "                       }\n"
3671 "               }\n"
3672 "               probe++;\n"
3673 "               if(probe >= t->max_aggrs)\n"
3674 "                       probe=0;\n"
3675 "       }\n"
3676 "       if(match_found){\n"
3677 ;
3678         ret += generate_gb_update(node_name, schema, "best_slot",has_udaf);
3679         ret +=
3680 "       }else{\n"
3681 "               if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3682 ;
3683 printf("sgah_qpn name is %s, disorder is %d\n",fs->node_name.c_str(),((sgah_qpn *)fs)->lfta_disorder);
3684         if(((sgah_qpn *)fs)->lfta_disorder <= 1){
3685                 ret +=
3686 "                       if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS)==gen_val){\n"
3687 "                               if((";
3688                 bool first_g = true;
3689                 for(int g=0;g<gb_tbl->size();g++){
3690                         data_type *gdt = gb_tbl->get_data_type(g);
3691                         if(gdt->is_temporal()){
3692                                 if(first_g) first_g = false; else ret+=" + ";
3693                                 ret += "(gb_attr_"+int_to_string(g)+" - t->aggr_table[best_slot].gb_var"+int_to_string(g)+")";
3694                         }
3695                 }
3696                 ret += ") == 0 ){\n";
3697
3698                 ret +=
3699 "                                       fta_aggr_flush_old_"+ node_name+"(f,t->max_aggrs);\n"
3700 "                               }\n"
3701 "                       }\n"
3702 ;
3703         }
3704
3705         ret += generate_tuple_from_aggr(node_name,schema,"best_slot");
3706         ret +=
3707 "\t\t\t#ifdef LFTA_STATS\n"
3708 "\t\t\tif((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val)\n"
3709 "\t\t\t\tt->collision_cnt++;\n\n"
3710 "\t\t\t#endif\n\n"
3711 "\t\t}\n"
3712 ;
3713         ret += generate_init_group(schema,"best_slot");
3714
3715
3716           ret += "\t}\n";
3717
3718         return ret;
3719 }
3720
3721
3722
3723 string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, bool is_wj, set<unsigned int> &s_pids){
3724
3725         string ret="static gs_retval_t accept_packet_"+node_name+
3726                 "(struct FTA *f, FTAID * ftaid, void *pkt, gs_int32_t sz){\n";
3727     ret += "\tstruct packet *p = (struct packet *)pkt;\n";
3728
3729   int a;
3730
3731 //                      Define all of the variables needed by this
3732 //                      procedure.
3733
3734
3735 //                      Gather all column references, need to define unpacking variables.
3736   int w,s;
3737   col_id_set cid_set;
3738   col_id_set::iterator csi;
3739
3740 //              If its a filter join, rebind all colrefs
3741 //              to the first range var, to avoid double unpacking.
3742
3743   if(is_fj){
3744     for(w=0;w<where.size();++w)
3745                 reset_pr_col_ids_tblvars(where[w]->pr, gb_tbl);
3746     for(s=0;s<sl_list.size();s++)
3747                 reset_se_col_ids_tblvars(sl_list[s], gb_tbl);
3748   }
3749
3750   for(w=0;w<where.size();++w){
3751         if(is_wj || is_fj || s_pids.count(w) == 0)
3752                 gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
3753   }
3754   for(s=0;s<sl_list.size();s++){
3755         gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
3756   }
3757
3758   int g;
3759   if(gb_tbl != NULL){
3760         for(g=0;g<gb_tbl->size();g++)
3761           gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
3762   }
3763
3764   //                    Variables for unpacking attributes.
3765   ret += "/*\t\tVariables for unpacking attributes\t*/\n";
3766   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
3767     int schref = (*csi).schema_ref;
3768         int tblref = (*csi).tblvar_ref;
3769     string field = (*csi).field;
3770     data_type dt(schema->get_type_name(schref,field));
3771     sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
3772         field.c_str(), tblref);
3773     ret += tmpstr;
3774   }
3775
3776   ret += "\n\n";
3777
3778 //                      Variables that are always needed
3779   ret += "/*\t\tVariables which are always needed\t*/\n";
3780   ret += "\tgs_retval_t retval;\n";
3781   ret += "\tgs_int32_t tuple_size, tuple_pos, lfta_bailout;\n";
3782   ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
3783
3784   ret+="\tstruct "+generate_fta_name(node_name)+" *t = (struct "+generate_fta_name(node_name)+"*) f;\n\n";
3785
3786
3787 //                      Variables needed for aggregation queries.
3788   if(is_aggr_query){
3789           ret += "\n/*\t\tVariables for aggregation\t*/\n";
3790           ret+="\tunsigned int i, probe;\n";
3791           ret+="\tunsigned int gen_val, match_found, best_slot;\n";
3792           ret+="\tgs_uint64_t hashval, hash2;\n";
3793 //                      Variables for storing group-by attribute values.
3794           if(gb_tbl->size() > 0)
3795                 ret += "/*\t\tGroup-by attributes\t*/\n";
3796           for(g=0;g<gb_tbl->size();g++){
3797                 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3798                 ret += tmpstr;
3799                 data_type *gdt = gb_tbl->get_data_type(g);
3800                 if(gdt->is_buffer_type()){
3801                   sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3802                   ret += tmpstr;
3803                 }
3804           }
3805           ret += "\n";
3806 //                      Temporaries for min/max
3807           string aggr_tmp_str = "";
3808           for(a=0;a<aggr_tbl->size();a++){
3809                 string aggr_op = aggr_tbl->get_op(a);
3810                 if(aggr_op == "MIN" || aggr_op == "MAX"){
3811                         sprintf(tmpstr,"\t%s aggr_tmp_%d;\n",aggr_tbl->get_data_type(a)->get_cvar_type().c_str(),a);
3812                         aggr_tmp_str.append(tmpstr);
3813                 }
3814           }
3815           if(aggr_tmp_str != ""){
3816                 ret += "/*\t\tTemp vars for BUFFER aggregates\t*/\n";
3817                 ret += aggr_tmp_str;
3818                 ret += "\n";
3819           }
3820 //              Variables for udaf output temporaries
3821         bool no_udaf = true;
3822         for(a=0;a<aggr_tbl->size();a++){
3823                 if(! aggr_tbl->is_builtin(a)){
3824                         if(no_udaf){
3825                                 ret+="/*\t\tUDAF output vars.\t*/\n";
3826                                 no_udaf = false;
3827                         }
3828                         int afcn_id = aggr_tbl->get_fcn_id(a);
3829                         data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
3830                         sprintf(tmpstr,"udaf_ret%d", a);
3831                         ret+="\t"+adt->make_cvar(tmpstr)+";\n";
3832                 }
3833         }
3834   }
3835
3836 //                      Variables needed for a filter join query
3837   if(fs->node_type() == "filter_join"){
3838         filter_join_qpn *fjq = (filter_join_qpn *)fs;
3839         bool uses_bloom = fjq->use_bloom;
3840         ret += "/*\t\tJoin fields\t*/\n";
3841         for(g=0;g<fjq->hash_eq.size();g++){
3842                 sprintf(tmpstr,"\t%s s_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_right_se()->get_data_type()->get_cvar_type().c_str(),g);
3843                 ret += tmpstr;
3844                 sprintf(tmpstr,"\t%s r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g);
3845                 ret += tmpstr;
3846           }
3847         if(uses_bloom){
3848                 ret +=
3849 "  /*           Variables for fj bloom filter   */ \n"
3850 "\tunsigned int i=0,j=0,k=0, b, bf_clean = 0, tmp_i, found; \n"
3851 "\tunsigned int bucket, bucket0, bucket1, bucket2;\n"
3852 "\tlong long int curr_fj_ts;\n"
3853 "\tlong long int curr_bin, the_bin;\n"
3854 "\n"
3855 ;
3856         }else{
3857                 ret +=
3858 "  /*           Variables for fj join table     */ \n"
3859 "\tunsigned int i, bucket, found; \n"
3860 "\tunsigned int bucket1, the_bucket;\n"
3861 "       long long int curr_fj_ts;\n"
3862 "\n"
3863 ;
3864         }
3865   }
3866
3867
3868   if(fs->node_type() == "watch_join"){
3869         watch_join_qpn *wlq = (watch_join_qpn *)fs;
3870         ret += "/*\t\tJoin fields\t*/\n";
3871         for(int k=0;k<wlq->key_flds.size(); ++k){
3872                 string kfld = wlq->key_flds[k];
3873                 ret += "\t"+wlq->hash_eq[kfld]->pr->get_right_se()->get_data_type()->get_cvar_type()+" s_equijoin_"+kfld+";\n";
3874                 ret += "\t"+wlq->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_cvar_type()+" r_equijoin_"+kfld+";\n";
3875           }
3876         ret +=
3877 "  /*           Variables for wl join table     */ \n"
3878 "\tunsigned int i, bucket;\n"
3879 "\tunsigned long long int hash; \n";
3880         string wl_schema = wlq->from[1]->get_schema_name();
3881         string wl_elem_str = generate_watchlist_element_name(wl_schema);
3882         ret += "\tstruct "+wl_elem_str+" *rec = NULL;\n";
3883 "\n"
3884 ;
3885   }
3886
3887
3888 //              Variables needed to store selected attributes of BUFFER type
3889 //              temporarily, in order to compute their size for storage
3890 //              in an output tuple.
3891
3892   string select_var_defs = "";
3893   for(int s=0;s<sl_list.size();s++){
3894         data_type *sdt = sl_list[s]->get_data_type();
3895         if(sdt->is_buffer_type()){
3896           sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
3897           select_var_defs.append(tmpstr);
3898         }
3899   }
3900   if(select_var_defs != ""){
3901         ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
3902     ret += select_var_defs;
3903   }
3904
3905 //              Variables to store results of partial functions.
3906   int p;
3907   if(partial_fcns.size()>0){
3908           ret += "/*\t\tVariables for storing results of partial functions. \t*/\n";
3909           for(p=0;p<partial_fcns.size();++p){
3910                 if(is_partial_fcn[p] || (!is_aggr_query && fcn_ref_cnt[p] >1)){
3911                   sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
3912                     partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
3913                   ret += tmpstr;
3914                   if(!is_aggr_query && fcn_ref_cnt[p] >1){
3915                         ret += "\tint fcn_ref_cnt_"+int_to_string(p)+" = 0;\n";
3916                   }
3917                 }
3918           }
3919
3920           if(is_aggr_query) ret += "\tint unpack_failed = 0;\n";
3921           ret += "\n";
3922   }
3923
3924 //              variable to hold packet struct  //
3925         if(packed_return){
3926                 ret += "\t struct "+node_name+"_input_struct *"+node_name+"_input_struct_var;\n";
3927         }
3928
3929
3930   ret += "\t#ifdef LFTA_STATS\n";
3931 // variable to store counter of cpu cycles spend in accept_tuple
3932         ret += "\tgs_uint64_t start_cycle = rdtsc();\n";
3933 // increment counter of received tuples
3934         ret += "\tt->in_tuple_cnt++;\n";
3935   ret += "\t#endif\n";
3936
3937
3938 //      -------------------------------------------------
3939 //              If the packet is "packet", test if its for this lfta,
3940 //              and if so load it into its struct
3941
3942         if(packed_return){
3943                 ret+="\n/*  packed tuple : test and load. \t*/\n";
3944                 ret+="\t"+node_name+"_input_struct_var = (struct "+node_name+"_input_struct *) pkt;\n";
3945                 ret+="\tif("+node_name+"_input_struct_var->__lfta_id_fm_nic__ != "+int_to_string(global_id) + ")\n";
3946                 ret+="\t\tgoto end;\n\n";
3947         }
3948
3949
3950
3951   col_id_set unpacked_cids;     //      Keep track of the cols that have been unpacked.
3952
3953   string temporal_flush;
3954   if(is_aggr_query)
3955         ret += generate_aggr_accept_prelim(fs, node_name, schema, unpacked_cids, temporal_flush);
3956   else {        // non-aggregation operators
3957
3958 // Unpack all the temporal attributes referenced in select clause
3959 // and update the last value of the attribute
3960         col_id_set temp_cids;           //      col ids of temp attributes in select clause
3961
3962         for(s=0;s<sl_list.size();s++){
3963                 data_type *sdt = sl_list[s]->get_data_type();
3964                 if (sdt->is_temporal()) {
3965                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
3966                 }
3967         }
3968 //                      If this is a filter join,
3969 //                      ensure that the temporal range field is unpacked.
3970         if(is_fj){
3971                 col_id window_var_cid(((filter_join_qpn *)fs)->temporal_var);
3972                 if(temp_cids.count(window_var_cid)==0)
3973                         temp_cids.insert(window_var_cid);
3974         }
3975
3976         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
3977                 if(unpacked_cids.count((*csi)) == 0){
3978                         int tblref = (*csi).tblvar_ref;
3979                         int schref = (*csi).schema_ref;
3980                         string field = (*csi).field;
3981                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3982                         sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
3983                         ret += tmpstr;
3984
3985                         unpacked_cids.insert( (*csi) );
3986                 }
3987         }
3988
3989   }
3990
3991   vector<cnf_elem *> filter = fs->get_filter_clause();
3992 //              Test the filter predicate (some query types have additional preds).
3993   if(filter.size() > 0 && !is_wj){      // watchlist join does specialized processing
3994
3995 //              Sort by evaluation cost.
3996 //              First, estimate evaluation costs
3997 //              Eliminate predicates covered by the prefilter (those in s_pids).
3998 //              I need to do it before the sort becuase the indices refer
3999 //              to the position in the unsorted list./
4000         vector<cnf_elem *> tmp_wh;
4001         for(w=0;w<filter.size();++w){
4002                 if(s_pids.count(w) == 0){
4003                         compute_cnf_cost(filter[w],Ext_fcns);
4004                         tmp_wh.push_back(filter[w]);
4005                 }
4006         }
4007         filter = tmp_wh;
4008
4009         sort(filter.begin(), filter.end(), compare_cnf_cost());
4010
4011 //              Now generate the predicates.
4012         for(w=0;w<filter.size();++w){
4013                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,filter[w]->cost);
4014                 ret += tmpstr;
4015 //                      Find the set of variables accessed in this CNF elem,
4016 //                      but in no previous element.
4017                 col_id_set this_pred_cids;
4018                 gather_pr_col_ids(filter[w]->pr, this_pred_cids, gb_tbl);
4019                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4020                         if(unpacked_cids.count( (*csi) ) == 0){
4021                         int tblref = (*csi).tblvar_ref;
4022                         int schref = (*csi).schema_ref;
4023                         string field = (*csi).field;
4024                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4025                                 unpacked_cids.insert( (*csi) );
4026                         }
4027                 }
4028 //                      Find partial fcns ref'd in this cnf element
4029                 set<int> pfcn_refs;
4030                 collect_partial_fcns_pr(filter[w]->pr, pfcn_refs);
4031 //                      Since set<..> is a "Sorted Associative Container",
4032 //                      we can walk through it in sorted order by walking from
4033 //                      begin() to end().  (and the partial fcns must be
4034 //                      evaluated in this order).
4035                 set<int>::iterator si;
4036                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
4037                         if(fcn_ref_cnt[(*si)] > 1){
4038                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
4039                         }
4040                         if(is_partial_fcn[(*si)]){
4041                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
4042                                 ret += "\t\tif(retval) goto end;\n";
4043                         }
4044                         if(fcn_ref_cnt[(*si)] > 1){
4045                                 if(!is_partial_fcn[(*si)]){
4046                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
4047                                 }
4048                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
4049                                 ret += "\t}\n";
4050                         }
4051                 }
4052
4053                 ret += "\tif( !("+generate_predicate_code(filter[w]->pr,schema)+
4054                                 ") ) goto end;\n";
4055         }
4056   }else{
4057           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
4058   }
4059
4060
4061 //                      We've passed the WHERE clause,
4062 //                      unpack the remainder of the accessed fields.
4063   if(is_fj){
4064         ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
4065         vector<cnf_elem *> h_eq = ((filter_join_qpn *)fs)-> hash_eq;
4066         for(w=0;w<h_eq.size();++w){
4067                 col_id_set this_pred_cids;
4068                 gather_pr_col_ids(h_eq[w]->pr, this_pred_cids, gb_tbl);
4069                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4070                         if(unpacked_cids.count( (*csi) ) == 0){
4071                                 int tblref = (*csi).tblvar_ref;
4072                                 int schref = (*csi).schema_ref;
4073                                 string field = (*csi).field;
4074                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4075                                 unpacked_cids.insert( (*csi) );
4076                         }
4077                 }
4078         }
4079   }else if(is_wj){              // STOPPED HERE move this to wj main body
4080 /*
4081         ret += "\n//\tPassed the WHERE clause, unpack the hash fields. \n";
4082         map<string, cnf_elem *> h_eq = ((watch_join_qpn *)fs)-> hash_eq;
4083         vector<string> kflds = ((watch_join_qpn *)fs)->key_flds;
4084         for(w=0;w<kflds.size();++w){
4085                 string kfld = kflds[w];
4086                 col_id_set this_pred_cids;
4087                 gather_pr_col_ids(h_eq[kfld]->pr, this_pred_cids, gb_tbl);
4088                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4089                         if(unpacked_cids.count( (*csi) ) == 0){
4090                                 int tblref = (*csi).tblvar_ref;
4091                                 int schref = (*csi).schema_ref;
4092                                 string field = (*csi).field;
4093                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4094                                 unpacked_cids.insert( (*csi) );
4095                         }
4096                 }
4097         }
4098 */
4099   }else{
4100           ret += "\n/*\tPassed the WHERE clause, unpack the rest of the accessed fields. */\n";
4101
4102           for(csi=cid_set.begin();csi!=cid_set.end();++csi){
4103                 if(unpacked_cids.count( (*csi) ) == 0){
4104                         int schref = (*csi).schema_ref;
4105                         int tblref = (*csi).tblvar_ref;
4106                         string field = (*csi).field;
4107                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4108                         unpacked_cids.insert( (*csi) );
4109                 }
4110           }
4111   }
4112
4113
4114 //////////////////
4115 //////////////////      After this, the query types
4116 //////////////////      are processed differently.
4117
4118   if(!is_aggr_query && !is_fj & !is_wj)
4119         ret += generate_sel_accept_body(fs, node_name, schema);
4120   else if(is_aggr_query)
4121         ret += generate_aggr_accept_body(fs, node_name, schema, temporal_flush);
4122   else{
4123         if(is_fj)
4124                 ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
4125         else
4126                 ret += generate_wj_accept_body((watch_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
4127   }
4128         
4129
4130
4131 //              Finish up.
4132
4133    ret += "\n\tend:\n";
4134   ret += "\t#ifdef LFTA_STATS\n";
4135         ret+= "\tt->cycle_cnt += rdtsc() - start_cycle;\n";
4136   ret += "\t#endif\n";
4137    ret += "\n\treturn 1;\n}\n\n";
4138
4139         return(ret);
4140 }
4141
4142
4143 string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, bool is_aggr_query, bool is_fj, bool uses_bloom){
4144         int g, cl;
4145
4146         string ret = "struct FTA * "+generate_alloc_name(node_name) +
4147            "(struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command,  gs_int32_t sz, void *value){\n";
4148
4149         ret+="\tstruct "+generate_fta_name(node_name)+"* f;\n";
4150         ret+="\tint i;\n";
4151         ret += "\n";
4152         ret+="\tif((f=fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n";
4153
4154 //                              assign a streamid to fta instance
4155         ret+="\t/* assign a streamid */\n";
4156         ret+="\tf->f.ftaid = ftaid;\n";
4157         ret+="\tf->f.ftaid.streamid = (gs_p_t)f;\n";
4158         ret+="\tgslog(LOG_INFO,\"Lfta "+node_name+" has FTAID {ip=%u,port=%u,index=%u,streamid=%u}\\n\",f->f.ftaid.ip,f->f.ftaid.port,f->f.ftaid.index,f->f.ftaid.streamid);\n";
4159
4160         if(is_aggr_query){
4161                 ret += "\tf->n_aggrs = 0;\n";
4162
4163                 ret += "\tf->max_aggrs = ";
4164
4165 //                              Computing the number of aggregate blocks is a little
4166 //                              tricky.  If there are no GB attrs, or if all GB attrs
4167 //                              are temporal, then use a single aggregate block, else
4168 //                              use a default value (10).  A user specification overrides
4169 //                              this logic.
4170                 bool single_group = true;
4171                 for(g=0;g<gb_tbl->size();g++){
4172                         data_type *gdt = gb_tbl->get_data_type(g);
4173                         if(! gdt->is_temporal() ){
4174                                 single_group = false;
4175                         }
4176                 }
4177                 string max_aggr_str = fs->get_val_of_def("aggregate_slots");
4178                 int max_aggr_i = atoi(max_aggr_str.c_str());
4179                 if(max_aggr_i <= 0){
4180                         if(single_group)
4181                                 ret += "2";
4182                         else
4183                                 ret += int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
4184                 }else{
4185                         unsigned int naggrs = 1;                // make it power of 2
4186                         unsigned int nones = 0;
4187                         while(max_aggr_i){
4188                                 if(max_aggr_i&1)
4189                                         nones++;
4190                                 naggrs = naggrs << 1;
4191                                 max_aggr_i = max_aggr_i >> 1;
4192                         }
4193                         if(nones==1)            // in case it was already a power of 2.
4194                                 naggrs/=2;
4195                         ret += int_to_string(naggrs);
4196                 }
4197                 ret += ";\n";
4198
4199                 ret+="\tif ((f->aggr_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n";
4200                 ret+="\t\treturn(0);\n";
4201                 ret+="\t}\n\n";
4202 //              ret+="/* compute how many integers we need to store the hashmap */\n";
4203 //              ret+="\tf->bitmap_size = (f->max_aggrs % (sizeof(gs_uint32_t) * 4)) ? (f->max_aggrs / (sizeof(gs_uint32_t) * 4) + 1) : (f->max_aggrs / (sizeof(gs_uint32_t) * 4));\n\n";
4204                 ret+="\tif ((f->aggr_table_hashmap = sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n";
4205                 ret+="\t\treturn(0);\n";
4206                 ret+="\t}\n";
4207                 ret+="/*\t\tfill bitmap with zero \t*/\n";
4208                 ret+="\tfor (i = 0; i < f->max_aggrs; ++i)\n";
4209                 ret+="\t\tf->aggr_table_hashmap[i] = 0;\n";
4210                 ret+="\tf->generation=0;\n";
4211                 ret+="\tf->flush_pos = f->max_aggrs;\n";
4212
4213                 ret += "\tf->flush_ctr = 0;\n";
4214
4215         }
4216
4217         if(is_fj){
4218                 if(uses_bloom){
4219                         ret+="\tf->first_exec = 1;\n";
4220                         unsigned int n_bloom = 11;
4221                         string n_bloom_str = fs->get_val_of_def("num_bloom");
4222                         int tmp_n_bloom = atoi(n_bloom_str.c_str());
4223                         if(tmp_n_bloom>0)
4224                                 n_bloom = tmp_n_bloom+1;
4225
4226                         unsigned int window_len = ((filter_join_qpn *)fs)->temporal_range;
4227                         if(window_len < n_bloom){
4228                                 n_bloom = window_len+1;
4229                         }
4230
4231                         int bf_exp_size = 12;  // base-2 log of number of bits
4232                         string bloom_len_str = fs->get_val_of_def("bloom_size");
4233                         int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
4234                         if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
4235                                 bf_exp_size = tmp_bf_exp_size;
4236                         }
4237                         int bf_bit_size = 1 << 12;
4238                         int bf_byte_size = bf_bit_size / (8*sizeof(char));
4239
4240                         int bf_tot = n_bloom*bf_byte_size;
4241                         ret+="\tif ((f->bf_table = sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n";
4242                         ret+="\t\treturn(0);\n";
4243                         ret+="\t}\n";
4244                         ret +=
4245 "       for(i=0;i<"+int_to_string(bf_tot)+";i++)\n"
4246 "               f->bf_table[i] = 0;\n"
4247 ;
4248                 }else{
4249                         unsigned int ht_size = 4096;
4250                         string ht_size_s = fs->get_val_of_def("aggregate_slots");
4251                         int tmp_ht_size = atoi(ht_size_s.c_str());
4252                         if(tmp_ht_size > 1024){
4253                                 unsigned int hs = 1;            // make it power of 2
4254                                 while(tmp_ht_size){
4255                                         hs =hs << 1;
4256                                         tmp_ht_size = tmp_ht_size >> 1;
4257                                 }
4258                                 ht_size = hs;
4259                         }
4260                         ret+="\tif ((f->join_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n";
4261                         ret+="\t\treturn(0);\n";
4262                         ret+="\t}\n\n";
4263                         ret +=
4264 "       for(i=0;i<"+int_to_string(ht_size)+";i++)\n"
4265 "               f->join_table[i].ts = 0;\n"
4266 ;
4267                 }
4268         }
4269
4270 //                      Initialize the complex literals (which might be handles).
4271
4272         for(cl=0;cl<complex_literals->size();cl++){
4273                 literal_t *l = complex_literals->get_literal(cl);
4274 //              sprintf(tmpstr,"\tf->complex_literal_%d = ",cl);
4275 //              ret += tmpstr + l->to_C_code() + ";\n";
4276                 sprintf(tmpstr,"&(f->complex_literal_%d)",cl);
4277                 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
4278         }
4279
4280         ret += "\n";
4281
4282 //                      Initialize the last seen values of temporal attributes to min(max) value of
4283 //                      their respective type
4284 //                      Create places to hold the last values of temporal attributes referenced in select clause
4285
4286
4287         col_id_set temp_cids;           //      col ids of temp attributes in select clause
4288
4289         int s;
4290         col_id_set::iterator csi;
4291
4292         for(s=0;s<sl_list.size();s++){
4293                 data_type *sdt = sl_list[s]->get_data_type();
4294                 if (sdt->is_temporal()) {
4295                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
4296                 }
4297         }
4298
4299         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
4300                 int tblref = (*csi).tblvar_ref;
4301                 int schref = (*csi).schema_ref;
4302                 string field = (*csi).field;
4303                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
4304                 if (dt.is_increasing()) {
4305                         sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_min_literal().c_str());
4306                         ret += tmpstr;
4307                 } else if (dt.is_decreasing()) {
4308                         sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_max_literal().c_str());
4309                         ret += tmpstr;
4310                 }
4311         }
4312
4313 //      initialize last seen values of temporal groubpy variables
4314         if(is_aggr_query){
4315                 for(g=0;g<gb_tbl->size();g++){
4316                         data_type *dt = gb_tbl->get_data_type(g);
4317                         if(dt->is_temporal()){
4318 /*
4319                                 fprintf(stderr,"group by attribute %s is temporal, ",
4320                                                 gb_tbl->get_name(g).c_str());
4321 */
4322                                 if(dt->is_increasing()){
4323                                         sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_min_literal().c_str());
4324                                 }else{
4325                                         sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_max_literal().c_str());
4326                                 }
4327                                 ret += tmpstr;
4328                         }
4329                 }
4330         }
4331
4332         ret += "\tf->f.alloc_fta="+generate_alloc_name(node_name)+";\n";
4333         ret+="\tf->f.free_fta=free_fta_"+node_name+";\n";
4334         ret+="\tf->f.control_fta=control_fta_"+node_name+";\n";
4335         ret+="\tf->f.accept_packet=accept_packet_"+node_name+";\n";
4336         ret+="\tf->f.clock_fta=clock_fta_"+node_name+";\n\n";
4337
4338 //                      Initialize runtime stats
4339         ret+="\tf->in_tuple_cnt = 0;\n";
4340         ret+="\tf->out_tuple_cnt = 0;\n";
4341         ret+="\tf->out_tuple_sz = 0;\n";
4342         ret+="\tf->accepted_tuple_cnt = 0;\n";
4343         ret+="\tf->cycle_cnt = 0;\n";
4344         ret+="\tf->collision_cnt = 0;\n";
4345         ret+="\tf->eviction_cnt = 0;\n";
4346         ret+="\tf->sampling_rate = 1.0;\n";
4347
4348         ret+="\tf->trace_id = 0;\n\n";
4349     if(param_tbl->size() > 0){
4350         ret+=
4351 "\tif(load_params_"+node_name+"(f, sz, value, 1)){\n"
4352 "#ifndef LFTA_IN_NIC\n"
4353 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small (%d). This query does not have valid parameters, bailing out.\\n\",sz);\n"
4354 "#else\n"
4355 "\t\t}\n"
4356 "#endif\n"
4357 "\t\t\treturn 0;\n"
4358 "\t\t}\n";
4359         }
4360
4361 //                      Register the pass-by-handle parameters
4362     int ph;
4363     for(ph=0;ph<param_handle_table.size();++ph){
4364                 data_type pdt(param_handle_table[ph]->type_name);
4365                 sprintf(tmpstr,"\tf->handle_param_%d = %s((struct FTA *)f,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
4366                 switch(param_handle_table[ph]->val_type){
4367                 case cplx_lit_e:
4368                         ret += tmpstr;
4369                         if(pdt.is_buffer_type()) ret += "&(";
4370                         sprintf(tmpstr,"f->complex_literal_%d",param_handle_table[ph]->complex_literal_idx);
4371                         ret += tmpstr ;
4372                         if(pdt.is_buffer_type()) ret += ")";
4373                         ret +=  ");\n";
4374                         break;
4375                 case litval_e:
4376 //                              not complex, no constructor
4377                         ret += tmpstr;
4378                         ret += param_handle_table[ph]->litval->to_C_code("") + ");\n";
4379                         break;
4380                 case param_e:
4381 //                              query parameter handles are regstered/deregistered in the
4382 //                              load_params function.
4383 //                      ret += "t->param_"+param_handle_table[ph]->param_name;
4384                         break;
4385                 default:
4386                         fprintf(stderr, "INTERNAL ERROR unknown case found when processing pass-by-handle parameter table.\n");
4387                         exit(1);
4388                 }
4389         }
4390
4391         ret += "\treturn (struct FTA *) f;\n";
4392         ret += "}\n\n";
4393
4394         return(ret);
4395 }
4396
4397
4398
4399
4400 //////////////////////////////////////////////////////////////////
4401
4402 string generate_lfta_block(qp_node *fs, table_list *schema, int gid,
4403 //              map<string,string> &int_fcn_defs,
4404                 ext_fcn_list *Ext_fcns, string &schema_embed_str, ifq_t *ifdb, nic_property *nicp, set<unsigned int> &s_pids){
4405         bool is_aggr_query;
4406         int s,p,g;
4407         string retval;
4408
4409 /////////////////////////////////////////////////////////////
4410 ///             Do operator-generic processing, such as
4411 ///             gathering the set of referenced columns,
4412 ///             generating structures, etc.
4413
4414 //              Initialize globals to empty.
4415         gb_tbl = NULL; aggr_tbl = NULL;
4416         global_id = -1; nicprop = NULL;
4417         param_tbl = fs->get_param_tbl();
4418         sl_list.clear(); where.clear();
4419         partial_fcns.clear();
4420         fcn_ref_cnt.clear(); is_partial_fcn.clear();
4421         pred_class.clear(); pred_pos.clear();
4422         sl_fcns_start=sl_fcns_end=wh_fcns_start=wh_fcns_end=0;
4423         gb_fcns_start=gb_fcns_end=ag_fcns_start=ag_fcns_end=0;
4424
4425
4426 //              Does the lfta read packed results from the NIC?
4427         nicprop = nicp;                 // load into global
4428         global_id = gid;
4429     packed_return = false;
4430         if(nicp && nicp->option_exists("Return")){
4431                 if(nicp->option_value("Return") == "Packed"){
4432                         packed_return = true;
4433                 }else{
4434                         fprintf(stderr,"Warning, nic option value of Return=%s is not recognized, ignoring\n",nicp->option_value("Return").c_str());
4435                 }
4436         }
4437
4438
4439 //                      Extract data which defines the query.
4440 //                              complex literals gathered now.
4441         complex_literals = fs->get_cplx_lit_tbl(Ext_fcns);
4442         param_handle_table = fs->get_handle_param_tbl(Ext_fcns);
4443         string node_name = fs->get_node_name();
4444     bool is_fj = false, uses_bloom = false;
4445         bool is_wj = false;
4446         bool is_watch_tbl = false;
4447
4448
4449         if(fs->node_type() == "spx_qpn"){
4450                 is_aggr_query = false;
4451                 spx_qpn *spx_node = (spx_qpn *)fs;
4452                 sl_list = spx_node->get_select_se_list();
4453                 where = spx_node->get_where_clause();
4454                 gb_tbl = NULL;
4455                 aggr_tbl = NULL;
4456         } else
4457         if(fs->node_type() == "sgah_qpn"){
4458                 is_aggr_query = true;
4459                 sgah_qpn *sgah_node = (sgah_qpn *)fs;
4460                 sl_list = sgah_node->get_select_se_list();
4461                 where = sgah_node->get_where_clause();
4462                 gb_tbl = sgah_node->get_gb_tbl();
4463                 aggr_tbl = sgah_node->get_aggr_tbl();
4464
4465                 if((sgah_node->get_having_clause()).size() > 0){
4466                         fprintf(stderr,"Warning in LFTA %s: HAVING clause will be ignored.\n", fs->get_node_name().c_str());
4467                 }
4468         } else
4469         if(fs->node_type() == "filter_join"){
4470                 is_aggr_query = false;
4471         is_fj = true;
4472                 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
4473                 sl_list = fj_node->get_select_se_list();
4474                 where = fj_node->get_where_clause();
4475                 uses_bloom = fj_node->use_bloom;
4476                 gb_tbl = NULL;
4477                 aggr_tbl = NULL;
4478         }else
4479         if(fs->node_type() == "watch_join"){
4480                 is_aggr_query = false;
4481         is_wj = true;
4482                 watch_join_qpn *wl_node = (watch_join_qpn *)fs;
4483                 sl_list = wl_node->get_select_se_list();
4484                 where = wl_node->get_where_clause();
4485                 gb_tbl = NULL;
4486                 aggr_tbl = NULL;
4487         }else
4488         if(fs->node_type() == "watch_tbl_qpn"){
4489                 is_aggr_query = false;
4490         is_watch_tbl = true;
4491                 vector<scalarexp_t *> empty_sl_list;
4492                 vector<cnf_elem *> empty_where;
4493                 sl_list = empty_sl_list;
4494                 where = empty_where;
4495                 gb_tbl = NULL;
4496                 aggr_tbl = NULL;
4497         } else {
4498                 fprintf(stderr,"INTERNAL ERROR, unrecognized node type %s in generate_lfta_block\n", fs->node_type().c_str());
4499                 exit(1);
4500         }
4501
4502 //                      Build list of "partial functions", by clause.
4503 //                      NOTE : partial fcns are not handles well.
4504 //                      The act of searching for them associates the fcn call
4505 //                      in the SE with an index to an array.  Refs to the
4506 //                      fcn value are replaced with refs to the variable they are
4507 //                      unpacked into.  A more general tagging mechanism would be better.
4508
4509         int i;
4510         vector<bool> *pfunc_ptr = NULL;
4511         vector<int> *ref_cnt_ptr = NULL;
4512         if(!is_aggr_query){             // don't collect cacheable fcns on aggr query.
4513                 ref_cnt_ptr = &fcn_ref_cnt;
4514                 pfunc_ptr = &is_partial_fcn;
4515         }
4516
4517         sl_fcns_start = 0;
4518         for(i=0;i<sl_list.size();i++){
4519                 find_partial_fcns(sl_list[i], &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
4520         }
4521         wh_fcns_start = sl_fcns_end = partial_fcns.size();
4522         for(i=0;i<where.size();i++){
4523                 find_partial_fcns_pr(where[i]->pr, &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
4524         }
4525         gb_fcns_start = wh_fcns_end = partial_fcns.size();
4526         if(gb_tbl != NULL){
4527                 for(i=0;i<gb_tbl->size();i++){
4528                         find_partial_fcns(gb_tbl->get_def(i), &partial_fcns, NULL, NULL, Ext_fcns);
4529                 }
4530         }
4531         ag_fcns_start = gb_fcns_end = partial_fcns.size();
4532         if(aggr_tbl != NULL){
4533                 for(i=0;i<aggr_tbl->size();i++){
4534                         find_partial_fcns(aggr_tbl->get_aggr_se(i), NULL, NULL, &is_partial_fcn, Ext_fcns);
4535                 }
4536         }
4537         ag_fcns_end = partial_fcns.size();
4538
4539 //              Fill up the is_partial_fcn and fcn_ref_cnt arrays.
4540         if(is_aggr_query){
4541                 for(i=0; i<partial_fcns.size();i++){
4542                         fcn_ref_cnt.push_back(1);
4543                         is_partial_fcn.push_back(true);
4544                 }
4545         }
4546
4547 //              Unmark non-partial expensive functions referenced only once.
4548         for(i=0; i<partial_fcns.size();i++){
4549                 if(!is_partial_fcn[i] && fcn_ref_cnt[i] <= 1){
4550                         partial_fcns[i]->set_partial_ref(-1);
4551                 }
4552         }
4553
4554         node_name = normalize_name(node_name);
4555
4556         retval += generate_preamble(schema, /*int_fcn_defs,*/ node_name, schema_embed_str);
4557
4558         if(packed_return){              // generate unpack struct
4559                 vector<tablevar_t *> input_tbls = fs->get_input_tbls();
4560                 int schref = input_tbls[0]->get_schema_ref();
4561                 vector<string> refd_cols;
4562                 for(s=0;s<sl_list.size();++s){
4563                         gather_nicsafe_cols(sl_list[s],refd_cols, nicp, gb_tbl);
4564                 }
4565                 for(p=0;p<where.size();++p){
4566 //                              I'm not disabling these preds ...
4567                         gather_nicsafe_cols(where[p]->pr,refd_cols, nicp, gb_tbl);
4568                 }
4569                 if(gb_tbl){
4570                         for(g=0;g<gb_tbl->size();++g){
4571                           gather_nicsafe_cols(gb_tbl->get_def(g),refd_cols, nicp, gb_tbl);
4572                         }
4573                 }
4574                 sort(refd_cols.begin(), refd_cols.end());
4575                 retval += "struct "+node_name+"_input_struct{\n";
4576                 retval += "\tint __lfta_id_fm_nic__;\n";
4577                 int vsi;
4578                 for(vsi=0;vsi<refd_cols.size();++vsi){
4579                 data_type dt(schema->get_type_name(schref,refd_cols[vsi]));
4580                         retval+="\t"+dt.get_cvar_type()+" unpack_var_"+refd_cols[vsi]+";\n";
4581                 }
4582                 retval+="};\n\n";
4583         }
4584
4585
4586 /////////////////////////////////////////////////////
4587 //                      Common stuff unpacked, do some generation
4588
4589
4590         if(is_aggr_query)
4591           retval += generate_aggr_struct(node_name, gb_tbl, aggr_tbl);
4592         if(is_fj)
4593                 retval += generate_fj_struct((filter_join_qpn *)fs, node_name);
4594         if(is_watch_tbl){
4595                 retval += "\n\n// watchtable code here \n\n";
4596                 watch_tbl_qpn *wl_node = (watch_tbl_qpn *)fs;
4597                 retval += generate_watchlist_structs(node_name, wl_node->table_layout, wl_node->filename, wl_node->refresh_interval);
4598                 retval += generate_watchlist_load(node_name, wl_node->table_layout, wl_node->key_flds);
4599         }
4600         
4601         if(! is_watch_tbl){
4602                 retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, is_wj, uses_bloom, schema);
4603                 retval += generate_tuple_struct(node_name, sl_list) ;
4604
4605                 if(is_aggr_query)
4606                         retval += generate_fta_flush(node_name, schema, Ext_fcns) ;
4607                 if(param_tbl->size() > 0)
4608                         retval += generate_fta_load_params(node_name) ;
4609                 retval += generate_fta_free(node_name, is_aggr_query) ;
4610                 retval +=  generate_fta_control(node_name, schema, is_aggr_query) ;
4611                 retval +=  generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, is_wj, s_pids) ;
4612
4613         /* extract the value of Time_Correlation from interface definition */
4614                 int e,v;
4615                 string es;
4616                 unsigned time_corr;
4617                 vector<tablevar_t *> tvec =  fs->get_input_tbls();
4618                 vector<string> time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es);
4619                 if (time_corr_vec.empty())
4620                         time_corr = DEFAULT_TIME_CORR;
4621                 else
4622                         time_corr = atoi(time_corr_vec[0].c_str());
4623
4624                 retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query, is_wj) );
4625                 retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) );
4626         }
4627
4628   return(retval);
4629 }
4630
4631
4632
4633 int compute_snap_len(qp_node *fs, table_list *schema){
4634
4635 //              Initialize global vars
4636         gb_tbl = NULL;
4637         sl_list.clear(); where.clear();
4638
4639         
4640         if(fs->node_type() == "watch_tbl_qpn"){
4641                 return -1;
4642         }
4643
4644         if(fs->node_type() == "spx_qpn"){
4645                 spx_qpn *spx_node = (spx_qpn *)fs;
4646                 sl_list = spx_node->get_select_se_list();
4647                 where = spx_node->get_where_clause();
4648         }
4649         else if(fs->node_type() == "sgah_qpn"){
4650                 sgah_qpn *sgah_node = (sgah_qpn *)fs;
4651                 sl_list = sgah_node->get_select_se_list();
4652                 where = sgah_node->get_where_clause();
4653                 gb_tbl = sgah_node->get_gb_tbl();
4654         }
4655         else if(fs->node_type() == "filter_join"){
4656                 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
4657                 sl_list = fj_node->get_select_se_list();
4658                 where = fj_node->get_where_clause();
4659         }
4660         else if(fs->node_type() == "watch_join"){
4661                 watch_join_qpn *fj_node = (watch_join_qpn *)fs;
4662                 sl_list = fj_node->get_select_se_list();
4663                 where = fj_node->get_where_clause();
4664         } else{
4665                 fprintf(stderr,"INTERNAL ERROR, node type %s not recognized in compute_snap_len\n",fs->node_type().c_str());
4666                 exit(1);
4667         }
4668
4669 //                      Gather all column references, need to define unpacking variables.
4670   int w,s;
4671   col_id_set cid_set;
4672   col_id_set::iterator csi;
4673
4674   for(w=0;w<where.size();++w)
4675         gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
4676   for(s=0;s<sl_list.size();s++){
4677         gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
4678   }
4679
4680   int g;
4681   if(gb_tbl != NULL){
4682         for(g=0;g<gb_tbl->size();g++)
4683           gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
4684   }
4685
4686   //                    compute snap length
4687   int snap_len = -1;
4688   int n_snap=0;
4689   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4690     int schref = (*csi).schema_ref;
4691         int tblref = (*csi).tblvar_ref;
4692     string field = (*csi).field;
4693
4694         param_list *field_params = schema->get_modifier_list(schref, field);
4695         if(field_params->contains_key("snap_len")){
4696                 string fld_snap_str = field_params->val_of("snap_len");
4697                 int fld_snap;
4698                 if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
4699                         if(fld_snap > snap_len) snap_len = fld_snap;
4700                         n_snap++;
4701                 }else{
4702                         fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
4703                 }
4704         }
4705   }
4706
4707   if(n_snap == cid_set.size()){
4708         return (snap_len);
4709   }else{
4710         return -1;
4711   }
4712
4713
4714 }
4715
4716 //              Function which computes an optimal
4717 //              set of unpacking functions.
4718
4719 void find_optimal_unpack_fcns(col_id_set &upref_cids, table_list *Schema, map<col_id, string,lt_col_id> &ucol_fcn_map){
4720         map<string, int> pfcn_count;
4721         map<string, int>::iterator msii;
4722         col_id_set::iterator cisi;
4723         set<string>::iterator ssi;
4724         string best_fcn;
4725
4726         while(ucol_fcn_map.size() < upref_cids.size()){
4727
4728 //                      Gather unpack functions referenced by unaccounted-for
4729 //                      columns, and increment their reference count.
4730                 pfcn_count.clear();
4731                 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
4732                         if(ucol_fcn_map.count((*cisi)) == 0){
4733                                 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
4734                                 for(ssi=ufcns.begin();ssi!=ufcns.end();++ssi)
4735                                         pfcn_count[(*ssi)]++;
4736                         }
4737                 }
4738
4739 //              Get the lowest cost per field function.
4740                 float min_cost = 0.0;
4741                 string best_fcn = "";
4742                 for(msii=pfcn_count.begin();msii!=pfcn_count.end();++msii){
4743                         int fcost = Schema->get_ufcn_cost((*msii).first);
4744                         if(fcost < 0){
4745                                 fprintf(stderr,"CONFIGURATION ERROR, unpack function %s either has negative cost or is not defined.\n",(*msii).first.c_str());
4746                                 exit(1);
4747                         }
4748                         float this_cost = (1.0*fcost)/(*msii).second;
4749                         if(msii == pfcn_count.begin() || this_cost < min_cost){
4750                                 min_cost = this_cost;
4751                                 best_fcn = (*msii).first;
4752                         }
4753                 }
4754                 if(best_fcn == ""){
4755                         fprintf(stderr,"ERROR, could not find a best field unpqacking function.\n");
4756                         exit(1);
4757                 }
4758
4759 //              Assign this function to the unassigned fcns which use it.
4760                 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
4761                         if(ucol_fcn_map.count((*cisi)) == 0){
4762                                 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
4763                                 if(ufcns.count(best_fcn)>0)
4764                                         ucol_fcn_map[(*cisi)] = best_fcn;
4765                         }
4766                 }
4767         }
4768 }
4769
4770
4771
4772 //              Generate an initial test test for the lfta
4773 //              Assume that the predicate references no external functions,
4774 //              and especially no partial functions,
4775 //              aggregates, internal functions.
4776 string generate_lfta_prefilter(vector<cnf_set *> &pred_list,
4777                 col_id_set &temp_cids, table_list *Schema, ext_fcn_list *Ext_fcns,
4778                 vector<col_id_set> &lfta_cols, vector<long long int> &lfta_sigs,
4779                 vector<int> &lfta_snap_lens, string iface){
4780   col_id_set tmp_cid_set, cid_set,upref_cids,upall_cids;
4781   col_id_set::iterator csi;
4782         int o,p,q;
4783         string ret;
4784
4785 //              Gather complex literals in the prefilter.
4786         cplx_lit_table *complex_literals = new cplx_lit_table();
4787         for(p=0;p<pred_list.size();++p){
4788                 find_complex_literal_pr(pred_list[p]->pr,Ext_fcns, complex_literals);
4789         }
4790
4791
4792 //              Find the combinable predicates
4793         vector<predicate_t *> pr_list;
4794         for(p=0;p<pred_list.size();++p){
4795         find_combinable_preds(pred_list[p]->pr,&pr_list, Schema, Ext_fcns);
4796         }
4797
4798 //              Analyze the combinable predicates to find the predicate classes.
4799         pred_class.clear();             // idx to equiv pred in equiv_list
4800         pred_pos.clear();               // idx to returned bitmask.
4801         vector<predicate_t *> equiv_list;
4802         vector<int> num_equiv;
4803
4804
4805         for(p=0;p<pr_list.size();++p){
4806                 for(q=0;q<equiv_list.size();++q){
4807                         if(is_equivalent_class_pred_base(equiv_list[q],pr_list[p],Schema,Ext_fcns))
4808                                 break;
4809                 }
4810                 if(q == equiv_list.size()){             // no equiv : create new
4811                         pred_class.push_back(equiv_list.size());
4812                         equiv_list.push_back(pr_list[p]);
4813                         pred_pos.push_back(0);
4814                         num_equiv.push_back(1);
4815
4816                 }else{                  // pr_list[p] is equivalent to pred q
4817                         pred_class.push_back(q);
4818                         pred_pos.push_back(num_equiv[q]);
4819                         num_equiv[q]++;
4820                 }
4821         }
4822
4823 //              Generate the variables which hold the common pred handles
4824         ret += "/*\t\tprefilter global vars.\t*/\n";
4825         for(q=0;q<equiv_list.size();++q){
4826                 for(p=0;p<=(num_equiv[q]/32);++p){
4827                         ret += "void *pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface+";\n";
4828                 }
4829         }
4830
4831 //              Struct to hold prefilter complex literals
4832         ret += "struct prefilter_complex_lit_struct_"+iface+" {\n";
4833         if(complex_literals->size() == 0)
4834                 ret += "\tint no_variable;\n";
4835         int cl;
4836         for(cl=0;cl<complex_literals->size();cl++){
4837                 literal_t *l = complex_literals->get_literal(cl);
4838                 data_type *dtl = new data_type( l->get_type() );
4839                 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
4840                 ret += tmpstr;
4841         }
4842         ret += "} prefilter_complex_lits_"+iface+";\n\n";
4843
4844
4845 //              Generate the prefilter initialziation code
4846         ret += "void init_lfta_prefilter_"+iface+"(){\n";
4847
4848 //              First initialize complex literals, if any.
4849         ret += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4850         for(cl=0;cl<complex_literals->size();cl++){
4851                 literal_t *l = complex_literals->get_literal(cl);
4852                 sprintf(tmpstr,"&(t->complex_literal_%d)",cl);
4853                 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
4854         }
4855
4856
4857         set<int> epred_seen;
4858         for(p=0;p<pr_list.size();++p){
4859                 int q = pred_class[p];
4860 //printf("\tq=%d\n",q);
4861                 if(epred_seen.count(q)>0){
4862                         ret += "\tregister_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4863                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4864                         vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4865                         for(o=0;o<op_list.size();++o){
4866                                 if(! cl_op[o]){
4867                                         ret += generate_se_code(op_list[o],Schema)+", ";
4868                                 }
4869                         }
4870                         ret += "pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+","+int_to_string(pred_pos[p]%32)+");\n";
4871                         epred_seen.insert(q);
4872                 }else{
4873                         ret += "\tpref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+" = (void *)register_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4874                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4875                         vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4876                         for(o=0;o<op_list.size();++o){
4877                                 if(! cl_op[o]){
4878                                         ret += generate_se_code(op_list[o],Schema)+", ";
4879                                 }
4880                         }
4881                         ret += "NULL,"+int_to_string(pred_pos[p]%32)+");\n";
4882                         epred_seen.insert(q);
4883                 }
4884         }
4885         ret += "}\n\n";
4886
4887
4888
4889 //              Start on main body code generation
4890   ret+="gs_uint64_t lfta_prefilter_"+iface+"(void *pkt){\n";
4891
4892
4893 ///--------------------------------------------------------------
4894 ///             Generate and store the prefilter body,
4895 ///             reuse it for the snap length calculator
4896 ///-------------------------------------------------------------
4897         string body;
4898
4899     body += "\tstruct packet *p = (struct packet *)pkt;\n";
4900
4901
4902
4903 //              Gather the colids to store unpacked variables.
4904         for(p=0;p<pred_list.size();++p){
4905         gather_pr_col_ids(pred_list[p]->pr,tmp_cid_set, gb_tbl);
4906         }
4907
4908 //              make the col_ids refer to the base tables, and
4909 //              grab the col_ids with at least one unpacking function.
4910         for(csi=tmp_cid_set.begin();csi!=tmp_cid_set.end();++csi){
4911                 string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
4912                 col_id tmp_col_id;
4913                 tmp_col_id.field = (*csi).field;
4914                 tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
4915                 tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
4916                 cid_set.insert(tmp_col_id);
4917                 field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
4918                 if(fe->get_unpack_fcns().size()>0)
4919                         upref_cids.insert(tmp_col_id);
4920
4921
4922         }
4923
4924 //              Find the set of unpacking programs needed for the
4925 //              prefilter fields.
4926         map<col_id, string,lt_col_id>  ucol_fcn_map;
4927         find_optimal_unpack_fcns(upref_cids, Schema, ucol_fcn_map);
4928         set<string> pref_ufcns;
4929         map<col_id, string,lt_col_id>::iterator mcis;
4930         for(mcis=ucol_fcn_map.begin(); mcis!=ucol_fcn_map.end(); mcis++){
4931                 pref_ufcns.insert((*mcis).second);
4932         }
4933
4934
4935
4936 //                      Variables for unpacking attributes.
4937     body += "/*\t\tVariables for unpacking attributes\t*/\n";
4938     for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4939       int schref = (*csi).schema_ref;
4940           int tblref = (*csi).tblvar_ref;
4941       string field = (*csi).field;
4942       data_type dt(Schema->get_type_name(schref,field));
4943       sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4944         field.c_str(), tblref);
4945       body += tmpstr;
4946       sprintf(tmpstr,"\tgs_retval_t ret_%s_%d;\n", field.c_str(),tblref);
4947       body += tmpstr;
4948     }
4949 //                      Variables for unpacking temporal attributes.
4950     body += "/*\t\tVariables for unpacking temporal attributes\t*/\n";
4951     for(csi=temp_cids.begin(); csi!=temp_cids.end();++csi){
4952           if (cid_set.count(*csi) == 0) {
4953         int schref = (*csi).schema_ref;
4954                 int tblref = (*csi).tblvar_ref;
4955         string field = (*csi).field;
4956         data_type dt(Schema->get_type_name(schref,field));
4957         sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4958           field.c_str(), tblref);
4959         body += tmpstr;
4960
4961           }
4962     }
4963     body += "\n\n";
4964
4965 //              Variables for combinable predicate evaluation
4966         body += "/*\t\tVariables for common prdicate evaluation\t*/\n";
4967         for(q=0;q<equiv_list.size();++q){
4968                 for(p=0;p<=(num_equiv[q]/32);++p){
4969                         body += "unsigned long int pref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = 0;\n";
4970                 }
4971         }
4972
4973
4974 //                      Variables that are always needed
4975     body += "/*\t\tVariables which are always needed\t*/\n";
4976         body += "\tgs_uint64_t retval=0, bitpos=1;\n";
4977         body += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4978
4979 //              Call the unpacking functions for the prefilter fields
4980         if(pref_ufcns.size() > 0)
4981                 body += "\n/*\t\tcall field unpacking functions\t*/\n";
4982         set<string>::iterator ssi;
4983         for(ssi=pref_ufcns.begin(); ssi!=pref_ufcns.end(); ++ssi){
4984                 body += "\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
4985         }
4986
4987
4988 //              Unpack the accessed attributes
4989         body += "\n/*\t\tUnpack the accessed attributes.\t*/\n";
4990     for(csi=cid_set.begin();csi!=cid_set.end();++csi){
4991           int tblref = (*csi).tblvar_ref;
4992       int schref = (*csi).schema_ref;
4993           string field = (*csi).field;
4994           sprintf(tmpstr,"\tret_%s_%d =  (%s(p, &unpack_var_%s_%d) == 0);\n",
4995                 field.c_str(),tblref,Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
4996           body += tmpstr;
4997     }
4998
4999 //              next unpack the temporal attributes and ignore the errors
5000 //              We are assuming here that failed unpack of temporal attributes
5001 //              is not going to overwrite the last stored value
5002 //              Failed upacks are ignored
5003     for(csi=temp_cids.begin();csi!=temp_cids.end();++csi){
5004           int tblref = (*csi).tblvar_ref;
5005       int schref = (*csi).schema_ref;
5006           string field = (*csi).field;
5007           sprintf(tmpstr,"\t%s(p, &prefilter_temp_vars.unpack_var_%s_%d);\n",
5008                  Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
5009           body += tmpstr;
5010     }
5011
5012 //              Evaluate the combinable predicates
5013         if(equiv_list.size()>0)
5014                 body += "/*\t\tEvaluate the combinable predicates.\t*/\n";
5015         for(q=0;q<equiv_list.size();++q){
5016                 for(p=0;p<=(num_equiv[q]/32);++p){
5017
5018 //              Only call the common eval fcn if all ref'd fields present.
5019                         col_id_set pred_cids;
5020                         col_id_set::iterator cpi;
5021                         gather_pr_col_ids(equiv_list[q], pred_cids, gb_tbl);
5022                         if(pred_cids.size()>0){
5023                         body += "\tif(";
5024                                 for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
5025                                         if(cpi != pred_cids.begin())
5026                                                 body += " && ";
5027                                 string field = (*cpi).field;
5028                                         int tblref = (*cpi).tblvar_ref;
5029                                         body += "ret_"+field+"_"+int_to_string(tblref);
5030                                 }
5031                                 body+=")\n";
5032                         }
5033
5034                         body += "\t\tpref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = eval_commonpred_"+equiv_list[q]->get_op()+"(pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface;
5035                         vector<scalarexp_t *> op_list = equiv_list[q]->get_op_list();
5036                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
5037                         for(o=0;o<op_list.size();++o){
5038                                 if(cl_op[o]){
5039                                         body += ","+generate_se_code(op_list[o],Schema);
5040                                 }
5041                         }
5042                         body += ");\n";
5043                 }
5044         }
5045
5046
5047         for(p=0;p<pred_list.size();++p){
5048                 col_id_set pred_cids;
5049                 col_id_set::iterator cpi;
5050                 gather_pr_col_ids(pred_list[p]->pr,pred_cids, gb_tbl);
5051                 if(pred_cids.size()>0){
5052                         body += "\tif(";
5053                         for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
5054                                 if(cpi != pred_cids.begin())
5055                                         body += " && ";
5056                         string field = (*cpi).field;
5057                                 int tblref = (*cpi).tblvar_ref;
5058                                 body += "ret_"+field+"_"+int_to_string(tblref);
5059                         }
5060                         body+=")\n";
5061                 }
5062         body += "\t\tif("+generate_predicate_code(pred_list[p]->pr,Schema)+")\n\t\t\tretval |= bitpos;\n";
5063                 body+="\tbitpos = bitpos << 1;\n";
5064         }
5065
5066 // ---------------------------------------------------------------
5067 //              Finished with the body of the prefilter
5068 // --------------------------------------------------------------
5069
5070         ret += body;
5071
5072 //                      Collect fields referenced by an lfta but not
5073 //                      already unpacked for the prefilter.
5074
5075 //printf("upref_cids is:\n");
5076 //for(csi=upref_cids.begin();csi!=upref_cids.end();csi++)
5077 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
5078 //printf("pref_ufcns is:\n");
5079 //for(ssi=pref_ufcns.begin();ssi!=pref_ufcns.end();++ssi)
5080 //printf("\t%s\n",(*ssi).c_str());
5081
5082         int l;
5083         for(l=0;l<lfta_cols.size();++l){
5084                 for(csi=lfta_cols[l].begin();csi!=lfta_cols[l].end();++csi){
5085                         string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
5086                         col_id tmp_col_id;
5087                         tmp_col_id.field = (*csi).field;
5088                         tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
5089                         tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
5090                         field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
5091                         set<string> fld_ufcns = fe->get_unpack_fcns();
5092 //printf("tmpcol is (%s, %d), ufcns size is %d, upref_cids cnt is %d\n",tmp_col_id.field.c_str(),tmp_col_id.schema_ref,fld_ufcns.size(),  upref_cids.count(tmp_col_id));
5093                         if(fld_ufcns.size()>0 && upref_cids.count(tmp_col_id) == 0){
5094 //              Ensure that this field not already unpacked.
5095                                 bool found = false;
5096                                 for(ssi=fld_ufcns.begin();ssi!=fld_ufcns.end();++ssi){
5097 //printf("\tField has unpacking fcn %s\n",(*ssi).c_str());
5098                                         if(pref_ufcns.count((*ssi))){
5099 //printf("Field already unpacked.\n");
5100                                                 found = true;;
5101                                         }
5102                                 }
5103                                 if(! found){
5104 //printf("\tadding to unpack list\n");
5105                                         upall_cids.insert(tmp_col_id);
5106                                 }
5107                         }
5108                 }
5109         }
5110
5111 //printf("upall_cids is:\n");
5112 //for(csi=upall_cids.begin();csi!=upall_cids.end();csi++)
5113 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
5114
5115 //              Get the set of unpacking programs for these.
5116         map<col_id, string,lt_col_id>  uall_fcn_map;
5117         find_optimal_unpack_fcns(upall_cids, Schema, uall_fcn_map);
5118         set<string> pall_ufcns;
5119         for(mcis=uall_fcn_map.begin(); mcis!=uall_fcn_map.end(); mcis++){
5120 //printf("uall_fcn_map[%s %d] = %s\n",(*mcis).first.field.c_str(),(*mcis).first.schema_ref,(*mcis).second.c_str());
5121                 pall_ufcns.insert((*mcis).second);
5122         }
5123
5124 //              Iterate through the remaining set of unpacking function
5125         if(pall_ufcns.size() > 0)
5126                 ret += "//\t\tcall all remaining field unpacking functions.\n";
5127         for(ssi=pall_ufcns.begin(); ssi!=pall_ufcns.end(); ++ssi){
5128 //              gather the set of columns unpacked by this ufcn
5129                 col_id_set fcol_set;
5130                 for(csi=upall_cids.begin();csi!=upall_cids.end();++csi){
5131                         if(uall_fcn_map[(*csi)] == (*ssi))
5132                                 fcol_set.insert((*csi));
5133                 }
5134
5135 //              gather the set of lftas which access a field unpacked by the fcn
5136                 set<long long int> clfta;
5137                 for(l=0;l<lfta_cols.size();l++){
5138                         for(csi=fcol_set.begin();csi!=fcol_set.end();++csi){
5139                                 if(lfta_cols[l].count((*csi)) > 0)
5140                                         break;
5141                         }
5142                         if(csi != fcol_set.end())
5143                                 clfta.insert(lfta_sigs[l]);
5144                 }
5145
5146 //              generate the unpacking code
5147                 ret += "\tif(";
5148                 set<long long int>::iterator sii;
5149                 for(sii=clfta.begin();sii!=clfta.end();++sii){
5150                         if(sii!=clfta.begin())
5151                                 ret += " || ";
5152                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sii),(*sii));
5153                         ret += tmpstr;
5154                 }
5155                 ret += ")\n\t\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
5156         }
5157
5158
5159     ret += "\treturn(retval);\n\n";
5160   ret += "}\n\n";
5161
5162
5163 // --------------------------------------------------------
5164 //              reuse prefilter body for snaplen calculator
5165 //
5166 //      This is dummy code, so I'm commenting it out.
5167
5168 /*
5169   ret+="gs_uint32_t lfta_pkt_snaplen(void *pkt){\n";
5170
5171         ret += body;
5172
5173         int i;
5174         vector<int> s_snaps = lfta_snap_lens;
5175         sort(s_snaps.begin(), s_snaps.end());
5176
5177         if(s_snaps[0] == -1){
5178                 set<unsigned long long int> sigset;
5179                 for(i=0;i<lfta_snap_lens.size();++i){
5180                         if(lfta_snap_lens[i] == -1){
5181                                 sigset.insert(lfta_sigs[i]);
5182                         }
5183                 }
5184                 ret += "\tif( ";
5185                 set<unsigned long long int>::iterator sulli;
5186                 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
5187                         if(sulli!=sigset.begin())
5188                                 ret += " || ";
5189                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
5190                         ret += tmpstr;
5191                 }
5192                 ret += ") return -1;\n";
5193         }
5194
5195         int nextpos = lfta_snap_lens.size()-1;
5196         int nextval = lfta_snap_lens[nextpos];
5197         while(nextval >= 0){
5198                 set<unsigned long long int> sigset;
5199                 for(i=0;i<lfta_snap_lens.size();++i){
5200                         if(lfta_snap_lens[i] == nextval){
5201                                 sigset.insert(lfta_sigs[i]);
5202                         }
5203                 }
5204                 ret += "\tif( ";
5205                 set<unsigned long long int>::iterator sulli;
5206                 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
5207                         if(sulli!=sigset.begin())
5208                                 ret += " || ";
5209                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
5210                         ret += tmpstr;
5211                 }
5212                 ret += ") return "+int_to_string(nextval)+";\n";
5213
5214                 for(nextpos--;nextpos>0 && lfta_snap_lens[nextpos] == nextval;nextpos--);
5215                 if(nextpos>0)
5216                         nextval = lfta_snap_lens[nextpos];
5217                 else
5218                         nextval = -1;
5219         }
5220         ret += "\treturn 0;\n";
5221         ret += "}\n\n";
5222 */
5223
5224
5225   return(ret);
5226 }
5227
5228
5229
5230
5231 //              Generate the struct which will store the the values of
5232 //              temporal attributesunpacked by prefilter
5233 string generate_lfta_prefilter_struct(col_id_set &cid_set, table_list *Schema) {
5234
5235   col_id_set::iterator csi;
5236
5237 // printf("generate_lfta_prefilter_struct : %d vars\n",cid_set.size());
5238
5239   string ret="struct prefilter_unpacked_temp_vars {\n";
5240   ret += "\t/*\tVariables for unpacking temporal attributes\t*/\n";
5241
5242   string init_code;
5243
5244   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
5245     int schref = (*csi).schema_ref;
5246     int tblref = (*csi).tblvar_ref;
5247     string field = (*csi).field;
5248         data_type dt(Schema->get_type_name(schref,field), Schema->get_modifier_list(schref,field));
5249     sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
5250         field.c_str(), tblref);
5251     ret += tmpstr;
5252
5253         if (init_code != "")
5254                 init_code += ", ";
5255         if (dt.is_increasing())
5256                 init_code += dt.get_min_literal();
5257         else
5258                 init_code += dt.get_max_literal();
5259
5260   }
5261   ret += "};\n\n";
5262
5263   ret += "struct prefilter_unpacked_temp_vars prefilter_temp_vars = {" + init_code + "};\n\n";
5264
5265   return(ret);
5266 }