Updates to examples queries. Improve real-timeness of lfta aggregation
[com/gs-lite.git] / src / ftacmp / generate_lfta_code.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #include <string>
17 #include <stdio.h>
18 #include <stdlib.h>
19 //#include <algo.h>
20 #include<algorithm>
21
22 #include "parse_fta.h"
23 #include "parse_schema.h"
24 #include "analyze_fta.h"
25 #include "generate_utils.h"
26 #include "query_plan.h"
27 #include "generate_lfta_code.h"
28 #include "generate_nic_code.h"
29
30 using namespace std;
31
32 extern int DEFAULT_LFTA_HASH_TABLE_SIZE;
33
34 // default value for correlation between the interface card and
35 // the system clock
36 #define DEFAULT_TIME_CORR 16
37
38
39 //      For fast hashing
40 //#define NRANDS 100
41 extern string hash_nums[NRANDS];
42 /*
43 = {
44 "12916008961267169387ull", "13447227858232756685ull",
45 "15651770379918602919ull", "1154671861688431608ull",
46 "6777078091984849858ull", "14217205709582564356ull",
47 "4955408621820609982ull", "15813680319165523695ull",
48 "9897969721407807129ull", "5799700135519793083ull",
49 "3446529189623437397ull", "2766403683465910630ull",
50 "3759321430908793328ull", "6569396511892890354ull",
51 "11124853911180290924ull", "17425412145238035549ull",
52 "6879931585355039943ull", "16598635011539670441ull",
53 "9615975578494811651ull", "4378135509538422740ull",
54 "741282195344332574ull", "17368612862906255584ull",
55 "17294299200556814618ull", "518343398779663051ull",
56 "3861893449302272757ull", "8951107288843549591ull",
57 "15785139392894559409ull", "5917810836789601602ull",
58 "16169988133001117004ull", "9792861259254509262ull",
59 "5089058010244872136ull", "2130075224835397689ull",
60 "844136788226150435ull", "1303298091153875333ull",
61 "3579898206894361183ull", "7529542662845336496ull",
62 "13151949992653382522ull", "2145333536541545660ull",
63 "11258221828939586934ull", "3741808146124570279ull",
64 "16272841626371307089ull", "12174572036188391283ull",
65 "9749343496254107661ull", "9141275584134508830ull",
66 "10134192232065698216ull", "12944268412561423018ull",
67 "17499725811865666340ull", "5281482378159088661ull",
68 "13254803486023572607ull", "4526762838498717025ull",
69 "15990846379668494011ull", "10680949816169027468ull",
70 "7116154096012931030ull", "5296740689865236632ull",
71 "5222427027515795922ull", "6893215299448261251ull",
72 "10164707755932877485ull", "15325979189512082255ull",
73 "3713267224148573289ull", "12292682741753167354ull",
74 "4098115959960163588ull", "16095675565885113990ull",
75 "11391590846210510720ull", "8432889531466002673ull",
76 "7146668520368482523ull", "7678169991822407997ull",
77 "9882712513525031447ull", "13904414563513869160ull",
78 "1080076724395768626ull", "8448147843172150388ull",
79 "17633093729608185134ull", "10044622457050142303ull",
80 "4128911859292425737ull", "30642269109444395ull",
81 "16124215396922640581ull", "15444089895060081110ull",
82 "16437006538696302944ull", "800338649777443426ull",
83 "5355794945275091932ull", "11656354278827687117ull",
84 "1110873718944691255ull", "10829576045617693977ull",
85 "3846916616884579955ull", "17055821716837625668ull",
86 "13418968402643535758ull", "11671612594828802128ull",
87 "11597298928184328586ull", "13196028510862205499ull",
88 "16539578557089782373ull", "3182048322921507591ull",
89 "10016080431267550241ull", "148751875162592690ull",
90 "10400930266590768572ull", "4023803397139127870ull",
91 "17766462746879108920ull", "14807761432134600873ull",
92 "13521540421053792403ull", "13980983198941385205ull",
93 "16257584414193564367ull", "1760484796451765024ull"
94 };
95 */
96
97
98 // ----------------------------------------------
99 //              Data extracted from the query plan node
100 //              for use by code generation.
101
102 static cplx_lit_table *complex_literals;  //Table of literals with constructors.
103 static vector<handle_param_tbl_entry *> param_handle_table;
104 static param_table *param_tbl;          // Table of all referenced parameters.
105
106 static vector<scalarexp_t *> sl_list;
107 static vector<cnf_elem *> where;
108
109 static gb_table *gb_tbl;                        // Table of all group-by attributes.
110 static aggregate_table *aggr_tbl;       // Table of all referenced aggregates.
111
112 static bool packed_return;              // unpack using structyure, not fcns
113 static nic_property *nicprop;   // nic properties for this interface.
114 static int global_id;
115
116
117 //              The partial_fcns vector can now refer to
118 //              partial functions, or expensive functions
119 //              which can be cached (if there are multiple refs).  A couple
120 //              of int vectors distinguish the cases.
121 static vector<scalarexp_t *> partial_fcns;
122 static vector<int> fcn_ref_cnt;
123 static vector<bool> is_partial_fcn;
124 int sl_fcns_start = 0, sl_fcns_end = 0;
125 int wh_fcns_start = 0, wh_fcns_end = 0;
126 int gb_fcns_start = 0, gb_fcns_end = 0;
127 int ag_fcns_start = 0, ag_fcns_end = 0;
128
129
130 //              These vectors are for combinable predicates.
131 static vector<int> pred_class;  //      identifies the group
132 static vector<int> pred_pos;    //  position in the group.
133
134
135
136 static char tmpstr[1000];
137
138 //////////////////////////////////////////////////////////////////////
139 ///                     Various utilities
140
141 string generate_fta_name(string node_name){
142         string ret = normalize_name(node_name);
143         if(ret == ""){
144                 ret = "default";
145         }
146         ret += "_fta";
147
148         return(ret);
149 }
150
151
152 string generate_aggr_struct_name(string node_name){
153         string ret = normalize_name(node_name);
154         if(ret == ""){
155                 ret = "default";
156         }
157         ret += "_aggr_struct";
158
159         return(ret);
160 }
161
162 string generate_fj_struct_name(string node_name){
163         string ret = normalize_name(node_name);
164         if(ret == ""){
165                 ret = "default";
166         }
167         ret += "_fj_struct";
168
169         return(ret);
170 }
171
172 string generate_watchlist_element_name(string node_name){
173         string ret = normalize_name(node_name);
174         if(ret == ""){
175                 ret = "default";
176         }
177         ret += "__wl_elem";
178
179         return(ret);
180 }
181
182 string generate_watchlist_struct_name(string node_name){
183         string ret = normalize_name(node_name);
184         if(ret == ""){
185                 ret = "default";
186         }
187         ret += "__wl_struct";
188
189         return(ret);
190 }
191
192 string generate_watchlist_name(string node_name){
193         string ret = normalize_name(node_name);
194         if(ret == ""){
195                 ret = "default";
196         }
197         ret += "__wl";
198
199         return(ret);
200 }
201
202 string generate_unpack_code(int tblref, int schref, string field, table_list *schema, string node_name, string end_goto = string("end")){
203         string ret;
204         if(! packed_return){
205                 sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",
206                         schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
207         ret += tmpstr;
208         if(!schema->get_modifier_list(schref,field)->contains_key("required"))
209                 ret += "\tif(retval) goto "+end_goto+";\n";
210
211         }else{
212 //                      TODO: ntoh xforms (aug 2010 : removing ntoh, hton)
213                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
214                 if(dt.is_buffer_type()){
215                         if(dt.get_type() != v_str_t){
216                           ret += "\tif(sizeof(struct "+node_name+"_input_struct)+"+node_name+"_input_struct_var->unpack_var_"+field+".length+int("+node_name+"_input_struct_var->unpack_var_"+field+".data) > sz)\n";
217                           ret += "\t\tgoto "+end_goto+";\n";
218                           ret+= "\t\t"+node_name+"_input_struct_var->unpack_var_"+field+".data += "+node_name+"_input_struct_var->unpack_var_"+field+".length;\n";
219                           ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
220                                 " = "+node_name+"_input_struct_var->unpack_var_"+field+";+\n";
221                         }else{
222                                 fprintf(stderr,"INTERNAL ERROR buffer type not string type in generate_lfta_code.cc:generate_unpack_code\n");
223                                 exit(1);
224                         }
225                 }else{
226                         ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
227                                 " = "+node_name+"_input_struct_var->unpack_var_"+field+";\n";
228                 }
229         }
230         return ret;
231 }
232
233 string generate_aggr_struct(string node_name, gb_table *gb_tbl, aggregate_table *aggr_tbl){
234   string ret = "struct " + generate_aggr_struct_name(node_name) + "{\n";
235
236   int g;
237   for(g=0;g<gb_tbl->size();g++){
238           sprintf(tmpstr,"gb_var%d",g);
239           ret += "\t"+gb_tbl->get_data_type(g)->make_cvar(tmpstr)+";\n";
240   }
241
242   int a;
243   for(a=0;a<aggr_tbl->size();a++){
244           ret += "\t";
245           sprintf(tmpstr,"aggr_var%d",a);
246           if(aggr_tbl->is_builtin(a))
247                 ret+="\t"+aggr_tbl->get_data_type(a)->make_cvar(tmpstr)+";\n";
248           else
249                 ret+="\t"+aggr_tbl->get_storage_type(a)->make_cvar(tmpstr)+";\n";
250   }
251
252 /*
253   ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *next;\n";
254 */
255
256   ret += "};\n\n";
257
258   return(ret);
259 }
260
261
262 string generate_fj_struct(filter_join_qpn *fs, string node_name ){
263   string ret;
264
265   if(fs->use_bloom == false){   // uses hash table instead
266         ret = "struct " + generate_fj_struct_name(node_name) + "{\n";
267         int k;
268         for(k=0;k<fs->hash_eq.size();++k){
269                 sprintf(tmpstr,"key_var%d",k);
270                 ret += "\t"+fs->hash_eq[k]->pr->get_right_se()->get_data_type()->make_cvar(tmpstr)+";\n";
271         }
272         ret += "\tlong long int ts;\n";
273     ret += "};\n\n";
274   }
275
276   return(ret);
277 }
278
279 string generate_watchlist_structs(string node_name, table_def *tbl,
280                 std::string filename, int refresh_interval){
281         string ret;
282
283         ret += "struct "+generate_watchlist_element_name(node_name)+"{\n";
284         vector<field_entry *> fields = tbl->get_fields();
285         for(int f=0;f<fields.size();++f){
286                 data_type dt(fields[f]->get_type());
287                 ret += "\t"+dt.make_cvar(fields[f]->get_name())+";\n";
288         }
289         ret += "\tgs_uint64_t hashval;\n";
290         ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next;\n";
291         ret += "};\n\n";
292
293         ret += "char *"+generate_watchlist_name(node_name)+"__fstr = \""+filename+"\";\n";
294         ret += "struct "+generate_watchlist_struct_name(node_name)+"{\n";
295         ret += "\tstruct "+ generate_watchlist_element_name(node_name)+" **ht;\n";
296         ret += "\tgs_uint32_t ht_size;\n";
297         ret += "\tgs_uint32_t n_elem;\n";
298         ret += "\tgs_uint32_t refresh_interval;\n";
299         ret += "\ttime_t next_refresh;\n";
300         ret += "\ttime_t last_mtime;\n";        
301         ret += "\tchar *filename;\n";
302         ret += "} "+generate_watchlist_name(node_name)+" = { NULL, 0, 0, "+std::to_string(refresh_interval)+", 0, 0, NULL};\n\n";
303
304         return ret;
305 }
306
307 string generate_watchlist_load(string node_name, table_def *tbl, vector<string> keys){
308         string ret;
309         string tgt = generate_watchlist_name(node_name);
310         vector<field_entry *> fields = tbl->get_fields();
311
312         ret += "void reload_watchlist__"+node_name+"(){\n";
313         ret += "\tgs_uint32_t i;\n";
314         ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *ptr = NULL;\n";
315         ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next = NULL;\n";
316         ret += "\tFILE *fl;\n";
317         ret += "\tchar buf[10000];\n";
318         ret += "\tgs_uint32_t buflen = 10000;\n";
319         ret += "\tchar *flds["+std::to_string(fields.size())+"];\n";
320         ret += "\tgs_uint32_t pos, f, linelen, malformed;\n";
321         ret += "\tgs_uint32_t n_malformed, short_lines, toolong_lines, ok;\n";
322         ret += "\tgs_uint64_t hash, bucket;\n";
323         ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *rec;\n\n";
324
325         ret += "//  make sure watchlist file has changed since the last time we loaded it\n";
326     ret += "\tstruct stat file_stat;\n";
327     ret += "\tint err = stat(" + tgt + ".filename, &file_stat);\n";
328     ret += "\tif (err) {\n";
329     ret += "\t\tgslog(LOG_INFO,\"Warning, unable to stat() watchlist file %s to reload " + node_name + ", continue using old version\\n\", " + tgt + ".filename);\n";
330     ret += "\t\treturn;\n";
331     ret += "\t}\n";
332         ret += "\tif (file_stat.st_mtime <= " + tgt + ".last_mtime && file_stat.st_ctime <= " + tgt + ".last_mtime)     // watchlist file hasn't changed since last time\n";
333         ret += "\t\treturn;\n";
334         ret += "\t" + tgt + ".last_mtime = (file_stat.st_mtime>file_stat.st_ctime)?file_stat.st_mtime:file_stat.st_ctime;\n\n";
335
336         ret += "//  Delete old entries.\n";
337         ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n";
338         ret += "\t\tptr="+tgt+".ht[i];\n";
339         ret += "\t\twhile(ptr!=NULL){\n";
340         for(int f=0;f<fields.size();++f){
341                 data_type dt(fields[f]->get_type());
342                 if(dt.is_buffer_type()){
343                         ret += "\t\t\t"+dt.get_buffer_destroy()+"(&(ptr->"+fields[f]->get_name()+"));\n";
344                 }
345         }
346         ret += "\t\t\tnext = ptr->next;\n";
347         ret += "\t\t\tfree(ptr);\n";
348         ret += "\t\t\tptr = next;\n";
349         ret += "\t\t}\n";
350         ret += "\t}\n";
351         ret += "\n// prepare new table. \n";
352         ret += "\tif("+tgt+".n_elem > "+tgt+".ht_size || "+tgt+".ht_size==0){\n";
353         ret += "\t\tif("+tgt+".ht)\n";
354         ret += "\t\t\tfree("+tgt+".ht);\n";
355         ret += "\t\tif("+tgt+".ht_size == 0)\n";
356         ret += "\t\t\t"+tgt+".ht_size = 100000;\n";
357         ret += "\t\telse\n";    
358         ret += "\t\t\t"+tgt+".ht_size = "+tgt+".n_elem;\n";
359         ret += "\t\t"+tgt+".ht = (struct "+generate_watchlist_element_name(node_name)+" **)malloc("+tgt+".ht_size * sizeof(struct "+generate_watchlist_element_name(node_name)+" *));\n";
360         ret += "\t}\n";
361         ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n";
362         ret += "\t\t"+tgt+".ht[i] = NULL;\n";
363         ret += "\t}\n";
364         ret += "\n// load new table\n";
365         ret += "\t"+tgt+".n_elem = 0;\n";
366         ret += "\tfl = fopen("+tgt+".filename, \"r\");\n";
367         ret += "\tif(fl==NULL){\n";
368         ret += "\t\tgslog(LOG_INFO,\"Warning, can't open file %s for watchlist "+node_name+"\\n\","+tgt+".filename);\n";
369         ret += "\t\treturn;\n";
370         ret += "\t}\n";
371         ret += "\tmalformed = 0;\n";
372         ret += "\tshort_lines = 0;\n";
373         ret += "\ttoolong_lines = 0;\n";
374         ret += "\twhile(fgets(buf, buflen, fl) != NULL){\n";
375         ret += "\t\tlinelen = strlen(buf);\n";
376         ret += "\t\tbuf[linelen-1]='\\0';       // strip off trailing newline\n";
377         ret += "\t\tlinelen--;\n";
378
379         ret += "\t\tpos=0;\n";
380         ret += "\t\tmalformed=0;\n";
381         ret += "\t\tok=1;\n";
382         ret += "\t\tflds[0] = buf;\n";
383         ret += "\t\tfor(f=1;pos < linelen && f<"+std::to_string(fields.size())+";++f){\n";
384         ret += "\t\t\tfor(;pos < linelen && buf[pos]!=',' && buf[pos]!='\\n';++pos);\n";
385         ret += "\t\t\tif(pos >= linelen){\n";
386         ret += "\t\t\t\tmalformed = 1;\n";
387         ret += "\t\t\t\tbreak;\n";
388         ret += "\t\t\t}\n";
389         ret += "\t\t\tbuf[pos]='\\0';\n";
390         ret += "\t\t\tpos++;\n";
391         ret += "\t\t\tflds[f]=buf+pos;\n";
392         ret += "\t\t}\n";
393         ret += "\t\tif(malformed){\n";
394         ret += "\t\t\tok=0;\n";
395         ret += "\t\t\tn_malformed++;\n";
396         ret += "\t\t}\n";
397         ret += "\t\tif(f<"+std::to_string(fields.size())+"){\n";
398         ret += "\t\t\tok=0;\n";
399         ret += "\t\t\tshort_lines++;\n";
400         ret += "\t\t}\n";
401         ret += "\t\tif(pos && (pos<linelen)){\n";
402         ret += "\t\t\tok=0;\n";
403         ret += "\t\t\ttoolong_lines++;\n";
404         ret += "\t\t}\n";
405         ret += "\t\tif(f>="+std::to_string(fields.size())+"){\n";
406         ret += "\t\t\trec = (struct "+generate_watchlist_element_name(node_name)+" *)malloc(sizeof(struct "+generate_watchlist_element_name(node_name)+"));\n";
407 //              Extract fields
408         for(int f=0;f<fields.size();++f){
409                 data_type dt(fields[f]->get_type());
410                 ret += "\t\t\t"+dt.get_wl_extract_fcn()+"(flds["+std::to_string(f)+"], &(rec->"+fields[f]->get_name()+"));\n";
411         }
412 //              Compute the hash value
413         ret += "\t\t\thash=0;\n";
414         for(int k=0;k<keys.size();++k){
415                 string key_fld = keys[k];
416                 int f;
417                 for(f=0;f<fields.size();++f){
418                         if(fields[f]->get_name() == key_fld)
419                                 break;
420                 }
421                 data_type dt(fields[f]->get_type());
422
423                 ret +=
424 "\t\t\thash ^= (("+hash_nums[f%NRANDS]+" * lfta_"+
425                         dt.get_type_str()+"_to_hash(rec->"+fields[f]->get_name()+")));\n";
426                 }
427         ret += "\t\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n";
428         ret += "\t\t\trec->hashval = hash;\n";
429         ret += "\t\t\trec->next = "+tgt+".ht[bucket];\n";
430         ret += "\t\t\t"+tgt+".ht[bucket] = rec;\n";
431         ret += "\t\t\t"+tgt+".n_elem++;\n";
432
433         ret += "\t\t}\n";
434         ret += "\t}\n";
435         ret += "\tif(n_malformed+toolong_lines > 0){\n";
436         ret += "\t\tgslog(LOG_INFO,\"Errors reading data for watchlist "+node_name+" from file %s: malformed=%d, too short=%d, too long=%d\\n\","+tgt+".filename, malformed, short_lines, toolong_lines);\n";
437         ret += "\t}\n";
438         ret += "}\n\n";
439
440         return ret;
441 }
442
443
444
445                 
446 string generate_fta_struct(string node_name, gb_table *gb_tbl,
447                 aggregate_table *aggr_tbl, param_table *param_tbl,
448                 cplx_lit_table *complex_literals,
449                 vector<handle_param_tbl_entry *> &param_handle_table,
450                 bool is_aggr_query, bool is_fj, bool is_wj, bool uses_bloom,
451                 table_list *schema){
452
453         string ret = "struct " + generate_fta_name(node_name) + "{\n";
454         ret += "\tstruct FTA f;\n";
455
456 //-------------------------------------------------------------
457 //              Aggregate-specific fields
458
459         if(is_aggr_query){
460 /*
461                 ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_head, *flush_head;\n";
462 */
463                 ret+="\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_table; // the groups\n";
464                 ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n";
465 //              ret+="\tint bitmap_size;\n";
466                 ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n";
467                 ret += "\tgs_int32_t n_ticks; // for limiting slow flush\n";
468                 ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n";
469                 ret += "\tint max_windows; // max number of open windows.\n";
470                 ret += "\tunsigned int generation; // initially zero, increment on\n";
471                 ret += "\t     // every hash table flush - whether regular or induced.\n";
472                 ret += "\t     // Old groups are identified by a generation mismatch.\n";
473                 ret += "\tunsigned int flush_pos; // next aggr_table entry to examine\n";
474                 ret += "\tunsigned int flush_ctr; // control slow flushing\n";
475
476
477
478                 int g;
479                 bool uses_temporal_flush = false;
480                 for(g=0;g<gb_tbl->size();g++){
481                         data_type *dt = gb_tbl->get_data_type(g);
482                         if(dt->is_temporal()){
483 /*
484                                 fprintf(stderr,"group by attribute %s is temporal, ",
485                                                 gb_tbl->get_name(g).c_str());
486                                 if(dt->is_increasing()){
487                                         fprintf(stderr,"increasing.\n");
488                                 }else{
489                                         fprintf(stderr,"decreasing.\n");
490                                 }
491 */
492                                 data_type *gdt = gb_tbl->get_data_type(g);
493                                 if(gdt->is_buffer_type()){
494                                         fprintf(stderr, "\t but temporal BUFFER types are not supported, skipping.\n");
495                                 }else{
496                                         sprintf(tmpstr,"\t%s last_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
497                                         ret += tmpstr;
498                                         sprintf(tmpstr,"\t%s flush_start_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
499                                         ret += tmpstr;
500                                         sprintf(tmpstr,"\t%s last_flushed_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
501                                         ret += tmpstr;
502                                         uses_temporal_flush = true;
503                                 }
504
505                         }
506
507                 }
508                 if(! uses_temporal_flush){
509                         fprintf(stderr,"Warning: no temporal flush.\n");
510                 }
511         }
512
513 // ---------------------------------------------------------
514 //                      Filter-join specific fields
515
516         if(is_fj){
517                 if(uses_bloom){
518                         ret +=
519 "\tunsigned char * bf_table; //array of bloom filters with layout \n"
520 "\t\t// bit 0 bf 0| bit 0 bf 1| bit 0 bf 2| bit 1 bf 0| bit 1 bf 1|.....\n"
521 "\tint first_exec;\n"
522 "\tlong long int last_bin;\n"
523 "\tint last_bloom_pos;\n"
524 "\n"
525 ;
526                 }else{          // limited hash table
527                         ret +=
528 "  struct "+generate_fj_struct_name(node_name)+" *join_table;\n"
529 "\n"
530 ;
531                 }
532
533         }
534
535 // --------------------------------------------
536 //              watchlist-join specific
537         if(is_wj){
538                 ret += "\ttime_t ux_time;\n";
539         }
540
541 //--------------------------------------------------------
542 //                      Common fields
543
544 //                      Create places to hold the parameters.
545         int p;
546         vector<string> param_vec = param_tbl->get_param_names();
547         for(p=0;p<param_vec.size();p++){
548                 data_type *dt = param_tbl->get_data_type(param_vec[p]);
549                 sprintf(tmpstr,"\t%s param_%s;\n",dt->get_cvar_type().c_str(),
550                                 param_vec[p].c_str());
551                 ret += tmpstr;
552                 if(param_tbl->handle_access(param_vec[p])){
553                         ret += "\tstruct search_handle *param_handle_"+param_vec[p]+";\n";
554                 }
555         }
556
557 //                      Create places to hold complex literals.
558         int cl;
559         for(cl=0;cl<complex_literals->size();cl++){
560                 literal_t *l = complex_literals->get_literal(cl);
561                 data_type *dtl = new data_type( l->get_type() );
562                 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
563                 ret += tmpstr;
564         }
565
566 //                      Create places to hold the pass-by-handle parameters.
567         for(p=0;p<param_handle_table.size();++p){
568                 sprintf(tmpstr,"\tgs_param_handle_t handle_param_%d;\n",p);
569                 ret += tmpstr;
570         }
571
572 //                      Create places to hold the last values of temporal
573 //                      attributes referenced in select clause
574 //                      we also need to store values of the temoral attributed
575 //                      of last flushed tuple in aggr queries
576 //                      to make sure we generate the cirrect temporal tuple
577 //                      in the presense of slow flushes
578
579
580         col_id_set temp_cids;           //      col ids of temp attributes in select clause
581
582         int s;
583         col_id_set::iterator csi;
584
585         for(s=0;s<sl_list.size();s++){
586                 data_type *sdt = sl_list[s]->get_data_type();
587                 if (sdt->is_temporal()) {
588                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
589                 }
590         }
591
592         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
593                 int tblref = (*csi).tblvar_ref;
594                 int schref = (*csi).schema_ref;
595                 string field = (*csi).field;
596                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
597                 sprintf(tmpstr,"\t%s last_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
598                 ret += tmpstr;
599         }
600
601         ret += "\tgs_uint64_t trace_id;\n\n";
602
603 //      Fields to store the runtime stats
604
605         ret += "\tgs_uint32_t in_tuple_cnt;\n";
606         ret += "\tgs_uint32_t out_tuple_cnt;\n";
607         ret += "\tgs_uint32_t out_tuple_sz;\n";
608         ret += "\tgs_uint32_t accepted_tuple_cnt;\n";
609         ret += "\tgs_uint64_t cycle_cnt;\n";
610         ret += "\tgs_uint32_t collision_cnt;\n";
611         ret += "\tgs_uint32_t eviction_cnt;\n";
612         ret += "\tgs_float_t sampling_rate;\n";
613
614
615
616         ret += "};\n\n";
617
618         return(ret);
619 }
620
621 //------------------------------------------------------------
622 //              Set colref tblvars to 0..
623 //              (special processing for join-like operators in an lfta).
624
625 void reset_se_col_ids_tblvars(scalarexp_t *se,  gb_table *gtbl){
626         vector<scalarexp_t *> operands;
627         int o;
628
629         if(! se)
630                 return;
631
632         switch(se->get_operator_type()){
633         case SE_LITERAL:
634         case SE_PARAM:
635         case SE_IFACE_PARAM:
636                 return;
637         case SE_UNARY_OP:
638                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
639                 return;
640         case SE_BINARY_OP:
641                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
642                 reset_se_col_ids_tblvars(se->get_right_se(),gtbl);
643                 return;
644         case SE_COLREF:
645                 if(! se->is_gb() ){
646                         se->get_colref()->set_tablevar_ref(0);
647                 }else{
648                         if(gtbl==NULL){
649                                 fprintf(stderr,"INTERNAL ERROR: gbvar ref in gather_se_col_ids, but gtbl is NULL.\n");
650                                 exit(1);
651                         }
652                         reset_se_col_ids_tblvars(gtbl->get_def(se->get_gb_ref()),gtbl);
653                 }
654                 return;
655         case SE_AGGR_STAR:
656                 return;
657         case SE_AGGR_SE:
658                 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
659                 return;
660         case SE_FUNC:
661                 operands = se->get_operands();
662                 for(o=0;o<operands.size();o++){
663                         reset_se_col_ids_tblvars(operands[o], gtbl);
664                 }
665                 return;
666         default:
667                 fprintf(stderr,"INTERNAL ERROR in reset_se_col_ids_tblvars, line %d, character %d: unknown operator type %d\n",
668                                 se->get_lineno(), se->get_charno(),se->get_operator_type());
669                 exit(1);
670         }
671 }
672
673
674 //              reset  column tblvars accessed in this pr.
675
676 void reset_pr_col_ids_tblvars(predicate_t *pr,  gb_table *gtbl){
677         vector<scalarexp_t *> op_list;
678         int o;
679
680         switch(pr->get_operator_type()){
681         case PRED_IN:
682                 reset_se_col_ids_tblvars(pr->get_left_se(), gtbl);
683                 return;
684         case PRED_COMPARE:
685                 reset_se_col_ids_tblvars(pr->get_left_se(),gtbl) ;
686                 reset_se_col_ids_tblvars(pr->get_right_se(),gtbl) ;
687                 return;
688         case PRED_UNARY_OP:
689                 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
690                 return;
691         case PRED_BINARY_OP:
692                 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
693                 reset_pr_col_ids_tblvars(pr->get_right_pr(),gtbl) ;
694                 return;
695         case PRED_FUNC:
696                 op_list = pr->get_op_list();
697                 for(o=0;o<op_list.size();++o){
698                         reset_se_col_ids_tblvars(op_list[o],gtbl) ;
699                 }
700                 return;
701         default:
702                 fprintf(stderr,"INTERNAL ERROR in reset_pr_col_ids_tblvars, line %d, character %d, unknown predicate operator type %d\n",
703                         pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
704         }
705 }
706
707
708
709
710 //                      Generate code that makes reference
711 //                      to the tuple, and not to any aggregates.
712 static string generate_se_code(scalarexp_t *se,table_list *schema){
713         string ret;
714     data_type *ldt, *rdt;
715         int o;
716         vector<scalarexp_t *> operands;
717
718
719         switch(se->get_operator_type()){
720         case SE_LITERAL:
721                 if(se->is_handle_ref()){
722                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
723                         ret = tmpstr;
724                         return(ret);
725                 }
726                 if(se->get_literal()->is_cpx_lit()){
727                         sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
728                         ret = tmpstr;
729                         return(ret);
730                 }
731                 return(se->get_literal()->to_C_code("")); // not complex, no constructor
732         case SE_PARAM:
733                 if(se->is_handle_ref()){
734                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
735                         ret = tmpstr;
736                         return(ret);
737                 }
738                 ret += "t->param_";
739                 ret += se->get_param_name();
740                 return(ret);
741         case SE_UNARY_OP:
742         ldt = se->get_left_se()->get_data_type();
743         if(ldt->complex_operator(se->get_op()) ){
744                         ret +=  ldt->get_complex_operator(se->get_op());
745                         ret += "(";
746                         ret += generate_se_code(se->get_left_se(),schema);
747             ret += ")";
748                 }else{
749                         ret += "(";
750                         ret += se->get_op();
751                         ret += generate_se_code(se->get_left_se(),schema);
752                         ret += ")";
753                 }
754                 return(ret);
755         case SE_BINARY_OP:
756         ldt = se->get_left_se()->get_data_type();
757         rdt = se->get_right_se()->get_data_type();
758
759         if(ldt->complex_operator(rdt, se->get_op()) ){
760                         ret +=  ldt->get_complex_operator(rdt, se->get_op());
761                         ret += "(";
762                         ret += generate_se_code(se->get_left_se(),schema);
763                         ret += ", ";
764                         ret += generate_se_code(se->get_right_se(),schema);
765                         ret += ")";
766                 }else{
767                         ret += "(";
768                         ret += generate_se_code(se->get_left_se(),schema);
769                         ret += se->get_op();
770                         ret += generate_se_code(se->get_right_se(),schema);
771                         ret += ")";
772                 }
773                 return(ret);
774         case SE_COLREF:
775                 if(se->is_gb()){                // OK to ref gb attrs, but they're not yet unpacked ...
776                                                         // so return the defining code.
777                         ret = generate_se_code(gb_tbl->get_def(se->get_gb_ref()), schema );
778
779                 }else{
780                 sprintf(tmpstr,"unpack_var_%s_%d",
781                   se->get_colref()->get_field().c_str(), se->get_colref()->get_tablevar_ref() );
782                 ret = tmpstr;
783                 }
784                 return(ret);
785         case SE_FUNC:
786 //                              Should not be ref'ing any aggr here.
787                 if(se->get_aggr_ref() >= 0){
788                         fprintf(stderr,"INTERNAL ERROR, UDAF reference in generate_se_code.\n");
789                         return("ERROR in generate_se_code");
790                 }
791
792                 if(se->is_partial()){
793                         sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
794                         ret = tmpstr;
795                 }else{
796                         ret += se->op + "(";
797                         operands = se->get_operands();
798                         for(o=0;o<operands.size();o++){
799                                 if(o>0) ret += ", ";
800                                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
801                                         ret += "&";
802                                 ret += generate_se_code(operands[o], schema);
803                         }
804                         ret += ")";
805                 }
806                 return(ret);
807         default:
808                 fprintf(stderr,"INTERNAL ERROR in generate_se_code (lfta), line %d, character %d: unknown operator type %d\n",
809                                 se->get_lineno(), se->get_charno(),se->get_operator_type());
810                 return("ERROR in generate_se_code");
811         }
812 }
813
814 //              generate code that refers only to aggregate data and constants.
815 static string generate_se_code_fm_aggr(scalarexp_t *se, string var, table_list *schema){
816
817         string ret;
818     data_type *ldt, *rdt;
819         int o;
820         vector<scalarexp_t *> operands;
821
822
823         switch(se->get_operator_type()){
824         case SE_LITERAL:
825                 if(se->is_handle_ref()){
826                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
827                         ret = tmpstr;
828                         return(ret);
829                 }
830                 if(se->get_literal()->is_cpx_lit()){
831                         sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
832                         ret = tmpstr;
833                         return(ret);
834                 }
835                 return(se->get_literal()->to_C_code("")); // not complex no constructor
836         case SE_PARAM:
837                 if(se->is_handle_ref()){
838                         sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
839                         ret = tmpstr;
840                         return(ret);
841                 }
842                 ret += "t->param_";
843                 ret += se->get_param_name();
844                 return(ret);
845         case SE_UNARY_OP:
846         ldt = se->get_left_se()->get_data_type();
847         if(ldt->complex_operator(se->get_op()) ){
848                         ret +=  ldt->get_complex_operator(se->get_op());
849                         ret += "(";
850                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
851             ret += ")";
852                 }else{
853                         ret += "(";
854                         ret += se->get_op();
855                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
856                         ret += ")";
857                 }
858                 return(ret);
859         case SE_BINARY_OP:
860         ldt = se->get_left_se()->get_data_type();
861         rdt = se->get_right_se()->get_data_type();
862
863         if(ldt->complex_operator(rdt, se->get_op()) ){
864                         ret +=  ldt->get_complex_operator(rdt, se->get_op());
865                         ret += "(";
866                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
867                         ret += ", ";
868                         ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
869                         ret += ")";
870                 }else{
871                         ret += "(";
872                         ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
873                         ret += se->get_op();
874                         ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
875                         ret += ")";
876                 }
877                 return(ret);
878         case SE_COLREF:
879                 if(se->is_gb()){                // OK to ref gb attrs, but they're not yet
880                                                         // unpacked ... so return the defining code.
881                         sprintf(tmpstr,"%sgb_var%d",var.c_str(),se->get_gb_ref());
882                         ret = tmpstr;
883
884                 }else{
885                 fprintf(stderr,"ERROR reference to non-GB column ref not permitted here,"
886                                 "error in generate_se_code_fm_aggr, line %d, character %d.\n",
887                                 se->get_lineno(), se->get_charno());
888                 ret = tmpstr;
889                 }
890                 return(ret);
891         case SE_AGGR_STAR:
892         case SE_AGGR_SE:
893                 sprintf(tmpstr,"%saggr_var%d",var.c_str(),se->get_aggr_ref());
894                 ret = tmpstr;
895                 return(ret);
896         case SE_FUNC:
897 //                              Is it a UDAF?
898                 if(se->get_aggr_ref() >= 0){
899                         sprintf(tmpstr,"udaf_ret%d",se->get_aggr_ref());
900                         ret = tmpstr;
901                         return(ret);
902                 }
903
904                 if(se->is_partial()){
905                         sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
906                         ret = tmpstr;
907                 }else{
908                         ret += se->op + "(";
909                         operands = se->get_operands();
910                         for(o=0;o<operands.size();o++){
911                                 if(o>0) ret += ", ";
912                                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
913                                         ret += "&";
914                                 ret += generate_se_code_fm_aggr(operands[o], var, schema);
915                         }
916                         ret += ")";
917                 }
918                 return(ret);
919         default:
920                 fprintf(stderr,"INTERNAL ERROR in generate_lfta_code.cc::generate_se_code_fm_aggr, line %d, character %d: unknown operator type %d\n",
921                                 se->get_lineno(), se->get_charno(),se->get_operator_type());
922                 return("ERROR in generate_se_code");
923         }
924
925 }
926
927
928 static string unpack_partial_fcn_fm_aggr(scalarexp_t *se, int pfn_id, string var, table_list *schema){
929         string ret;
930         int o;
931         vector<scalarexp_t *> operands;
932
933
934         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
935                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn_fm_aggr. line %d, character %d\n",
936                                 se->get_lineno(), se->get_charno());
937                 return("ERROR in generate_se_code");
938         }
939
940         ret = "\tretval = " + se->get_op() + "( ";
941         sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
942         ret += tmpstr;
943
944         operands = se->get_operands();
945         for(o=0;o<operands.size();o++){
946                 ret += ", ";
947                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
948                         ret += "&";
949                 ret += generate_se_code_fm_aggr(operands[o], var, schema);
950         }
951         ret += ");\n";
952
953         return(ret);
954 }
955
956 static string generate_cached_fcn(scalarexp_t *se, table_list *schema){
957         string ret;
958         int o;
959         vector<scalarexp_t *> operands;
960
961         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
962                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to generate_cached_fcn. line %d, character %d\n",
963                                 se->get_lineno(), se->get_charno());
964                 return("ERROR in generate_se_code");
965         }
966
967         ret = se->get_op() + "( ";
968
969         operands = se->get_operands();
970         for(o=0;o<operands.size();o++){
971                 if(o) ret += ", ";
972                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
973                         ret += "&";
974                 ret += generate_se_code(operands[o], schema);
975         }
976         ret += ");\n";
977
978         return(ret);
979 }
980
981
982
983 static string unpack_partial_fcn(scalarexp_t *se, int pfn_id, table_list *schema){
984         string ret;
985         int o;
986         vector<scalarexp_t *> operands;
987
988
989         if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
990                 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn. line %d, character %d\n",
991                                 se->get_lineno(), se->get_charno());
992                 return("ERROR in generate_se_code");
993         }
994
995         ret = "\tretval = " + se->get_op() + "( ",
996         sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
997         ret += tmpstr;
998
999         operands = se->get_operands();
1000         for(o=0;o<operands.size();o++){
1001                 ret += ", ";
1002                 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
1003                         ret += "&";
1004                 ret += generate_se_code(operands[o], schema);
1005         }
1006         ret += ");\n";
1007
1008         return(ret);
1009 }
1010
1011
1012
1013
1014
1015 static string generate_C_comparison_op(string op){
1016   if(op == "=") return("==");
1017   if(op == "<>") return("!=");
1018   return(op);
1019 }
1020
1021 static string generate_C_boolean_op(string op){
1022         if( (op == "AND") || (op == "And") || (op == "and") ){
1023                 return("&&");
1024         }
1025         if( (op == "OR") || (op == "Or") || (op == "or") ){
1026                 return("||");
1027         }
1028         if( (op == "NOT") || (op == "Not") || (op == "not") ){
1029                 return("!");
1030         }
1031
1032         fprintf(stderr,"INTERNAL ERROR: unknown boolean operator %s\n",op.c_str());
1033         return("ERROR UNKNOWN BOOLEAN OPERATOR :"+op);
1034 }
1035
1036
1037 static string generate_predicate_code(predicate_t *pr,table_list *schema){
1038         string ret;
1039         vector<literal_t *>  litv;
1040         int i;
1041     data_type *ldt, *rdt;
1042         vector<scalarexp_t *> op_list;
1043         int o,cref,ppos;
1044         unsigned int bitmask;
1045
1046         switch(pr->get_operator_type()){
1047         case PRED_IN:
1048         ldt = pr->get_left_se()->get_data_type();
1049
1050                 ret += "( ";
1051                 litv = pr->get_lit_vec();
1052                 for(i=0;i<litv.size();i++){
1053                         if(i>0) ret += " || ";
1054                         ret += "( ";
1055
1056                 if(ldt->complex_comparison(ldt) ){
1057                                 ret +=  ldt->get_equals_fcn(ldt) ;
1058                                 ret += "( ";
1059                                 if(ldt->is_buffer_type() ) ret += "&";
1060                                 ret += generate_se_code(pr->get_left_se(), schema);
1061                                 ret += ", ";
1062                                 if(ldt->is_buffer_type() ) ret += "&";
1063                                 if(litv[i]->is_cpx_lit()){
1064                                         sprintf(tmpstr,"t->complex_literal_%d",litv[i]->get_cpx_lit_ref() );
1065                                         ret += tmpstr;
1066                                 }else{
1067                                         ret += litv[i]->to_C_code("");
1068                                 }
1069                                 ret += ") == 0";
1070                         }else{
1071                                 ret += generate_se_code(pr->get_left_se(), schema);
1072                                 ret += " == ";
1073                                 ret += litv[i]->to_C_code("");
1074                         }
1075
1076                         ret += " )";
1077                 }
1078                 ret += " )";
1079                 return(ret);
1080
1081         case PRED_COMPARE:
1082         ldt = pr->get_left_se()->get_data_type();
1083         rdt = pr->get_right_se()->get_data_type();
1084
1085                 ret += "( ";
1086         if(ldt->complex_comparison(rdt) ){
1087 // TODO can use get_equals_fcn if op is "=" ?
1088                         ret += ldt->get_comparison_fcn(rdt);
1089                         ret += "(";
1090                         if(ldt->is_buffer_type() ) ret += "&";
1091                         ret += generate_se_code(pr->get_left_se(),schema);
1092                         ret += ", ";
1093                         if(rdt->is_buffer_type() ) ret += "&";
1094                         ret += generate_se_code(pr->get_right_se(),schema);
1095                         ret += ") ";
1096                         ret +=  generate_C_comparison_op(pr->get_op());
1097                         ret += "0";
1098                 }else{
1099                         ret += generate_se_code(pr->get_left_se(),schema);
1100                         ret +=  generate_C_comparison_op(pr->get_op());
1101                         ret += generate_se_code(pr->get_right_se(),schema);
1102                 }
1103                 ret += " )";
1104                 return(ret);
1105         case PRED_UNARY_OP:
1106                 ret += "( ";
1107                 ret +=  generate_C_boolean_op(pr->get_op());
1108                 ret += generate_predicate_code(pr->get_left_pr(),schema);
1109                 ret += " )";
1110                 return(ret);
1111         case PRED_BINARY_OP:
1112                 ret += "( ";
1113                 ret += generate_predicate_code(pr->get_left_pr(),schema);
1114                 ret +=  generate_C_boolean_op(pr->get_op());
1115                 ret += generate_predicate_code(pr->get_right_pr(),schema);
1116                 ret += " )";
1117                 return(ret);
1118         case PRED_FUNC:
1119                 op_list = pr->get_op_list();
1120                 cref = pr->get_combinable_ref();
1121                 if(cref >= 0){  // predicate is a combinable pred reference
1122                         //              Trust, but verify
1123                         if(pred_class.size() >= cref && pred_class[cref] >= 0){
1124                                 ppos = pred_pos[cref];
1125                                 bitmask = 1 << ppos % 32;
1126                                 sprintf(tmpstr,"(pref_common_pred_val_%d_%d & %u)",pred_class[cref],ppos/32,bitmask);
1127                                 ret = tmpstr;
1128                                 return ret;
1129                         }
1130                 }
1131
1132                 ret =  pr->get_op() + "(";
1133                 if (pr->is_sampling_fcn) {
1134                         ret += "t->sampling_rate";
1135                         if (!op_list.empty())
1136                                 ret += ", ";
1137                 }
1138                 for(o=0;o<op_list.size();++o){
1139                         if(o>0) ret += ", ";
1140                         if(op_list[o]->get_data_type()->is_buffer_type() && (! (op_list[o]->is_handle_ref()) ) )
1141                                         ret += "&";
1142                         ret += generate_se_code(op_list[o],schema);
1143                 }
1144                 ret += " )";
1145                 return(ret);
1146         default:
1147                 fprintf(stderr,"INTERNAL ERROR in generate_predicate_code, line %d, character %d, unknown predicate operator type %d\n",
1148                         pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
1149                 return("ERROR in generate_predicate_code");
1150         }
1151 }
1152
1153
1154 static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *dt){
1155         string ret;
1156
1157     if(dt->complex_comparison(dt) ){
1158                 ret += dt->get_equals_fcn(dt);
1159                 ret += "(";
1160                         if(dt->is_buffer_type() ) ret += "&";
1161                 ret += lhs_op;
1162                 ret += ", ";
1163                         if(dt->is_buffer_type() ) ret += "&";
1164                 ret += rhs_op;
1165                 ret += ") == 0";
1166         }else{
1167                 ret += lhs_op;
1168                 ret += " == ";
1169                 ret += rhs_op;
1170         }
1171
1172         return(ret);
1173 }
1174
1175 //static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
1176 //      string ret;
1177 //
1178  //   if(dt->complex_comparison(dt) ){
1179 //              ret += dt->get_equals_fcn(dt);
1180 //              ret += "(";
1181 //                      if(dt->is_buffer_type() ) ret += "&";
1182 //              ret += lhs_op;
1183 //              ret += ", ";
1184 //                      if(dt->is_buffer_type() ) ret += "&";
1185 //              ret += rhs_op;
1186 //              ret += ") == 0";
1187 //      }else{
1188 //              ret += lhs_op;
1189 //              ret += " == ";
1190 //              ret += rhs_op;
1191 //      }
1192 //
1193 //      return(ret);
1194 //}
1195
1196 //              Here I assume that only MIN and MAX aggregates can be computed
1197 //              over BUFFER data types.
1198
1199 static string generate_aggr_update(string var, aggregate_table *atbl,int aidx, table_list *schema){
1200         string retval = "\t\t";
1201         string op = atbl->get_op(aidx);
1202
1203 //              Is it a UDAF
1204         if(! atbl->is_builtin(aidx)) {
1205                 int o;
1206                 retval += op+"_LFTA_AGGR_UPDATE_(";
1207                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1208                 retval+="("+var+")";
1209                 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1210                 for(o=0;o<opl.size();++o){
1211                         retval += ",";
1212                         if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1213                                         retval.append("&");
1214                         retval += generate_se_code(opl[o], schema);
1215                 }
1216                 retval += ");\n";
1217
1218                 return retval;
1219         }
1220
1221 //              Built-in aggregate processing.
1222
1223         data_type *dt = atbl->get_data_type(aidx);
1224
1225         if(op == "COUNT"){
1226                 retval.append(var);
1227                 retval.append("++;\n");
1228                 return(retval);
1229         }
1230         if(op == "SUM"){
1231                 retval.append(var);
1232                 retval.append(" += ");
1233                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1234                 retval.append(";\n");
1235                 return(retval);
1236         }
1237         if(op == "MIN"){
1238                 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1239                 retval.append(tmpstr);
1240                 if(dt->complex_comparison(dt)){
1241                         if(dt->is_buffer_type())
1242                           sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1243                         else
1244                           sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1245                 }else{
1246                         sprintf(tmpstr,"\t\tif(aggr_tmp_%d < %s)\n",aidx,var.c_str());
1247                 }
1248                 retval.append(tmpstr);
1249                 if(dt->is_buffer_type()){
1250                         sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1251                 }else{
1252                         sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1253                 }
1254                 retval.append(tmpstr);
1255
1256                 return(retval);
1257         }
1258         if(op == "MAX"){
1259                 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1260                 retval.append(tmpstr);
1261                 if(dt->complex_comparison(dt)){
1262                         if(dt->is_buffer_type())
1263                          sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1264                         else
1265                          sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1266                 }else{
1267                         sprintf(tmpstr,"\t\tif(aggr_tmp_%d > %s)\n",aidx,var.c_str());
1268                 }
1269                 retval.append(tmpstr);
1270                 if(dt->is_buffer_type()){
1271                         sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1272                 }else{
1273                         sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1274                 }
1275                 retval.append(tmpstr);
1276
1277                 return(retval);
1278
1279         }
1280         if(op == "AND_AGGR"){
1281                 retval.append(var);
1282                 retval.append(" &= ");
1283                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1284                 retval.append(";\n");
1285                 return(retval);
1286         }
1287         if(op == "OR_AGGR"){
1288                 retval.append(var);
1289                 retval.append(" |= ");
1290                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1291                 retval.append(";\n");
1292                 return(retval);
1293         }
1294         if(op == "XOR_AGGR"){
1295                 retval.append(var);
1296                 retval.append(" ^= ");
1297                 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1298                 retval.append(";\n");
1299                 return(retval);
1300         }
1301         fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_update.\n",op.c_str());
1302         return("ERROR: aggregate not recognized: "+op);
1303
1304 }
1305
1306
1307
1308 static string generate_aggr_init(string var, aggregate_table *atbl,int aidx, table_list *schema){
1309         string retval;
1310         string op = atbl->get_op(aidx);
1311
1312 //              Is it a UDAF
1313         if(! atbl->is_builtin(aidx)) {
1314                 int o;
1315                 retval += "\t\t"+op+"_LFTA_AGGR_INIT_(";
1316                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1317                 retval+="("+var+"));\n";
1318 //                      Add 1st tupl
1319                 retval += "\t"+atbl->get_op(aidx)+"_LFTA_AGGR_UPDATE_(";
1320                 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1321                 retval+="("+var+")";
1322                 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1323                 for(o=0;o<opl.size();++o){
1324                         retval += ",";
1325                         if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1326                                         retval.append("&");
1327                         retval += generate_se_code(opl[o],schema);
1328                 }
1329                 retval += ");\n";
1330                 return(retval);
1331         }
1332
1333 //              Built-in aggregate processing.
1334
1335
1336         data_type *dt = atbl->get_data_type(aidx);
1337
1338         if(op == "COUNT"){
1339                 retval = "\t\t"+var;
1340                 retval.append(" = 1;\n");
1341                 return(retval);
1342         }
1343
1344         if(op == "SUM" || op == "MIN" || op == "MAX" || op == "AND_AGGR" ||
1345                                                                         op == "OR_AGGR" || op == "XOR_AGGR"){
1346                 if(dt->is_buffer_type()){
1347                         sprintf(tmpstr,"\t\taggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1348                         retval.append(tmpstr);
1349                         sprintf(tmpstr,"\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_assign_copy().c_str(),var.c_str(),aidx);
1350                         retval.append(tmpstr);
1351                 }else{
1352                         retval = "\t\t"+var;
1353                         retval += " = ";
1354                         retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema));
1355                         retval.append(";\n");
1356                 }
1357                 return(retval);
1358         }
1359
1360         fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_init.\n",op.c_str());
1361         return("ERROR: aggregate not recognized: "+op);
1362 }
1363
1364
1365 ////////////////////////////////////////////////////////////
1366
1367
1368 string generate_preamble(table_list *schema, //map<string,string> &int_fcn_defs,
1369         std::string &node_name, std::string &schema_embed_str){
1370 //                      Include these only once, not once per lfta
1371 //      string ret = "#include \"rts.h\"\n";
1372 //      ret +=  "#include \"fta.h\"\n\n");
1373
1374         string ret = "#ifndef LFTA_IN_NIC\n";
1375         ret += "const char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n";
1376         ret += "#include<stdio.h>\n";
1377         ret += "#include <limits.h>\n";
1378         ret += "#include <float.h>\n";
1379         ret += "#include <sys/stat.h>\n";
1380         ret += "#include \"rdtsc.h\"\n";
1381         ret += "#endif\n";
1382
1383
1384
1385         return(ret);
1386 }
1387
1388
1389 string generate_tuple_from_aggr(string node_name, table_list *schema, string idx){
1390         int a,p,s;
1391 //                       need to create and output the tuple.
1392         string ret = "/*\t\tCreate an output tuple for the aggregate being kicked out \t*/\n";
1393 //                      Check for any UDAFs with LFTA_BAILOUT
1394         ret += "\tlfta_bailout = 0;\n";
1395         for(a=0;a<aggr_tbl->size();a++){
1396                 if(aggr_tbl->has_bailout(a)){
1397                         ret += "\tlfta_bailout+="+aggr_tbl->get_op(a)+"_LFTA_AGGR_BAILOUT_(";
1398                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1399                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1400                 }
1401         }
1402         ret += "\tif(! lfta_bailout){\n";
1403
1404 //                      First, compute the size of the tuple.
1405
1406 //                      Unpack UDAF return values
1407         for(a=0;a<aggr_tbl->size();a++){
1408                 if(! aggr_tbl->is_builtin(a)){
1409                         ret += "\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_OUTPUT_(&(udaf_ret"+int_to_string(a)+"),";
1410                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1411                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1412
1413                 }
1414         }
1415
1416
1417 //                      Unpack partial fcns ref'd by the select clause.
1418   if(sl_fcns_start != sl_fcns_end){
1419             ret += "\t\tunpack_failed = 0;\n";
1420     for(p=sl_fcns_start;p<sl_fcns_end;p++){
1421           if(is_partial_fcn[p]){
1422                 ret += "\t" + unpack_partial_fcn_fm_aggr(partial_fcns[p], p,
1423                          "t->aggr_table["+idx+"].",schema);
1424                 ret += "\t\tif(retval) unpack_failed = 1;\n";
1425           }
1426     }
1427                                                                 // BEGIN don't allocate tuple if
1428         ret += "\t\tif( unpack_failed == 0 ){\n"; // unpack failed.
1429   }
1430
1431 //                      Unpack any BUFFER type selections into temporaries
1432 //                      so that I can compute their size and not have
1433 //                      to recompute their value during tuple packing.
1434 //                      I can use regular assignment here because
1435 //                      these temporaries are non-persistent.
1436
1437           for(s=0;s<sl_list.size();s++){
1438                 data_type *sdt = sl_list[s]->get_data_type();
1439                 if(sdt->is_buffer_type()){
1440                         sprintf(tmpstr,"\t\t\tselvar_%d = ",s);
1441                         ret += tmpstr;
1442                         ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1443                         ret += ";\n";
1444                 }
1445           }
1446
1447
1448 //              The size of the tuple is the size of the tuple struct plus the
1449 //              size of the buffers to be copied in.
1450
1451           ret += "\t\t\ttuple_size = sizeof( struct ";
1452           ret +=  generate_tuple_name(node_name);
1453           ret += ")";
1454           for(s=0;s<sl_list.size();s++){
1455                 data_type *sdt = sl_list[s]->get_data_type();
1456                 if(sdt->is_buffer_type()){
1457                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
1458                         ret += tmpstr;
1459                 }
1460           }
1461           ret += ";\n";
1462
1463
1464           ret += "\t\t\ttuple = (struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
1465           ret += "\t\t\tif( tuple != NULL){\n";
1466
1467
1468 //                      Test passed, make assignments to the tuple.
1469
1470           ret += "\t\t\t\ttuple_pos = sizeof( struct ";
1471           ret +=  generate_tuple_name(node_name) ;
1472           ret += ");\n";
1473
1474 //                      Mark tuple as REGULAR_TUPLE
1475           ret += "\n\t\t\t\ttuple->tuple_type = REGULAR_TUPLE;\n";
1476
1477           for(s=0;s<sl_list.size();s++){
1478                 data_type *sdt = sl_list[s]->get_data_type();
1479                 if(sdt->is_buffer_type()){
1480                         sprintf(tmpstr,"\t\t\t\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
1481                         ret += tmpstr;
1482                         sprintf(tmpstr,"\t\t\t\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
1483                         ret += tmpstr;
1484                 }else{
1485                         sprintf(tmpstr,"\t\t\t\ttuple->tuple_var%d = ",s);
1486                         ret += tmpstr;
1487 //                      if(sdt->needs_hn_translation())
1488 //                              ret += sdt->hton_translation() +"( ";
1489                         ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1490 //                      if(sdt->needs_hn_translation())
1491 //                              ret += ") ";
1492                         ret += ";\n";
1493                 }
1494           }
1495
1496 //              Generate output.
1497           ret += "\t\t\t\tpost_tuple(tuple);\n";
1498           ret += "\t\t\t\t#ifdef LFTA_STATS\n";
1499           ret+="\t\t\t\tt->out_tuple_cnt++;\n";
1500           ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
1501           ret += "\t\t\t\t#endif\n\n";
1502           ret += "\t\t\t}\n";
1503
1504           if(sl_fcns_start != sl_fcns_end)      // END don't allocate tuple if
1505                 ret += "\t\t}\n";                               // unpack failed.
1506           ret += "\t}\n";
1507
1508 //                      Need to release memory held by BUFFER types.
1509                 int g;
1510
1511           for(g=0;g<gb_tbl->size();g++){
1512                 data_type *gdt = gb_tbl->get_data_type(g);
1513                 if(gdt->is_buffer_type()){
1514                         sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].gb_var%d));\n",gdt->get_buffer_destroy().c_str(),idx.c_str(),g);
1515                         ret += tmpstr;
1516                 }
1517           }
1518           for(a=0;a<aggr_tbl->size();a++){
1519                 if(aggr_tbl->is_builtin(a)){
1520                         data_type *adt = aggr_tbl->get_data_type(a);
1521                         if(adt->is_buffer_type()){
1522                                 sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].aggr_var%d));\n",adt->get_buffer_destroy().c_str(),idx.c_str(),a);
1523                                 ret += tmpstr;
1524                         }
1525                 }else{
1526                         ret += "\t\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_DESTROY_(";
1527                         if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1528                         ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1529                 }
1530           }
1531
1532         ret += "\t\tt->n_aggrs--;\n";
1533
1534         return(ret);
1535
1536 }
1537
1538 string generate_gb_match_test(string idx){
1539         int g;
1540         string ret="\tif (IS_FILLED(t->aggr_table_bitmap, "+idx+") &&  IS_NEW(t->aggr_table_bitmap,"+idx+")";
1541         if(gb_tbl->size()>0){
1542                 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
1543                 ret+="\t\t";
1544
1545 //                      Next, scan list for a match on the group-by attributes.
1546           string rhs_op, lhs_op;
1547           for(g=0;g<gb_tbl->size();g++){
1548                   ret += " && ";
1549                   ret += "(";
1550                   sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
1551                   sprintf(tmpstr,"t->aggr_table[%s].gb_var%d",idx.c_str(),g); rhs_op = tmpstr;
1552                   ret += generate_equality_test(lhs_op, rhs_op, gb_tbl->get_data_type(g) );
1553                   ret += ")";
1554           }
1555          }
1556
1557           ret += "){\n";
1558
1559         return ret;
1560 }
1561
1562 string generate_gb_update(string node_name, table_list *schema, string idx, bool has_udaf){
1563         int g;
1564         string ret;
1565
1566           ret += "/*\t\tMatch found : update in place.\t*/\n";
1567           int a;
1568           has_udaf = false;
1569           for(a=0;a<aggr_tbl->size();a++){
1570                   sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1571                   ret += generate_aggr_update(tmpstr,aggr_tbl,a, schema);
1572                   if(! aggr_tbl->is_builtin(a)) has_udaf = true;
1573           }
1574
1575 //                      garbage collect copied buffer type gb attrs.
1576   for(g=0;g<gb_tbl->size();g++){
1577           data_type *gdt = gb_tbl->get_data_type(g);
1578           if(gdt->is_buffer_type()){
1579                 sprintf(tmpstr,"\t\t\t%s(&(gb_attr_%d));\n",gdt->get_buffer_destroy().c_str(),g);
1580                 ret+=tmpstr;
1581           }
1582         }
1583
1584
1585
1586           bool first_udaf = true;
1587           if(has_udaf){
1588                 ret += "\t\tif(";
1589                 for(a=0;a<aggr_tbl->size();a++){
1590                         if(! aggr_tbl->is_builtin(a)){
1591                                 if(! first_udaf)ret += " || ";
1592                                 else first_udaf = false;
1593                                 ret += aggr_tbl->get_op(a)+"_LFTA_AGGR_FLUSHME_(";
1594                                 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1595                                 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"))";
1596                         }
1597                 }
1598                 ret+="){\n";
1599                 ret+=" fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
1600                 ret += generate_tuple_from_aggr(node_name,schema,idx);
1601                 ret += "\t\tt->aggr_table_hashmap["+idx+"] &= ~SLOT_FILLED;\n";
1602                 ret+="\t\t}\n";
1603         }
1604         return ret;
1605 }
1606
1607
1608 string generate_init_group( table_list *schema, string idx){
1609           int g,a;
1610           string ret="\t\t\tt->aggr_table_hashmap["+idx+"] = hash2 | SLOT_FILLED | gen_val;\n";
1611 //                      Fill up the aggregate block.
1612           for(g=0;g<gb_tbl->size();g++){
1613                   sprintf(tmpstr,"\t\t\tt->aggr_table[%s].gb_var%d = gb_attr_%d;\n",idx.c_str(),g,g);
1614                   ret += tmpstr;
1615           }
1616           for(a=0;a<aggr_tbl->size();a++){
1617                   sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1618                   ret += generate_aggr_init(tmpstr, aggr_tbl,a,  schema);
1619           }
1620           ret+="\t\tt->n_aggrs++;\n";
1621         return ret;
1622 }
1623
1624
1625 string generate_fta_flush(string node_name, table_list *schema,
1626                 ext_fcn_list *Ext_fcns){
1627
1628    string ret;
1629    string select_var_defs ;
1630         int s, p;
1631
1632 //              Flush from previous epoch
1633
1634         ret+="static void fta_aggr_flush_old_"+node_name+"(struct FTA *f, unsigned int nflush){\n";
1635
1636         ret += "\tgs_int32_t tuple_size, tuple_pos;\n";
1637     ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1638         ret += "\tint i, lfta_bailout;\n";
1639         ret += "\tunsigned int gen_val;\n";
1640
1641         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1642         ret += generate_fta_name(node_name)+" *) f;\n";
1643
1644         ret += "\n";
1645
1646
1647 //              Variables needed to store selected attributes of BUFFER type
1648 //              temporarily, in order to compute their size for storage
1649 //              in an output tuple.
1650
1651   select_var_defs = "";
1652   for(s=0;s<sl_list.size();s++){
1653         data_type *sdt = sl_list[s]->get_data_type();
1654         if(sdt->is_buffer_type()){
1655           sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
1656           select_var_defs.append(tmpstr);
1657         }
1658   }
1659   if(select_var_defs != ""){
1660         ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
1661     ret += select_var_defs;
1662   }
1663
1664
1665 //              Variables to store results of partial functions.
1666   if(sl_fcns_start != sl_fcns_end){
1667         ret += "/*\t\tVariables to store the results of partial functions.\t*/\n";
1668         for(p=sl_fcns_start;p<sl_fcns_end;p++){
1669                 sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
1670                         partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
1671                 ret += tmpstr;
1672         }
1673         ret += "\tgs_retval_t retval = 0;\n\tint unpack_failed = 0;\n;";
1674   }
1675
1676 //              Variables for udaf output temporaries
1677         bool no_udaf = true;
1678         int a;
1679         for(a=0;a<aggr_tbl->size();a++){
1680                 if(! aggr_tbl->is_builtin(a)){
1681                         if(no_udaf){
1682                                 ret+="/*\t\tUDAF output vars.\t*/\n";
1683                                 no_udaf = false;
1684                         }
1685                         int afcn_id = aggr_tbl->get_fcn_id(a);
1686                         data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
1687                         sprintf(tmpstr,"udaf_ret%d", a);
1688                         ret+="\t"+adt->make_cvar(tmpstr)+";\n";
1689                 }
1690         }
1691
1692
1693 //  ret+="\tt->flush_finished=1; /* flush will be completed */\n";
1694   ret+="\n";
1695   ret+="\tgen_val = t->generation & SLOT_GEN_BITS;\n";
1696   ret+="\tfor (i=t->flush_pos; (i < t->max_aggrs) && t->n_aggrs && nflush>0; ++i){\n";
1697   ret+="\t\tif ( (t->aggr_table_hashmap[i] & SLOT_FILLED) && (((t->aggr_table_hashmap[i] & SLOT_GEN_BITS) != gen_val ) || (";
1698                 bool first_g=true;
1699                 int g;
1700                 for(g=0;g<gb_tbl->size();g++){
1701                   data_type *gdt = gb_tbl->get_data_type(g);
1702                   if(gdt->is_temporal()){
1703                         if(first_g) first_g=false; else ret+=" || ";
1704                         ret += "t->last_gb_"+int_to_string(g)+" > t->aggr_table[i].gb_var"+int_to_string(g)+" ";
1705                   }
1706                 }
1707   ret += "))) {\n";
1708   ret+="\t\t\tt->aggr_table_hashmap[i] = 0;\n";
1709   ret+=
1710 "#ifdef LFTA_STATS\n"
1711 "\t\t\tt->eviction_cnt++;\n"
1712 "#endif\n"
1713 ;
1714
1715
1716   ret+=generate_tuple_from_aggr(node_name,schema,"i");
1717
1718 //  ret+="\t\t\tt->n_aggrs--;\n";  // done in generate_tuple_from_aggr
1719   ret+="\t\t\tnflush--;\n";
1720   ret+="\t\t}\n";
1721   ret+="\t}\n";
1722   ret+="\tt->flush_pos=i;\n";
1723   ret+="\tif(t->n_aggrs == 0) {\n";
1724   ret+="\t\tt->flush_pos = t->max_aggrs;\n";
1725   ret += "\t}\n\n";
1726
1727   ret+="\tif(t->flush_pos == t->max_aggrs) {\n";
1728
1729   for(int g=0;g<gb_tbl->size();g++){
1730         data_type *dt = gb_tbl->get_data_type(g);
1731         if(dt->is_temporal()){
1732                 data_type *gdt = gb_tbl->get_data_type(g);
1733                 if(!gdt->is_buffer_type()){
1734                         sprintf(tmpstr,"\t\tt->last_flushed_gb_%d = t->flush_start_gb_%d;\n",g,g);
1735                         ret += tmpstr;
1736                 }
1737         }
1738   }
1739   ret += "\t}\n}\n\n";
1740
1741   return(ret);
1742 }
1743
1744 // TODO Remove sprintf to perform string catenation
1745 string generate_fta_load_params(string node_name){
1746         int p;
1747         vector<string> param_names = param_tbl->get_param_names();
1748
1749         string ret = "static int load_params_"+node_name+"(struct "+generate_fta_name(node_name);
1750                 ret += " *t, int sz, void *value, int initial_call){\n";
1751     ret += "\tint pos=0;\n";
1752     ret += "\tint data_pos;\n";
1753
1754         for(p=0;p<param_names.size();p++){
1755                 data_type *dt = param_tbl->get_data_type(param_names[p]);
1756                 if(dt->is_buffer_type()){
1757                         sprintf(tmpstr,"\t%s tmp_var_%s;\n",dt->get_cvar_type().c_str(), param_names[p].c_str() );
1758                         ret += tmpstr;
1759                         sprintf(tmpstr,"\t%s access_var_%s;\n",dt->get_tuple_cvar_type().c_str(), param_names[p].c_str() );
1760                         ret += tmpstr;
1761                 }
1762         }
1763
1764
1765
1766         ret += "\n\tdata_pos = ";
1767         for(p=0;p<param_names.size();p++){
1768                 if(p>0) ret += " + ";
1769                 data_type *dt = param_tbl->get_data_type(param_names[p]);
1770                 ret += "sizeof( ";
1771                 ret +=  dt->get_tuple_cvar_type();
1772                 ret += " )";
1773         }
1774         ret += ";\n";
1775         ret += "\tif(data_pos > sz) return 1;\n\n";
1776
1777
1778         for(p=0;p<param_names.size();p++){
1779                 data_type *dt = param_tbl->get_data_type(param_names[p]);
1780                 if(dt->is_buffer_type()){
1781                         sprintf(tmpstr,"\taccess_var_%s =  *( (%s *)((char *)value+pos) );\n",param_names[p].c_str(), dt->get_tuple_cvar_type().c_str() );
1782                         ret += tmpstr;
1783                         switch( dt->get_type() ){
1784                         case v_str_t:
1785 //                              ret += "\ttmp_var_"+param_names[p]+".data = ntohl( tmp_var_"+param_names[p]+".data );\n";               // ntoh conversion
1786 //                              ret += "\ttmp_var_"+param_names[p]+".length = ntohl( tmp_var_"+param_names[p]+".length );\n";   // ntoh conversion
1787                                 sprintf(tmpstr,"\tif( (access_var_%s.offset) + access_var_%s.length > sz) return 1;\n",param_names[p].c_str(), param_names[p].c_str() );
1788                                 ret += tmpstr;
1789                                 sprintf(tmpstr,"\ttmp_var_%s.data = (gs_sp_t)(value) + access_var_%s.offset ;\n",param_names[p].c_str(), param_names[p].c_str() );
1790                                 ret += tmpstr;
1791                                 sprintf(tmpstr,"\ttmp_var_%s.length =  access_var_%s.length ;\n",param_names[p].c_str(), param_names[p].c_str() );
1792                                 ret += tmpstr;
1793                         break;
1794                         default:
1795                                 fprintf(stderr,"ERROR: parameter %s is of type %s, a buffered type, but I don't know how to unpack it as a parameter.\n",param_names[p].c_str(), dt->get_type_str().c_str() );
1796                                 exit(1);
1797                         break;
1798                         }
1799 //                                      First, destroy the old
1800                         ret += "\tif(! initial_call)\n";
1801                         sprintf(tmpstr,"\t\t%s(&(t->param_%s));\n",dt->get_buffer_destroy().c_str(),param_names[p].c_str());
1802                         ret += tmpstr;
1803 //                                      Next, create the new.
1804                         sprintf(tmpstr,"\t%s((struct FTA *)t, &(t->param_%s), &tmp_var_%s);\n", dt->get_buffer_assign_copy().c_str(), param_names[p].c_str(), param_names[p].c_str() );
1805                         ret += tmpstr;
1806                 }else{
1807 //                      if(dt->needs_hn_translation()){
1808 //                              sprintf(tmpstr,"\tt->param_%s =  %s( *( (%s *)( (char *)value+pos) ) );\n",
1809 //                                param_names[p].c_str(), dt->ntoh_translation().c_str(), dt->get_cvar_type().c_str() );
1810 //                      }else{
1811                                 sprintf(tmpstr,"\tt->param_%s =  *( (%s *)( (char *)value+pos) );\n",
1812                                   param_names[p].c_str(), dt->get_cvar_type().c_str() );
1813 //                      }
1814                         ret += tmpstr;
1815                 }
1816                 sprintf(tmpstr,"\tpos += sizeof( %s );\n",dt->get_cvar_type().c_str() );
1817                 ret += tmpstr;
1818         }
1819
1820 //                      Register the pass-by-handle parameters
1821
1822         ret += "/* register and de-register the pass-by-handle parameters */\n";
1823
1824     int ph;
1825     for(ph=0;ph<param_handle_table.size();++ph){
1826                 data_type pdt(param_handle_table[ph]->type_name);
1827                 switch(param_handle_table[ph]->val_type){
1828                 case cplx_lit_e:
1829                         break;
1830                 case litval_e:
1831                         break;
1832                 case param_e:
1833                         ret += "\tif(! initial_call)\n";
1834                         sprintf(tmpstr, "\t\t%s(t->handle_param_%d);\n",
1835                                 param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1836                         ret += tmpstr;
1837                         sprintf(tmpstr,"\tt->handle_param_%d = %s((struct FTA *)t,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
1838                         ret += tmpstr;
1839
1840                         if(pdt.is_buffer_type()) ret += "&(";
1841                         ret += "t->param_"+param_handle_table[ph]->param_name;
1842                         if(pdt.is_buffer_type()) ret += ")";
1843                         ret += ");\n";
1844                         break;
1845                 default:
1846                         sprintf(tmpstr, "INTERNAL ERROR unknown case (%d) found when processing pass-by-handle parameter table.",param_handle_table[ph]->val_type);
1847                         fprintf(stderr,"%s\n",tmpstr);
1848                         ret+=tmpstr;
1849                 }
1850         }
1851
1852         ret+="\treturn 0;\n";
1853         ret += "}\n\n";
1854
1855         return(ret);
1856 }
1857
1858
1859
1860
1861 string generate_fta_free(string node_name, bool is_aggr_query){
1862
1863         string ret="static gs_retval_t free_fta_"+node_name+"(struct FTA *f, gs_uint32_t recursive){\n";
1864         ret+= "\tstruct "+generate_fta_name(node_name)+
1865                 " * t = (struct "+generate_fta_name(node_name)+" *) f;\n";
1866         ret += "\tint i;\n";
1867
1868         if(is_aggr_query){
1869                 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1870                 ret+="\t/* \t\tmark all groups as old */\n";
1871                 ret+="\tt->generation++;\n";
1872                 ret+="\tt->flush_pos = 0;\n";
1873                 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1874         }
1875
1876 //                      Deregister the pass-by-handle parameters
1877         ret += "/* de-register the pass-by-handle parameters */\n";
1878     int ph;
1879     for(ph=0;ph<param_handle_table.size();++ph){
1880                 sprintf(tmpstr, "\t%s(t->handle_param_%d);\n",
1881                         param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1882                 ret += tmpstr;
1883         }
1884
1885
1886         ret += "\treturn 0;\n}\n\n";
1887         return(ret);
1888 }
1889
1890
1891 string generate_fta_control(string node_name, table_list *schema, bool is_aggr_query){
1892         string ret="static gs_retval_t control_fta_"+node_name+"(struct FTA *f,  gs_int32_t command, gs_int32_t sz, void *value){\n";
1893         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1894                 ret += generate_fta_name(node_name)+" *) f;\n\n";
1895         ret+="\tint i;\n";
1896
1897
1898         ret += "\t/* temp status tuple */\n";
1899         ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1900         ret += "\tgs_int32_t tuple_size;\n";
1901
1902
1903         if(is_aggr_query){
1904                 ret+="\tif(command == FTA_COMMAND_FLUSH){\n";
1905
1906                 ret+="\t\tif (!t->n_aggrs) {\n";
1907                 ret+="\t\t\ttuple = (struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, 0);\n";
1908                 ret+="\t\t\tif( tuple != NULL)\n";
1909                 ret+="\t\t\t\tpost_tuple(tuple);\n";
1910
1911                 ret+="\t\t}else{\n";
1912
1913                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1914                 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1915                 ret +="\t\tt->generation++;\n";
1916                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1917                 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1918                 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1919                 ret+="\t\t\tt->flush_pos = 0;\n";
1920                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1921                 ret+="\t\t}\n";
1922
1923                 ret+="\t}\n";
1924         }
1925         if(param_tbl->size() > 0){
1926                 ret+=
1927 "\tif(command == FTA_COMMAND_LOAD_PARAMS){\n"
1928 "\t\tif(load_params_"+node_name+"(t, sz, value, 0))\n"
1929 "#ifndef LFTA_IN_NIC\n"
1930 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small, ignored.\\n\");\n"
1931 "#else\n"
1932 "\t\t{}\n"
1933 "#endif\n"
1934 "\t}\n";
1935         }
1936         ret+=
1937 "\tif(command == FTA_COMMAND_SET_SAMPLING_RATE){\n"
1938 "\t\tmemcpy(&t->sampling_rate, value, sizeof(gs_float_t));\n"
1939 "\t}\n\n";
1940
1941
1942         ret += "\tif(command == FTA_COMMAND_FILE_DONE ){\n";
1943
1944         if(is_aggr_query){
1945                 ret+="\t\tif (t->n_aggrs) {\n";
1946                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1947                 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1948                 ret +="\t\tt->generation++;\n";
1949                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1950                 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1951                 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1952                 ret+="\t\t\tt->flush_pos = 0;\n";
1953                 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1954                 ret+="\t\t}\n";
1955         }
1956
1957         ret += "\t\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+");\n";
1958         ret += "\t\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
1959         ret += "\t\tif( tuple == NULL)\n\t\treturn 1;\n";
1960
1961         /* mark tuple as EOF_TUPLE */
1962         ret += "\n\t\t/* Mark tuple as eof_tuple */\n";
1963         ret += "\t\ttuple->tuple_type = EOF_TUPLE;\n";
1964         ret += "\t\tpost_tuple(tuple);\n";
1965         ret += "\t}\n";
1966
1967         ret += "\treturn 0;\n}\n\n";
1968
1969         return(ret);
1970 }
1971
1972 string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query, bool advance_uxtime){
1973         string ret="static gs_retval_t clock_fta_"+node_name+"(struct FTA *f){\n";
1974         ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1975                 ret += generate_fta_name(node_name)+" *) f;\n\n";
1976
1977         ret += "\t/* Create a temp status tuple */\n";
1978         ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1979         ret += "\tgs_int32_t tuple_size;\n";
1980         ret += "\tunsigned int i;\n";
1981         ret += "\ttime_t cur_time;\n";
1982         ret += "\tint time_advanced;\n";
1983         ret += "\tstruct fta_stat stats;\n";
1984
1985
1986
1987         /* copy the last seen values of temporal attributes */
1988         col_id_set temp_cids;           //      col ids of temp attributes in select clause
1989
1990
1991         /* HACK: in order to reuse the SE generation code, we need to copy
1992          * the last values of the temp attributes into new variables
1993          * which have names unpack_var_XXX_XXX
1994          */
1995
1996         int s, g;
1997         col_id_set::iterator csi;
1998
1999         for(s=0;s<sl_list.size();s++){
2000                 data_type *sdt = sl_list[s]->get_data_type();
2001                 if (sdt->is_temporal()) {
2002                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
2003                 }
2004         }
2005
2006         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2007                 int tblref = (*csi).tblvar_ref;
2008                 int schref = (*csi).schema_ref;
2009                 string field = (*csi).field;
2010                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2011                 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
2012                 ret += tmpstr;
2013         }
2014
2015         if (is_aggr_query) {
2016                 for(g=0;g<gb_tbl->size();g++){
2017                         data_type *gdt = gb_tbl->get_data_type(g);
2018                         if(gdt->is_temporal()){
2019                                 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
2020                                 ret += tmpstr;
2021                                 data_type *gdt = gb_tbl->get_data_type(g);
2022                                 if(gdt->is_buffer_type()){
2023                                 sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
2024                                 ret += tmpstr;
2025                                 }
2026                         }
2027                 }
2028         }
2029         ret += "\n";
2030
2031         ret += "\ttime_advanced = 0;\n";
2032
2033         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2034                 int tblref = (*csi).tblvar_ref;
2035                 int schref = (*csi).schema_ref;
2036                 string field = (*csi).field;
2037                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2038
2039                 // update last seen value with the value seen
2040                 ret += "\t#ifdef PREFILTER_DEFINED\n";
2041                 sprintf(tmpstr,"\tif (prefilter_temp_vars.unpack_var_%s_%d > t->last_%s_%d) {\n\t\tt->last_%s_%d = prefilter_temp_vars.unpack_var_%s_%d;\n",
2042                         field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref);
2043                 ret += tmpstr;
2044                 ret += "\t\ttime_advanced = 1;\n\t}\n";
2045                 ret += "\t#endif\n";
2046
2047                 // we need to pay special attention to time fields
2048                 if (field == "time" || field == "timestamp" || field == "timestamp_ms"){
2049                         ret += "\tcur_time = time(&cur_time);\n";
2050
2051                         if (field == "time") {
2052                                 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && (t->last_time_%d < (cur_time - %d))) {\n",
2053                                         tblref, time_corr);
2054                                 ret += tmpstr;
2055                                 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = cur_time - %d;\n",
2056                                 field.c_str(), tblref, field.c_str(), tblref, time_corr);
2057                         } else if (field == "timestamp_ms") {
2058                                 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((t->last_timestamp_ms_%d/1000) < (cur_time - %d))) {\n",
2059                                         tblref, time_corr);
2060                                 ret += tmpstr;
2061                                 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = (cur_time - %d) * 1000;\n",
2062                                 field.c_str(), tblref, field.c_str(), tblref, time_corr);
2063                         }else{
2064                                 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((gs_uint32_t)(t->last_%s_%d>>32) < (cur_time - %d))) {\n",
2065                                         field.c_str(), tblref, time_corr);
2066                                 ret += tmpstr;
2067                                 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = ((gs_uint64_t)(cur_time - %d))<<32;\n",
2068                                         field.c_str(), tblref, field.c_str(), tblref, time_corr);
2069                         }
2070                         ret += tmpstr;
2071
2072                         ret += "\t\ttime_advanced = 1;\n";
2073                         ret += "\t}\n";
2074
2075                         sprintf(tmpstr,"\telse\n\t\tunpack_var_%s_%d = t->last_%s_%d;\n",
2076                         field.c_str(), tblref, field.c_str(), tblref);
2077                         ret += tmpstr;
2078                 } else {
2079                         sprintf(tmpstr,"\tunpack_var_%s_%d = t->last_%s_%d;\n",
2080                                 field.c_str(), tblref, field.c_str(), tblref);
2081                         ret += tmpstr;
2082                 }
2083         }
2084
2085
2086         if(advance_uxtime){
2087                 ret += "\tt->ux_time = time(&(t->ux_time));\n";
2088         }
2089
2090         // for aggregation lftas we need to check if the time was advanced beyond the current epoch
2091         if (is_aggr_query) {
2092
2093                 string change_test;
2094                 bool first_one = true;
2095                 for(g=0;g<gb_tbl->size();g++){
2096                   data_type *gdt = gb_tbl->get_data_type(g);
2097                   if(gdt->is_temporal()){
2098 //                                      To perform the test, first need to compute the value
2099 //                                      of the temporal gb attrs.
2100                           if(gdt->is_buffer_type()){
2101         //                              NOTE : if the SE defining the gb is anything
2102         //                              other than a ref to a variable, this will generate
2103         //                              illegal code.  To be resolved with Spatch.
2104                                 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2105                                         g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2106                                 ret+=tmpstr;
2107                                 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2108                                         gdt->get_buffer_assign_copy().c_str(), g, g);
2109                           }else{
2110                                 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2111                           }
2112                           ret += tmpstr;
2113
2114                           sprintf(tmpstr,"t->last_gb_%d",g);   string lhs_op = tmpstr;
2115                           sprintf(tmpstr,"gb_attr_%d",g);   string rhs_op = tmpstr;
2116                           if(first_one){first_one = false;} else {change_test.append(") && (");}
2117                           change_test.append(generate_equality_test(lhs_op, rhs_op, gdt));
2118                   }
2119                 }
2120
2121                 ret += "\n\tif( time_advanced && !( (";
2122                 ret += change_test;
2123                 ret += ") ) ){\n";
2124
2125                 ret += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
2126                 ret += "\t\tif(t->flush_pos<t->max_aggrs) \n";
2127                 ret += "\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2128
2129                 ret += "\t\t/* \t\tmark all groups as old */\n";
2130                 ret +="\t\tt->generation++;\n";
2131                 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
2132                 ret += "//\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
2133                 ret += "//\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
2134                 ret += "\t\tt->flush_pos = 0;\n";
2135
2136                 for(g=0;g<gb_tbl->size();g++){
2137                      data_type *gdt = gb_tbl->get_data_type(g);
2138                      if(gdt->is_temporal()){
2139                         sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);                 ret += tmpstr;
2140                         sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);                        ret += tmpstr;
2141                      }
2142                 }
2143                 ret += "\t\tt->n_ticks = 0; // reset clock tick counter, limit slow flush\n";
2144                 ret += "\t}else{\n";
2145                 ret += "//\tLimit slow flush, do a full flush at two clock ticks past the change in generation.\n";
2146                 ret += "\t\tt->n_ticks++;\n";
2147                 ret += "\t\tif(t->n_ticks == 2){\n";
2148                 ret += "\t\t\tif(t->flush_pos<t->max_aggrs) \n";
2149                 ret += "\t\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2150                 ret += "\t\t}\n";
2151                 ret += "\t}\n\n";
2152                 
2153
2154         }
2155
2156         ret += "\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t) + sizeof(struct fta_stat);\n";
2157         ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+"*)allocate_tuple(f, tuple_size );\n";
2158         ret += "\tif( tuple == NULL)\n\t\treturn 1;\n";
2159
2160
2161         for(s=0;s<sl_list.size();s++){
2162                 data_type *sdt = sl_list[s]->get_data_type();
2163                 if(sdt->is_temporal()){
2164
2165                         if (sl_list[s]->is_gb()) {
2166                                 sprintf(tmpstr,"\tt->last_flushed_gb_%d = (t->n_aggrs) ? t->last_flushed_gb_%d : %s;\n",sl_list[s]->get_gb_ref(), sl_list[s]->get_gb_ref(), generate_se_code(sl_list[s],schema).c_str());
2167                                 ret += tmpstr;
2168                         }
2169
2170                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2171                         ret += tmpstr;
2172 //                      if(sdt->needs_hn_translation())
2173 //                              ret += sdt->hton_translation() +"( ";
2174                         if (sl_list[s]->is_gb()) {
2175                                 sprintf(tmpstr, "t->last_flushed_gb_%d",sl_list[s]->get_gb_ref());
2176                                 ret += tmpstr;
2177                         } else{
2178                                 ret += generate_se_code(sl_list[s],schema);
2179                         }
2180 //                      if(sdt->needs_hn_translation())
2181 //                              ret += " )";
2182                         ret += ";\n";
2183                 }
2184         }
2185
2186         /* mark tuple as temporal */
2187         ret += "\n\t/* Mark tuple as temporal */\n";
2188         ret += "\ttuple->tuple_type = TEMPORAL_TUPLE;\n";
2189
2190         ret += "\n\t/* Copy trace id */\n";
2191         ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+"), &t->trace_id, sizeof(gs_uint64_t));\n\n";
2192
2193         ret += "\n\t/* Populate runtime stats */\n";
2194         ret += "\tstats.ftaid = f->ftaid;\n";
2195         ret += "\tstats.in_tuple_cnt = t->in_tuple_cnt;\n";
2196         ret += "\tstats.out_tuple_cnt = t->out_tuple_cnt;\n";
2197         ret += "\tstats.out_tuple_sz = t->out_tuple_sz;\n";
2198         ret += "\tstats.accepted_tuple_cnt = t->accepted_tuple_cnt;\n";
2199         ret += "\tstats.cycle_cnt = t->cycle_cnt;\n";
2200         ret += "\tstats.collision_cnt = t->collision_cnt;\n";
2201         ret += "\tstats.eviction_cnt = t->eviction_cnt;\n";
2202         ret += "\tstats.sampling_rate    = t->sampling_rate;\n";
2203
2204         ret += "\n#ifdef LFTA_PROFILE\n";
2205         ret += "\n\t/* Print stats */\n";
2206         ret += "\tfprintf(stderr, \"STATS " + node_name + " \");\n";
2207         ret += "\tfprintf(stderr, \"in_tuple_cnt= %u \", t->in_tuple_cnt);\n";
2208         ret += "\tfprintf(stderr, \"out_tuple_cnt= %u \", t->out_tuple_cnt);\n";
2209         ret += "\tfprintf(stderr, \"out_tuple_sz= %u \", t->out_tuple_sz);\n";
2210         ret += "\tfprintf(stderr, \"accepted_tuple_cnt= %u \", t->accepted_tuple_cnt);\n";
2211         ret += "\tfprintf(stderr, \"cycle_cnt= %llu \", t->cycle_cnt);\n";
2212         ret += "\tfprintf(stderr, \"cycle_per_in_tuple= %lf \", ((double)t->cycle_cnt)/((double)t->in_tuple_cnt));\n";
2213         ret += "\tfprintf(stderr, \"collision_cnt= %d\\n\", t->collision_cnt);\n\n";
2214         ret += "\tfprintf(stderr, \"eviction_cnt= %d\\n\", t->eviction_cnt);\n\n";
2215         ret += "\n#endif\n";
2216
2217
2218         ret += "\n\t/* Copy stats */\n";
2219         ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t), &stats, sizeof(fta_stat));\n\n";
2220         ret+="\tpost_tuple(tuple);\n";
2221
2222         ret += "\n\t/* Send a heartbeat message to clearinghouse */\n";
2223         ret += "\n\t/* Disable heartbeats for now to avoid overloading clearinghouse */\n";     
2224         ret += "\t/* fta_heartbeat(f->ftaid, t->trace_id++, 1, &stats); */\n";
2225
2226         ret += "\n\t/* Reset runtime stats */\n";
2227         ret += "\tt->in_tuple_cnt = 0;\n";
2228         ret += "\tt->out_tuple_cnt = 0;\n";
2229         ret += "\tt->out_tuple_sz = 0;\n";
2230         ret += "\tt->accepted_tuple_cnt = 0;\n";
2231         ret += "\tt->cycle_cnt = 0;\n";
2232         ret += "\tt->collision_cnt = 0;\n";
2233         ret += "\tt->eviction_cnt = 0;\n";
2234
2235         ret += "\treturn 0;\n}\n\n";
2236
2237         return(ret);
2238 }
2239
2240
2241 //              accept processing before the where clause,
2242 //              do flush processwing.
2243 string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *schema,   col_id_set &unpacked_cids, string &temporal_flush){
2244         int s;
2245
2246 //              Slow flush
2247   string ret="\n/*\tslow flush\t*/\n";
2248   string slow_flush_str = fs->get_val_of_def("slow_flush");
2249   int n_slow_flush = atoi(slow_flush_str.c_str());
2250   if(n_slow_flush <= 0) n_slow_flush = 2;
2251   if(n_slow_flush > 1){
2252         ret += "\tt->flush_ctr++;\n";
2253         ret += "\tif(t->flush_ctr >= "+int_to_string(n_slow_flush)+"){\n";
2254         ret += "\t\tt->flush_ctr = 0;\n";
2255     ret+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n";
2256         ret += "\t}\n\n";
2257   }else{
2258     ret+="\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n\n";
2259   }
2260
2261
2262         string change_test;
2263         bool first_one = true;
2264         int g;
2265     col_id_set flush_cids;              //      col ids accessed when computing flush variables.
2266                                                         //  unpack them at temporal flush test time.
2267     temporal_flush = "";
2268
2269
2270         for(g=0;g<gb_tbl->size();g++){
2271                   data_type *gdt = gb_tbl->get_data_type(g);
2272                   if(gdt->is_temporal()){
2273                           gather_se_col_ids(gb_tbl->get_def(g), flush_cids, gb_tbl);
2274
2275 //                                      To perform the test, first need to compute the value
2276 //                                      of the temporal gb attrs.
2277                           if(gdt->is_buffer_type()){
2278         //                              NOTE : if the SE defining the gb is anything
2279         //                              other than a ref to a variable, this will generate
2280         //                              illegal code.  To be resolved with Spatch.
2281                                 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2282                                         g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2283                                 temporal_flush += tmpstr;
2284                                 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2285                                         gdt->get_buffer_assign_copy().c_str(), g, g);
2286                           }else{
2287                                 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2288                           }
2289                           temporal_flush += tmpstr;
2290 //                                      END computing the value of the temporal GB attr.
2291
2292
2293                           sprintf(tmpstr,"t->last_gb_%d",g);   string lhs_op = tmpstr;
2294                           sprintf(tmpstr,"gb_attr_%d",g);   string rhs_op = tmpstr;
2295                           if(first_one){first_one = false;} else {change_test.append(") && (");}
2296                           change_test += generate_equality_test(lhs_op, rhs_op, gdt);
2297                   }
2298         }
2299         if(!first_one){         // will be false iff. there is a temporal GB attribute
2300                   temporal_flush += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
2301                   temporal_flush += "\tif( !( (";
2302                   temporal_flush += change_test;
2303                   temporal_flush += ") ) ){\n";
2304
2305 //                temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2306                   temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs){ \n";
2307                   temporal_flush+="\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2308                   temporal_flush+="\t\t}\n";
2309                   temporal_flush+="\t\t/* \t\tmark all groups as old */\n";
2310                   temporal_flush+="\t\tt->generation++;\n";
2311                   temporal_flush+="\t\tt->flush_pos = 0;\n";
2312                   temporal_flush+="\t\tt->n_ticks = 0; // reset clock tick counter, to limit slow flush\n";
2313
2314
2315 //                              Now set the saved temporal value of the gb to the
2316 //                              current value of the gb.  Only for simple types,
2317 //                              not for buffer types -- but the strings are not
2318 //                              temporal in any case.
2319
2320                         for(g=0;g<gb_tbl->size();g++){
2321                           data_type *gdt = gb_tbl->get_data_type(g);
2322                           if(gdt->is_temporal()){
2323                                   if(gdt->is_buffer_type()){
2324
2325                                         fprintf(stderr,"ERROR : can't handle temporal buffer types, ignoring in buffer flush control.\n");
2326                                   }else{
2327                                         sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);
2328                                         temporal_flush += tmpstr;
2329                                         sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);
2330                                         temporal_flush += tmpstr;
2331                                   }
2332                                 }
2333                         }
2334                   temporal_flush += "\t}\n\n";
2335         }
2336
2337 // Unpack all the temporal attributes referenced in select clause
2338 // and update the last value of the attribute
2339         col_id_set temp_cids;           //      col ids of temp attributes in select clause
2340         col_id_set::iterator csi;
2341
2342         for(s=0;s<sl_list.size();s++){
2343                 data_type *sdt = sl_list[s]->get_data_type();
2344                 if (sdt->is_temporal()) {
2345                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
2346                 }
2347         }
2348
2349         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2350                 if(unpacked_cids.count((*csi)) == 0){
2351                         int tblref = (*csi).tblvar_ref;
2352                         int schref = (*csi).schema_ref;
2353                         string field = (*csi).field;
2354                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2355 /*
2356                         data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2357                         sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",
2358                                 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2359                         ret += tmpstr;
2360                         ret += "\tif(retval) return 1;\n";
2361 */
2362                         sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
2363                         ret += tmpstr;
2364
2365                         unpacked_cids.insert( (*csi) );
2366                 }
2367         }
2368
2369
2370 //                              Do the flush here if this is a real_time query
2371         string rt_level = fs->get_val_of_def("real_time");
2372         if(rt_level != "" && temporal_flush != ""){
2373                 for(csi=flush_cids.begin(); csi != flush_cids.end();++csi){
2374                         if(unpacked_cids.count((*csi)) == 0){
2375                         int tblref = (*csi).tblvar_ref;
2376                         int schref = (*csi).schema_ref;
2377                         string field = (*csi).field;
2378                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2379 /*
2380                                 sprintf(tmpstr,"\tretval =  %s(p, &unpack_var_%s_%d);\n",
2381                                 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2382                         ret += tmpstr;
2383                         ret += "\tif(retval) return 1;\n";
2384 */
2385                                 unpacked_cids.insert( (*csi) );
2386                         }
2387                 }
2388                 ret += temporal_flush;
2389         }
2390
2391         return ret;
2392  }
2393
2394 string generate_sel_accept_body(qp_node *fs, string node_name, table_list *schema){
2395
2396 int p,s;
2397 string ret;
2398
2399 ///////////////                 Processing for filter-only query
2400
2401 //                      test passed : create the tuple, then assign to it.
2402           ret += "/*\t\tCreate and post the tuple\t*/\n";
2403
2404 //                      Unpack partial fcns ref'd by the select clause.
2405 //                      Its a kind of a WHERE clause ...
2406   for(p=sl_fcns_start;p<sl_fcns_end;p++){
2407         if(fcn_ref_cnt[p] > 1){
2408                 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2409         }
2410         if(is_partial_fcn[p]){
2411                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2412                 ret += "\tif(retval) goto end;\n";
2413         }
2414         if(fcn_ref_cnt[p] > 1){
2415                 if(!is_partial_fcn[p]){
2416                         ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2417                 }
2418                 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2419                 ret += "\t}\n";
2420         }
2421   }
2422
2423   // increment the counter of accepted tuples
2424   ret += "\n\t#ifdef LFTA_STATS\n";
2425   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2426   ret += "\t#endif\n\n";
2427
2428 //                      First, compute the size of the tuple.
2429
2430 //                      Unpack any BUFFER type selections into temporaries
2431 //                      so that I can compute their size and not have
2432 //                      to recompute their value during tuple packing.
2433 //                      I can use regular assignment here because
2434 //                      these temporaries are non-persistent.
2435
2436           for(s=0;s<sl_list.size();s++){
2437                 data_type *sdt = sl_list[s]->get_data_type();
2438                 if(sdt->is_buffer_type()){
2439                         sprintf(tmpstr,"\tselvar_%d = ",s);
2440                         ret += tmpstr;
2441                         ret += generate_se_code(sl_list[s],schema);
2442                         ret += ";\n";
2443                 }
2444           }
2445
2446
2447 //              The size of the tuple is the size of the tuple struct plus the
2448 //              size of the buffers to be copied in.
2449
2450           ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
2451           for(s=0;s<sl_list.size();s++){
2452                 data_type *sdt = sl_list[s]->get_data_type();
2453                 if(sdt->is_buffer_type()){
2454                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
2455                         ret += tmpstr;
2456                 }
2457           }
2458           ret += ";\n";
2459
2460
2461           ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
2462           ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
2463
2464 //                      Test passed, make assignments to the tuple.
2465
2466           ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
2467
2468 //                      Mark tuple as REGULAR_TUPLE
2469           ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
2470
2471
2472           for(s=0;s<sl_list.size();s++){
2473                 data_type *sdt = sl_list[s]->get_data_type();
2474                 if(sdt->is_buffer_type()){
2475                         sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
2476                         ret += tmpstr;
2477                         sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
2478                         ret += tmpstr;
2479                 }else{
2480                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2481                         ret += tmpstr;
2482 //                      if(sdt->needs_hn_translation())
2483 //                              ret += sdt->hton_translation() +"( ";
2484                         ret += generate_se_code(sl_list[s],schema);
2485 //                      if(sdt->needs_hn_translation())
2486 //                              ret += ") ";
2487                         ret += ";\n";
2488                 }
2489           }
2490
2491 //              Generate output.
2492
2493           ret += "\tpost_tuple(tuple);\n";
2494
2495 //              Increment the counter of posted tuples
2496   ret += "\n\t#ifdef LFTA_STATS\n";
2497   ret += "\tt->out_tuple_cnt++;\n";
2498   ret+="\tt->out_tuple_sz+=tuple_size;\n";
2499   ret += "\t#endif\n\n";
2500
2501
2502
2503         return ret;
2504 }
2505
2506 //      TODO Ensure that postfilter predicates are being generated
2507 string generate_fj_accept_body(filter_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
2508
2509 int p,s,w;
2510 string ret;
2511
2512 //                      Get parameters
2513         unsigned int window_len = fs->temporal_range;
2514         unsigned int n_bloom = 11;
2515         string n_bloom_str = fs->get_val_of_def("num_bloom");
2516         int tmp_n_bloom = atoi(n_bloom_str.c_str());
2517         if(tmp_n_bloom>0)
2518                 n_bloom = tmp_n_bloom+1;
2519         float bloom_width = (window_len+1.0)/(1.0*n_bloom-1);
2520         sprintf(tmpstr,"%f",bloom_width);
2521         string bloom_width_str = tmpstr;
2522
2523         if(window_len < n_bloom){
2524                 n_bloom = window_len+1;
2525                 bloom_width_str = "1";
2526         }
2527
2528
2529 //              Grab the current window time
2530         scalarexp_t winvar(fs->temporal_var);
2531         ret += "\tcurr_fj_ts = "+generate_se_code(&winvar,schema)+";\n";
2532
2533         int bf_exp_size = 12;  // base-2 log of number of bits
2534         string bloom_len_str = fs->get_val_of_def("bloom_size");
2535         int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
2536         if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
2537                 bf_exp_size = tmp_bf_exp_size;
2538         }
2539         int bf_bit_size = 1 << bf_exp_size;
2540         int bf_byte_size = bf_bit_size / (8*sizeof(char));
2541
2542         unsigned int ht_size = 4096;
2543         string ht_size_s = fs->get_val_of_def("aggregate_slots");
2544         int tmp_ht_size = atoi(ht_size_s.c_str());
2545         if(tmp_ht_size > 1024){
2546                 unsigned int hs = 1;            // make it power of 2
2547                 while(tmp_ht_size){
2548                         hs =hs << 1;
2549                         tmp_ht_size = tmp_ht_size >> 1;
2550                 }
2551                 ht_size = hs;
2552         }
2553
2554         int i, bf_mask = 0;
2555         if(fs->use_bloom){
2556                 for(i=0;i<bf_exp_size;i++)
2557                         bf_mask = (bf_mask << 1) | 1;
2558         }else{
2559                 for(i=ht_size;i>1;i=i>>1)
2560                         bf_mask = (bf_mask << 1) | 1;
2561         }
2562
2563 /*
2564 printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=%d, bf_byte_size=%d, ht_size=%d, ht_size_s=%s, bf_mask=%d\n",
2565         n_bloom,
2566         window_len,
2567         bloom_width_str.c_str(),
2568         bf_exp_size,
2569         bf_bit_size,
2570         bf_byte_size,
2571         ht_size,
2572         ht_size_s.c_str(),
2573         bf_mask);
2574 */
2575
2576
2577
2578
2579 //              If this is a bloom-filter fj, first test if the
2580 //              bloom filter needs to be advanced.
2581 //              SET_BF_EMPTY(table,number of bloom filters,bloom filter index,bit index)
2582 //              t->bf_size : number of bits in bloom filter
2583 //              TODO: vectorize?
2584 //              TODO: Don't iterate more than n_bloom times!
2585 //                      As written, its possible to wrap around many times.
2586         if(fs->use_bloom){
2587                 ret +=
2588 "//                     Clean out old bloom filters if needed.\n"
2589 "//                     TODO vectorize this ? \n"
2590 "       if(t->first_exec){\n"
2591 "               t->first_exec = 0;\n"
2592 "               t->last_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2593 "               t->last_bloom_pos = t->last_bin % "+int_to_string(n_bloom)+";\n"
2594 "       }else{\n"
2595 "               curr_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2596 "               if(curr_bin != t->last_bin){\n"
2597 "                       for(the_bin=t->last_bin+1;the_bin<=curr_bin;the_bin++){\n"
2598 "                               t->last_bloom_pos++;\n"
2599 "                               if(t->last_bloom_pos >= "+int_to_string(n_bloom)+")\n"
2600 "                                       t->last_bloom_pos = 0;\n"
2601 "                               tmp_i = t->last_bloom_pos;\n"
2602 "                               for(j=0;j<"+int_to_string(bf_bit_size)+";j++){\n"
2603 "                                       SET_BF_EMPTY(t->bf_table, "+int_to_string(n_bloom)+", tmp_i,j);\n"
2604 "                               }\n"
2605 "                       }\n"
2606 "               }\n"
2607 "               t->last_bin = curr_bin;\n"
2608 "       }\n"
2609 ;
2610         }
2611
2612
2613 //-----------------------------------------------------------------
2614 //              First, determine whether to do S (filter stream) processing.
2615
2616         ret +=
2617 "//             S (filtering stream) predicate, should it be processed?\n"
2618 "\n"
2619 ;
2620 // Sort S preds based on cost.
2621         vector<cnf_elem *> s_filt = fs->pred_t1;
2622         col_id_set::iterator csi;
2623   if(s_filt.size() > 0){
2624
2625 //                      Unpack fields ref'd in the S pred
2626         for(w=0;w<s_filt.size();++w){
2627                 col_id_set this_pred_cids;
2628                 gather_pr_col_ids(s_filt[w]->pr, this_pred_cids, gb_tbl);
2629                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2630                         if(unpacked_cids.count( (*csi) ) == 0){
2631                                 int tblref = (*csi).tblvar_ref;
2632                                 int schref = (*csi).schema_ref;
2633                                 string field = (*csi).field;
2634                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name,"end_s");
2635                                 unpacked_cids.insert( (*csi) );
2636                         }
2637                 }
2638         }
2639
2640
2641 //              Sort by evaluation cost.
2642 //              First, estimate evaluation costs
2643 //              Eliminate predicates covered by the prefilter (those in s_pids).
2644 //              I need to do it before the sort becuase the indices refer
2645 //              to the position in the unsorted list.
2646         vector<cnf_elem *> tmp_wh;
2647         for(w=0;w<s_filt.size();++w){
2648                 compute_cnf_cost(s_filt[w],Ext_fcns);
2649                 tmp_wh.push_back(s_filt[w]);
2650         }
2651         s_filt = tmp_wh;
2652
2653         sort(s_filt.begin(), s_filt.end(), compare_cnf_cost());
2654
2655 //              Now generate the predicates.
2656         for(w=0;w<s_filt.size();++w){
2657                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,s_filt[w]->cost);
2658                 ret += tmpstr;
2659
2660 //                      Find partial fcns ref'd in this cnf element
2661                 set<int> pfcn_refs;
2662                 collect_partial_fcns_pr(s_filt[w]->pr, pfcn_refs);
2663 //                      Since set<..> is a "Sorted Associative Container",
2664 //                      we can walk through it in sorted order by walking from
2665 //                      begin() to end().  (and the partial fcns must be
2666 //                      evaluated in this order).
2667                 set<int>::iterator si;
2668                 string pf_preds;
2669                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2670                         if(fcn_ref_cnt[(*si)] > 1){
2671                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2672                         }
2673                         if(is_partial_fcn[(*si)]){
2674                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2675                                 ret += "\t\tif(retval) goto end_s;\n";
2676                         }
2677                         if(fcn_ref_cnt[(*si)] > 1){
2678                                 if(!is_partial_fcn[(*si)]){
2679                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2680 //              Testing for S is a side branch.
2681 //              I don't want a cacheable partial function to be
2682 //              marked as evaluated.  Therefore I mark the function
2683 //              as evalauted ONLY IF it is not partial.
2684                                         ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2685                                 }
2686                                 ret += "\t}\n";
2687                         }
2688                 }
2689
2690                 ret += "\tif( !("+generate_predicate_code(s_filt[w]->pr,schema)+
2691                                 ") ) goto end_s;\n";
2692         }
2693   }else{
2694           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2695   }
2696
2697         for(p=0;p<fs->hash_eq.size();++p)
2698                 ret += "\t\ts_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_right_se(),schema)+";\n";
2699
2700         if(fs->use_bloom){
2701 //                      First, generate the S scalar expressions in the hash_eq
2702
2703 //                      Iterate over the bloom filters
2704                 for(i=0;i<3;i++){
2705                         ret += "\t\tbucket=0;\n";
2706                         for(p=0;p<fs->hash_eq.size();++p){
2707                                 ret +=
2708 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2709         fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2710         +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2711                         }
2712 //              SET_BF_BIT(table,number of bloom filters,bloom filter index,bit index)
2713                                 ret +=
2714 "               bucket &= "+int_to_string(bf_mask)+";\n"
2715 "               SET_BF_BIT(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket);\n"
2716 "\n"
2717 ;
2718                 }
2719         }else{
2720                 ret += "// Add the S record to the hash table, choose a position\n";
2721                 ret += "\t\tbucket=0;\n";
2722                 for(p=0;p<fs->hash_eq.size();++p){
2723                         ret +=
2724 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2725         fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2726         +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2727                 }
2728                 ret +=
2729 "               bucket &= "+int_to_string(bf_mask)+";\n"
2730 "               bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2731 ;
2732 //                      Try the first bucket
2733                 ret += "\t\tif(";
2734                 for(p=0;p<fs->hash_eq.size();++p){
2735                         if(p>0) ret += " && ";
2736 //                      ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2737 //                                      " == s_equijoin_"+int_to_string(p);
2738                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2739                         string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2740                         string rhs_op = "s_equijoin_"+int_to_string(p);
2741                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2742                 }
2743                 ret += "){\n\t\t\tthe_bucket = bucket;\n";
2744                 ret += "\t\t}else{\n\t\t\tif(";
2745                 for(p=0;p<fs->hash_eq.size();++p){
2746                         if(p>0) ret += " && ";
2747 //                      ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2748 //                                      " == s_equijoin_"+int_to_string(p);
2749                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2750                         string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
2751                         string rhs_op = "s_equijoin_"+int_to_string(p);
2752                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2753                 }
2754                 ret +=  "){\n\t\t\t\tthe_bucket = bucket1;\n";
2755                 ret += "\t\t\t}else{ \n\t\t\t\tif(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n";
2756                 ret+="\t\t\t\t\tthe_bucket = bucket;\n\t\t\t\telse\n\t\t\t\t\tthe_bucket=bucket1;\n";
2757                 ret += "\t\t\t}\n\t\t}\n";
2758                 for(p=0;p<fs->hash_eq.size();++p){
2759                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2760                         if(hdt->is_buffer_type()){
2761                                 sprintf(tmpstr,"\t\t%s(f, &(t->join_table[the_bucket].key_var%d), &s_equijoin_%d);\n", hdt->get_buffer_assign_copy().c_str(), p, p);
2762                                 ret += tmpstr;
2763                         }else{
2764                                 ret += "\t\tt->join_table[the_bucket].key_var"+int_to_string(p)+
2765                                         " = s_equijoin_"+int_to_string(p)+";\n";
2766                         }
2767                 }
2768                 ret+="\t\tt->join_table[the_bucket].ts =  curr_fj_ts;\n";
2769         }
2770   ret += "\tend_s:\n";
2771
2772 //      ------------------------------------------------------------
2773 //              Next, determine if the R record should be processed.
2774
2775
2776         ret +=
2777 "//             R (main stream) cheap predicate\n"
2778 "\n"
2779 ;
2780
2781 //              Unpack r_filt fields
2782         vector<cnf_elem *> r_filt = fs->pred_t0;
2783         for(w=0;w<r_filt.size();++w){
2784                 col_id_set this_pred_cids;
2785                 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
2786                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2787                         if(unpacked_cids.count( (*csi) ) == 0){
2788                                 int tblref = (*csi).tblvar_ref;
2789                                 int schref = (*csi).schema_ref;
2790                                 string field = (*csi).field;
2791                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2792                                 unpacked_cids.insert( (*csi) );
2793                         }
2794                 }
2795         }
2796
2797 // Sort R preds based on cost.
2798
2799         vector<cnf_elem *> tmp_wh;
2800         for(w=0;w<r_filt.size();++w){
2801                 compute_cnf_cost(r_filt[w],Ext_fcns);
2802                 tmp_wh.push_back(r_filt[w]);
2803         }
2804         r_filt = tmp_wh;
2805
2806         sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
2807
2808 //              WARNING! the constant 20 below is a wild-ass guess.
2809         int cheap_rpos;
2810         for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++);
2811
2812 //              Test the cheap filters on R.
2813   if(cheap_rpos >0){
2814
2815 //              Now generate the predicates.
2816         for(w=0;w<cheap_rpos;++w){
2817                 sprintf(tmpstr,"//\t\tcheap R predicate clause %d. (cost %d)\n",w,r_filt[w]->cost);
2818                 ret += tmpstr;
2819
2820 //                      Find partial fcns ref'd in this cnf element
2821                 set<int> pfcn_refs;
2822                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2823 //                      Since set<..> is a "Sorted Associative Container",
2824 //                      we can walk through it in sorted order by walking from
2825 //                      begin() to end().  (and the partial fcns must be
2826 //                      evaluated in this order).
2827                 set<int>::iterator si;
2828                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2829                         if(fcn_ref_cnt[(*si)] > 1){
2830                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2831                         }
2832                         if(is_partial_fcn[(*si)]){
2833                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2834                                 ret += "\t\tif(retval) goto end;\n";
2835                         }
2836                         if(fcn_ref_cnt[(*si)] > 1){
2837                                 if(!is_partial_fcn[(*si)]){
2838                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2839                                 }
2840                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2841                                 ret += "\t}\n";
2842                         }
2843                 }
2844
2845                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2846                                 ") ) goto end;\n";
2847         }
2848   }else{
2849           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2850   }
2851
2852         ret += "\n//    Do the join\n\n";
2853         for(p=0;p<fs->hash_eq.size();++p)
2854                 ret += "\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n";
2855
2856
2857 //                      Passed the cheap pred, now test the join with S.
2858         if(fs->use_bloom){
2859                 for(i=0;i<3;i++){
2860                         ret += "\t\tbucket"+int_to_string(i)+"=0;\n";
2861                         for(p=0;p<fs->hash_eq.size();++p){
2862                                 ret +=
2863 "       bucket"+int_to_string(i)+
2864         " ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2865         fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
2866         +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2867                         }
2868                                 ret +=
2869 "       bucket"+int_to_string(i)+" &= "+int_to_string(bf_mask)+";\n";
2870                 }
2871                 ret += "\tfound = 0;\n";
2872                 ret += "\tfor(b=0;b<"+int_to_string(n_bloom)+" && !found; b++){\n";
2873                 ret +=
2874 "\t\tif(IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket0) && "
2875 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket1) && "
2876 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket2))\n "
2877 "\t\t\tfound=1;\n"
2878 "\t}\n"
2879 ;
2880                 ret +=
2881 "       if(!found)\n"
2882 "               goto end;\n"
2883 ;
2884         }else{
2885                 ret += "\tfound = 0;\n";
2886                 ret += "\t\tbucket=0;\n";
2887                 for(p=0;p<fs->hash_eq.size();++p){
2888                         ret +=
2889 "               bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2890         fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
2891         +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2892                 }
2893                 ret +=
2894 "               bucket &= "+int_to_string(bf_mask)+";\n"
2895 "               bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2896 ;
2897 //                      Try the first bucket
2898                 ret += "\t\tif(";
2899                 for(p=0;p<fs->hash_eq.size();++p){
2900                         if(p>0) ret += " && ";
2901 //                      ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2902 //                                      " == r_equijoin_"+int_to_string(p);
2903                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2904                         string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2905                         string rhs_op = "s_equijoin_"+int_to_string(p);
2906                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2907                 }
2908                 if(p>0) ret += " && ";
2909                 ret += "t->join_table[bucket].ts+"+int_to_string(fs->temporal_range)+" <=  curr_fj_ts";
2910                 ret += "){\n\t\t\tfound = 1;\n";
2911                 ret += "\t\t}else {if(";
2912                 for(p=0;p<fs->hash_eq.size();++p){
2913                         if(p>0) ret += " && ";
2914 //                      ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2915 //                                      " == r_equijoin_"+int_to_string(p);
2916                         data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2917                         string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
2918                         string rhs_op = "s_equijoin_"+int_to_string(p);
2919                         ret += generate_equality_test(lhs_op,rhs_op,hdt);
2920                 }
2921                 if(p>0) ret += " && ";
2922                 ret += "t->join_table[bucket1].ts+"+int_to_string(fs->temporal_range)+" <=  curr_fj_ts";
2923                 ret +=  ")\n\t\t\tfound=1;\n";
2924                 ret+="\t\t}\n";
2925                 ret +=
2926 "       if(!found)\n"
2927 "               goto end;\n"
2928 ;
2929         }
2930
2931
2932 //              Test the expensive filters on R.
2933   if(cheap_rpos < r_filt.size()){
2934
2935 //              Now generate the predicates.
2936         for(w=cheap_rpos;w<r_filt.size();++w){
2937                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
2938                 ret += tmpstr;
2939
2940 //                      Find partial fcns ref'd in this cnf element
2941                 set<int> pfcn_refs;
2942                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2943 //                      Since set<..> is a "Sorted Associative Container",
2944 //                      we can walk through it in sorted order by walking from
2945 //                      begin() to end().  (and the partial fcns must be
2946 //                      evaluated in this order).
2947                 set<int>::iterator si;
2948                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2949                         if(fcn_ref_cnt[(*si)] > 1){
2950                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2951                         }
2952                         if(is_partial_fcn[(*si)]){
2953                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2954                                 ret += "\t\tif(retval) goto end;\n";
2955                         }
2956                         if(fcn_ref_cnt[(*si)] > 1){
2957                                 if(!is_partial_fcn[(*si)]){
2958                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2959                                 }
2960                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2961                                 ret += "\t}\n";
2962                         }
2963                 }
2964
2965                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2966                                 ") ) goto end;\n";
2967         }
2968   }else{
2969           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2970   }
2971
2972
2973
2974 ///////////////                 post the tuple
2975
2976 //                      test passed : create the tuple, then assign to it.
2977           ret += "/*\t\tCreate and post the tuple\t*/\n";
2978
2979 //              Unpack r_filt fields
2980         for(s=0;s<sl_list.size();++s){
2981                 col_id_set this_se_cids;
2982                 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
2983                 for(csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
2984                         if(unpacked_cids.count( (*csi) ) == 0){
2985                                 int tblref = (*csi).tblvar_ref;
2986                                 int schref = (*csi).schema_ref;
2987                                 string field = (*csi).field;
2988                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2989                                 unpacked_cids.insert( (*csi) );
2990                         }
2991                 }
2992         }
2993
2994
2995 //                      Unpack partial fcns ref'd by the select clause.
2996 //                      Its a kind of a WHERE clause ...
2997   for(p=sl_fcns_start;p<sl_fcns_end;p++){
2998         if(fcn_ref_cnt[p] > 1){
2999                 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
3000         }
3001         if(is_partial_fcn[p]){
3002                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3003                 ret += "\tif(retval) goto end;\n";
3004         }
3005         if(fcn_ref_cnt[p] > 1){
3006                 if(!is_partial_fcn[p]){
3007                         ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
3008                 }
3009                 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
3010                 ret += "\t}\n";
3011         }
3012   }
3013
3014   // increment the counter of accepted tuples
3015   ret += "\n\t#ifdef LFTA_STATS\n";
3016   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3017   ret += "\t#endif\n\n";
3018
3019 //                      First, compute the size of the tuple.
3020
3021 //                      Unpack any BUFFER type selections into temporaries
3022 //                      so that I can compute their size and not have
3023 //                      to recompute their value during tuple packing.
3024 //                      I can use regular assignment here because
3025 //                      these temporaries are non-persistent.
3026
3027           for(s=0;s<sl_list.size();s++){
3028                 data_type *sdt = sl_list[s]->get_data_type();
3029                 if(sdt->is_buffer_type()){
3030                         sprintf(tmpstr,"\tselvar_%d = ",s);
3031                         ret += tmpstr;
3032                         ret += generate_se_code(sl_list[s],schema);
3033                         ret += ";\n";
3034                 }
3035           }
3036
3037
3038 //              The size of the tuple is the size of the tuple struct plus the
3039 //              size of the buffers to be copied in.
3040
3041           ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
3042           for(s=0;s<sl_list.size();s++){
3043                 data_type *sdt = sl_list[s]->get_data_type();
3044                 if(sdt->is_buffer_type()){
3045                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
3046                         ret += tmpstr;
3047                 }
3048           }
3049           ret += ";\n";
3050
3051
3052           ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
3053           ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
3054
3055 //                      Test passed, make assignments to the tuple.
3056
3057           ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
3058
3059 //                      Mark tuple as REGULAR_TUPLE
3060           ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
3061
3062
3063           for(s=0;s<sl_list.size();s++){
3064                 data_type *sdt = sl_list[s]->get_data_type();
3065                 if(sdt->is_buffer_type()){
3066                         sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
3067                         ret += tmpstr;
3068                         sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
3069                         ret += tmpstr;
3070                 }else{
3071                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
3072                         ret += tmpstr;
3073 //                      if(sdt->needs_hn_translation())
3074 //                              ret += sdt->hton_translation() +"( ";
3075                         ret += generate_se_code(sl_list[s],schema);
3076 //                      if(sdt->needs_hn_translation())
3077 //                              ret += ") ";
3078                         ret += ";\n";
3079                 }
3080           }
3081
3082 //              Generate output.
3083
3084           ret += "\tpost_tuple(tuple);\n";
3085
3086 //              Increment the counter of posted tuples
3087   ret += "\n\t#ifdef LFTA_STATS\n";
3088   ret += "\n\tt->out_tuple_cnt++;\n\n";
3089   ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
3090   ret += "\t#endif\n\n";
3091
3092
3093         return ret;
3094 }
3095
3096
3097 string generate_wj_accept_body(watch_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
3098
3099 int p,s,w;
3100 string ret;
3101
3102
3103         string wl_schema = fs->from[1]->get_schema_name();
3104         string wl_elem_str = generate_watchlist_element_name(wl_schema);
3105         string wl_node_str = generate_watchlist_struct_name(wl_schema);
3106         string tgt = generate_watchlist_name(wl_schema);
3107
3108         ret += "//\n//\t\tGenerate test to update watchtable here\n//\n\n";
3109
3110
3111
3112
3113
3114 //      ------------------------------------------------------------
3115 //              Determine if the R record should be processed.
3116
3117
3118         ret +=
3119 "//             R (main stream) cheap predicate\n"
3120 "\n"
3121 ;
3122
3123 //              Unpack r_filt fields
3124         vector<cnf_elem *> r_filt = fs->pred_t0;
3125         for(w=0;w<r_filt.size();++w){
3126                 col_id_set this_pred_cids;
3127                 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
3128                 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3129                         if(unpacked_cids.count( (*csi) ) == 0){
3130                                 int tblref = (*csi).tblvar_ref;
3131                                 int schref = (*csi).schema_ref;
3132                                 string field = (*csi).field;
3133                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3134                                 unpacked_cids.insert( (*csi) );
3135                         }
3136                 }
3137         }
3138
3139 // Sort R preds based on cost.
3140
3141         vector<cnf_elem *> tmp_wh;
3142         for(w=0;w<r_filt.size();++w){
3143                 compute_cnf_cost(r_filt[w],Ext_fcns);
3144                 tmp_wh.push_back(r_filt[w]);
3145         }
3146         r_filt = tmp_wh;
3147
3148         sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
3149
3150 //              WARNING! the constant 20 below is a wild-ass guess.
3151         int cheap_rpos;
3152         for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++);
3153
3154 //              Test the cheap filters on R.
3155   if(cheap_rpos >0){
3156
3157 //              Now generate the predicates.
3158         for(w=0;w<cheap_rpos;++w){
3159                 sprintf(tmpstr,"//\t\tCheap R predicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
3160                 ret += tmpstr;
3161
3162 //                      Find partial fcns ref'd in this cnf element
3163                 set<int> pfcn_refs;
3164                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
3165 //                      Since set<..> is a "Sorted Associative Container",
3166 //                      we can walk through it in sorted order by walking from
3167 //                      begin() to end().  (and the partial fcns must be
3168 //                      evaluated in this order).
3169                 set<int>::iterator si;
3170                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3171                         if(fcn_ref_cnt[(*si)] > 1){
3172                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3173                         }
3174                         if(is_partial_fcn[(*si)]){
3175                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3176                                 ret += "\t\tif(retval) goto end;\n";
3177                         }
3178                         if(fcn_ref_cnt[(*si)] > 1){
3179                                 if(!is_partial_fcn[(*si)]){
3180                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3181                                 }
3182                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3183                                 ret += "\t}\n";
3184                         }
3185                 }
3186
3187                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
3188                                 ") ) goto end;\n";
3189         }
3190   }else{
3191           ret += "\n\n/*\t\t (no cheap R predicate to test)\t*/\n\n";
3192   }
3193
3194         ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
3195         map<string, cnf_elem *> h_eq = ((watch_join_qpn *)fs)-> hash_eq;
3196         vector<string> kflds = ((watch_join_qpn *)fs)->key_flds;
3197         for(w=0;w<kflds.size();++w){
3198                 string kfld = kflds[w];
3199                 col_id_set this_pred_cids;
3200                 gather_pr_col_ids(h_eq[kfld]->pr, this_pred_cids, gb_tbl);
3201                 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3202                         if(unpacked_cids.count( (*csi) ) == 0){
3203                                 int tblref = (*csi).tblvar_ref;
3204                                 int schref = (*csi).schema_ref;
3205                                 string field = (*csi).field;
3206                                 if(tblref==0) // LHS from packet, don't unpack the RHS
3207                                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3208                                 unpacked_cids.insert( (*csi) );
3209                         }
3210                 }
3211         }
3212
3213
3214         ret += "\n//    Do the join\n\n";
3215         ret += "\n//            (ensure that the watchtable is fresh)\n";
3216         ret += "\tif(t->ux_time >= "+tgt+".next_refresh){\n";
3217         ret += "\t\treload_watchlist__"+wl_schema+"();\n";
3218         ret += "\t\t"+tgt+".next_refresh = t->ux_time+"+tgt+".refresh_interval;\n";
3219         ret += "\t}\n\n";
3220
3221
3222         for(p=0;p<fs->key_flds.size();++p){
3223                 string kfld = fs->key_flds[p];
3224                 ret += "\tr_equijoin_"+kfld+" = "+generate_se_code(fs->hash_eq[kfld]->pr->get_left_se(),schema)+";\n";
3225         }
3226
3227
3228 //                      Passed the cheap pred, now test the join with S.
3229         ret += "\tbucket=0;\n";
3230         ret += "\thash=0;\n";
3231         for(p=0;p<fs->key_flds.size();++p){
3232                 string kfld = fs->key_flds[p];
3233                 ret +=
3234 "               hash ^= (("+hash_nums[p%NRANDS]+" * lfta_"+
3235         fs->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_type_str()+
3236         +"_to_hash(r_equijoin_"+kfld+")));\n";
3237         }
3238         ret += "\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n";
3239
3240         ret += "\t\trec = "+tgt+".ht[bucket];\n";
3241         ret += "\t\twhile(rec!=NULL){\n";
3242         ret += "\t\t\tif(hash==rec->hashval){\n";
3243         ret += "\t\t\t\tif(";
3244         for(p=0;p<fs->key_flds.size();++p){
3245                 string kfld = fs->key_flds[p];
3246                 if(p>0) ret += " && ";
3247                 data_type *hdt = fs->hash_eq[kfld]->pr->get_right_se()->get_data_type();
3248                 string lhs_op = "r_equijoin_"+kfld;
3249                 string rhs_op = "rec->"+kfld;
3250                 ret += generate_equality_test(lhs_op,rhs_op,hdt);
3251         }
3252         ret += ")\n";
3253         ret += "\t\t\t\t\tbreak;\n";
3254         ret += "\t\t\t}\n";
3255         ret += "\t\t\trec=rec->next;\n";
3256         ret += "\t\t}\n";
3257         ret += "\t\tif(rec==NULL)\n";
3258         ret += "\t\t\tgoto end;\n";
3259                 
3260         ret += "\n/*\tPassed the hash lookup clause, unpack the other predicate fields. */\n";
3261         for(w=0;w<where.size();++w){
3262                 col_id_set this_pred_cids;
3263                 gather_pr_col_ids(where[w]->pr, this_pred_cids, gb_tbl);
3264                 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3265                         if(unpacked_cids.count( (*csi) ) == 0){
3266                                 int tblref = (*csi).tblvar_ref;
3267                                 int schref = (*csi).schema_ref;
3268                                 string field = (*csi).field;
3269                                 if(tblref==0) // LHS from packet
3270                                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3271                                 else    // RHS from hash bucket
3272                                         ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n";
3273                                 unpacked_cids.insert( (*csi) );
3274                         }
3275                 }
3276         }
3277
3278
3279 //              Test the expensive filters on R.
3280 //                      TODO Should merge this with other predicates and eval in order
3281 //                              of cost - see the fj code.
3282 //                      TODO join and postfilter predicates haven't been costed yet.
3283   if(cheap_rpos < r_filt.size()){
3284
3285 //              Now generate the predicates.
3286         for(w=cheap_rpos;w<r_filt.size();++w){
3287                 sprintf(tmpstr,"//\t\tExpensive R predicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
3288                 ret += tmpstr;
3289
3290 //                      Find partial fcns ref'd in this cnf element
3291                 set<int> pfcn_refs;
3292                 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
3293 //                      Since set<..> is a "Sorted Associative Container",
3294 //                      we can walk through it in sorted order by walking from
3295 //                      begin() to end().  (and the partial fcns must be
3296 //                      evaluated in this order).
3297                 set<int>::iterator si;
3298                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3299                         if(fcn_ref_cnt[(*si)] > 1){
3300                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3301                         }
3302                         if(is_partial_fcn[(*si)]){
3303                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3304                                 ret += "\t\tif(retval) goto end;\n";
3305                         }
3306                         if(fcn_ref_cnt[(*si)] > 1){
3307                                 if(!is_partial_fcn[(*si)]){
3308                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3309                                 }
3310                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3311                                 ret += "\t}\n";
3312                         }
3313                 }
3314
3315                 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
3316                                 ") ) goto end;\n";
3317         }
3318   }else{
3319           ret += "\n\n/*\t\t (no expensive R predicate to test)\t*/\n\n";
3320   }
3321
3322 //              TODO sort the additional predicates by cost
3323
3324 //              S-only
3325         for(w=0;w<fs->pred_t1.size();++w){
3326                 sprintf(tmpstr,"//\t\tS Predicate clause %d.(cost %d)\n",w,fs->pred_t1[w]->cost);
3327                 ret += tmpstr;
3328
3329 //                      Find partial fcns ref'd in this cnf element
3330                 set<int> pfcn_refs;
3331                 collect_partial_fcns_pr(fs->pred_t1[w]->pr, pfcn_refs);
3332 //                      Since set<..> is a "Sorted Associative Container",
3333 //                      we can walk through it in sorted order by walking from
3334 //                      begin() to end().  (and the partial fcns must be
3335 //                      evaluated in this order).
3336                 set<int>::iterator si;
3337                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3338                         if(fcn_ref_cnt[(*si)] > 1){
3339                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3340                         }
3341                         if(is_partial_fcn[(*si)]){
3342                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3343                                 ret += "\t\tif(retval) goto end;\n";
3344                         }
3345                         if(fcn_ref_cnt[(*si)] > 1){
3346                                 if(!is_partial_fcn[(*si)]){
3347                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3348                                 }
3349                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3350                                 ret += "\t}\n";
3351                         }
3352                 }
3353
3354                 ret += "\tif( !("+generate_predicate_code(fs->pred_t1[w]->pr,schema)+
3355                                 ") ) goto end;\n";
3356         }
3357
3358 //              non hash-eq join 
3359         for(w=0;w<fs->join_filter.size();++w){
3360                 sprintf(tmpstr,"//\t\tJoin Predicate clause %d.(cost %d)\n",w,fs->join_filter[w]->cost);
3361                 ret += tmpstr;
3362
3363 //                      Find partial fcns ref'd in this cnf element
3364                 set<int> pfcn_refs;
3365                 collect_partial_fcns_pr(fs->join_filter[w]->pr, pfcn_refs);
3366 //                      Since set<..> is a "Sorted Associative Container",
3367 //                      we can walk through it in sorted order by walking from
3368 //                      begin() to end().  (and the partial fcns must be
3369 //                      evaluated in this order).
3370                 set<int>::iterator si;
3371                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3372                         if(fcn_ref_cnt[(*si)] > 1){
3373                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3374                         }
3375                         if(is_partial_fcn[(*si)]){
3376                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3377                                 ret += "\t\tif(retval) goto end;\n";
3378                         }
3379                         if(fcn_ref_cnt[(*si)] > 1){
3380                                 if(!is_partial_fcn[(*si)]){
3381                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3382                                 }
3383                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3384                                 ret += "\t}\n";
3385                         }
3386                 }
3387
3388                 ret += "\tif( !("+generate_predicate_code(fs->join_filter[w]->pr,schema)+
3389                                 ") ) goto end;\n";
3390         }
3391
3392 //              postfilter
3393         for(w=0;w<fs->postfilter.size();++w){
3394                 sprintf(tmpstr,"//\t\tpostfilter Predicate clause %d.(cost %d)\n",w,fs->postfilter[w]->cost);
3395                 ret += tmpstr;
3396
3397 //                      Find partial fcns ref'd in this cnf element
3398                 set<int> pfcn_refs;
3399                 collect_partial_fcns_pr(fs->postfilter[w]->pr, pfcn_refs);
3400 //                      Since set<..> is a "Sorted Associative Container",
3401 //                      we can walk through it in sorted order by walking from
3402 //                      begin() to end().  (and the partial fcns must be
3403 //                      evaluated in this order).
3404                 set<int>::iterator si;
3405                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3406                         if(fcn_ref_cnt[(*si)] > 1){
3407                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3408                         }
3409                         if(is_partial_fcn[(*si)]){
3410                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3411                                 ret += "\t\tif(retval) goto end;\n";
3412                         }
3413                         if(fcn_ref_cnt[(*si)] > 1){
3414                                 if(!is_partial_fcn[(*si)]){
3415                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3416                                 }
3417                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3418                                 ret += "\t}\n";
3419                         }
3420                 }
3421
3422                 ret += "\tif( !("+generate_predicate_code(fs->postfilter[w]->pr,schema)+
3423                                 ") ) goto end;\n";
3424         }
3425
3426
3427
3428 ///////////////                 post the tuple
3429
3430 //                      test passed : create the tuple, then assign to it.
3431           ret += "/*\t\tCreate and post the tuple\t*/\n";
3432
3433 //              Unpack R fields
3434         for(s=0;s<sl_list.size();++s){
3435                 col_id_set this_se_cids;
3436                 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
3437                 for(auto csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
3438                         if(unpacked_cids.count( (*csi) ) == 0){
3439                                 int tblref = (*csi).tblvar_ref;
3440                                 int schref = (*csi).schema_ref;
3441                                 string field = (*csi).field;
3442                                 if(tblref==0) // LHS from packet
3443                                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3444                                 else    // RHS from hash bucket
3445                                         ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n";
3446                                 unpacked_cids.insert( (*csi) );
3447                         }
3448                 }
3449         }
3450
3451
3452 //                      Unpack partial fcns ref'd by the select clause.
3453 //                      Its a kind of a WHERE clause ...
3454   for(p=sl_fcns_start;p<sl_fcns_end;p++){
3455         if(fcn_ref_cnt[p] > 1){
3456                 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
3457         }
3458         if(is_partial_fcn[p]){
3459                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3460                 ret += "\tif(retval) goto end;\n";
3461         }
3462         if(fcn_ref_cnt[p] > 1){
3463                 if(!is_partial_fcn[p]){
3464                         ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
3465                 }
3466                 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
3467                 ret += "\t}\n";
3468         }
3469   }
3470
3471   // increment the counter of accepted tuples
3472   ret += "\n\t#ifdef LFTA_STATS\n";
3473   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3474   ret += "\t#endif\n\n";
3475
3476 //                      First, compute the size of the tuple.
3477
3478 //                      Unpack any BUFFER type selections into temporaries
3479 //                      so that I can compute their size and not have
3480 //                      to recompute their value during tuple packing.
3481 //                      I can use regular assignment here because
3482 //                      these temporaries are non-persistent.
3483
3484           for(s=0;s<sl_list.size();s++){
3485                 data_type *sdt = sl_list[s]->get_data_type();
3486                 if(sdt->is_buffer_type()){
3487                         sprintf(tmpstr,"\tselvar_%d = ",s);
3488                         ret += tmpstr;
3489                         ret += generate_se_code(sl_list[s],schema);
3490                         ret += ";\n";
3491                 }
3492           }
3493
3494
3495 //              The size of the tuple is the size of the tuple struct plus the
3496 //              size of the buffers to be copied in.
3497
3498           ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
3499           for(s=0;s<sl_list.size();s++){
3500                 data_type *sdt = sl_list[s]->get_data_type();
3501                 if(sdt->is_buffer_type()){
3502                         sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
3503                         ret += tmpstr;
3504                 }
3505           }
3506           ret += ";\n";
3507
3508
3509           ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
3510           ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
3511
3512 //                      Test passed, make assignments to the tuple.
3513
3514           ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
3515
3516 //                      Mark tuple as REGULAR_TUPLE
3517           ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
3518
3519
3520           for(s=0;s<sl_list.size();s++){
3521                 data_type *sdt = sl_list[s]->get_data_type();
3522                 if(sdt->is_buffer_type()){
3523                         sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
3524                         ret += tmpstr;
3525                         sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
3526                         ret += tmpstr;
3527                 }else{
3528                         sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
3529                         ret += tmpstr;
3530 //                      if(sdt->needs_hn_translation())
3531 //                              ret += sdt->hton_translation() +"( ";
3532                         ret += generate_se_code(sl_list[s],schema);
3533 //                      if(sdt->needs_hn_translation())
3534 //                              ret += ") ";
3535                         ret += ";\n";
3536                 }
3537           }
3538
3539 //              Generate output.
3540
3541           ret += "\tpost_tuple(tuple);\n";
3542
3543 //              Increment the counter of posted tuples
3544   ret += "\n\t#ifdef LFTA_STATS\n";
3545   ret += "\n\tt->out_tuple_cnt++;\n\n";
3546   ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
3547   ret += "\t#endif\n\n";
3548
3549
3550         return ret;
3551 }
3552
3553 string generate_aggr_accept_body(qp_node *fs,string node_name,table_list *schema, string &temporal_flush){
3554         string ret;
3555         int a,p,g;
3556
3557 //////////////          Processing for aggregtion query
3558
3559 //              First, search for a match.  Start by unpacking the group-by attributes.
3560
3561 //                      One complication : if a real-time aggregate flush occurs,
3562 //                      the GB attr has already been calculated.  So don't compute
3563 //                      it again if 1) its temporal and 2) it will be computed in the
3564 //                      agggregate flush code.
3565
3566 //              Unpack the partial fcns ref'd by the gb's and the aggr defs.
3567   for(p=gb_fcns_start;p<gb_fcns_end;p++){
3568     if(is_partial_fcn[p]){
3569                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3570                 ret += "\tif(retval) goto end;\n";
3571         }
3572   }
3573   for(p=ag_fcns_start;p<ag_fcns_end;p++){
3574     if(is_partial_fcn[p]){
3575                 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3576                 ret += "\tif(retval) goto end;\n";
3577         }
3578   }
3579
3580   // increment the counter of accepted tuples
3581   ret += "\n\t#ifdef LFTA_STATS\n";
3582   ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3583   ret += "\t#endif\n\n";
3584
3585   ret += "/*\t\tTest if the group is in the hash table \t*/\n";
3586 //                      Compute the values of the group-by variables.
3587   for(g=0;g<gb_tbl->size();g++){
3588           data_type *gdt = gb_tbl->get_data_type(g);
3589           if((! gdt->is_temporal()) || temporal_flush == ""){
3590
3591                   if(gdt->is_buffer_type()){
3592         //                              NOTE : if the SE defining the gb is anything
3593         //                              other than a ref to a variable, this will generate
3594         //                              illegal code.  To be resolved with Spatch.
3595                         sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
3596                                 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
3597                         ret += tmpstr;
3598                         sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
3599                                 gdt->get_buffer_assign_copy().c_str(), g, g);
3600                   }else{
3601                         sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
3602                   }
3603                   ret += tmpstr;
3604           }
3605   }
3606   ret += "\n";
3607
3608 //                      A quick aside : if any of the GB attrs are temporal,
3609 //                      test for change and flush if any change occurred.
3610 //                      We've already computed the flush code,
3611 //                      Put it here if this is not a real time query.
3612 //                      We've already unpacked all column refs, so no need to
3613 //                      do it again here.
3614
3615         string rt_level = fs->get_val_of_def("real_time");
3616         if(rt_level == "" && temporal_flush != ""){
3617                 ret += temporal_flush;
3618         }
3619
3620 //                      Compute the hash bucket
3621         if(gb_tbl->size() > 0){
3622                 ret += "\thashval = ";\
3623                 for(g=0;g<gb_tbl->size();g++){
3624                   if(g>0) ret += " ^ ";
3625                   data_type *gdt = gb_tbl->get_data_type(g);
3626                   if(gdt->is_buffer_type()){
3627                         sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
3628                                 gdt->get_type_str().c_str(), g);
3629                   }else{
3630                         sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
3631                                 gdt->get_type_str().c_str(), g);
3632                   }
3633                   ret += tmpstr;
3634                 }
3635                 ret += ";\n";
3636                 ret += "\thash2 = ((hashval * "+hash_nums[g%NRANDS]+") >> 32) & SLOT_HASH_BITS;\n";
3637         ret+="\tprobe = (hashval >> 32) & (t->max_aggrs-1);\n";
3638         }else{
3639                 ret+="\tprobe = 0;\n";
3640                 ret+="\thash2 = 0;\n\n";
3641         }
3642
3643 //              Does the lfta reference a udaf?
3644           bool has_udaf = false;
3645           for(a=0;a<aggr_tbl->size();a++){
3646                   if(! aggr_tbl->is_builtin(a)) has_udaf = true;
3647           }
3648
3649 //              Scan for a match, or alternatively the best slot.
3650 //              Currently, hardcode 5 tests.
3651         ret +=
3652 "       gen_val = t->generation & SLOT_GEN_BITS;\n"
3653 "       match_found = 0;\n"
3654 "       best_slot = probe;\n"
3655 "       for(i=0;i<5 && match_found == 0;i++){\n"
3656 "               if((t->aggr_table_hashmap[probe] & SLOT_FILLED) && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_HASH_BITS) == hash2 ){\n"
3657 ;
3658         if(gb_tbl->size()>0){
3659                 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
3660                 ret+="\t\tif(";
3661                 string rhs_op, lhs_op;
3662                 for(g=0;g<gb_tbl->size();g++){
3663                   if(g>0) ret += " && ";
3664                   ret += "(";
3665                   sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
3666                   sprintf(tmpstr,"t->aggr_table[probe].gb_var%d",g); rhs_op = tmpstr;
3667                   ret += generate_equality_test(lhs_op,rhs_op,gb_tbl->get_data_type(g));
3668                   ret += ")";
3669                 }
3670          }
3671          ret += "){\n"
3672 "                       match_found = 1;\n"
3673 "                       best_slot = probe;\n"
3674 "               }\n"
3675 "       }\n"
3676 "//             Rate slots in case no match found: prefer empty, then full but old slots\n"
3677 "       if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3678 "               if((t->aggr_table_hashmap[probe] & SLOT_FILLED)==0)\n"
3679 "                       best_slot = probe;\n"
3680 "               }else{\n"
3681 "                       if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) != gen_val){\n"
3682 "                               best_slot = probe;\n"
3683 "                       }\n"
3684 "               }\n"
3685 "               probe++;\n"
3686 "               if(probe >= t->max_aggrs)\n"
3687 "                       probe=0;\n"
3688 "       }\n"
3689 "       if(match_found){\n"
3690 ;
3691         ret += generate_gb_update(node_name, schema, "best_slot",has_udaf);
3692         ret +=
3693 "       }else{\n"
3694 "               if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3695 ;
3696 printf("sgah_qpn name is %s, disorder is %d\n",fs->node_name.c_str(),((sgah_qpn *)fs)->lfta_disorder);
3697         if(((sgah_qpn *)fs)->lfta_disorder <= 1){
3698                 ret +=
3699 "                       if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS)==gen_val){\n"
3700 "                               if((";
3701                 bool first_g = true;
3702                 for(int g=0;g<gb_tbl->size();g++){
3703                         data_type *gdt = gb_tbl->get_data_type(g);
3704                         if(gdt->is_temporal()){
3705                                 if(first_g) first_g = false; else ret+=" + ";
3706                                 ret += "(gb_attr_"+int_to_string(g)+" - t->aggr_table[best_slot].gb_var"+int_to_string(g)+")";
3707                         }
3708                 }
3709                 ret += ") == 0 ){\n";
3710
3711                 ret +=
3712 "                                       fta_aggr_flush_old_"+ node_name+"(f,t->max_aggrs);\n"
3713 "                               }\n"
3714 "                       }\n"
3715 ;
3716         }
3717
3718         ret += generate_tuple_from_aggr(node_name,schema,"best_slot");
3719         ret +=
3720 "\t\t\t#ifdef LFTA_STATS\n"
3721 "\t\t\tif((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val)\n"
3722 "\t\t\t\tt->collision_cnt++;\n\n"
3723 "\t\t\t#endif\n\n"
3724 "\t\t}\n"
3725 ;
3726         ret += generate_init_group(schema,"best_slot");
3727
3728
3729           ret += "\t}\n";
3730
3731         return ret;
3732 }
3733
3734
3735
3736 string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, bool is_wj, set<unsigned int> &s_pids){
3737
3738         string ret="static gs_retval_t accept_packet_"+node_name+
3739                 "(struct FTA *f, FTAID * ftaid, void *pkt, gs_int32_t sz){\n";
3740     ret += "\tstruct packet *p = (struct packet *)pkt;\n";
3741
3742   int a;
3743
3744 //                      Define all of the variables needed by this
3745 //                      procedure.
3746
3747
3748 //                      Gather all column references, need to define unpacking variables.
3749   int w,s;
3750   col_id_set cid_set;
3751   col_id_set::iterator csi;
3752
3753 //              If its a filter join, rebind all colrefs
3754 //              to the first range var, to avoid double unpacking.
3755
3756   if(is_fj){
3757     for(w=0;w<where.size();++w)
3758                 reset_pr_col_ids_tblvars(where[w]->pr, gb_tbl);
3759     for(s=0;s<sl_list.size();s++)
3760                 reset_se_col_ids_tblvars(sl_list[s], gb_tbl);
3761   }
3762
3763   for(w=0;w<where.size();++w){
3764         if(is_wj || is_fj || s_pids.count(w) == 0)
3765                 gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
3766   }
3767   for(s=0;s<sl_list.size();s++){
3768         gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
3769   }
3770
3771   int g;
3772   if(gb_tbl != NULL){
3773         for(g=0;g<gb_tbl->size();g++)
3774           gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
3775   }
3776
3777   //                    Variables for unpacking attributes.
3778   ret += "/*\t\tVariables for unpacking attributes\t*/\n";
3779   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
3780     int schref = (*csi).schema_ref;
3781         int tblref = (*csi).tblvar_ref;
3782     string field = (*csi).field;
3783     data_type dt(schema->get_type_name(schref,field));
3784     sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
3785         field.c_str(), tblref);
3786     ret += tmpstr;
3787   }
3788
3789   ret += "\n\n";
3790
3791 //                      Variables that are always needed
3792   ret += "/*\t\tVariables which are always needed\t*/\n";
3793   ret += "\tgs_retval_t retval;\n";
3794   ret += "\tgs_int32_t tuple_size, tuple_pos, lfta_bailout;\n";
3795   ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
3796
3797   ret+="\tstruct "+generate_fta_name(node_name)+" *t = (struct "+generate_fta_name(node_name)+"*) f;\n\n";
3798
3799
3800 //                      Variables needed for aggregation queries.
3801   if(is_aggr_query){
3802           ret += "\n/*\t\tVariables for aggregation\t*/\n";
3803           ret+="\tunsigned int i, probe;\n";
3804           ret+="\tunsigned int gen_val, match_found, best_slot;\n";
3805           ret+="\tgs_uint64_t hashval, hash2;\n";
3806 //                      Variables for storing group-by attribute values.
3807           if(gb_tbl->size() > 0)
3808                 ret += "/*\t\tGroup-by attributes\t*/\n";
3809           for(g=0;g<gb_tbl->size();g++){
3810                 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3811                 ret += tmpstr;
3812                 data_type *gdt = gb_tbl->get_data_type(g);
3813                 if(gdt->is_buffer_type()){
3814                   sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3815                   ret += tmpstr;
3816                 }
3817           }
3818           ret += "\n";
3819 //                      Temporaries for min/max
3820           string aggr_tmp_str = "";
3821           for(a=0;a<aggr_tbl->size();a++){
3822                 string aggr_op = aggr_tbl->get_op(a);
3823                 if(aggr_op == "MIN" || aggr_op == "MAX"){
3824                         sprintf(tmpstr,"\t%s aggr_tmp_%d;\n",aggr_tbl->get_data_type(a)->get_cvar_type().c_str(),a);
3825                         aggr_tmp_str.append(tmpstr);
3826                 }
3827           }
3828           if(aggr_tmp_str != ""){
3829                 ret += "/*\t\tTemp vars for BUFFER aggregates\t*/\n";
3830                 ret += aggr_tmp_str;
3831                 ret += "\n";
3832           }
3833 //              Variables for udaf output temporaries
3834         bool no_udaf = true;
3835         for(a=0;a<aggr_tbl->size();a++){
3836                 if(! aggr_tbl->is_builtin(a)){
3837                         if(no_udaf){
3838                                 ret+="/*\t\tUDAF output vars.\t*/\n";
3839                                 no_udaf = false;
3840                         }
3841                         int afcn_id = aggr_tbl->get_fcn_id(a);
3842                         data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
3843                         sprintf(tmpstr,"udaf_ret%d", a);
3844                         ret+="\t"+adt->make_cvar(tmpstr)+";\n";
3845                 }
3846         }
3847   }
3848
3849 //                      Variables needed for a filter join query
3850   if(fs->node_type() == "filter_join"){
3851         filter_join_qpn *fjq = (filter_join_qpn *)fs;
3852         bool uses_bloom = fjq->use_bloom;
3853         ret += "/*\t\tJoin fields\t*/\n";
3854         for(g=0;g<fjq->hash_eq.size();g++){
3855                 sprintf(tmpstr,"\t%s s_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_right_se()->get_data_type()->get_cvar_type().c_str(),g);
3856                 ret += tmpstr;
3857                 sprintf(tmpstr,"\t%s r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g);
3858                 ret += tmpstr;
3859           }
3860         if(uses_bloom){
3861                 ret +=
3862 "  /*           Variables for fj bloom filter   */ \n"
3863 "\tunsigned int i=0,j=0,k=0, b, bf_clean = 0, tmp_i, found; \n"
3864 "\tunsigned int bucket, bucket0, bucket1, bucket2;\n"
3865 "\tlong long int curr_fj_ts;\n"
3866 "\tlong long int curr_bin, the_bin;\n"
3867 "\n"
3868 ;
3869         }else{
3870                 ret +=
3871 "  /*           Variables for fj join table     */ \n"
3872 "\tunsigned int i, bucket, found; \n"
3873 "\tunsigned int bucket1, the_bucket;\n"
3874 "       long long int curr_fj_ts;\n"
3875 "\n"
3876 ;
3877         }
3878   }
3879
3880
3881   if(fs->node_type() == "watch_join"){
3882         watch_join_qpn *wlq = (watch_join_qpn *)fs;
3883         ret += "/*\t\tJoin fields\t*/\n";
3884         for(int k=0;k<wlq->key_flds.size(); ++k){
3885                 string kfld = wlq->key_flds[k];
3886                 ret += "\t"+wlq->hash_eq[kfld]->pr->get_right_se()->get_data_type()->get_cvar_type()+" s_equijoin_"+kfld+";\n";
3887                 ret += "\t"+wlq->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_cvar_type()+" r_equijoin_"+kfld+";\n";
3888           }
3889         ret +=
3890 "  /*           Variables for wl join table     */ \n"
3891 "\tunsigned int i, bucket;\n"
3892 "\tunsigned long long int hash; \n";
3893         string wl_schema = wlq->from[1]->get_schema_name();
3894         string wl_elem_str = generate_watchlist_element_name(wl_schema);
3895         ret += "\tstruct "+wl_elem_str+" *rec = NULL;\n";
3896 "\n"
3897 ;
3898   }
3899
3900
3901 //              Variables needed to store selected attributes of BUFFER type
3902 //              temporarily, in order to compute their size for storage
3903 //              in an output tuple.
3904
3905   string select_var_defs = "";
3906   for(int s=0;s<sl_list.size();s++){
3907         data_type *sdt = sl_list[s]->get_data_type();
3908         if(sdt->is_buffer_type()){
3909           sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
3910           select_var_defs.append(tmpstr);
3911         }
3912   }
3913   if(select_var_defs != ""){
3914         ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
3915     ret += select_var_defs;
3916   }
3917
3918 //              Variables to store results of partial functions.
3919   int p;
3920   if(partial_fcns.size()>0){
3921           ret += "/*\t\tVariables for storing results of partial functions. \t*/\n";
3922           for(p=0;p<partial_fcns.size();++p){
3923                 if(is_partial_fcn[p] || (!is_aggr_query && fcn_ref_cnt[p] >1)){
3924                   sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
3925                     partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
3926                   ret += tmpstr;
3927                   if(!is_aggr_query && fcn_ref_cnt[p] >1){
3928                         ret += "\tint fcn_ref_cnt_"+int_to_string(p)+" = 0;\n";
3929                   }
3930                 }
3931           }
3932
3933           if(is_aggr_query) ret += "\tint unpack_failed = 0;\n";
3934           ret += "\n";
3935   }
3936
3937 //              variable to hold packet struct  //
3938         if(packed_return){
3939                 ret += "\t struct "+node_name+"_input_struct *"+node_name+"_input_struct_var;\n";
3940         }
3941
3942
3943   ret += "\t#ifdef LFTA_STATS\n";
3944 // variable to store counter of cpu cycles spend in accept_tuple
3945         ret += "\tgs_uint64_t start_cycle = rdtsc();\n";
3946 // increment counter of received tuples
3947         ret += "\tt->in_tuple_cnt++;\n";
3948   ret += "\t#endif\n";
3949
3950
3951 //      -------------------------------------------------
3952 //              If the packet is "packet", test if its for this lfta,
3953 //              and if so load it into its struct
3954
3955         if(packed_return){
3956                 ret+="\n/*  packed tuple : test and load. \t*/\n";
3957                 ret+="\t"+node_name+"_input_struct_var = (struct "+node_name+"_input_struct *) pkt;\n";
3958                 ret+="\tif("+node_name+"_input_struct_var->__lfta_id_fm_nic__ != "+int_to_string(global_id) + ")\n";
3959                 ret+="\t\tgoto end;\n\n";
3960         }
3961
3962
3963
3964   col_id_set unpacked_cids;     //      Keep track of the cols that have been unpacked.
3965
3966   string temporal_flush;
3967   if(is_aggr_query)
3968         ret += generate_aggr_accept_prelim(fs, node_name, schema, unpacked_cids, temporal_flush);
3969   else {        // non-aggregation operators
3970
3971 // Unpack all the temporal attributes referenced in select clause
3972 // and update the last value of the attribute
3973         col_id_set temp_cids;           //      col ids of temp attributes in select clause
3974
3975         for(s=0;s<sl_list.size();s++){
3976                 data_type *sdt = sl_list[s]->get_data_type();
3977                 if (sdt->is_temporal()) {
3978                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
3979                 }
3980         }
3981 //                      If this is a filter join,
3982 //                      ensure that the temporal range field is unpacked.
3983         if(is_fj){
3984                 col_id window_var_cid(((filter_join_qpn *)fs)->temporal_var);
3985                 if(temp_cids.count(window_var_cid)==0)
3986                         temp_cids.insert(window_var_cid);
3987         }
3988
3989         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
3990                 if(unpacked_cids.count((*csi)) == 0){
3991                         int tblref = (*csi).tblvar_ref;
3992                         int schref = (*csi).schema_ref;
3993                         string field = (*csi).field;
3994                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3995                         sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
3996                         ret += tmpstr;
3997
3998                         unpacked_cids.insert( (*csi) );
3999                 }
4000         }
4001
4002   }
4003
4004   vector<cnf_elem *> filter = fs->get_filter_clause();
4005 //              Test the filter predicate (some query types have additional preds).
4006   if(filter.size() > 0 && !is_wj){      // watchlist join does specialized processing
4007
4008 //              Sort by evaluation cost.
4009 //              First, estimate evaluation costs
4010 //              Eliminate predicates covered by the prefilter (those in s_pids).
4011 //              I need to do it before the sort becuase the indices refer
4012 //              to the position in the unsorted list./
4013         vector<cnf_elem *> tmp_wh;
4014         for(w=0;w<filter.size();++w){
4015                 if(s_pids.count(w) == 0){
4016                         compute_cnf_cost(filter[w],Ext_fcns);
4017                         tmp_wh.push_back(filter[w]);
4018                 }
4019         }
4020         filter = tmp_wh;
4021
4022         sort(filter.begin(), filter.end(), compare_cnf_cost());
4023
4024 //              Now generate the predicates.
4025         for(w=0;w<filter.size();++w){
4026                 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,filter[w]->cost);
4027                 ret += tmpstr;
4028 //                      Find the set of variables accessed in this CNF elem,
4029 //                      but in no previous element.
4030                 col_id_set this_pred_cids;
4031                 gather_pr_col_ids(filter[w]->pr, this_pred_cids, gb_tbl);
4032                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4033                         if(unpacked_cids.count( (*csi) ) == 0){
4034                         int tblref = (*csi).tblvar_ref;
4035                         int schref = (*csi).schema_ref;
4036                         string field = (*csi).field;
4037                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4038                                 unpacked_cids.insert( (*csi) );
4039                         }
4040                 }
4041 //                      Find partial fcns ref'd in this cnf element
4042                 set<int> pfcn_refs;
4043                 collect_partial_fcns_pr(filter[w]->pr, pfcn_refs);
4044 //                      Since set<..> is a "Sorted Associative Container",
4045 //                      we can walk through it in sorted order by walking from
4046 //                      begin() to end().  (and the partial fcns must be
4047 //                      evaluated in this order).
4048                 set<int>::iterator si;
4049                 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
4050                         if(fcn_ref_cnt[(*si)] > 1){
4051                                 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
4052                         }
4053                         if(is_partial_fcn[(*si)]){
4054                                 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
4055                                 ret += "\t\tif(retval) goto end;\n";
4056                         }
4057                         if(fcn_ref_cnt[(*si)] > 1){
4058                                 if(!is_partial_fcn[(*si)]){
4059                                         ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
4060                                 }
4061                                 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
4062                                 ret += "\t}\n";
4063                         }
4064                 }
4065
4066                 ret += "\tif( !("+generate_predicate_code(filter[w]->pr,schema)+
4067                                 ") ) goto end;\n";
4068         }
4069   }else{
4070           ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
4071   }
4072
4073
4074 //                      We've passed the WHERE clause,
4075 //                      unpack the remainder of the accessed fields.
4076   if(is_fj){
4077         ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
4078         vector<cnf_elem *> h_eq = ((filter_join_qpn *)fs)-> hash_eq;
4079         for(w=0;w<h_eq.size();++w){
4080                 col_id_set this_pred_cids;
4081                 gather_pr_col_ids(h_eq[w]->pr, this_pred_cids, gb_tbl);
4082                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4083                         if(unpacked_cids.count( (*csi) ) == 0){
4084                                 int tblref = (*csi).tblvar_ref;
4085                                 int schref = (*csi).schema_ref;
4086                                 string field = (*csi).field;
4087                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4088                                 unpacked_cids.insert( (*csi) );
4089                         }
4090                 }
4091         }
4092   }else if(is_wj){              // STOPPED HERE move this to wj main body
4093 /*
4094         ret += "\n//\tPassed the WHERE clause, unpack the hash fields. \n";
4095         map<string, cnf_elem *> h_eq = ((watch_join_qpn *)fs)-> hash_eq;
4096         vector<string> kflds = ((watch_join_qpn *)fs)->key_flds;
4097         for(w=0;w<kflds.size();++w){
4098                 string kfld = kflds[w];
4099                 col_id_set this_pred_cids;
4100                 gather_pr_col_ids(h_eq[kfld]->pr, this_pred_cids, gb_tbl);
4101                 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4102                         if(unpacked_cids.count( (*csi) ) == 0){
4103                                 int tblref = (*csi).tblvar_ref;
4104                                 int schref = (*csi).schema_ref;
4105                                 string field = (*csi).field;
4106                                 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4107                                 unpacked_cids.insert( (*csi) );
4108                         }
4109                 }
4110         }
4111 */
4112   }else{
4113           ret += "\n/*\tPassed the WHERE clause, unpack the rest of the accessed fields. */\n";
4114
4115           for(csi=cid_set.begin();csi!=cid_set.end();++csi){
4116                 if(unpacked_cids.count( (*csi) ) == 0){
4117                         int schref = (*csi).schema_ref;
4118                         int tblref = (*csi).tblvar_ref;
4119                         string field = (*csi).field;
4120                         ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4121                         unpacked_cids.insert( (*csi) );
4122                 }
4123           }
4124   }
4125
4126
4127 //////////////////
4128 //////////////////      After this, the query types
4129 //////////////////      are processed differently.
4130
4131   if(!is_aggr_query && !is_fj & !is_wj)
4132         ret += generate_sel_accept_body(fs, node_name, schema);
4133   else if(is_aggr_query)
4134         ret += generate_aggr_accept_body(fs, node_name, schema, temporal_flush);
4135   else{
4136         if(is_fj)
4137                 ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
4138         else
4139                 ret += generate_wj_accept_body((watch_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
4140   }
4141         
4142
4143
4144 //              Finish up.
4145
4146    ret += "\n\tend:\n";
4147   ret += "\t#ifdef LFTA_STATS\n";
4148         ret+= "\tt->cycle_cnt += rdtsc() - start_cycle;\n";
4149   ret += "\t#endif\n";
4150    ret += "\n\treturn 1;\n}\n\n";
4151
4152         return(ret);
4153 }
4154
4155
4156 string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, bool is_aggr_query, bool is_fj, bool uses_bloom){
4157         int g, cl;
4158
4159         string ret = "struct FTA * "+generate_alloc_name(node_name) +
4160            "(struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command,  gs_int32_t sz, void *value){\n";
4161
4162         ret+="\tstruct "+generate_fta_name(node_name)+"* f;\n";
4163         ret+="\tint i;\n";
4164         ret += "\n";
4165         ret+="\tif((f=(struct "+generate_fta_name(node_name)+" *)fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n";
4166
4167 //                              assign a streamid to fta instance
4168         ret+="\t/* assign a streamid */\n";
4169         ret+="\tf->f.ftaid = ftaid;\n";
4170         ret+="\tf->f.ftaid.streamid = (gs_p_t)f;\n";
4171         ret+="\tgslog(LOG_INFO,\"Lfta "+node_name+" has FTAID {ip=%u,port=%u,index=%u,streamid=%u}\\n\",f->f.ftaid.ip,f->f.ftaid.port,f->f.ftaid.index,f->f.ftaid.streamid);\n";
4172
4173         if(is_aggr_query){
4174                 ret += "\tf->n_aggrs = 0;\n";
4175                 ret += "\tf->n_ticks = 0; // for limiting slow flush\n";
4176
4177                 ret += "\tf->max_aggrs = ";
4178
4179 //                              Computing the number of aggregate blocks is a little
4180 //                              tricky.  If there are no GB attrs, or if all GB attrs
4181 //                              are temporal, then use a single aggregate block, else
4182 //                              use a default value (10).  A user specification overrides
4183 //                              this logic.
4184                 bool single_group = true;
4185                 for(g=0;g<gb_tbl->size();g++){
4186                         data_type *gdt = gb_tbl->get_data_type(g);
4187                         if(! gdt->is_temporal() ){
4188                                 single_group = false;
4189                         }
4190                 }
4191                 string max_aggr_str = fs->get_val_of_def("aggregate_slots");
4192                 int max_aggr_i = atoi(max_aggr_str.c_str());
4193                 if(max_aggr_i <= 0){
4194                         if(single_group)
4195                                 ret += "2";
4196                         else
4197                                 ret += int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
4198                 }else{
4199                         unsigned int naggrs = 1;                // make it power of 2
4200                         unsigned int nones = 0;
4201                         while(max_aggr_i){
4202                                 if(max_aggr_i&1)
4203                                         nones++;
4204                                 naggrs = naggrs << 1;
4205                                 max_aggr_i = max_aggr_i >> 1;
4206                         }
4207                         if(nones==1)            // in case it was already a power of 2.
4208                                 naggrs/=2;
4209                         ret += int_to_string(naggrs);
4210                 }
4211                 ret += ";\n";
4212
4213                 ret+="\tif ((f->aggr_table = (struct "+generate_aggr_struct_name(node_name)+" *)sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n";
4214                 ret+="\t\treturn(0);\n";
4215                 ret+="\t}\n\n";
4216 //              ret+="/* compute how many integers we need to store the hashmap */\n";
4217 //              ret+="\tf->bitmap_size = (f->max_aggrs % (sizeof(gs_uint32_t) * 4)) ? (f->max_aggrs / (sizeof(gs_uint32_t) * 4) + 1) : (f->max_aggrs / (sizeof(gs_uint32_t) * 4));\n\n";
4218                 ret+="\tif ((f->aggr_table_hashmap = (gs_uint32_t *)sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n";
4219                 ret+="\t\treturn(0);\n";
4220                 ret+="\t}\n";
4221                 ret+="/*\t\tfill bitmap with zero \t*/\n";
4222                 ret+="\tfor (i = 0; i < f->max_aggrs; ++i)\n";
4223                 ret+="\t\tf->aggr_table_hashmap[i] = 0;\n";
4224                 ret+="\tf->generation=0;\n";
4225                 ret+="\tf->flush_pos = f->max_aggrs;\n";
4226
4227                 ret += "\tf->flush_ctr = 0;\n";
4228
4229         }
4230
4231         if(is_fj){
4232                 if(uses_bloom){
4233                         ret+="\tf->first_exec = 1;\n";
4234                         unsigned int n_bloom = 11;
4235                         string n_bloom_str = fs->get_val_of_def("num_bloom");
4236                         int tmp_n_bloom = atoi(n_bloom_str.c_str());
4237                         if(tmp_n_bloom>0)
4238                                 n_bloom = tmp_n_bloom+1;
4239
4240                         unsigned int window_len = ((filter_join_qpn *)fs)->temporal_range;
4241                         if(window_len < n_bloom){
4242                                 n_bloom = window_len+1;
4243                         }
4244
4245                         int bf_exp_size = 12;  // base-2 log of number of bits
4246                         string bloom_len_str = fs->get_val_of_def("bloom_size");
4247                         int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
4248                         if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
4249                                 bf_exp_size = tmp_bf_exp_size;
4250                         }
4251                         int bf_bit_size = 1 << 12;
4252                         int bf_byte_size = bf_bit_size / (8*sizeof(char));
4253
4254                         int bf_tot = n_bloom*bf_byte_size;
4255                         ret+="\tif ((f->bf_table = (unsigned char *)sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n";
4256                         ret+="\t\treturn(0);\n";
4257                         ret+="\t}\n";
4258                         ret +=
4259 "       for(i=0;i<"+int_to_string(bf_tot)+";i++)\n"
4260 "               f->bf_table[i] = 0;\n"
4261 ;
4262                 }else{
4263                         unsigned int ht_size = 4096;
4264                         string ht_size_s = fs->get_val_of_def("aggregate_slots");
4265                         int tmp_ht_size = atoi(ht_size_s.c_str());
4266                         if(tmp_ht_size > 1024){
4267                                 unsigned int hs = 1;            // make it power of 2
4268                                 while(tmp_ht_size){
4269                                         hs =hs << 1;
4270                                         tmp_ht_size = tmp_ht_size >> 1;
4271                                 }
4272                                 ht_size = hs;
4273                         }
4274                         ret+="\tif ((f->join_table = (struct "+generate_fj_struct_name(node_name)+" *) sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n";
4275                         ret+="\t\treturn(0);\n";
4276                         ret+="\t}\n\n";
4277                         ret +=
4278 "       for(i=0;i<"+int_to_string(ht_size)+";i++)\n"
4279 "               f->join_table[i].ts = 0;\n"
4280 ;
4281                 }
4282         }
4283
4284 //                      Initialize the complex literals (which might be handles).
4285
4286         for(cl=0;cl<complex_literals->size();cl++){
4287                 literal_t *l = complex_literals->get_literal(cl);
4288 //              sprintf(tmpstr,"\tf->complex_literal_%d = ",cl);
4289 //              ret += tmpstr + l->to_C_code() + ";\n";
4290                 sprintf(tmpstr,"&(f->complex_literal_%d)",cl);
4291                 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
4292         }
4293
4294         ret += "\n";
4295
4296 //                      Initialize the last seen values of temporal attributes to min(max) value of
4297 //                      their respective type
4298 //                      Create places to hold the last values of temporal attributes referenced in select clause
4299
4300
4301         col_id_set temp_cids;           //      col ids of temp attributes in select clause
4302
4303         int s;
4304         col_id_set::iterator csi;
4305
4306         for(s=0;s<sl_list.size();s++){
4307                 data_type *sdt = sl_list[s]->get_data_type();
4308                 if (sdt->is_temporal()) {
4309                         gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
4310                 }
4311         }
4312
4313         for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
4314                 int tblref = (*csi).tblvar_ref;
4315                 int schref = (*csi).schema_ref;
4316                 string field = (*csi).field;
4317                 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
4318                 if (dt.is_increasing()) {
4319                         sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_min_literal().c_str());
4320                         ret += tmpstr;
4321                 } else if (dt.is_decreasing()) {
4322                         sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_max_literal().c_str());
4323                         ret += tmpstr;
4324                 }
4325         }
4326
4327 //      initialize last seen values of temporal groubpy variables
4328         if(is_aggr_query){
4329                 for(g=0;g<gb_tbl->size();g++){
4330                         data_type *dt = gb_tbl->get_data_type(g);
4331                         if(dt->is_temporal()){
4332 /*
4333                                 fprintf(stderr,"group by attribute %s is temporal, ",
4334                                                 gb_tbl->get_name(g).c_str());
4335 */
4336                                 if(dt->is_increasing()){
4337                                         sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_min_literal().c_str());
4338                                 }else{
4339                                         sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_max_literal().c_str());
4340                                 }
4341                                 ret += tmpstr;
4342                         }
4343                 }
4344         }
4345
4346         ret += "\tf->f.alloc_fta="+generate_alloc_name(node_name)+";\n";
4347         ret+="\tf->f.free_fta=free_fta_"+node_name+";\n";
4348         ret+="\tf->f.control_fta=control_fta_"+node_name+";\n";
4349         ret+="\tf->f.accept_packet=accept_packet_"+node_name+";\n";
4350         ret+="\tf->f.clock_fta=clock_fta_"+node_name+";\n\n";
4351
4352 //                      Initialize runtime stats
4353         ret+="\tf->in_tuple_cnt = 0;\n";
4354         ret+="\tf->out_tuple_cnt = 0;\n";
4355         ret+="\tf->out_tuple_sz = 0;\n";
4356         ret+="\tf->accepted_tuple_cnt = 0;\n";
4357         ret+="\tf->cycle_cnt = 0;\n";
4358         ret+="\tf->collision_cnt = 0;\n";
4359         ret+="\tf->eviction_cnt = 0;\n";
4360         ret+="\tf->sampling_rate = 1.0;\n";
4361
4362         ret+="\tf->trace_id = 0;\n\n";
4363     if(param_tbl->size() > 0){
4364         ret+=
4365 "\tif(load_params_"+node_name+"(f, sz, value, 1)){\n"
4366 "#ifndef LFTA_IN_NIC\n"
4367 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small (%d). This query does not have valid parameters, bailing out.\\n\",sz);\n"
4368 "#else\n"
4369 "\t\t}\n"
4370 "#endif\n"
4371 "\t\t\treturn 0;\n"
4372 "\t\t}\n";
4373         }
4374
4375 //                      Register the pass-by-handle parameters
4376     int ph;
4377     for(ph=0;ph<param_handle_table.size();++ph){
4378                 data_type pdt(param_handle_table[ph]->type_name);
4379                 sprintf(tmpstr,"\tf->handle_param_%d = %s((struct FTA *)f,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
4380                 switch(param_handle_table[ph]->val_type){
4381                 case cplx_lit_e:
4382                         ret += tmpstr;
4383                         if(pdt.is_buffer_type()) ret += "&(";
4384                         sprintf(tmpstr,"f->complex_literal_%d",param_handle_table[ph]->complex_literal_idx);
4385                         ret += tmpstr ;
4386                         if(pdt.is_buffer_type()) ret += ")";
4387                         ret +=  ");\n";
4388                         break;
4389                 case litval_e:
4390 //                              not complex, no constructor
4391                         ret += tmpstr;
4392                         ret += param_handle_table[ph]->litval->to_C_code("") + ");\n";
4393                         break;
4394                 case param_e:
4395 //                              query parameter handles are regstered/deregistered in the
4396 //                              load_params function.
4397 //                      ret += "t->param_"+param_handle_table[ph]->param_name;
4398                         break;
4399                 default:
4400                         fprintf(stderr, "INTERNAL ERROR unknown case found when processing pass-by-handle parameter table.\n");
4401                         exit(1);
4402                 }
4403         }
4404
4405         ret += "\treturn (struct FTA *) f;\n";
4406         ret += "}\n\n";
4407
4408         return(ret);
4409 }
4410
4411
4412
4413
4414 //////////////////////////////////////////////////////////////////
4415
4416 string generate_lfta_block(qp_node *fs, table_list *schema, int gid,
4417 //              map<string,string> &int_fcn_defs,
4418                 ext_fcn_list *Ext_fcns, string &schema_embed_str, ifq_t *ifdb, nic_property *nicp, set<unsigned int> &s_pids){
4419         bool is_aggr_query;
4420         int s,p,g;
4421         string retval;
4422
4423 /////////////////////////////////////////////////////////////
4424 ///             Do operator-generic processing, such as
4425 ///             gathering the set of referenced columns,
4426 ///             generating structures, etc.
4427
4428 //              Initialize globals to empty.
4429         gb_tbl = NULL; aggr_tbl = NULL;
4430         global_id = -1; nicprop = NULL;
4431         param_tbl = fs->get_param_tbl();
4432         sl_list.clear(); where.clear();
4433         partial_fcns.clear();
4434         fcn_ref_cnt.clear(); is_partial_fcn.clear();
4435         pred_class.clear(); pred_pos.clear();
4436         sl_fcns_start=sl_fcns_end=wh_fcns_start=wh_fcns_end=0;
4437         gb_fcns_start=gb_fcns_end=ag_fcns_start=ag_fcns_end=0;
4438
4439
4440 //              Does the lfta read packed results from the NIC?
4441         nicprop = nicp;                 // load into global
4442         global_id = gid;
4443     packed_return = false;
4444         if(nicp && nicp->option_exists("Return")){
4445                 if(nicp->option_value("Return") == "Packed"){
4446                         packed_return = true;
4447                 }else{
4448                         fprintf(stderr,"Warning, nic option value of Return=%s is not recognized, ignoring\n",nicp->option_value("Return").c_str());
4449                 }
4450         }
4451
4452
4453 //                      Extract data which defines the query.
4454 //                              complex literals gathered now.
4455         complex_literals = fs->get_cplx_lit_tbl(Ext_fcns);
4456         param_handle_table = fs->get_handle_param_tbl(Ext_fcns);
4457         string node_name = fs->get_node_name();
4458     bool is_fj = false, uses_bloom = false;
4459         bool is_wj = false;
4460         bool is_watch_tbl = false;
4461
4462
4463         if(fs->node_type() == "spx_qpn"){
4464                 is_aggr_query = false;
4465                 spx_qpn *spx_node = (spx_qpn *)fs;
4466                 sl_list = spx_node->get_select_se_list();
4467                 where = spx_node->get_where_clause();
4468                 gb_tbl = NULL;
4469                 aggr_tbl = NULL;
4470         } else
4471         if(fs->node_type() == "sgah_qpn"){
4472                 is_aggr_query = true;
4473                 sgah_qpn *sgah_node = (sgah_qpn *)fs;
4474                 sl_list = sgah_node->get_select_se_list();
4475                 where = sgah_node->get_where_clause();
4476                 gb_tbl = sgah_node->get_gb_tbl();
4477                 aggr_tbl = sgah_node->get_aggr_tbl();
4478
4479                 if((sgah_node->get_having_clause()).size() > 0){
4480                         fprintf(stderr,"Warning in LFTA %s: HAVING clause will be ignored.\n", fs->get_node_name().c_str());
4481                 }
4482         } else
4483         if(fs->node_type() == "filter_join"){
4484                 is_aggr_query = false;
4485         is_fj = true;
4486                 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
4487                 sl_list = fj_node->get_select_se_list();
4488                 where = fj_node->get_where_clause();
4489                 uses_bloom = fj_node->use_bloom;
4490                 gb_tbl = NULL;
4491                 aggr_tbl = NULL;
4492         }else
4493         if(fs->node_type() == "watch_join"){
4494                 is_aggr_query = false;
4495         is_wj = true;
4496                 watch_join_qpn *wl_node = (watch_join_qpn *)fs;
4497                 sl_list = wl_node->get_select_se_list();
4498                 where = wl_node->get_where_clause();
4499                 gb_tbl = NULL;
4500                 aggr_tbl = NULL;
4501         }else
4502         if(fs->node_type() == "watch_tbl_qpn"){
4503                 is_aggr_query = false;
4504         is_watch_tbl = true;
4505                 vector<scalarexp_t *> empty_sl_list;
4506                 vector<cnf_elem *> empty_where;
4507                 sl_list = empty_sl_list;
4508                 where = empty_where;
4509                 gb_tbl = NULL;
4510                 aggr_tbl = NULL;
4511         } else {
4512                 fprintf(stderr,"INTERNAL ERROR, unrecognized node type %s in generate_lfta_block\n", fs->node_type().c_str());
4513                 exit(1);
4514         }
4515
4516 //                      Build list of "partial functions", by clause.
4517 //                      NOTE : partial fcns are not handled well.
4518 //                      The act of searching for them associates the fcn call
4519 //                      in the SE with an index to an array.  Refs to the
4520 //                      fcn value are replaced with refs to the variable they are
4521 //                      unpacked into.  A more general tagging mechanism would be better.
4522
4523         int i;
4524         vector<bool> *pfunc_ptr = NULL;
4525         vector<int> *ref_cnt_ptr = NULL;
4526         if(!is_aggr_query){             // don't collect cacheable fcns on aggr query.
4527                 ref_cnt_ptr = &fcn_ref_cnt;
4528                 pfunc_ptr = &is_partial_fcn;
4529         }
4530
4531         sl_fcns_start = 0;
4532         for(i=0;i<sl_list.size();i++){
4533                 find_partial_fcns(sl_list[i], &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
4534         }
4535         wh_fcns_start = sl_fcns_end = partial_fcns.size();
4536         for(i=0;i<where.size();i++){
4537                 find_partial_fcns_pr(where[i]->pr, &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
4538         }
4539         gb_fcns_start = wh_fcns_end = partial_fcns.size();
4540         if(gb_tbl != NULL){
4541                 for(i=0;i<gb_tbl->size();i++){
4542                         find_partial_fcns(gb_tbl->get_def(i), &partial_fcns, NULL, NULL, Ext_fcns);
4543                 }
4544         }
4545         ag_fcns_start = gb_fcns_end = partial_fcns.size();
4546         if(aggr_tbl != NULL){
4547                 for(i=0;i<aggr_tbl->size();i++){
4548                         find_partial_fcns(aggr_tbl->get_aggr_se(i), &partial_fcns, NULL, &is_partial_fcn, Ext_fcns);
4549                 }
4550         }
4551         ag_fcns_end = partial_fcns.size();
4552
4553 //              Fill up the is_partial_fcn and fcn_ref_cnt arrays.
4554         if(is_aggr_query){
4555                 for(i=0; i<partial_fcns.size();i++){
4556                         fcn_ref_cnt.push_back(1);
4557                         is_partial_fcn.push_back(true);
4558                 }
4559         }
4560
4561 //              Unmark non-partial expensive functions referenced only once.
4562         for(i=0; i<partial_fcns.size();i++){
4563                 if(!is_partial_fcn[i] && fcn_ref_cnt[i] <= 1){
4564                         partial_fcns[i]->set_partial_ref(-1);
4565                 }
4566         }
4567
4568         node_name = normalize_name(node_name);
4569
4570         retval += generate_preamble(schema, /*int_fcn_defs,*/ node_name, schema_embed_str);
4571
4572         if(packed_return){              // generate unpack struct
4573                 vector<tablevar_t *> input_tbls = fs->get_input_tbls();
4574                 int schref = input_tbls[0]->get_schema_ref();
4575                 vector<string> refd_cols;
4576                 for(s=0;s<sl_list.size();++s){
4577                         gather_nicsafe_cols(sl_list[s],refd_cols, nicp, gb_tbl);
4578                 }
4579                 for(p=0;p<where.size();++p){
4580 //                              I'm not disabling these preds ...
4581                         gather_nicsafe_cols(where[p]->pr,refd_cols, nicp, gb_tbl);
4582                 }
4583                 if(gb_tbl){
4584                         for(g=0;g<gb_tbl->size();++g){
4585                           gather_nicsafe_cols(gb_tbl->get_def(g),refd_cols, nicp, gb_tbl);
4586                         }
4587                 }
4588                 sort(refd_cols.begin(), refd_cols.end());
4589                 retval += "struct "+node_name+"_input_struct{\n";
4590                 retval += "\tint __lfta_id_fm_nic__;\n";
4591                 int vsi;
4592                 for(vsi=0;vsi<refd_cols.size();++vsi){
4593                 data_type dt(schema->get_type_name(schref,refd_cols[vsi]));
4594                         retval+="\t"+dt.get_cvar_type()+" unpack_var_"+refd_cols[vsi]+";\n";
4595                 }
4596                 retval+="};\n\n";
4597         }
4598
4599
4600 /////////////////////////////////////////////////////
4601 //                      Common stuff unpacked, do some generation
4602
4603
4604         if(is_aggr_query)
4605           retval += generate_aggr_struct(node_name, gb_tbl, aggr_tbl);
4606         if(is_fj)
4607                 retval += generate_fj_struct((filter_join_qpn *)fs, node_name);
4608         if(is_watch_tbl){
4609                 retval += "\n\n// watchtable code here \n\n";
4610                 watch_tbl_qpn *wl_node = (watch_tbl_qpn *)fs;
4611                 retval += generate_watchlist_structs(node_name, wl_node->table_layout, wl_node->filename, wl_node->refresh_interval);
4612                 retval += generate_watchlist_load(node_name, wl_node->table_layout, wl_node->key_flds);
4613         }
4614         
4615         if(! is_watch_tbl){
4616                 retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, is_wj, uses_bloom, schema);
4617                 retval += generate_tuple_struct(node_name, sl_list) ;
4618
4619                 if(is_aggr_query)
4620                         retval += generate_fta_flush(node_name, schema, Ext_fcns) ;
4621                 if(param_tbl->size() > 0)
4622                         retval += generate_fta_load_params(node_name) ;
4623                 retval += generate_fta_free(node_name, is_aggr_query) ;
4624                 retval +=  generate_fta_control(node_name, schema, is_aggr_query) ;
4625                 retval +=  generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, is_wj, s_pids) ;
4626
4627         /* extract the value of Time_Correlation from interface definition */
4628                 int e,v;
4629                 string es;
4630                 unsigned time_corr;
4631                 vector<tablevar_t *> tvec =  fs->get_input_tbls();
4632                 vector<string> time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es);
4633                 if (time_corr_vec.empty())
4634                         time_corr = DEFAULT_TIME_CORR;
4635                 else
4636                         time_corr = atoi(time_corr_vec[0].c_str());
4637
4638                 retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query, is_wj) );
4639                 retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) );
4640         }
4641
4642   return(retval);
4643 }
4644
4645
4646
4647 int compute_snap_len(qp_node *fs, table_list *schema, string snap_type){
4648
4649 //              Initialize global vars
4650         gb_tbl = NULL;
4651         sl_list.clear(); where.clear();
4652
4653         
4654         if(fs->node_type() == "watch_tbl_qpn"){
4655                 return -1;
4656         }
4657
4658         if(fs->node_type() == "spx_qpn"){
4659                 spx_qpn *spx_node = (spx_qpn *)fs;
4660                 sl_list = spx_node->get_select_se_list();
4661                 where = spx_node->get_where_clause();
4662         }
4663         else if(fs->node_type() == "sgah_qpn"){
4664                 sgah_qpn *sgah_node = (sgah_qpn *)fs;
4665                 sl_list = sgah_node->get_select_se_list();
4666                 where = sgah_node->get_where_clause();
4667                 gb_tbl = sgah_node->get_gb_tbl();
4668         }
4669         else if(fs->node_type() == "filter_join"){
4670                 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
4671                 sl_list = fj_node->get_select_se_list();
4672                 where = fj_node->get_where_clause();
4673         }
4674         else if(fs->node_type() == "watch_join"){
4675                 watch_join_qpn *fj_node = (watch_join_qpn *)fs;
4676                 sl_list = fj_node->get_select_se_list();
4677                 where = fj_node->get_where_clause();
4678         } else{
4679                 fprintf(stderr,"INTERNAL ERROR, node type %s not recognized in compute_snap_len\n",fs->node_type().c_str());
4680                 exit(1);
4681         }
4682
4683 //                      Gather all column references, need to define unpacking variables.
4684   int w,s;
4685   col_id_set cid_set;
4686   col_id_set::iterator csi;
4687
4688   for(w=0;w<where.size();++w)
4689         gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
4690   for(s=0;s<sl_list.size();s++){
4691         gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
4692   }
4693
4694   int g;
4695   if(gb_tbl != NULL){
4696         for(g=0;g<gb_tbl->size();g++)
4697           gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
4698   }
4699
4700   //                    compute snap length
4701   int snap_len = -1;
4702   int n_snap=0;
4703   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4704     int schref = (*csi).schema_ref;
4705         int tblref = (*csi).tblvar_ref;
4706     string field = (*csi).field;
4707
4708         if(snap_type == "index"){
4709                 int pos = schema->get_field_idx(schref, field);
4710                 if(pos>snap_len) snap_len = pos;
4711                 n_snap++;
4712         }else{
4713                 param_list *field_params = schema->get_modifier_list(schref, field);
4714                 if(field_params->contains_key("snap_len")){
4715                         string fld_snap_str = field_params->val_of("snap_len");
4716                         int fld_snap;
4717                         if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
4718                                 if(fld_snap > snap_len) snap_len = fld_snap;
4719                                 n_snap++;
4720                         }else{
4721                                 fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
4722                         }
4723                 }
4724         }
4725   }
4726
4727   if(n_snap == cid_set.size()){
4728         return (snap_len);
4729   }else{
4730         return -1;
4731   }
4732
4733
4734 }
4735
4736 //              Function which computes an optimal
4737 //              set of unpacking functions.
4738
4739 void find_optimal_unpack_fcns(col_id_set &upref_cids, table_list *Schema, map<col_id, string,lt_col_id> &ucol_fcn_map){
4740         map<string, int> pfcn_count;
4741         map<string, int>::iterator msii;
4742         col_id_set::iterator cisi;
4743         set<string>::iterator ssi;
4744         string best_fcn;
4745
4746         while(ucol_fcn_map.size() < upref_cids.size()){
4747
4748 //                      Gather unpack functions referenced by unaccounted-for
4749 //                      columns, and increment their reference count.
4750                 pfcn_count.clear();
4751                 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
4752                         if(ucol_fcn_map.count((*cisi)) == 0){
4753                                 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
4754                                 for(ssi=ufcns.begin();ssi!=ufcns.end();++ssi)
4755                                         pfcn_count[(*ssi)]++;
4756                         }
4757                 }
4758
4759 //              Get the lowest cost per field function.
4760                 float min_cost = 0.0;
4761                 string best_fcn = "";
4762                 for(msii=pfcn_count.begin();msii!=pfcn_count.end();++msii){
4763                         int fcost = Schema->get_ufcn_cost((*msii).first);
4764                         if(fcost < 0){
4765                                 fprintf(stderr,"CONFIGURATION ERROR, unpack function %s either has negative cost or is not defined.\n",(*msii).first.c_str());
4766                                 exit(1);
4767                         }
4768                         float this_cost = (1.0*fcost)/(*msii).second;
4769                         if(msii == pfcn_count.begin() || this_cost < min_cost){
4770                                 min_cost = this_cost;
4771                                 best_fcn = (*msii).first;
4772                         }
4773                 }
4774                 if(best_fcn == ""){
4775                         fprintf(stderr,"ERROR, could not find a best field unpqacking function.\n");
4776                         exit(1);
4777                 }
4778
4779 //              Assign this function to the unassigned fcns which use it.
4780                 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
4781                         if(ucol_fcn_map.count((*cisi)) == 0){
4782                                 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
4783                                 if(ufcns.count(best_fcn)>0)
4784                                         ucol_fcn_map[(*cisi)] = best_fcn;
4785                         }
4786                 }
4787         }
4788 }
4789
4790
4791
4792 //              Generate an initial test test for the lfta
4793 //              Assume that the predicate references no external functions,
4794 //              and especially no partial functions,
4795 //              aggregates, internal functions.
4796 string generate_lfta_prefilter(vector<cnf_set *> &pred_list,
4797                 col_id_set &temp_cids, table_list *Schema, ext_fcn_list *Ext_fcns,
4798                 vector<col_id_set> &lfta_cols, vector<long long int> &lfta_sigs,
4799                 vector<int> &lfta_snap_lens, string iface){
4800   col_id_set tmp_cid_set, cid_set,upref_cids,upall_cids;
4801   col_id_set::iterator csi;
4802         int o,p,q;
4803         string ret;
4804
4805 //              Gather complex literals in the prefilter.
4806         cplx_lit_table *complex_literals = new cplx_lit_table();
4807         for(p=0;p<pred_list.size();++p){
4808                 find_complex_literal_pr(pred_list[p]->pr,Ext_fcns, complex_literals);
4809         }
4810
4811
4812 //              Find the combinable predicates
4813         vector<predicate_t *> pr_list;
4814         for(p=0;p<pred_list.size();++p){
4815         find_combinable_preds(pred_list[p]->pr,&pr_list, Schema, Ext_fcns);
4816         }
4817
4818 //              Analyze the combinable predicates to find the predicate classes.
4819         pred_class.clear();             // idx to equiv pred in equiv_list
4820         pred_pos.clear();               // idx to returned bitmask.
4821         vector<predicate_t *> equiv_list;
4822         vector<int> num_equiv;
4823
4824
4825         for(p=0;p<pr_list.size();++p){
4826                 for(q=0;q<equiv_list.size();++q){
4827                         if(is_equivalent_class_pred_base(equiv_list[q],pr_list[p],Schema,Ext_fcns))
4828                                 break;
4829                 }
4830                 if(q == equiv_list.size()){             // no equiv : create new
4831                         pred_class.push_back(equiv_list.size());
4832                         equiv_list.push_back(pr_list[p]);
4833                         pred_pos.push_back(0);
4834                         num_equiv.push_back(1);
4835
4836                 }else{                  // pr_list[p] is equivalent to pred q
4837                         pred_class.push_back(q);
4838                         pred_pos.push_back(num_equiv[q]);
4839                         num_equiv[q]++;
4840                 }
4841         }
4842
4843 //              Generate the variables which hold the common pred handles
4844         ret += "/*\t\tprefilter global vars.\t*/\n";
4845         for(q=0;q<equiv_list.size();++q){
4846                 for(p=0;p<=(num_equiv[q]/32);++p){
4847                         ret += "void *pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface+";\n";
4848                 }
4849         }
4850
4851 //              Struct to hold prefilter complex literals
4852         ret += "struct prefilter_complex_lit_struct_"+iface+" {\n";
4853         if(complex_literals->size() == 0)
4854                 ret += "\tint no_variable;\n";
4855         int cl;
4856         for(cl=0;cl<complex_literals->size();cl++){
4857                 literal_t *l = complex_literals->get_literal(cl);
4858                 data_type *dtl = new data_type( l->get_type() );
4859                 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
4860                 ret += tmpstr;
4861         }
4862         ret += "} prefilter_complex_lits_"+iface+";\n\n";
4863
4864
4865 //              Generate the prefilter initialziation code
4866         ret += "void init_lfta_prefilter_"+iface+"(){\n";
4867
4868 //              First initialize complex literals, if any.
4869         ret += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4870         for(cl=0;cl<complex_literals->size();cl++){
4871                 literal_t *l = complex_literals->get_literal(cl);
4872                 sprintf(tmpstr,"&(t->complex_literal_%d)",cl);
4873                 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
4874         }
4875
4876
4877         set<int> epred_seen;
4878         for(p=0;p<pr_list.size();++p){
4879                 int q = pred_class[p];
4880 //printf("\tq=%d\n",q);
4881                 if(epred_seen.count(q)>0){
4882                         ret += "\tregister_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4883                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4884                         vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4885                         for(o=0;o<op_list.size();++o){
4886                                 if(! cl_op[o]){
4887                                         ret += generate_se_code(op_list[o],Schema)+", ";
4888                                 }
4889                         }
4890                         ret += "pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+","+int_to_string(pred_pos[p]%32)+");\n";
4891                         epred_seen.insert(q);
4892                 }else{
4893                         ret += "\tpref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+" = (void *)register_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4894                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4895                         vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4896                         for(o=0;o<op_list.size();++o){
4897                                 if(! cl_op[o]){
4898                                         ret += generate_se_code(op_list[o],Schema)+", ";
4899                                 }
4900                         }
4901                         ret += "NULL,"+int_to_string(pred_pos[p]%32)+");\n";
4902                         epred_seen.insert(q);
4903                 }
4904         }
4905         ret += "}\n\n";
4906
4907
4908
4909 //              Start on main body code generation
4910   ret+="gs_uint64_t lfta_prefilter_"+iface+"(void *pkt){\n";
4911
4912
4913 ///--------------------------------------------------------------
4914 ///             Generate and store the prefilter body,
4915 ///             reuse it for the snap length calculator
4916 ///-------------------------------------------------------------
4917         string body;
4918
4919     body += "\tstruct packet *p = (struct packet *)pkt;\n";
4920
4921
4922
4923 //              Gather the colids to store unpacked variables.
4924         for(p=0;p<pred_list.size();++p){
4925         gather_pr_col_ids(pred_list[p]->pr,tmp_cid_set, gb_tbl);
4926         }
4927
4928 //              make the col_ids refer to the base tables, and
4929 //              grab the col_ids with at least one unpacking function.
4930         for(csi=tmp_cid_set.begin();csi!=tmp_cid_set.end();++csi){
4931                 string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
4932                 col_id tmp_col_id;
4933                 tmp_col_id.field = (*csi).field;
4934                 tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
4935                 tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
4936                 cid_set.insert(tmp_col_id);
4937                 field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
4938                 if(fe->get_unpack_fcns().size()>0)
4939                         upref_cids.insert(tmp_col_id);
4940
4941
4942         }
4943
4944 //              Find the set of unpacking programs needed for the
4945 //              prefilter fields.
4946         map<col_id, string,lt_col_id>  ucol_fcn_map;
4947         find_optimal_unpack_fcns(upref_cids, Schema, ucol_fcn_map);
4948         set<string> pref_ufcns;
4949         map<col_id, string,lt_col_id>::iterator mcis;
4950         for(mcis=ucol_fcn_map.begin(); mcis!=ucol_fcn_map.end(); mcis++){
4951                 pref_ufcns.insert((*mcis).second);
4952         }
4953
4954
4955
4956 //                      Variables for unpacking attributes.
4957     body += "/*\t\tVariables for unpacking attributes\t*/\n";
4958     for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4959       int schref = (*csi).schema_ref;
4960           int tblref = (*csi).tblvar_ref;
4961       string field = (*csi).field;
4962       data_type dt(Schema->get_type_name(schref,field));
4963       sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4964         field.c_str(), tblref);
4965       body += tmpstr;
4966       sprintf(tmpstr,"\tgs_retval_t ret_%s_%d;\n", field.c_str(),tblref);
4967       body += tmpstr;
4968     }
4969 //                      Variables for unpacking temporal attributes.
4970     body += "/*\t\tVariables for unpacking temporal attributes\t*/\n";
4971     for(csi=temp_cids.begin(); csi!=temp_cids.end();++csi){
4972           if (cid_set.count(*csi) == 0) {
4973         int schref = (*csi).schema_ref;
4974                 int tblref = (*csi).tblvar_ref;
4975         string field = (*csi).field;
4976         data_type dt(Schema->get_type_name(schref,field));
4977         sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4978           field.c_str(), tblref);
4979         body += tmpstr;
4980
4981           }
4982     }
4983     body += "\n\n";
4984
4985 //              Variables for combinable predicate evaluation
4986         body += "/*\t\tVariables for common prdicate evaluation\t*/\n";
4987         for(q=0;q<equiv_list.size();++q){
4988                 for(p=0;p<=(num_equiv[q]/32);++p){
4989                         body += "unsigned long int pref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = 0;\n";
4990                 }
4991         }
4992
4993
4994 //                      Variables that are always needed
4995     body += "/*\t\tVariables which are always needed\t*/\n";
4996         body += "\tgs_uint64_t retval=0, bitpos=1;\n";
4997         body += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4998
4999 //              Call the unpacking functions for the prefilter fields
5000         if(pref_ufcns.size() > 0)
5001                 body += "\n/*\t\tcall field unpacking functions\t*/\n";
5002         set<string>::iterator ssi;
5003         for(ssi=pref_ufcns.begin(); ssi!=pref_ufcns.end(); ++ssi){
5004                 body += "\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
5005         }
5006
5007
5008 //              Unpack the accessed attributes
5009         body += "\n/*\t\tUnpack the accessed attributes.\t*/\n";
5010     for(csi=cid_set.begin();csi!=cid_set.end();++csi){
5011           int tblref = (*csi).tblvar_ref;
5012       int schref = (*csi).schema_ref;
5013           string field = (*csi).field;
5014           sprintf(tmpstr,"\tret_%s_%d =  (%s(p, &unpack_var_%s_%d) == 0);\n",
5015                 field.c_str(),tblref,Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
5016           body += tmpstr;
5017     }
5018
5019 //              next unpack the temporal attributes and ignore the errors
5020 //              We are assuming here that failed unpack of temporal attributes
5021 //              is not going to overwrite the last stored value
5022 //              Failed upacks are ignored
5023     for(csi=temp_cids.begin();csi!=temp_cids.end();++csi){
5024           int tblref = (*csi).tblvar_ref;
5025       int schref = (*csi).schema_ref;
5026           string field = (*csi).field;
5027           sprintf(tmpstr,"\t%s(p, &prefilter_temp_vars.unpack_var_%s_%d);\n",
5028                  Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
5029           body += tmpstr;
5030     }
5031
5032 //              Evaluate the combinable predicates
5033         if(equiv_list.size()>0)
5034                 body += "/*\t\tEvaluate the combinable predicates.\t*/\n";
5035         for(q=0;q<equiv_list.size();++q){
5036                 for(p=0;p<=(num_equiv[q]/32);++p){
5037
5038 //              Only call the common eval fcn if all ref'd fields present.
5039                         col_id_set pred_cids;
5040                         col_id_set::iterator cpi;
5041                         gather_pr_col_ids(equiv_list[q], pred_cids, gb_tbl);
5042                         if(pred_cids.size()>0){
5043                         body += "\tif(";
5044                                 for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
5045                                         if(cpi != pred_cids.begin())
5046                                                 body += " && ";
5047                                 string field = (*cpi).field;
5048                                         int tblref = (*cpi).tblvar_ref;
5049                                         body += "ret_"+field+"_"+int_to_string(tblref);
5050                                 }
5051                                 body+=")\n";
5052                         }
5053
5054                         body += "\t\tpref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = eval_commonpred_"+equiv_list[q]->get_op()+"(pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface;
5055                         vector<scalarexp_t *> op_list = equiv_list[q]->get_op_list();
5056                         vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
5057                         for(o=0;o<op_list.size();++o){
5058                                 if(cl_op[o]){
5059                                         body += ","+generate_se_code(op_list[o],Schema);
5060                                 }
5061                         }
5062                         body += ");\n";
5063                 }
5064         }
5065
5066
5067         for(p=0;p<pred_list.size();++p){
5068                 col_id_set pred_cids;
5069                 col_id_set::iterator cpi;
5070                 gather_pr_col_ids(pred_list[p]->pr,pred_cids, gb_tbl);
5071                 if(pred_cids.size()>0){
5072                         body += "\tif(";
5073                         for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
5074                                 if(cpi != pred_cids.begin())
5075                                         body += " && ";
5076                         string field = (*cpi).field;
5077                                 int tblref = (*cpi).tblvar_ref;
5078                                 body += "ret_"+field+"_"+int_to_string(tblref);
5079                         }
5080                         body+=")\n";
5081                 }
5082         body += "\t\tif("+generate_predicate_code(pred_list[p]->pr,Schema)+")\n\t\t\tretval |= bitpos;\n";
5083                 body+="\tbitpos = bitpos << 1;\n";
5084         }
5085
5086 // ---------------------------------------------------------------
5087 //              Finished with the body of the prefilter
5088 // --------------------------------------------------------------
5089
5090         ret += body;
5091
5092 //                      Collect fields referenced by an lfta but not
5093 //                      already unpacked for the prefilter.
5094
5095 //printf("upref_cids is:\n");
5096 //for(csi=upref_cids.begin();csi!=upref_cids.end();csi++)
5097 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
5098 //printf("pref_ufcns is:\n");
5099 //for(ssi=pref_ufcns.begin();ssi!=pref_ufcns.end();++ssi)
5100 //printf("\t%s\n",(*ssi).c_str());
5101
5102         int l;
5103         for(l=0;l<lfta_cols.size();++l){
5104                 for(csi=lfta_cols[l].begin();csi!=lfta_cols[l].end();++csi){
5105                         string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
5106                         col_id tmp_col_id;
5107                         tmp_col_id.field = (*csi).field;
5108                         tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
5109                         tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
5110                         field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
5111                         set<string> fld_ufcns = fe->get_unpack_fcns();
5112 //printf("tmpcol is (%s, %d), ufcns size is %d, upref_cids cnt is %d\n",tmp_col_id.field.c_str(),tmp_col_id.schema_ref,fld_ufcns.size(),  upref_cids.count(tmp_col_id));
5113                         if(fld_ufcns.size()>0 && upref_cids.count(tmp_col_id) == 0){
5114 //              Ensure that this field not already unpacked.
5115                                 bool found = false;
5116                                 for(ssi=fld_ufcns.begin();ssi!=fld_ufcns.end();++ssi){
5117 //printf("\tField has unpacking fcn %s\n",(*ssi).c_str());
5118                                         if(pref_ufcns.count((*ssi))){
5119 //printf("Field already unpacked.\n");
5120                                                 found = true;;
5121                                         }
5122                                 }
5123                                 if(! found){
5124 //printf("\tadding to unpack list\n");
5125                                         upall_cids.insert(tmp_col_id);
5126                                 }
5127                         }
5128                 }
5129         }
5130
5131 //printf("upall_cids is:\n");
5132 //for(csi=upall_cids.begin();csi!=upall_cids.end();csi++)
5133 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
5134
5135 //              Get the set of unpacking programs for these.
5136         map<col_id, string,lt_col_id>  uall_fcn_map;
5137         find_optimal_unpack_fcns(upall_cids, Schema, uall_fcn_map);
5138         set<string> pall_ufcns;
5139         for(mcis=uall_fcn_map.begin(); mcis!=uall_fcn_map.end(); mcis++){
5140 //printf("uall_fcn_map[%s %d] = %s\n",(*mcis).first.field.c_str(),(*mcis).first.schema_ref,(*mcis).second.c_str());
5141                 pall_ufcns.insert((*mcis).second);
5142         }
5143
5144 //              Iterate through the remaining set of unpacking function
5145         if(pall_ufcns.size() > 0)
5146                 ret += "//\t\tcall all remaining field unpacking functions.\n";
5147         for(ssi=pall_ufcns.begin(); ssi!=pall_ufcns.end(); ++ssi){
5148 //              gather the set of columns unpacked by this ufcn
5149                 col_id_set fcol_set;
5150                 for(csi=upall_cids.begin();csi!=upall_cids.end();++csi){
5151                         if(uall_fcn_map[(*csi)] == (*ssi))
5152                                 fcol_set.insert((*csi));
5153                 }
5154
5155 //              gather the set of lftas which access a field unpacked by the fcn
5156                 set<long long int> clfta;
5157                 for(l=0;l<lfta_cols.size();l++){
5158                         for(csi=fcol_set.begin();csi!=fcol_set.end();++csi){
5159                                 if(lfta_cols[l].count((*csi)) > 0)
5160                                         break;
5161                         }
5162                         if(csi != fcol_set.end())
5163                                 clfta.insert(lfta_sigs[l]);
5164                 }
5165
5166 //              generate the unpacking code
5167                 ret += "\tif(";
5168                 set<long long int>::iterator sii;
5169                 for(sii=clfta.begin();sii!=clfta.end();++sii){
5170                         if(sii!=clfta.begin())
5171                                 ret += " || ";
5172                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sii),(*sii));
5173                         ret += tmpstr;
5174                 }
5175                 ret += ")\n\t\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
5176         }
5177
5178
5179     ret += "\treturn(retval);\n\n";
5180   ret += "}\n\n";
5181
5182
5183 // --------------------------------------------------------
5184 //              reuse prefilter body for snaplen calculator
5185 //
5186 //      This is dummy code, so I'm commenting it out.
5187
5188 /*
5189   ret+="gs_uint32_t lfta_pkt_snaplen(void *pkt){\n";
5190
5191         ret += body;
5192
5193         int i;
5194         vector<int> s_snaps = lfta_snap_lens;
5195         sort(s_snaps.begin(), s_snaps.end());
5196
5197         if(s_snaps[0] == -1){
5198                 set<unsigned long long int> sigset;
5199                 for(i=0;i<lfta_snap_lens.size();++i){
5200                         if(lfta_snap_lens[i] == -1){
5201                                 sigset.insert(lfta_sigs[i]);
5202                         }
5203                 }
5204                 ret += "\tif( ";
5205                 set<unsigned long long int>::iterator sulli;
5206                 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
5207                         if(sulli!=sigset.begin())
5208                                 ret += " || ";
5209                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
5210                         ret += tmpstr;
5211                 }
5212                 ret += ") return -1;\n";
5213         }
5214
5215         int nextpos = lfta_snap_lens.size()-1;
5216         int nextval = lfta_snap_lens[nextpos];
5217         while(nextval >= 0){
5218                 set<unsigned long long int> sigset;
5219                 for(i=0;i<lfta_snap_lens.size();++i){
5220                         if(lfta_snap_lens[i] == nextval){
5221                                 sigset.insert(lfta_sigs[i]);
5222                         }
5223                 }
5224                 ret += "\tif( ";
5225                 set<unsigned long long int>::iterator sulli;
5226                 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
5227                         if(sulli!=sigset.begin())
5228                                 ret += " || ";
5229                         sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
5230                         ret += tmpstr;
5231                 }
5232                 ret += ") return "+int_to_string(nextval)+";\n";
5233
5234                 for(nextpos--;nextpos>0 && lfta_snap_lens[nextpos] == nextval;nextpos--);
5235                 if(nextpos>0)
5236                         nextval = lfta_snap_lens[nextpos];
5237                 else
5238                         nextval = -1;
5239         }
5240         ret += "\treturn 0;\n";
5241         ret += "}\n\n";
5242 */
5243
5244
5245   return(ret);
5246 }
5247
5248
5249
5250
5251 //              Generate the struct which will store the the values of
5252 //              temporal attributesunpacked by prefilter
5253 string generate_lfta_prefilter_struct(col_id_set &cid_set, table_list *Schema) {
5254
5255   col_id_set::iterator csi;
5256
5257 // printf("generate_lfta_prefilter_struct : %d vars\n",cid_set.size());
5258
5259   string ret="struct prefilter_unpacked_temp_vars {\n";
5260   ret += "\t/*\tVariables for unpacking temporal attributes\t*/\n";
5261
5262   string init_code;
5263
5264   for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
5265     int schref = (*csi).schema_ref;
5266     int tblref = (*csi).tblvar_ref;
5267     string field = (*csi).field;
5268         data_type dt(Schema->get_type_name(schref,field), Schema->get_modifier_list(schref,field));
5269     sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
5270         field.c_str(), tblref);
5271     ret += tmpstr;
5272
5273         if (init_code != "")
5274                 init_code += ", ";
5275         if (dt.is_increasing())
5276                 init_code += dt.get_min_literal();
5277         else
5278                 init_code += dt.get_max_literal();
5279
5280   }
5281   ret += "};\n\n";
5282
5283   ret += "struct prefilter_unpacked_temp_vars prefilter_temp_vars = {" + init_code + "};\n\n";
5284
5285   return(ret);
5286 }