1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
22 #include "parse_fta.h"
23 #include "parse_schema.h"
24 #include "analyze_fta.h"
25 #include "generate_utils.h"
26 #include "query_plan.h"
27 #include "generate_lfta_code.h"
28 #include "generate_nic_code.h"
32 extern int DEFAULT_LFTA_HASH_TABLE_SIZE;
34 // default value for correlation between the interface card and
36 #define DEFAULT_TIME_CORR 16
41 extern string hash_nums[NRANDS];
44 "12916008961267169387ull", "13447227858232756685ull",
45 "15651770379918602919ull", "1154671861688431608ull",
46 "6777078091984849858ull", "14217205709582564356ull",
47 "4955408621820609982ull", "15813680319165523695ull",
48 "9897969721407807129ull", "5799700135519793083ull",
49 "3446529189623437397ull", "2766403683465910630ull",
50 "3759321430908793328ull", "6569396511892890354ull",
51 "11124853911180290924ull", "17425412145238035549ull",
52 "6879931585355039943ull", "16598635011539670441ull",
53 "9615975578494811651ull", "4378135509538422740ull",
54 "741282195344332574ull", "17368612862906255584ull",
55 "17294299200556814618ull", "518343398779663051ull",
56 "3861893449302272757ull", "8951107288843549591ull",
57 "15785139392894559409ull", "5917810836789601602ull",
58 "16169988133001117004ull", "9792861259254509262ull",
59 "5089058010244872136ull", "2130075224835397689ull",
60 "844136788226150435ull", "1303298091153875333ull",
61 "3579898206894361183ull", "7529542662845336496ull",
62 "13151949992653382522ull", "2145333536541545660ull",
63 "11258221828939586934ull", "3741808146124570279ull",
64 "16272841626371307089ull", "12174572036188391283ull",
65 "9749343496254107661ull", "9141275584134508830ull",
66 "10134192232065698216ull", "12944268412561423018ull",
67 "17499725811865666340ull", "5281482378159088661ull",
68 "13254803486023572607ull", "4526762838498717025ull",
69 "15990846379668494011ull", "10680949816169027468ull",
70 "7116154096012931030ull", "5296740689865236632ull",
71 "5222427027515795922ull", "6893215299448261251ull",
72 "10164707755932877485ull", "15325979189512082255ull",
73 "3713267224148573289ull", "12292682741753167354ull",
74 "4098115959960163588ull", "16095675565885113990ull",
75 "11391590846210510720ull", "8432889531466002673ull",
76 "7146668520368482523ull", "7678169991822407997ull",
77 "9882712513525031447ull", "13904414563513869160ull",
78 "1080076724395768626ull", "8448147843172150388ull",
79 "17633093729608185134ull", "10044622457050142303ull",
80 "4128911859292425737ull", "30642269109444395ull",
81 "16124215396922640581ull", "15444089895060081110ull",
82 "16437006538696302944ull", "800338649777443426ull",
83 "5355794945275091932ull", "11656354278827687117ull",
84 "1110873718944691255ull", "10829576045617693977ull",
85 "3846916616884579955ull", "17055821716837625668ull",
86 "13418968402643535758ull", "11671612594828802128ull",
87 "11597298928184328586ull", "13196028510862205499ull",
88 "16539578557089782373ull", "3182048322921507591ull",
89 "10016080431267550241ull", "148751875162592690ull",
90 "10400930266590768572ull", "4023803397139127870ull",
91 "17766462746879108920ull", "14807761432134600873ull",
92 "13521540421053792403ull", "13980983198941385205ull",
93 "16257584414193564367ull", "1760484796451765024ull"
98 // ----------------------------------------------
99 // Data extracted from the query plan node
100 // for use by code generation.
102 static cplx_lit_table *complex_literals; //Table of literals with constructors.
103 static vector<handle_param_tbl_entry *> param_handle_table;
104 static param_table *param_tbl; // Table of all referenced parameters.
106 static vector<scalarexp_t *> sl_list;
107 static vector<cnf_elem *> where;
109 static gb_table *gb_tbl; // Table of all group-by attributes.
110 static aggregate_table *aggr_tbl; // Table of all referenced aggregates.
112 static bool packed_return; // unpack using structyure, not fcns
113 static nic_property *nicprop; // nic properties for this interface.
114 static int global_id;
117 // The partial_fcns vector can now refer to
118 // partial functions, or expensive functions
119 // which can be cached (if there are multiple refs). A couple
120 // of int vectors distinguish the cases.
121 static vector<scalarexp_t *> partial_fcns;
122 static vector<int> fcn_ref_cnt;
123 static vector<bool> is_partial_fcn;
124 int sl_fcns_start = 0, sl_fcns_end = 0;
125 int wh_fcns_start = 0, wh_fcns_end = 0;
126 int gb_fcns_start = 0, gb_fcns_end = 0;
127 int ag_fcns_start = 0, ag_fcns_end = 0;
130 // These vectors are for combinable predicates.
131 static vector<int> pred_class; // identifies the group
132 static vector<int> pred_pos; // position in the group.
136 static char tmpstr[1000];
138 //////////////////////////////////////////////////////////////////////
139 /// Various utilities
141 string generate_fta_name(string node_name){
142 string ret = normalize_name(node_name);
152 string generate_aggr_struct_name(string node_name){
153 string ret = normalize_name(node_name);
157 ret += "_aggr_struct";
162 string generate_fj_struct_name(string node_name){
163 string ret = normalize_name(node_name);
172 string generate_watchlist_element_name(string node_name){
173 string ret = normalize_name(node_name);
182 string generate_watchlist_struct_name(string node_name){
183 string ret = normalize_name(node_name);
187 ret += "__wl_struct";
192 string generate_watchlist_name(string node_name){
193 string ret = normalize_name(node_name);
202 string generate_unpack_code(int tblref, int schref, string field, table_list *schema, string node_name, string end_goto = string("end")){
205 sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n",
206 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
208 if(!schema->get_modifier_list(schref,field)->contains_key("required"))
209 ret += "\tif(retval) goto "+end_goto+";\n";
212 // TODO: ntoh xforms (aug 2010 : removing ntoh, hton)
213 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
214 if(dt.is_buffer_type()){
215 if(dt.get_type() != v_str_t){
216 ret += "\tif(sizeof(struct "+node_name+"_input_struct)+"+node_name+"_input_struct_var->unpack_var_"+field+".length+int("+node_name+"_input_struct_var->unpack_var_"+field+".data) > sz)\n";
217 ret += "\t\tgoto "+end_goto+";\n";
218 ret+= "\t\t"+node_name+"_input_struct_var->unpack_var_"+field+".data += "+node_name+"_input_struct_var->unpack_var_"+field+".length;\n";
219 ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
220 " = "+node_name+"_input_struct_var->unpack_var_"+field+";+\n";
222 fprintf(stderr,"INTERNAL ERROR buffer type not string type in generate_lfta_code.cc:generate_unpack_code\n");
226 ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
227 " = "+node_name+"_input_struct_var->unpack_var_"+field+";\n";
233 string generate_aggr_struct(string node_name, gb_table *gb_tbl, aggregate_table *aggr_tbl){
234 string ret = "struct " + generate_aggr_struct_name(node_name) + "{\n";
237 for(g=0;g<gb_tbl->size();g++){
238 sprintf(tmpstr,"gb_var%d",g);
239 ret += "\t"+gb_tbl->get_data_type(g)->make_cvar(tmpstr)+";\n";
243 for(a=0;a<aggr_tbl->size();a++){
245 sprintf(tmpstr,"aggr_var%d",a);
246 if(aggr_tbl->is_builtin(a))
247 ret+="\t"+aggr_tbl->get_data_type(a)->make_cvar(tmpstr)+";\n";
249 ret+="\t"+aggr_tbl->get_storage_type(a)->make_cvar(tmpstr)+";\n";
253 ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *next;\n";
262 string generate_fj_struct(filter_join_qpn *fs, string node_name ){
265 if(fs->use_bloom == false){ // uses hash table instead
266 ret = "struct " + generate_fj_struct_name(node_name) + "{\n";
268 for(k=0;k<fs->hash_eq.size();++k){
269 sprintf(tmpstr,"key_var%d",k);
270 ret += "\t"+fs->hash_eq[k]->pr->get_right_se()->get_data_type()->make_cvar(tmpstr)+";\n";
272 ret += "\tlong long int ts;\n";
279 string generate_watchlist_structs(string node_name, table_def *tbl,
280 std::string filename, int refresh_interval){
283 ret += "struct "+generate_watchlist_element_name(node_name)+"{\n";
284 vector<field_entry *> fields = tbl->get_fields();
285 for(int f=0;f<fields.size();++f){
286 data_type dt(fields[f]->get_type());
287 ret += "\t"+dt.make_cvar(fields[f]->get_name())+";\n";
289 ret += "\tgs_uint64_t hashval;\n";
290 ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next;\n";
293 ret += "char *"+generate_watchlist_name(node_name)+"__fstr = \""+filename+"\";\n";
294 ret += "struct "+generate_watchlist_struct_name(node_name)+"{\n";
295 ret += "\tstruct "+ generate_watchlist_element_name(node_name)+" **ht;\n";
296 ret += "\tgs_uint32_t ht_size;\n";
297 ret += "\tgs_uint32_t n_elem;\n";
298 ret += "\tgs_uint32_t refresh_interval;\n";
299 ret += "\ttime_t next_refresh;\n";
300 ret += "\ttime_t last_mtime;\n";
301 ret += "\tchar *filename;\n";
302 ret += "} "+generate_watchlist_name(node_name)+" = { NULL, 0, 0, "+std::to_string(refresh_interval)+", 0, 0, NULL};\n\n";
307 string generate_watchlist_load(string node_name, table_def *tbl, vector<string> keys){
309 string tgt = generate_watchlist_name(node_name);
310 vector<field_entry *> fields = tbl->get_fields();
312 ret += "void reload_watchlist__"+node_name+"(){\n";
313 ret += "\tgs_uint32_t i;\n";
314 ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *ptr = NULL;\n";
315 ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next = NULL;\n";
316 ret += "\tFILE *fl;\n";
317 ret += "\tchar buf[10000];\n";
318 ret += "\tgs_uint32_t buflen = 10000;\n";
319 ret += "\tchar *flds["+std::to_string(fields.size())+"];\n";
320 ret += "\tgs_uint32_t pos, f, linelen, malformed;\n";
321 ret += "\tgs_uint32_t n_malformed, short_lines, toolong_lines, ok;\n";
322 ret += "\tgs_uint64_t hash, bucket;\n";
323 ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *rec;\n\n";
325 ret += "// make sure watchlist file has changed since the last time we loaded it\n";
326 ret += "\tstruct stat file_stat;\n";
327 ret += "\tint err = stat(" + tgt + ".filename, &file_stat);\n";
328 ret += "\tif (err) {\n";
329 ret += "\t\tgslog(LOG_INFO,\"Warning, unable to stat() watchlist file %s to reload " + node_name + ", continue using old version\\n\", " + tgt + ".filename);\n";
330 ret += "\t\treturn;\n";
332 ret += "\tif (file_stat.st_mtime <= " + tgt + ".last_mtime && file_stat.st_ctime <= " + tgt + ".last_mtime) // watchlist file hasn't changed since last time\n";
333 ret += "\t\treturn;\n";
334 ret += "\t" + tgt + ".last_mtime = (file_stat.st_mtime>file_stat.st_ctime)?file_stat.st_mtime:file_stat.st_ctime;\n\n";
336 ret += "// Delete old entries.\n";
337 ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n";
338 ret += "\t\tptr="+tgt+".ht[i];\n";
339 ret += "\t\twhile(ptr!=NULL){\n";
340 for(int f=0;f<fields.size();++f){
341 data_type dt(fields[f]->get_type());
342 if(dt.is_buffer_type()){
343 ret += "\t\t\t"+dt.get_buffer_destroy()+"(&(ptr->"+fields[f]->get_name()+"));\n";
346 ret += "\t\t\tnext = ptr->next;\n";
347 ret += "\t\t\tfree(ptr);\n";
348 ret += "\t\t\tptr = next;\n";
351 ret += "\n// prepare new table. \n";
352 ret += "\tif("+tgt+".n_elem > "+tgt+".ht_size || "+tgt+".ht_size==0){\n";
353 ret += "\t\tif("+tgt+".ht)\n";
354 ret += "\t\t\tfree("+tgt+".ht);\n";
355 ret += "\t\tif("+tgt+".ht_size == 0)\n";
356 ret += "\t\t\t"+tgt+".ht_size = 100000;\n";
358 ret += "\t\t\t"+tgt+".ht_size = "+tgt+".n_elem;\n";
359 ret += "\t\t"+tgt+".ht = (struct "+generate_watchlist_element_name(node_name)+" **)malloc("+tgt+".ht_size * sizeof(struct "+generate_watchlist_element_name(node_name)+" *));\n";
361 ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n";
362 ret += "\t\t"+tgt+".ht[i] = NULL;\n";
364 ret += "\n// load new table\n";
365 ret += "\t"+tgt+".n_elem = 0;\n";
366 ret += "\tfl = fopen("+tgt+".filename, \"r\");\n";
367 ret += "\tif(fl==NULL){\n";
368 ret += "\t\tgslog(LOG_INFO,\"Warning, can't open file %s for watchlist "+node_name+"\\n\","+tgt+".filename);\n";
369 ret += "\t\treturn;\n";
371 ret += "\tmalformed = 0;\n";
372 ret += "\tshort_lines = 0;\n";
373 ret += "\ttoolong_lines = 0;\n";
374 ret += "\twhile(fgets(buf, buflen, fl) != NULL){\n";
375 ret += "\t\tlinelen = strlen(buf);\n";
376 ret += "\t\tbuf[linelen-1]='\\0'; // strip off trailing newline\n";
377 ret += "\t\tlinelen--;\n";
379 ret += "\t\tpos=0;\n";
380 ret += "\t\tmalformed=0;\n";
381 ret += "\t\tok=1;\n";
382 ret += "\t\tflds[0] = buf;\n";
383 ret += "\t\tfor(f=1;pos < linelen && f<"+std::to_string(fields.size())+";++f){\n";
384 ret += "\t\t\tfor(;pos < linelen && buf[pos]!=',' && buf[pos]!='\\n';++pos);\n";
385 ret += "\t\t\tif(pos >= linelen){\n";
386 ret += "\t\t\t\tmalformed = 1;\n";
387 ret += "\t\t\t\tbreak;\n";
389 ret += "\t\t\tbuf[pos]='\\0';\n";
390 ret += "\t\t\tpos++;\n";
391 ret += "\t\t\tflds[f]=buf+pos;\n";
393 ret += "\t\tif(malformed){\n";
394 ret += "\t\t\tok=0;\n";
395 ret += "\t\t\tn_malformed++;\n";
397 ret += "\t\tif(f<"+std::to_string(fields.size())+"){\n";
398 ret += "\t\t\tok=0;\n";
399 ret += "\t\t\tshort_lines++;\n";
401 ret += "\t\tif(pos && (pos<linelen)){\n";
402 ret += "\t\t\tok=0;\n";
403 ret += "\t\t\ttoolong_lines++;\n";
405 ret += "\t\tif(f>="+std::to_string(fields.size())+"){\n";
406 ret += "\t\t\trec = (struct "+generate_watchlist_element_name(node_name)+" *)malloc(sizeof(struct "+generate_watchlist_element_name(node_name)+"));\n";
408 for(int f=0;f<fields.size();++f){
409 data_type dt(fields[f]->get_type());
410 ret += "\t\t\t"+dt.get_wl_extract_fcn()+"(flds["+std::to_string(f)+"], &(rec->"+fields[f]->get_name()+"));\n";
412 // Compute the hash value
413 ret += "\t\t\thash=0;\n";
414 for(int k=0;k<keys.size();++k){
415 string key_fld = keys[k];
417 for(f=0;f<fields.size();++f){
418 if(fields[f]->get_name() == key_fld)
421 data_type dt(fields[f]->get_type());
424 "\t\t\thash ^= (("+hash_nums[f%NRANDS]+" * lfta_"+
425 dt.get_type_str()+"_to_hash(rec->"+fields[f]->get_name()+")));\n";
427 ret += "\t\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n";
428 ret += "\t\t\trec->hashval = hash;\n";
429 ret += "\t\t\trec->next = "+tgt+".ht[bucket];\n";
430 ret += "\t\t\t"+tgt+".ht[bucket] = rec;\n";
431 ret += "\t\t\t"+tgt+".n_elem++;\n";
435 ret += "\tif(n_malformed+toolong_lines > 0){\n";
436 ret += "\t\tgslog(LOG_INFO,\"Errors reading data for watchlist "+node_name+" from file %s: malformed=%d, too short=%d, too long=%d\\n\","+tgt+".filename, malformed, short_lines, toolong_lines);\n";
446 string generate_fta_struct(string node_name, gb_table *gb_tbl,
447 aggregate_table *aggr_tbl, param_table *param_tbl,
448 cplx_lit_table *complex_literals,
449 vector<handle_param_tbl_entry *> ¶m_handle_table,
450 bool is_aggr_query, bool is_fj, bool is_wj, bool uses_bloom,
453 string ret = "struct " + generate_fta_name(node_name) + "{\n";
454 ret += "\tstruct FTA f;\n";
456 //-------------------------------------------------------------
457 // Aggregate-specific fields
461 ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_head, *flush_head;\n";
463 ret+="\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_table; // the groups\n";
464 ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n";
465 // ret+="\tint bitmap_size;\n";
466 ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n";
467 ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n";
468 ret += "\tint max_windows; // max number of open windows.\n";
469 ret += "\tunsigned int generation; // initially zero, increment on\n";
470 ret += "\t // every hash table flush - whether regular or induced.\n";
471 ret += "\t // Old groups are identified by a generation mismatch.\n";
472 ret += "\tunsigned int flush_pos; // next aggr_table entry to examine\n";
473 ret += "\tunsigned int flush_ctr; // control slow flushing\n";
478 bool uses_temporal_flush = false;
479 for(g=0;g<gb_tbl->size();g++){
480 data_type *dt = gb_tbl->get_data_type(g);
481 if(dt->is_temporal()){
483 fprintf(stderr,"group by attribute %s is temporal, ",
484 gb_tbl->get_name(g).c_str());
485 if(dt->is_increasing()){
486 fprintf(stderr,"increasing.\n");
488 fprintf(stderr,"decreasing.\n");
491 data_type *gdt = gb_tbl->get_data_type(g);
492 if(gdt->is_buffer_type()){
493 fprintf(stderr, "\t but temporal BUFFER types are not supported, skipping.\n");
495 sprintf(tmpstr,"\t%s last_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
497 sprintf(tmpstr,"\t%s flush_start_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
499 sprintf(tmpstr,"\t%s last_flushed_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
501 uses_temporal_flush = true;
507 if(! uses_temporal_flush){
508 fprintf(stderr,"Warning: no temporal flush.\n");
512 // ---------------------------------------------------------
513 // Filter-join specific fields
518 "\tunsigned char * bf_table; //array of bloom filters with layout \n"
519 "\t\t// bit 0 bf 0| bit 0 bf 1| bit 0 bf 2| bit 1 bf 0| bit 1 bf 1|.....\n"
520 "\tint first_exec;\n"
521 "\tlong long int last_bin;\n"
522 "\tint last_bloom_pos;\n"
525 }else{ // limited hash table
527 " struct "+generate_fj_struct_name(node_name)+" *join_table;\n"
534 // --------------------------------------------
535 // watchlist-join specific
537 ret += "\ttime_t ux_time;\n";
540 //--------------------------------------------------------
543 // Create places to hold the parameters.
545 vector<string> param_vec = param_tbl->get_param_names();
546 for(p=0;p<param_vec.size();p++){
547 data_type *dt = param_tbl->get_data_type(param_vec[p]);
548 sprintf(tmpstr,"\t%s param_%s;\n",dt->get_cvar_type().c_str(),
549 param_vec[p].c_str());
551 if(param_tbl->handle_access(param_vec[p])){
552 ret += "\tstruct search_handle *param_handle_"+param_vec[p]+";\n";
556 // Create places to hold complex literals.
558 for(cl=0;cl<complex_literals->size();cl++){
559 literal_t *l = complex_literals->get_literal(cl);
560 data_type *dtl = new data_type( l->get_type() );
561 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
565 // Create places to hold the pass-by-handle parameters.
566 for(p=0;p<param_handle_table.size();++p){
567 sprintf(tmpstr,"\tgs_param_handle_t handle_param_%d;\n",p);
571 // Create places to hold the last values of temporal
572 // attributes referenced in select clause
573 // we also need to store values of the temoral attributed
574 // of last flushed tuple in aggr queries
575 // to make sure we generate the cirrect temporal tuple
576 // in the presense of slow flushes
579 col_id_set temp_cids; // col ids of temp attributes in select clause
582 col_id_set::iterator csi;
584 for(s=0;s<sl_list.size();s++){
585 data_type *sdt = sl_list[s]->get_data_type();
586 if (sdt->is_temporal()) {
587 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
591 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
592 int tblref = (*csi).tblvar_ref;
593 int schref = (*csi).schema_ref;
594 string field = (*csi).field;
595 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
596 sprintf(tmpstr,"\t%s last_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
600 ret += "\tgs_uint64_t trace_id;\n\n";
602 // Fields to store the runtime stats
604 ret += "\tgs_uint32_t in_tuple_cnt;\n";
605 ret += "\tgs_uint32_t out_tuple_cnt;\n";
606 ret += "\tgs_uint32_t out_tuple_sz;\n";
607 ret += "\tgs_uint32_t accepted_tuple_cnt;\n";
608 ret += "\tgs_uint64_t cycle_cnt;\n";
609 ret += "\tgs_uint32_t collision_cnt;\n";
610 ret += "\tgs_uint32_t eviction_cnt;\n";
611 ret += "\tgs_float_t sampling_rate;\n";
620 //------------------------------------------------------------
621 // Set colref tblvars to 0..
622 // (special processing for join-like operators in an lfta).
624 void reset_se_col_ids_tblvars(scalarexp_t *se, gb_table *gtbl){
625 vector<scalarexp_t *> operands;
631 switch(se->get_operator_type()){
637 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
640 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
641 reset_se_col_ids_tblvars(se->get_right_se(),gtbl);
645 se->get_colref()->set_tablevar_ref(0);
648 fprintf(stderr,"INTERNAL ERROR: gbvar ref in gather_se_col_ids, but gtbl is NULL.\n");
651 reset_se_col_ids_tblvars(gtbl->get_def(se->get_gb_ref()),gtbl);
657 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
660 operands = se->get_operands();
661 for(o=0;o<operands.size();o++){
662 reset_se_col_ids_tblvars(operands[o], gtbl);
666 fprintf(stderr,"INTERNAL ERROR in reset_se_col_ids_tblvars, line %d, character %d: unknown operator type %d\n",
667 se->get_lineno(), se->get_charno(),se->get_operator_type());
673 // reset column tblvars accessed in this pr.
675 void reset_pr_col_ids_tblvars(predicate_t *pr, gb_table *gtbl){
676 vector<scalarexp_t *> op_list;
679 switch(pr->get_operator_type()){
681 reset_se_col_ids_tblvars(pr->get_left_se(), gtbl);
684 reset_se_col_ids_tblvars(pr->get_left_se(),gtbl) ;
685 reset_se_col_ids_tblvars(pr->get_right_se(),gtbl) ;
688 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
691 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
692 reset_pr_col_ids_tblvars(pr->get_right_pr(),gtbl) ;
695 op_list = pr->get_op_list();
696 for(o=0;o<op_list.size();++o){
697 reset_se_col_ids_tblvars(op_list[o],gtbl) ;
701 fprintf(stderr,"INTERNAL ERROR in reset_pr_col_ids_tblvars, line %d, character %d, unknown predicate operator type %d\n",
702 pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
709 // Generate code that makes reference
710 // to the tuple, and not to any aggregates.
711 static string generate_se_code(scalarexp_t *se,table_list *schema){
713 data_type *ldt, *rdt;
715 vector<scalarexp_t *> operands;
718 switch(se->get_operator_type()){
720 if(se->is_handle_ref()){
721 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
725 if(se->get_literal()->is_cpx_lit()){
726 sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
730 return(se->get_literal()->to_C_code("")); // not complex, no constructor
732 if(se->is_handle_ref()){
733 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
738 ret += se->get_param_name();
741 ldt = se->get_left_se()->get_data_type();
742 if(ldt->complex_operator(se->get_op()) ){
743 ret += ldt->get_complex_operator(se->get_op());
745 ret += generate_se_code(se->get_left_se(),schema);
750 ret += generate_se_code(se->get_left_se(),schema);
755 ldt = se->get_left_se()->get_data_type();
756 rdt = se->get_right_se()->get_data_type();
758 if(ldt->complex_operator(rdt, se->get_op()) ){
759 ret += ldt->get_complex_operator(rdt, se->get_op());
761 ret += generate_se_code(se->get_left_se(),schema);
763 ret += generate_se_code(se->get_right_se(),schema);
767 ret += generate_se_code(se->get_left_se(),schema);
769 ret += generate_se_code(se->get_right_se(),schema);
774 if(se->is_gb()){ // OK to ref gb attrs, but they're not yet unpacked ...
775 // so return the defining code.
776 ret = generate_se_code(gb_tbl->get_def(se->get_gb_ref()), schema );
779 sprintf(tmpstr,"unpack_var_%s_%d",
780 se->get_colref()->get_field().c_str(), se->get_colref()->get_tablevar_ref() );
785 // Should not be ref'ing any aggr here.
786 if(se->get_aggr_ref() >= 0){
787 fprintf(stderr,"INTERNAL ERROR, UDAF reference in generate_se_code.\n");
788 return("ERROR in generate_se_code");
791 if(se->is_partial()){
792 sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
796 operands = se->get_operands();
797 for(o=0;o<operands.size();o++){
799 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
801 ret += generate_se_code(operands[o], schema);
807 fprintf(stderr,"INTERNAL ERROR in generate_se_code (lfta), line %d, character %d: unknown operator type %d\n",
808 se->get_lineno(), se->get_charno(),se->get_operator_type());
809 return("ERROR in generate_se_code");
813 // generate code that refers only to aggregate data and constants.
814 static string generate_se_code_fm_aggr(scalarexp_t *se, string var, table_list *schema){
817 data_type *ldt, *rdt;
819 vector<scalarexp_t *> operands;
822 switch(se->get_operator_type()){
824 if(se->is_handle_ref()){
825 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
829 if(se->get_literal()->is_cpx_lit()){
830 sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
834 return(se->get_literal()->to_C_code("")); // not complex no constructor
836 if(se->is_handle_ref()){
837 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
842 ret += se->get_param_name();
845 ldt = se->get_left_se()->get_data_type();
846 if(ldt->complex_operator(se->get_op()) ){
847 ret += ldt->get_complex_operator(se->get_op());
849 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
854 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
859 ldt = se->get_left_se()->get_data_type();
860 rdt = se->get_right_se()->get_data_type();
862 if(ldt->complex_operator(rdt, se->get_op()) ){
863 ret += ldt->get_complex_operator(rdt, se->get_op());
865 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
867 ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
871 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
873 ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
878 if(se->is_gb()){ // OK to ref gb attrs, but they're not yet
879 // unpacked ... so return the defining code.
880 sprintf(tmpstr,"%sgb_var%d",var.c_str(),se->get_gb_ref());
884 fprintf(stderr,"ERROR reference to non-GB column ref not permitted here,"
885 "error in generate_se_code_fm_aggr, line %d, character %d.\n",
886 se->get_lineno(), se->get_charno());
892 sprintf(tmpstr,"%saggr_var%d",var.c_str(),se->get_aggr_ref());
897 if(se->get_aggr_ref() >= 0){
898 sprintf(tmpstr,"udaf_ret%d",se->get_aggr_ref());
903 if(se->is_partial()){
904 sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
908 operands = se->get_operands();
909 for(o=0;o<operands.size();o++){
911 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
913 ret += generate_se_code_fm_aggr(operands[o], var, schema);
919 fprintf(stderr,"INTERNAL ERROR in generate_lfta_code.cc::generate_se_code_fm_aggr, line %d, character %d: unknown operator type %d\n",
920 se->get_lineno(), se->get_charno(),se->get_operator_type());
921 return("ERROR in generate_se_code");
927 static string unpack_partial_fcn_fm_aggr(scalarexp_t *se, int pfn_id, string var, table_list *schema){
930 vector<scalarexp_t *> operands;
933 if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
934 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn_fm_aggr. line %d, character %d\n",
935 se->get_lineno(), se->get_charno());
936 return("ERROR in generate_se_code");
939 ret = "\tretval = " + se->get_op() + "( ";
940 sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
943 operands = se->get_operands();
944 for(o=0;o<operands.size();o++){
946 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
948 ret += generate_se_code_fm_aggr(operands[o], var, schema);
955 static string generate_cached_fcn(scalarexp_t *se, table_list *schema){
958 vector<scalarexp_t *> operands;
960 if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
961 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to generate_cached_fcn. line %d, character %d\n",
962 se->get_lineno(), se->get_charno());
963 return("ERROR in generate_se_code");
966 ret = se->get_op() + "( ";
968 operands = se->get_operands();
969 for(o=0;o<operands.size();o++){
971 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
973 ret += generate_se_code(operands[o], schema);
982 static string unpack_partial_fcn(scalarexp_t *se, int pfn_id, table_list *schema){
985 vector<scalarexp_t *> operands;
988 if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
989 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn. line %d, character %d\n",
990 se->get_lineno(), se->get_charno());
991 return("ERROR in generate_se_code");
994 ret = "\tretval = " + se->get_op() + "( ",
995 sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
998 operands = se->get_operands();
999 for(o=0;o<operands.size();o++){
1001 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
1003 ret += generate_se_code(operands[o], schema);
1014 static string generate_C_comparison_op(string op){
1015 if(op == "=") return("==");
1016 if(op == "<>") return("!=");
1020 static string generate_C_boolean_op(string op){
1021 if( (op == "AND") || (op == "And") || (op == "and") ){
1024 if( (op == "OR") || (op == "Or") || (op == "or") ){
1027 if( (op == "NOT") || (op == "Not") || (op == "not") ){
1031 fprintf(stderr,"INTERNAL ERROR: unknown boolean operator %s\n",op.c_str());
1032 return("ERROR UNKNOWN BOOLEAN OPERATOR :"+op);
1036 static string generate_predicate_code(predicate_t *pr,table_list *schema){
1038 vector<literal_t *> litv;
1040 data_type *ldt, *rdt;
1041 vector<scalarexp_t *> op_list;
1043 unsigned int bitmask;
1045 switch(pr->get_operator_type()){
1047 ldt = pr->get_left_se()->get_data_type();
1050 litv = pr->get_lit_vec();
1051 for(i=0;i<litv.size();i++){
1052 if(i>0) ret += " || ";
1055 if(ldt->complex_comparison(ldt) ){
1056 ret += ldt->get_equals_fcn(ldt) ;
1058 if(ldt->is_buffer_type() ) ret += "&";
1059 ret += generate_se_code(pr->get_left_se(), schema);
1061 if(ldt->is_buffer_type() ) ret += "&";
1062 if(litv[i]->is_cpx_lit()){
1063 sprintf(tmpstr,"t->complex_literal_%d",litv[i]->get_cpx_lit_ref() );
1066 ret += litv[i]->to_C_code("");
1070 ret += generate_se_code(pr->get_left_se(), schema);
1072 ret += litv[i]->to_C_code("");
1081 ldt = pr->get_left_se()->get_data_type();
1082 rdt = pr->get_right_se()->get_data_type();
1085 if(ldt->complex_comparison(rdt) ){
1086 // TODO can use get_equals_fcn if op is "=" ?
1087 ret += ldt->get_comparison_fcn(rdt);
1089 if(ldt->is_buffer_type() ) ret += "&";
1090 ret += generate_se_code(pr->get_left_se(),schema);
1092 if(rdt->is_buffer_type() ) ret += "&";
1093 ret += generate_se_code(pr->get_right_se(),schema);
1095 ret += generate_C_comparison_op(pr->get_op());
1098 ret += generate_se_code(pr->get_left_se(),schema);
1099 ret += generate_C_comparison_op(pr->get_op());
1100 ret += generate_se_code(pr->get_right_se(),schema);
1106 ret += generate_C_boolean_op(pr->get_op());
1107 ret += generate_predicate_code(pr->get_left_pr(),schema);
1110 case PRED_BINARY_OP:
1112 ret += generate_predicate_code(pr->get_left_pr(),schema);
1113 ret += generate_C_boolean_op(pr->get_op());
1114 ret += generate_predicate_code(pr->get_right_pr(),schema);
1118 op_list = pr->get_op_list();
1119 cref = pr->get_combinable_ref();
1120 if(cref >= 0){ // predicate is a combinable pred reference
1121 // Trust, but verify
1122 if(pred_class.size() >= cref && pred_class[cref] >= 0){
1123 ppos = pred_pos[cref];
1124 bitmask = 1 << ppos % 32;
1125 sprintf(tmpstr,"(pref_common_pred_val_%d_%d & %u)",pred_class[cref],ppos/32,bitmask);
1131 ret = pr->get_op() + "(";
1132 if (pr->is_sampling_fcn) {
1133 ret += "t->sampling_rate";
1134 if (!op_list.empty())
1137 for(o=0;o<op_list.size();++o){
1138 if(o>0) ret += ", ";
1139 if(op_list[o]->get_data_type()->is_buffer_type() && (! (op_list[o]->is_handle_ref()) ) )
1141 ret += generate_se_code(op_list[o],schema);
1146 fprintf(stderr,"INTERNAL ERROR in generate_predicate_code, line %d, character %d, unknown predicate operator type %d\n",
1147 pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
1148 return("ERROR in generate_predicate_code");
1153 static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *dt){
1156 if(dt->complex_comparison(dt) ){
1157 ret += dt->get_equals_fcn(dt);
1159 if(dt->is_buffer_type() ) ret += "&";
1162 if(dt->is_buffer_type() ) ret += "&";
1174 //static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
1177 // if(dt->complex_comparison(dt) ){
1178 // ret += dt->get_equals_fcn(dt);
1180 // if(dt->is_buffer_type() ) ret += "&";
1183 // if(dt->is_buffer_type() ) ret += "&";
1195 // Here I assume that only MIN and MAX aggregates can be computed
1196 // over BUFFER data types.
1198 static string generate_aggr_update(string var, aggregate_table *atbl,int aidx, table_list *schema){
1199 string retval = "\t\t";
1200 string op = atbl->get_op(aidx);
1203 if(! atbl->is_builtin(aidx)) {
1205 retval += op+"_LFTA_AGGR_UPDATE_(";
1206 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1207 retval+="("+var+")";
1208 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1209 for(o=0;o<opl.size();++o){
1211 if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1213 retval += generate_se_code(opl[o], schema);
1220 // Built-in aggregate processing.
1222 data_type *dt = atbl->get_data_type(aidx);
1226 retval.append("++;\n");
1231 retval.append(" += ");
1232 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1233 retval.append(";\n");
1237 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1238 retval.append(tmpstr);
1239 if(dt->complex_comparison(dt)){
1240 if(dt->is_buffer_type())
1241 sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1243 sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1245 sprintf(tmpstr,"\t\tif(aggr_tmp_%d < %s)\n",aidx,var.c_str());
1247 retval.append(tmpstr);
1248 if(dt->is_buffer_type()){
1249 sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1251 sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1253 retval.append(tmpstr);
1258 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1259 retval.append(tmpstr);
1260 if(dt->complex_comparison(dt)){
1261 if(dt->is_buffer_type())
1262 sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1264 sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1266 sprintf(tmpstr,"\t\tif(aggr_tmp_%d > %s)\n",aidx,var.c_str());
1268 retval.append(tmpstr);
1269 if(dt->is_buffer_type()){
1270 sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1272 sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1274 retval.append(tmpstr);
1279 if(op == "AND_AGGR"){
1281 retval.append(" &= ");
1282 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1283 retval.append(";\n");
1286 if(op == "OR_AGGR"){
1288 retval.append(" |= ");
1289 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1290 retval.append(";\n");
1293 if(op == "XOR_AGGR"){
1295 retval.append(" ^= ");
1296 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1297 retval.append(";\n");
1300 fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_update.\n",op.c_str());
1301 return("ERROR: aggregate not recognized: "+op);
1307 static string generate_aggr_init(string var, aggregate_table *atbl,int aidx, table_list *schema){
1309 string op = atbl->get_op(aidx);
1312 if(! atbl->is_builtin(aidx)) {
1314 retval += "\t\t"+op+"_LFTA_AGGR_INIT_(";
1315 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1316 retval+="("+var+"));\n";
1318 retval += "\t"+atbl->get_op(aidx)+"_LFTA_AGGR_UPDATE_(";
1319 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1320 retval+="("+var+")";
1321 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1322 for(o=0;o<opl.size();++o){
1324 if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1326 retval += generate_se_code(opl[o],schema);
1332 // Built-in aggregate processing.
1335 data_type *dt = atbl->get_data_type(aidx);
1338 retval = "\t\t"+var;
1339 retval.append(" = 1;\n");
1343 if(op == "SUM" || op == "MIN" || op == "MAX" || op == "AND_AGGR" ||
1344 op == "OR_AGGR" || op == "XOR_AGGR"){
1345 if(dt->is_buffer_type()){
1346 sprintf(tmpstr,"\t\taggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1347 retval.append(tmpstr);
1348 sprintf(tmpstr,"\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_assign_copy().c_str(),var.c_str(),aidx);
1349 retval.append(tmpstr);
1351 retval = "\t\t"+var;
1353 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema));
1354 retval.append(";\n");
1359 fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_init.\n",op.c_str());
1360 return("ERROR: aggregate not recognized: "+op);
1364 ////////////////////////////////////////////////////////////
1367 string generate_preamble(table_list *schema, //map<string,string> &int_fcn_defs,
1368 std::string &node_name, std::string &schema_embed_str){
1369 // Include these only once, not once per lfta
1370 // string ret = "#include \"rts.h\"\n";
1371 // ret += "#include \"fta.h\"\n\n");
1373 string ret = "#ifndef LFTA_IN_NIC\n";
1374 ret += "char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n";
1375 ret += "#include<stdio.h>\n";
1376 ret += "#include <limits.h>\n";
1377 ret += "#include <float.h>\n";
1378 ret += "#include <sys/stat.h>\n";
1379 ret += "#include \"rdtsc.h\"\n";
1388 string generate_tuple_from_aggr(string node_name, table_list *schema, string idx){
1390 // need to create and output the tuple.
1391 string ret = "/*\t\tCreate an output tuple for the aggregate being kicked out \t*/\n";
1392 // Check for any UDAFs with LFTA_BAILOUT
1393 ret += "\tlfta_bailout = 0;\n";
1394 for(a=0;a<aggr_tbl->size();a++){
1395 if(aggr_tbl->has_bailout(a)){
1396 ret += "\tlfta_bailout+="+aggr_tbl->get_op(a)+"_LFTA_AGGR_BAILOUT_(";
1397 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1398 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1401 ret += "\tif(! lfta_bailout){\n";
1403 // First, compute the size of the tuple.
1405 // Unpack UDAF return values
1406 for(a=0;a<aggr_tbl->size();a++){
1407 if(! aggr_tbl->is_builtin(a)){
1408 ret += "\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_OUTPUT_(&(udaf_ret"+int_to_string(a)+"),";
1409 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1410 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1416 // Unpack partial fcns ref'd by the select clause.
1417 if(sl_fcns_start != sl_fcns_end){
1418 ret += "\t\tunpack_failed = 0;\n";
1419 for(p=sl_fcns_start;p<sl_fcns_end;p++){
1420 if(is_partial_fcn[p]){
1421 ret += "\t" + unpack_partial_fcn_fm_aggr(partial_fcns[p], p,
1422 "t->aggr_table["+idx+"].",schema);
1423 ret += "\t\tif(retval) unpack_failed = 1;\n";
1426 // BEGIN don't allocate tuple if
1427 ret += "\t\tif( unpack_failed == 0 ){\n"; // unpack failed.
1430 // Unpack any BUFFER type selections into temporaries
1431 // so that I can compute their size and not have
1432 // to recompute their value during tuple packing.
1433 // I can use regular assignment here because
1434 // these temporaries are non-persistent.
1436 for(s=0;s<sl_list.size();s++){
1437 data_type *sdt = sl_list[s]->get_data_type();
1438 if(sdt->is_buffer_type()){
1439 sprintf(tmpstr,"\t\t\tselvar_%d = ",s);
1441 ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1447 // The size of the tuple is the size of the tuple struct plus the
1448 // size of the buffers to be copied in.
1450 ret += "\t\t\ttuple_size = sizeof( struct ";
1451 ret += generate_tuple_name(node_name);
1453 for(s=0;s<sl_list.size();s++){
1454 data_type *sdt = sl_list[s]->get_data_type();
1455 if(sdt->is_buffer_type()){
1456 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
1463 ret += "\t\t\ttuple = allocate_tuple(f, tuple_size );\n";
1464 ret += "\t\t\tif( tuple != NULL){\n";
1467 // Test passed, make assignments to the tuple.
1469 ret += "\t\t\t\ttuple_pos = sizeof( struct ";
1470 ret += generate_tuple_name(node_name) ;
1473 // Mark tuple as REGULAR_TUPLE
1474 ret += "\n\t\t\t\ttuple->tuple_type = REGULAR_TUPLE;\n";
1476 for(s=0;s<sl_list.size();s++){
1477 data_type *sdt = sl_list[s]->get_data_type();
1478 if(sdt->is_buffer_type()){
1479 sprintf(tmpstr,"\t\t\t\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
1481 sprintf(tmpstr,"\t\t\t\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
1484 sprintf(tmpstr,"\t\t\t\ttuple->tuple_var%d = ",s);
1486 // if(sdt->needs_hn_translation())
1487 // ret += sdt->hton_translation() +"( ";
1488 ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1489 // if(sdt->needs_hn_translation())
1496 ret += "\t\t\t\tpost_tuple(tuple);\n";
1497 ret += "\t\t\t\t#ifdef LFTA_STATS\n";
1498 ret+="\t\t\t\tt->out_tuple_cnt++;\n";
1499 ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
1500 ret += "\t\t\t\t#endif\n\n";
1503 if(sl_fcns_start != sl_fcns_end) // END don't allocate tuple if
1504 ret += "\t\t}\n"; // unpack failed.
1507 // Need to release memory held by BUFFER types.
1510 for(g=0;g<gb_tbl->size();g++){
1511 data_type *gdt = gb_tbl->get_data_type(g);
1512 if(gdt->is_buffer_type()){
1513 sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].gb_var%d));\n",gdt->get_buffer_destroy().c_str(),idx.c_str(),g);
1517 for(a=0;a<aggr_tbl->size();a++){
1518 if(aggr_tbl->is_builtin(a)){
1519 data_type *adt = aggr_tbl->get_data_type(a);
1520 if(adt->is_buffer_type()){
1521 sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].aggr_var%d));\n",adt->get_buffer_destroy().c_str(),idx.c_str(),a);
1525 ret += "\t\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_DESTROY_(";
1526 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1527 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1531 ret += "\t\tt->n_aggrs--;\n";
1537 string generate_gb_match_test(string idx){
1539 string ret="\tif (IS_FILLED(t->aggr_table_bitmap, "+idx+") && IS_NEW(t->aggr_table_bitmap,"+idx+")";
1540 if(gb_tbl->size()>0){
1541 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
1544 // Next, scan list for a match on the group-by attributes.
1545 string rhs_op, lhs_op;
1546 for(g=0;g<gb_tbl->size();g++){
1549 sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
1550 sprintf(tmpstr,"t->aggr_table[%s].gb_var%d",idx.c_str(),g); rhs_op = tmpstr;
1551 ret += generate_equality_test(lhs_op, rhs_op, gb_tbl->get_data_type(g) );
1561 string generate_gb_update(string node_name, table_list *schema, string idx, bool has_udaf){
1565 ret += "/*\t\tMatch found : update in place.\t*/\n";
1568 for(a=0;a<aggr_tbl->size();a++){
1569 sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1570 ret += generate_aggr_update(tmpstr,aggr_tbl,a, schema);
1571 if(! aggr_tbl->is_builtin(a)) has_udaf = true;
1574 // garbage collect copied buffer type gb attrs.
1575 for(g=0;g<gb_tbl->size();g++){
1576 data_type *gdt = gb_tbl->get_data_type(g);
1577 if(gdt->is_buffer_type()){
1578 sprintf(tmpstr,"\t\t\t%s(&(gb_attr_%d));\n",gdt->get_buffer_destroy().c_str(),g);
1585 bool first_udaf = true;
1588 for(a=0;a<aggr_tbl->size();a++){
1589 if(! aggr_tbl->is_builtin(a)){
1590 if(! first_udaf)ret += " || ";
1591 else first_udaf = false;
1592 ret += aggr_tbl->get_op(a)+"_LFTA_AGGR_FLUSHME_(";
1593 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1594 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"))";
1598 ret+=" fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
1599 ret += generate_tuple_from_aggr(node_name,schema,idx);
1600 ret += "\t\tt->aggr_table_hashmap["+idx+"] &= ~SLOT_FILLED;\n";
1607 string generate_init_group( table_list *schema, string idx){
1609 string ret="\t\t\tt->aggr_table_hashmap["+idx+"] = hash2 | SLOT_FILLED | gen_val;\n";
1610 // Fill up the aggregate block.
1611 for(g=0;g<gb_tbl->size();g++){
1612 sprintf(tmpstr,"\t\t\tt->aggr_table[%s].gb_var%d = gb_attr_%d;\n",idx.c_str(),g,g);
1615 for(a=0;a<aggr_tbl->size();a++){
1616 sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1617 ret += generate_aggr_init(tmpstr, aggr_tbl,a, schema);
1619 ret+="\t\tt->n_aggrs++;\n";
1624 string generate_fta_flush(string node_name, table_list *schema,
1625 ext_fcn_list *Ext_fcns){
1628 string select_var_defs ;
1631 // Flush from previous epoch
1633 ret+="static void fta_aggr_flush_old_"+node_name+"(struct FTA *f, unsigned int nflush){\n";
1635 ret += "\tgs_int32_t tuple_size, tuple_pos;\n";
1636 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1637 ret += "\tint i, lfta_bailout;\n";
1638 ret += "\tunsigned int gen_val;\n";
1640 ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1641 ret += generate_fta_name(node_name)+" *) f;\n";
1646 // Variables needed to store selected attributes of BUFFER type
1647 // temporarily, in order to compute their size for storage
1648 // in an output tuple.
1650 select_var_defs = "";
1651 for(s=0;s<sl_list.size();s++){
1652 data_type *sdt = sl_list[s]->get_data_type();
1653 if(sdt->is_buffer_type()){
1654 sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
1655 select_var_defs.append(tmpstr);
1658 if(select_var_defs != ""){
1659 ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
1660 ret += select_var_defs;
1664 // Variables to store results of partial functions.
1665 if(sl_fcns_start != sl_fcns_end){
1666 ret += "/*\t\tVariables to store the results of partial functions.\t*/\n";
1667 for(p=sl_fcns_start;p<sl_fcns_end;p++){
1668 sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
1669 partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
1672 ret += "\tgs_retval_t retval = 0;\n\tint unpack_failed = 0;\n;";
1675 // Variables for udaf output temporaries
1676 bool no_udaf = true;
1678 for(a=0;a<aggr_tbl->size();a++){
1679 if(! aggr_tbl->is_builtin(a)){
1681 ret+="/*\t\tUDAF output vars.\t*/\n";
1684 int afcn_id = aggr_tbl->get_fcn_id(a);
1685 data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
1686 sprintf(tmpstr,"udaf_ret%d", a);
1687 ret+="\t"+adt->make_cvar(tmpstr)+";\n";
1692 // ret+="\tt->flush_finished=1; /* flush will be completed */\n";
1694 ret+="\tgen_val = t->generation & SLOT_GEN_BITS;\n";
1695 ret+="\tfor (i=t->flush_pos; (i < t->max_aggrs) && t->n_aggrs && nflush>0; ++i){\n";
1696 ret+="\t\tif ( (t->aggr_table_hashmap[i] & SLOT_FILLED) && (((t->aggr_table_hashmap[i] & SLOT_GEN_BITS) != gen_val ) || (";
1699 for(g=0;g<gb_tbl->size();g++){
1700 data_type *gdt = gb_tbl->get_data_type(g);
1701 if(gdt->is_temporal()){
1702 if(first_g) first_g=false; else ret+=" || ";
1703 ret += "t->last_gb_"+int_to_string(g)+" > t->aggr_table[i].gb_var"+int_to_string(g)+" ";
1707 ret+="\t\t\tt->aggr_table_hashmap[i] = 0;\n";
1709 "#ifdef LFTA_STATS\n"
1710 "\t\t\tt->eviction_cnt++;\n"
1715 ret+=generate_tuple_from_aggr(node_name,schema,"i");
1717 // ret+="\t\t\tt->n_aggrs--;\n"; // done in generate_tuple_from_aggr
1718 ret+="\t\t\tnflush--;\n";
1721 ret+="\tt->flush_pos=i;\n";
1722 ret+="\tif(t->n_aggrs == 0) {\n";
1723 ret+="\t\tt->flush_pos = t->max_aggrs;\n";
1726 ret+="\tif(t->flush_pos == t->max_aggrs) {\n";
1728 for(int g=0;g<gb_tbl->size();g++){
1729 data_type *dt = gb_tbl->get_data_type(g);
1730 if(dt->is_temporal()){
1731 data_type *gdt = gb_tbl->get_data_type(g);
1732 if(!gdt->is_buffer_type()){
1733 sprintf(tmpstr,"\t\tt->last_flushed_gb_%d = t->flush_start_gb_%d;\n",g,g);
1738 ret += "\t}\n}\n\n";
1743 // TODO Remove sprintf to perform string catenation
1744 string generate_fta_load_params(string node_name){
1746 vector<string> param_names = param_tbl->get_param_names();
1748 string ret = "static int load_params_"+node_name+"(struct "+generate_fta_name(node_name);
1749 ret += " *t, int sz, void *value, int initial_call){\n";
1750 ret += "\tint pos=0;\n";
1751 ret += "\tint data_pos;\n";
1753 for(p=0;p<param_names.size();p++){
1754 data_type *dt = param_tbl->get_data_type(param_names[p]);
1755 if(dt->is_buffer_type()){
1756 sprintf(tmpstr,"\t%s tmp_var_%s;\n",dt->get_cvar_type().c_str(), param_names[p].c_str() );
1758 sprintf(tmpstr,"\t%s access_var_%s;\n",dt->get_tuple_cvar_type().c_str(), param_names[p].c_str() );
1765 ret += "\n\tdata_pos = ";
1766 for(p=0;p<param_names.size();p++){
1767 if(p>0) ret += " + ";
1768 data_type *dt = param_tbl->get_data_type(param_names[p]);
1770 ret += dt->get_tuple_cvar_type();
1774 ret += "\tif(data_pos > sz) return 1;\n\n";
1777 for(p=0;p<param_names.size();p++){
1778 data_type *dt = param_tbl->get_data_type(param_names[p]);
1779 if(dt->is_buffer_type()){
1780 sprintf(tmpstr,"\taccess_var_%s = *( (%s *)((char *)value+pos) );\n",param_names[p].c_str(), dt->get_tuple_cvar_type().c_str() );
1782 switch( dt->get_type() ){
1784 // ret += "\ttmp_var_"+param_names[p]+".data = ntohl( tmp_var_"+param_names[p]+".data );\n"; // ntoh conversion
1785 // ret += "\ttmp_var_"+param_names[p]+".length = ntohl( tmp_var_"+param_names[p]+".length );\n"; // ntoh conversion
1786 sprintf(tmpstr,"\tif( (access_var_%s.offset) + access_var_%s.length > sz) return 1;\n",param_names[p].c_str(), param_names[p].c_str() );
1788 sprintf(tmpstr,"\ttmp_var_%s.data = (gs_sp_t)(value) + access_var_%s.offset ;\n",param_names[p].c_str(), param_names[p].c_str() );
1790 sprintf(tmpstr,"\ttmp_var_%s.length = access_var_%s.length ;\n",param_names[p].c_str(), param_names[p].c_str() );
1794 fprintf(stderr,"ERROR: parameter %s is of type %s, a buffered type, but I don't know how to unpack it as a parameter.\n",param_names[p].c_str(), dt->get_type_str().c_str() );
1798 // First, destroy the old
1799 ret += "\tif(! initial_call)\n";
1800 sprintf(tmpstr,"\t\t%s(&(t->param_%s));\n",dt->get_buffer_destroy().c_str(),param_names[p].c_str());
1802 // Next, create the new.
1803 sprintf(tmpstr,"\t%s((struct FTA *)t, &(t->param_%s), &tmp_var_%s);\n", dt->get_buffer_assign_copy().c_str(), param_names[p].c_str(), param_names[p].c_str() );
1806 // if(dt->needs_hn_translation()){
1807 // sprintf(tmpstr,"\tt->param_%s = %s( *( (%s *)( (char *)value+pos) ) );\n",
1808 // param_names[p].c_str(), dt->ntoh_translation().c_str(), dt->get_cvar_type().c_str() );
1810 sprintf(tmpstr,"\tt->param_%s = *( (%s *)( (char *)value+pos) );\n",
1811 param_names[p].c_str(), dt->get_cvar_type().c_str() );
1815 sprintf(tmpstr,"\tpos += sizeof( %s );\n",dt->get_cvar_type().c_str() );
1819 // Register the pass-by-handle parameters
1821 ret += "/* register and de-register the pass-by-handle parameters */\n";
1824 for(ph=0;ph<param_handle_table.size();++ph){
1825 data_type pdt(param_handle_table[ph]->type_name);
1826 switch(param_handle_table[ph]->val_type){
1832 ret += "\tif(! initial_call)\n";
1833 sprintf(tmpstr, "\t\t%s(t->handle_param_%d);\n",
1834 param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1836 sprintf(tmpstr,"\tt->handle_param_%d = %s((struct FTA *)t,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
1839 if(pdt.is_buffer_type()) ret += "&(";
1840 ret += "t->param_"+param_handle_table[ph]->param_name;
1841 if(pdt.is_buffer_type()) ret += ")";
1845 sprintf(tmpstr, "INTERNAL ERROR unknown case (%d) found when processing pass-by-handle parameter table.",param_handle_table[ph]->val_type);
1846 fprintf(stderr,"%s\n",tmpstr);
1851 ret+="\treturn 0;\n";
1860 string generate_fta_free(string node_name, bool is_aggr_query){
1862 string ret="static gs_retval_t free_fta_"+node_name+"(struct FTA *f, gs_uint32_t recursive){\n";
1863 ret+= "\tstruct "+generate_fta_name(node_name)+
1864 " * t = (struct "+generate_fta_name(node_name)+" *) f;\n";
1865 ret += "\tint i;\n";
1868 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1869 ret+="\t/* \t\tmark all groups as old */\n";
1870 ret+="\tt->generation++;\n";
1871 ret+="\tt->flush_pos = 0;\n";
1872 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1875 // Deregister the pass-by-handle parameters
1876 ret += "/* de-register the pass-by-handle parameters */\n";
1878 for(ph=0;ph<param_handle_table.size();++ph){
1879 sprintf(tmpstr, "\t%s(t->handle_param_%d);\n",
1880 param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1885 ret += "\treturn 0;\n}\n\n";
1890 string generate_fta_control(string node_name, table_list *schema, bool is_aggr_query){
1891 string ret="static gs_retval_t control_fta_"+node_name+"(struct FTA *f, gs_int32_t command, gs_int32_t sz, void *value){\n";
1892 ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1893 ret += generate_fta_name(node_name)+" *) f;\n\n";
1897 ret += "\t/* temp status tuple */\n";
1898 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1899 ret += "\tgs_int32_t tuple_size;\n";
1903 ret+="\tif(command == FTA_COMMAND_FLUSH){\n";
1905 ret+="\t\tif (!t->n_aggrs) {\n";
1906 ret+="\t\t\ttuple = allocate_tuple(f, 0);\n";
1907 ret+="\t\t\tif( tuple != NULL)\n";
1908 ret+="\t\t\t\tpost_tuple(tuple);\n";
1910 ret+="\t\t}else{\n";
1912 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1913 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1914 ret +="\t\tt->generation++;\n";
1915 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1916 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1917 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1918 ret+="\t\t\tt->flush_pos = 0;\n";
1919 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1924 if(param_tbl->size() > 0){
1926 "\tif(command == FTA_COMMAND_LOAD_PARAMS){\n"
1927 "\t\tif(load_params_"+node_name+"(t, sz, value, 0))\n"
1928 "#ifndef LFTA_IN_NIC\n"
1929 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small, ignored.\\n\");\n"
1936 "\tif(command == FTA_COMMAND_SET_SAMPLING_RATE){\n"
1937 "\t\tmemcpy(&t->sampling_rate, value, sizeof(gs_float_t));\n"
1941 ret += "\tif(command == FTA_COMMAND_FILE_DONE ){\n";
1944 ret+="\t\tif (t->n_aggrs) {\n";
1945 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1946 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1947 ret +="\t\tt->generation++;\n";
1948 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1949 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1950 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1951 ret+="\t\t\tt->flush_pos = 0;\n";
1952 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1956 ret += "\t\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+");\n";
1957 ret += "\t\ttuple = allocate_tuple(f, tuple_size );\n";
1958 ret += "\t\tif( tuple == NULL)\n\t\treturn 1;\n";
1960 /* mark tuple as EOF_TUPLE */
1961 ret += "\n\t\t/* Mark tuple as eof_tuple */\n";
1962 ret += "\t\ttuple->tuple_type = EOF_TUPLE;\n";
1963 ret += "\t\tpost_tuple(tuple);\n";
1966 ret += "\treturn 0;\n}\n\n";
1971 string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query, bool advance_uxtime){
1972 string ret="static gs_retval_t clock_fta_"+node_name+"(struct FTA *f){\n";
1973 ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1974 ret += generate_fta_name(node_name)+" *) f;\n\n";
1976 ret += "\t/* Create a temp status tuple */\n";
1977 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1978 ret += "\tgs_int32_t tuple_size;\n";
1979 ret += "\tunsigned int i;\n";
1980 ret += "\ttime_t cur_time;\n";
1981 ret += "\tint time_advanced;\n";
1982 ret += "\tstruct fta_stat stats;\n";
1986 /* copy the last seen values of temporal attributes */
1987 col_id_set temp_cids; // col ids of temp attributes in select clause
1990 /* HACK: in order to reuse the SE generation code, we need to copy
1991 * the last values of the temp attributes into new variables
1992 * which have names unpack_var_XXX_XXX
1996 col_id_set::iterator csi;
1998 for(s=0;s<sl_list.size();s++){
1999 data_type *sdt = sl_list[s]->get_data_type();
2000 if (sdt->is_temporal()) {
2001 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
2005 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2006 int tblref = (*csi).tblvar_ref;
2007 int schref = (*csi).schema_ref;
2008 string field = (*csi).field;
2009 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2010 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
2014 if (is_aggr_query) {
2015 for(g=0;g<gb_tbl->size();g++){
2016 data_type *gdt = gb_tbl->get_data_type(g);
2017 if(gdt->is_temporal()){
2018 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
2020 data_type *gdt = gb_tbl->get_data_type(g);
2021 if(gdt->is_buffer_type()){
2022 sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
2030 ret += "\ttime_advanced = 0;\n";
2032 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2033 int tblref = (*csi).tblvar_ref;
2034 int schref = (*csi).schema_ref;
2035 string field = (*csi).field;
2036 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2038 // update last seen value with the value seen
2039 ret += "\t#ifdef PREFILTER_DEFINED\n";
2040 sprintf(tmpstr,"\tif (prefilter_temp_vars.unpack_var_%s_%d > t->last_%s_%d) {\n\t\tt->last_%s_%d = prefilter_temp_vars.unpack_var_%s_%d;\n",
2041 field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref);
2043 ret += "\t\ttime_advanced = 1;\n\t}\n";
2044 ret += "\t#endif\n";
2046 // we need to pay special attention to time fields
2047 if (field == "time" || field == "timestamp" || field == "timestamp_ms"){
2048 ret += "\tcur_time = time(&cur_time);\n";
2050 if (field == "time") {
2051 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && (t->last_time_%d < (cur_time - %d))) {\n",
2054 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = cur_time - %d;\n",
2055 field.c_str(), tblref, field.c_str(), tblref, time_corr);
2056 } else if (field == "timestamp_ms") {
2057 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((t->last_timestamp_ms_%d/1000) < (cur_time - %d))) {\n",
2060 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = (cur_time - %d) * 1000;\n",
2061 field.c_str(), tblref, field.c_str(), tblref, time_corr);
2063 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((gs_uint32_t)(t->last_%s_%d>>32) < (cur_time - %d))) {\n",
2064 field.c_str(), tblref, time_corr);
2066 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = ((gs_uint64_t)(cur_time - %d))<<32;\n",
2067 field.c_str(), tblref, field.c_str(), tblref, time_corr);
2071 ret += "\t\ttime_advanced = 1;\n";
2074 sprintf(tmpstr,"\telse\n\t\tunpack_var_%s_%d = t->last_%s_%d;\n",
2075 field.c_str(), tblref, field.c_str(), tblref);
2078 sprintf(tmpstr,"\tunpack_var_%s_%d = t->last_%s_%d;\n",
2079 field.c_str(), tblref, field.c_str(), tblref);
2086 ret += "\tt->ux_time = time(&(t->ux_time));\n";
2089 // for aggregation lftas we need to check if the time was advanced beyond the current epoch
2090 if (is_aggr_query) {
2093 bool first_one = true;
2094 for(g=0;g<gb_tbl->size();g++){
2095 data_type *gdt = gb_tbl->get_data_type(g);
2096 if(gdt->is_temporal()){
2097 // To perform the test, first need to compute the value
2098 // of the temporal gb attrs.
2099 if(gdt->is_buffer_type()){
2100 // NOTE : if the SE defining the gb is anything
2101 // other than a ref to a variable, this will generate
2102 // illegal code. To be resolved with Spatch.
2103 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2104 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2106 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2107 gdt->get_buffer_assign_copy().c_str(), g, g);
2109 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2113 sprintf(tmpstr,"t->last_gb_%d",g); string lhs_op = tmpstr;
2114 sprintf(tmpstr,"gb_attr_%d",g); string rhs_op = tmpstr;
2115 if(first_one){first_one = false;} else {change_test.append(") && (");}
2116 change_test.append(generate_equality_test(lhs_op, rhs_op, gdt));
2120 ret += "\n\tif( time_advanced && !( (";
2124 ret += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
2125 ret += "\t\tif(t->flush_pos<t->max_aggrs) \n";
2126 ret += "\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2128 ret += "\t\t/* \t\tmark all groups as old */\n";
2129 ret +="\t\tt->generation++;\n";
2130 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
2131 ret += "//\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
2132 ret += "//\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
2133 ret += "\t\tt->flush_pos = 0;\n";
2135 for(g=0;g<gb_tbl->size();g++){
2136 data_type *gdt = gb_tbl->get_data_type(g);
2137 if(gdt->is_temporal()){
2138 sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g); ret += tmpstr;
2139 sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g); ret += tmpstr;
2146 ret += "\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t) + sizeof(struct fta_stat);\n";
2147 ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
2148 ret += "\tif( tuple == NULL)\n\t\treturn 1;\n";
2151 for(s=0;s<sl_list.size();s++){
2152 data_type *sdt = sl_list[s]->get_data_type();
2153 if(sdt->is_temporal()){
2155 if (sl_list[s]->is_gb()) {
2156 sprintf(tmpstr,"\tt->last_flushed_gb_%d = (t->n_aggrs) ? t->last_flushed_gb_%d : %s;\n",sl_list[s]->get_gb_ref(), sl_list[s]->get_gb_ref(), generate_se_code(sl_list[s],schema).c_str());
2160 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2162 // if(sdt->needs_hn_translation())
2163 // ret += sdt->hton_translation() +"( ";
2164 if (sl_list[s]->is_gb()) {
2165 sprintf(tmpstr, "t->last_flushed_gb_%d",sl_list[s]->get_gb_ref());
2168 ret += generate_se_code(sl_list[s],schema);
2170 // if(sdt->needs_hn_translation())
2176 /* mark tuple as temporal */
2177 ret += "\n\t/* Mark tuple as temporal */\n";
2178 ret += "\ttuple->tuple_type = TEMPORAL_TUPLE;\n";
2180 ret += "\n\t/* Copy trace id */\n";
2181 ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+"), &t->trace_id, sizeof(gs_uint64_t));\n\n";
2183 ret += "\n\t/* Populate runtime stats */\n";
2184 ret += "\tstats.ftaid = f->ftaid;\n";
2185 ret += "\tstats.in_tuple_cnt = t->in_tuple_cnt;\n";
2186 ret += "\tstats.out_tuple_cnt = t->out_tuple_cnt;\n";
2187 ret += "\tstats.out_tuple_sz = t->out_tuple_sz;\n";
2188 ret += "\tstats.accepted_tuple_cnt = t->accepted_tuple_cnt;\n";
2189 ret += "\tstats.cycle_cnt = t->cycle_cnt;\n";
2190 ret += "\tstats.collision_cnt = t->collision_cnt;\n";
2191 ret += "\tstats.eviction_cnt = t->eviction_cnt;\n";
2192 ret += "\tstats.sampling_rate = t->sampling_rate;\n";
2194 ret += "\n#ifdef LFTA_PROFILE\n";
2195 ret += "\n\t/* Print stats */\n";
2196 ret += "\tfprintf(stderr, \"STATS " + node_name + " \");\n";
2197 ret += "\tfprintf(stderr, \"in_tuple_cnt= %u \", t->in_tuple_cnt);\n";
2198 ret += "\tfprintf(stderr, \"out_tuple_cnt= %u \", t->out_tuple_cnt);\n";
2199 ret += "\tfprintf(stderr, \"out_tuple_sz= %u \", t->out_tuple_sz);\n";
2200 ret += "\tfprintf(stderr, \"accepted_tuple_cnt= %u \", t->accepted_tuple_cnt);\n";
2201 ret += "\tfprintf(stderr, \"cycle_cnt= %llu \", t->cycle_cnt);\n";
2202 ret += "\tfprintf(stderr, \"cycle_per_in_tuple= %lf \", ((double)t->cycle_cnt)/((double)t->in_tuple_cnt));\n";
2203 ret += "\tfprintf(stderr, \"collision_cnt= %d\\n\", t->collision_cnt);\n\n";
2204 ret += "\tfprintf(stderr, \"eviction_cnt= %d\\n\", t->eviction_cnt);\n\n";
2205 ret += "\n#endif\n";
2208 ret += "\n\t/* Copy stats */\n";
2209 ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t), &stats, sizeof(fta_stat));\n\n";
2210 ret+="\tpost_tuple(tuple);\n";
2212 ret += "\n\t/* Send a heartbeat message to clearinghouse */\n";
2213 ret += "\n\t/* Disable heartbeats for now to avoid overloading clearinghouse */\n";
2214 ret += "\t/* fta_heartbeat(f->ftaid, t->trace_id++, 1, &stats); */\n";
2216 ret += "\n\t/* Reset runtime stats */\n";
2217 ret += "\tt->in_tuple_cnt = 0;\n";
2218 ret += "\tt->out_tuple_cnt = 0;\n";
2219 ret += "\tt->out_tuple_sz = 0;\n";
2220 ret += "\tt->accepted_tuple_cnt = 0;\n";
2221 ret += "\tt->cycle_cnt = 0;\n";
2222 ret += "\tt->collision_cnt = 0;\n";
2223 ret += "\tt->eviction_cnt = 0;\n";
2225 ret += "\treturn 0;\n}\n\n";
2231 // accept processing before the where clause,
2232 // do flush processwing.
2233 string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *schema, col_id_set &unpacked_cids, string &temporal_flush){
2237 string ret="\n/*\tslow flush\t*/\n";
2238 string slow_flush_str = fs->get_val_of_def("slow_flush");
2239 int n_slow_flush = atoi(slow_flush_str.c_str());
2240 if(n_slow_flush <= 0) n_slow_flush = 2;
2241 if(n_slow_flush > 1){
2242 ret += "\tt->flush_ctr++;\n";
2243 ret += "\tif(t->flush_ctr >= "+int_to_string(n_slow_flush)+"){\n";
2244 ret += "\t\tt->flush_ctr = 0;\n";
2245 ret+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n";
2248 ret+="\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n\n";
2253 bool first_one = true;
2255 col_id_set flush_cids; // col ids accessed when computing flush variables.
2256 // unpack them at temporal flush test time.
2257 temporal_flush = "";
2260 for(g=0;g<gb_tbl->size();g++){
2261 data_type *gdt = gb_tbl->get_data_type(g);
2262 if(gdt->is_temporal()){
2263 gather_se_col_ids(gb_tbl->get_def(g), flush_cids, gb_tbl);
2265 // To perform the test, first need to compute the value
2266 // of the temporal gb attrs.
2267 if(gdt->is_buffer_type()){
2268 // NOTE : if the SE defining the gb is anything
2269 // other than a ref to a variable, this will generate
2270 // illegal code. To be resolved with Spatch.
2271 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2272 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2273 temporal_flush += tmpstr;
2274 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2275 gdt->get_buffer_assign_copy().c_str(), g, g);
2277 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2279 temporal_flush += tmpstr;
2280 // END computing the value of the temporal GB attr.
2283 sprintf(tmpstr,"t->last_gb_%d",g); string lhs_op = tmpstr;
2284 sprintf(tmpstr,"gb_attr_%d",g); string rhs_op = tmpstr;
2285 if(first_one){first_one = false;} else {change_test.append(") && (");}
2286 change_test += generate_equality_test(lhs_op, rhs_op, gdt);
2289 if(!first_one){ // will be false iff. there is a temporal GB attribute
2290 temporal_flush += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
2291 temporal_flush += "\tif( !( (";
2292 temporal_flush += change_test;
2293 temporal_flush += ") ) ){\n";
2295 // temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2296 temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs){ \n";
2297 temporal_flush+="\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2298 temporal_flush+="\t\t}\n";
2299 temporal_flush+="\t\t/* \t\tmark all groups as old */\n";
2300 temporal_flush+="\t\tt->generation++;\n";
2301 temporal_flush+="\t\tt->flush_pos = 0;\n";
2304 // Now set the saved temporal value of the gb to the
2305 // current value of the gb. Only for simple types,
2306 // not for buffer types -- but the strings are not
2307 // temporal in any case.
2309 for(g=0;g<gb_tbl->size();g++){
2310 data_type *gdt = gb_tbl->get_data_type(g);
2311 if(gdt->is_temporal()){
2312 if(gdt->is_buffer_type()){
2314 fprintf(stderr,"ERROR : can't handle temporal buffer types, ignoring in buffer flush control.\n");
2316 sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);
2317 temporal_flush += tmpstr;
2318 sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);
2319 temporal_flush += tmpstr;
2323 temporal_flush += "\t}\n\n";
2326 // Unpack all the temporal attributes referenced in select clause
2327 // and update the last value of the attribute
2328 col_id_set temp_cids; // col ids of temp attributes in select clause
2329 col_id_set::iterator csi;
2331 for(s=0;s<sl_list.size();s++){
2332 data_type *sdt = sl_list[s]->get_data_type();
2333 if (sdt->is_temporal()) {
2334 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
2338 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2339 if(unpacked_cids.count((*csi)) == 0){
2340 int tblref = (*csi).tblvar_ref;
2341 int schref = (*csi).schema_ref;
2342 string field = (*csi).field;
2343 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2345 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2346 sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n",
2347 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2349 ret += "\tif(retval) return 1;\n";
2351 sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
2354 unpacked_cids.insert( (*csi) );
2359 // Do the flush here if this is a real_time query
2360 string rt_level = fs->get_val_of_def("real_time");
2361 if(rt_level != "" && temporal_flush != ""){
2362 for(csi=flush_cids.begin(); csi != flush_cids.end();++csi){
2363 if(unpacked_cids.count((*csi)) == 0){
2364 int tblref = (*csi).tblvar_ref;
2365 int schref = (*csi).schema_ref;
2366 string field = (*csi).field;
2367 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2369 sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n",
2370 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2372 ret += "\tif(retval) return 1;\n";
2374 unpacked_cids.insert( (*csi) );
2377 ret += temporal_flush;
2383 string generate_sel_accept_body(qp_node *fs, string node_name, table_list *schema){
2388 /////////////// Processing for filter-only query
2390 // test passed : create the tuple, then assign to it.
2391 ret += "/*\t\tCreate and post the tuple\t*/\n";
2393 // Unpack partial fcns ref'd by the select clause.
2394 // Its a kind of a WHERE clause ...
2395 for(p=sl_fcns_start;p<sl_fcns_end;p++){
2396 if(fcn_ref_cnt[p] > 1){
2397 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2399 if(is_partial_fcn[p]){
2400 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2401 ret += "\tif(retval) goto end;\n";
2403 if(fcn_ref_cnt[p] > 1){
2404 if(!is_partial_fcn[p]){
2405 ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2407 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2412 // increment the counter of accepted tuples
2413 ret += "\n\t#ifdef LFTA_STATS\n";
2414 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2415 ret += "\t#endif\n\n";
2417 // First, compute the size of the tuple.
2419 // Unpack any BUFFER type selections into temporaries
2420 // so that I can compute their size and not have
2421 // to recompute their value during tuple packing.
2422 // I can use regular assignment here because
2423 // these temporaries are non-persistent.
2425 for(s=0;s<sl_list.size();s++){
2426 data_type *sdt = sl_list[s]->get_data_type();
2427 if(sdt->is_buffer_type()){
2428 sprintf(tmpstr,"\tselvar_%d = ",s);
2430 ret += generate_se_code(sl_list[s],schema);
2436 // The size of the tuple is the size of the tuple struct plus the
2437 // size of the buffers to be copied in.
2439 ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
2440 for(s=0;s<sl_list.size();s++){
2441 data_type *sdt = sl_list[s]->get_data_type();
2442 if(sdt->is_buffer_type()){
2443 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
2450 ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
2451 ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
2453 // Test passed, make assignments to the tuple.
2455 ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
2457 // Mark tuple as REGULAR_TUPLE
2458 ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
2461 for(s=0;s<sl_list.size();s++){
2462 data_type *sdt = sl_list[s]->get_data_type();
2463 if(sdt->is_buffer_type()){
2464 sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
2466 sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
2469 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2471 // if(sdt->needs_hn_translation())
2472 // ret += sdt->hton_translation() +"( ";
2473 ret += generate_se_code(sl_list[s],schema);
2474 // if(sdt->needs_hn_translation())
2482 ret += "\tpost_tuple(tuple);\n";
2484 // Increment the counter of posted tuples
2485 ret += "\n\t#ifdef LFTA_STATS\n";
2486 ret += "\tt->out_tuple_cnt++;\n";
2487 ret+="\tt->out_tuple_sz+=tuple_size;\n";
2488 ret += "\t#endif\n\n";
2495 // TODO Ensure that postfilter predicates are being generated
2496 string generate_fj_accept_body(filter_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
2502 unsigned int window_len = fs->temporal_range;
2503 unsigned int n_bloom = 11;
2504 string n_bloom_str = fs->get_val_of_def("num_bloom");
2505 int tmp_n_bloom = atoi(n_bloom_str.c_str());
2507 n_bloom = tmp_n_bloom+1;
2508 float bloom_width = (window_len+1.0)/(1.0*n_bloom-1);
2509 sprintf(tmpstr,"%f",bloom_width);
2510 string bloom_width_str = tmpstr;
2512 if(window_len < n_bloom){
2513 n_bloom = window_len+1;
2514 bloom_width_str = "1";
2518 // Grab the current window time
2519 scalarexp_t winvar(fs->temporal_var);
2520 ret += "\tcurr_fj_ts = "+generate_se_code(&winvar,schema)+";\n";
2522 int bf_exp_size = 12; // base-2 log of number of bits
2523 string bloom_len_str = fs->get_val_of_def("bloom_size");
2524 int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
2525 if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
2526 bf_exp_size = tmp_bf_exp_size;
2528 int bf_bit_size = 1 << bf_exp_size;
2529 int bf_byte_size = bf_bit_size / (8*sizeof(char));
2531 unsigned int ht_size = 4096;
2532 string ht_size_s = fs->get_val_of_def("aggregate_slots");
2533 int tmp_ht_size = atoi(ht_size_s.c_str());
2534 if(tmp_ht_size > 1024){
2535 unsigned int hs = 1; // make it power of 2
2538 tmp_ht_size = tmp_ht_size >> 1;
2545 for(i=0;i<bf_exp_size;i++)
2546 bf_mask = (bf_mask << 1) | 1;
2548 for(i=ht_size;i>1;i=i>>1)
2549 bf_mask = (bf_mask << 1) | 1;
2553 printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=%d, bf_byte_size=%d, ht_size=%d, ht_size_s=%s, bf_mask=%d\n",
2556 bloom_width_str.c_str(),
2568 // If this is a bloom-filter fj, first test if the
2569 // bloom filter needs to be advanced.
2570 // SET_BF_EMPTY(table,number of bloom filters,bloom filter index,bit index)
2571 // t->bf_size : number of bits in bloom filter
2573 // TODO: Don't iterate more than n_bloom times!
2574 // As written, its possible to wrap around many times.
2577 "// Clean out old bloom filters if needed.\n"
2578 "// TODO vectorize this ? \n"
2579 " if(t->first_exec){\n"
2580 " t->first_exec = 0;\n"
2581 " t->last_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2582 " t->last_bloom_pos = t->last_bin % "+int_to_string(n_bloom)+";\n"
2584 " curr_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2585 " if(curr_bin != t->last_bin){\n"
2586 " for(the_bin=t->last_bin+1;the_bin<=curr_bin;the_bin++){\n"
2587 " t->last_bloom_pos++;\n"
2588 " if(t->last_bloom_pos >= "+int_to_string(n_bloom)+")\n"
2589 " t->last_bloom_pos = 0;\n"
2590 " tmp_i = t->last_bloom_pos;\n"
2591 " for(j=0;j<"+int_to_string(bf_bit_size)+";j++){\n"
2592 " SET_BF_EMPTY(t->bf_table, "+int_to_string(n_bloom)+", tmp_i,j);\n"
2596 " t->last_bin = curr_bin;\n"
2602 //-----------------------------------------------------------------
2603 // First, determine whether to do S (filter stream) processing.
2606 "// S (filtering stream) predicate, should it be processed?\n"
2609 // Sort S preds based on cost.
2610 vector<cnf_elem *> s_filt = fs->pred_t1;
2611 col_id_set::iterator csi;
2612 if(s_filt.size() > 0){
2614 // Unpack fields ref'd in the S pred
2615 for(w=0;w<s_filt.size();++w){
2616 col_id_set this_pred_cids;
2617 gather_pr_col_ids(s_filt[w]->pr, this_pred_cids, gb_tbl);
2618 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2619 if(unpacked_cids.count( (*csi) ) == 0){
2620 int tblref = (*csi).tblvar_ref;
2621 int schref = (*csi).schema_ref;
2622 string field = (*csi).field;
2623 ret += generate_unpack_code(tblref,schref,field,schema,node_name,"end_s");
2624 unpacked_cids.insert( (*csi) );
2630 // Sort by evaluation cost.
2631 // First, estimate evaluation costs
2632 // Eliminate predicates covered by the prefilter (those in s_pids).
2633 // I need to do it before the sort becuase the indices refer
2634 // to the position in the unsorted list.
2635 vector<cnf_elem *> tmp_wh;
2636 for(w=0;w<s_filt.size();++w){
2637 compute_cnf_cost(s_filt[w],Ext_fcns);
2638 tmp_wh.push_back(s_filt[w]);
2642 sort(s_filt.begin(), s_filt.end(), compare_cnf_cost());
2644 // Now generate the predicates.
2645 for(w=0;w<s_filt.size();++w){
2646 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,s_filt[w]->cost);
2649 // Find partial fcns ref'd in this cnf element
2651 collect_partial_fcns_pr(s_filt[w]->pr, pfcn_refs);
2652 // Since set<..> is a "Sorted Associative Container",
2653 // we can walk through it in sorted order by walking from
2654 // begin() to end(). (and the partial fcns must be
2655 // evaluated in this order).
2656 set<int>::iterator si;
2658 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2659 if(fcn_ref_cnt[(*si)] > 1){
2660 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2662 if(is_partial_fcn[(*si)]){
2663 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2664 ret += "\t\tif(retval) goto end_s;\n";
2666 if(fcn_ref_cnt[(*si)] > 1){
2667 if(!is_partial_fcn[(*si)]){
2668 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2669 // Testing for S is a side branch.
2670 // I don't want a cacheable partial function to be
2671 // marked as evaluated. Therefore I mark the function
2672 // as evalauted ONLY IF it is not partial.
2673 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2679 ret += "\tif( !("+generate_predicate_code(s_filt[w]->pr,schema)+
2680 ") ) goto end_s;\n";
2683 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2686 for(p=0;p<fs->hash_eq.size();++p)
2687 ret += "\t\ts_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_right_se(),schema)+";\n";
2690 // First, generate the S scalar expressions in the hash_eq
2692 // Iterate over the bloom filters
2694 ret += "\t\tbucket=0;\n";
2695 for(p=0;p<fs->hash_eq.size();++p){
2697 " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2698 fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2699 +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2701 // SET_BF_BIT(table,number of bloom filters,bloom filter index,bit index)
2703 " bucket &= "+int_to_string(bf_mask)+";\n"
2704 " SET_BF_BIT(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket);\n"
2709 ret += "// Add the S record to the hash table, choose a position\n";
2710 ret += "\t\tbucket=0;\n";
2711 for(p=0;p<fs->hash_eq.size();++p){
2713 " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2714 fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2715 +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2718 " bucket &= "+int_to_string(bf_mask)+";\n"
2719 " bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2721 // Try the first bucket
2723 for(p=0;p<fs->hash_eq.size();++p){
2724 if(p>0) ret += " && ";
2725 // ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2726 // " == s_equijoin_"+int_to_string(p);
2727 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2728 string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2729 string rhs_op = "s_equijoin_"+int_to_string(p);
2730 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2732 ret += "){\n\t\t\tthe_bucket = bucket;\n";
2733 ret += "\t\t}else{\n\t\t\tif(";
2734 for(p=0;p<fs->hash_eq.size();++p){
2735 if(p>0) ret += " && ";
2736 // ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2737 // " == s_equijoin_"+int_to_string(p);
2738 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2739 string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
2740 string rhs_op = "s_equijoin_"+int_to_string(p);
2741 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2743 ret += "){\n\t\t\t\tthe_bucket = bucket1;\n";
2744 ret += "\t\t\t}else{ \n\t\t\t\tif(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n";
2745 ret+="\t\t\t\t\tthe_bucket = bucket;\n\t\t\t\telse\n\t\t\t\t\tthe_bucket=bucket1;\n";
2746 ret += "\t\t\t}\n\t\t}\n";
2747 for(p=0;p<fs->hash_eq.size();++p){
2748 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2749 if(hdt->is_buffer_type()){
2750 sprintf(tmpstr,"\t\t%s(f, &(t->join_table[the_bucket].key_var%d), &s_equijoin_%d);\n", hdt->get_buffer_assign_copy().c_str(), p, p);
2753 ret += "\t\tt->join_table[the_bucket].key_var"+int_to_string(p)+
2754 " = s_equijoin_"+int_to_string(p)+";\n";
2757 ret+="\t\tt->join_table[the_bucket].ts = curr_fj_ts;\n";
2759 ret += "\tend_s:\n";
2761 // ------------------------------------------------------------
2762 // Next, determine if the R record should be processed.
2766 "// R (main stream) cheap predicate\n"
2770 // Unpack r_filt fields
2771 vector<cnf_elem *> r_filt = fs->pred_t0;
2772 for(w=0;w<r_filt.size();++w){
2773 col_id_set this_pred_cids;
2774 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
2775 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2776 if(unpacked_cids.count( (*csi) ) == 0){
2777 int tblref = (*csi).tblvar_ref;
2778 int schref = (*csi).schema_ref;
2779 string field = (*csi).field;
2780 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2781 unpacked_cids.insert( (*csi) );
2786 // Sort R preds based on cost.
2788 vector<cnf_elem *> tmp_wh;
2789 for(w=0;w<r_filt.size();++w){
2790 compute_cnf_cost(r_filt[w],Ext_fcns);
2791 tmp_wh.push_back(r_filt[w]);
2795 sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
2797 // WARNING! the constant 20 below is a wild-ass guess.
2799 for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++);
2801 // Test the cheap filters on R.
2804 // Now generate the predicates.
2805 for(w=0;w<cheap_rpos;++w){
2806 sprintf(tmpstr,"//\t\tcheap R predicate clause %d. (cost %d)\n",w,r_filt[w]->cost);
2809 // Find partial fcns ref'd in this cnf element
2811 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2812 // Since set<..> is a "Sorted Associative Container",
2813 // we can walk through it in sorted order by walking from
2814 // begin() to end(). (and the partial fcns must be
2815 // evaluated in this order).
2816 set<int>::iterator si;
2817 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2818 if(fcn_ref_cnt[(*si)] > 1){
2819 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2821 if(is_partial_fcn[(*si)]){
2822 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2823 ret += "\t\tif(retval) goto end;\n";
2825 if(fcn_ref_cnt[(*si)] > 1){
2826 if(!is_partial_fcn[(*si)]){
2827 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2829 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2834 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2838 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2841 ret += "\n// Do the join\n\n";
2842 for(p=0;p<fs->hash_eq.size();++p)
2843 ret += "\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n";
2846 // Passed the cheap pred, now test the join with S.
2849 ret += "\t\tbucket"+int_to_string(i)+"=0;\n";
2850 for(p=0;p<fs->hash_eq.size();++p){
2852 " bucket"+int_to_string(i)+
2853 " ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2854 fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
2855 +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2858 " bucket"+int_to_string(i)+" &= "+int_to_string(bf_mask)+";\n";
2860 ret += "\tfound = 0;\n";
2861 ret += "\tfor(b=0;b<"+int_to_string(n_bloom)+" && !found; b++){\n";
2863 "\t\tif(IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket0) && "
2864 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket1) && "
2865 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket2))\n "
2874 ret += "\tfound = 0;\n";
2875 ret += "\t\tbucket=0;\n";
2876 for(p=0;p<fs->hash_eq.size();++p){
2878 " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2879 fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
2880 +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2883 " bucket &= "+int_to_string(bf_mask)+";\n"
2884 " bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2886 // Try the first bucket
2888 for(p=0;p<fs->hash_eq.size();++p){
2889 if(p>0) ret += " && ";
2890 // ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2891 // " == r_equijoin_"+int_to_string(p);
2892 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2893 string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2894 string rhs_op = "s_equijoin_"+int_to_string(p);
2895 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2897 if(p>0) ret += " && ";
2898 ret += "t->join_table[bucket].ts+"+int_to_string(fs->temporal_range)+" <= curr_fj_ts";
2899 ret += "){\n\t\t\tfound = 1;\n";
2900 ret += "\t\t}else {if(";
2901 for(p=0;p<fs->hash_eq.size();++p){
2902 if(p>0) ret += " && ";
2903 // ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2904 // " == r_equijoin_"+int_to_string(p);
2905 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2906 string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
2907 string rhs_op = "s_equijoin_"+int_to_string(p);
2908 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2910 if(p>0) ret += " && ";
2911 ret += "t->join_table[bucket1].ts+"+int_to_string(fs->temporal_range)+" <= curr_fj_ts";
2912 ret += ")\n\t\t\tfound=1;\n";
2921 // Test the expensive filters on R.
2922 if(cheap_rpos < r_filt.size()){
2924 // Now generate the predicates.
2925 for(w=cheap_rpos;w<r_filt.size();++w){
2926 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
2929 // Find partial fcns ref'd in this cnf element
2931 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2932 // Since set<..> is a "Sorted Associative Container",
2933 // we can walk through it in sorted order by walking from
2934 // begin() to end(). (and the partial fcns must be
2935 // evaluated in this order).
2936 set<int>::iterator si;
2937 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2938 if(fcn_ref_cnt[(*si)] > 1){
2939 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2941 if(is_partial_fcn[(*si)]){
2942 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2943 ret += "\t\tif(retval) goto end;\n";
2945 if(fcn_ref_cnt[(*si)] > 1){
2946 if(!is_partial_fcn[(*si)]){
2947 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2949 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2954 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2958 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2963 /////////////// post the tuple
2965 // test passed : create the tuple, then assign to it.
2966 ret += "/*\t\tCreate and post the tuple\t*/\n";
2968 // Unpack r_filt fields
2969 for(s=0;s<sl_list.size();++s){
2970 col_id_set this_se_cids;
2971 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
2972 for(csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
2973 if(unpacked_cids.count( (*csi) ) == 0){
2974 int tblref = (*csi).tblvar_ref;
2975 int schref = (*csi).schema_ref;
2976 string field = (*csi).field;
2977 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2978 unpacked_cids.insert( (*csi) );
2984 // Unpack partial fcns ref'd by the select clause.
2985 // Its a kind of a WHERE clause ...
2986 for(p=sl_fcns_start;p<sl_fcns_end;p++){
2987 if(fcn_ref_cnt[p] > 1){
2988 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2990 if(is_partial_fcn[p]){
2991 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2992 ret += "\tif(retval) goto end;\n";
2994 if(fcn_ref_cnt[p] > 1){
2995 if(!is_partial_fcn[p]){
2996 ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2998 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
3003 // increment the counter of accepted tuples
3004 ret += "\n\t#ifdef LFTA_STATS\n";
3005 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3006 ret += "\t#endif\n\n";
3008 // First, compute the size of the tuple.
3010 // Unpack any BUFFER type selections into temporaries
3011 // so that I can compute their size and not have
3012 // to recompute their value during tuple packing.
3013 // I can use regular assignment here because
3014 // these temporaries are non-persistent.
3016 for(s=0;s<sl_list.size();s++){
3017 data_type *sdt = sl_list[s]->get_data_type();
3018 if(sdt->is_buffer_type()){
3019 sprintf(tmpstr,"\tselvar_%d = ",s);
3021 ret += generate_se_code(sl_list[s],schema);
3027 // The size of the tuple is the size of the tuple struct plus the
3028 // size of the buffers to be copied in.
3030 ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
3031 for(s=0;s<sl_list.size();s++){
3032 data_type *sdt = sl_list[s]->get_data_type();
3033 if(sdt->is_buffer_type()){
3034 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
3041 ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
3042 ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
3044 // Test passed, make assignments to the tuple.
3046 ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
3048 // Mark tuple as REGULAR_TUPLE
3049 ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
3052 for(s=0;s<sl_list.size();s++){
3053 data_type *sdt = sl_list[s]->get_data_type();
3054 if(sdt->is_buffer_type()){
3055 sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
3057 sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
3060 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
3062 // if(sdt->needs_hn_translation())
3063 // ret += sdt->hton_translation() +"( ";
3064 ret += generate_se_code(sl_list[s],schema);
3065 // if(sdt->needs_hn_translation())
3073 ret += "\tpost_tuple(tuple);\n";
3075 // Increment the counter of posted tuples
3076 ret += "\n\t#ifdef LFTA_STATS\n";
3077 ret += "\n\tt->out_tuple_cnt++;\n\n";
3078 ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
3079 ret += "\t#endif\n\n";
3086 string generate_wj_accept_body(watch_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
3092 string wl_schema = fs->from[1]->get_schema_name();
3093 string wl_elem_str = generate_watchlist_element_name(wl_schema);
3094 string wl_node_str = generate_watchlist_struct_name(wl_schema);
3095 string tgt = generate_watchlist_name(wl_schema);
3097 ret += "//\n//\t\tGenerate test to update watchtable here\n//\n\n";
3103 // ------------------------------------------------------------
3104 // Determine if the R record should be processed.
3108 "// R (main stream) cheap predicate\n"
3112 // Unpack r_filt fields
3113 vector<cnf_elem *> r_filt = fs->pred_t0;
3114 for(w=0;w<r_filt.size();++w){
3115 col_id_set this_pred_cids;
3116 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
3117 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3118 if(unpacked_cids.count( (*csi) ) == 0){
3119 int tblref = (*csi).tblvar_ref;
3120 int schref = (*csi).schema_ref;
3121 string field = (*csi).field;
3122 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3123 unpacked_cids.insert( (*csi) );
3128 // Sort R preds based on cost.
3130 vector<cnf_elem *> tmp_wh;
3131 for(w=0;w<r_filt.size();++w){
3132 compute_cnf_cost(r_filt[w],Ext_fcns);
3133 tmp_wh.push_back(r_filt[w]);
3137 sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
3139 // WARNING! the constant 20 below is a wild-ass guess.
3141 for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++);
3143 // Test the cheap filters on R.
3146 // Now generate the predicates.
3147 for(w=0;w<cheap_rpos;++w){
3148 sprintf(tmpstr,"//\t\tCheap R predicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
3151 // Find partial fcns ref'd in this cnf element
3153 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
3154 // Since set<..> is a "Sorted Associative Container",
3155 // we can walk through it in sorted order by walking from
3156 // begin() to end(). (and the partial fcns must be
3157 // evaluated in this order).
3158 set<int>::iterator si;
3159 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3160 if(fcn_ref_cnt[(*si)] > 1){
3161 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3163 if(is_partial_fcn[(*si)]){
3164 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3165 ret += "\t\tif(retval) goto end;\n";
3167 if(fcn_ref_cnt[(*si)] > 1){
3168 if(!is_partial_fcn[(*si)]){
3169 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3171 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3176 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
3180 ret += "\n\n/*\t\t (no cheap R predicate to test)\t*/\n\n";
3183 ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
3184 map<string, cnf_elem *> h_eq = ((watch_join_qpn *)fs)-> hash_eq;
3185 vector<string> kflds = ((watch_join_qpn *)fs)->key_flds;
3186 for(w=0;w<kflds.size();++w){
3187 string kfld = kflds[w];
3188 col_id_set this_pred_cids;
3189 gather_pr_col_ids(h_eq[kfld]->pr, this_pred_cids, gb_tbl);
3190 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3191 if(unpacked_cids.count( (*csi) ) == 0){
3192 int tblref = (*csi).tblvar_ref;
3193 int schref = (*csi).schema_ref;
3194 string field = (*csi).field;
3195 if(tblref==0) // LHS from packet, don't unpack the RHS
3196 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3197 unpacked_cids.insert( (*csi) );
3203 ret += "\n// Do the join\n\n";
3204 ret += "\n// (ensure that the watchtable is fresh)\n";
3205 ret += "\tif(t->ux_time >= "+tgt+".next_refresh){\n";
3206 ret += "\t\treload_watchlist__"+wl_schema+"();\n";
3207 ret += "\t\t"+tgt+".next_refresh = t->ux_time+"+tgt+".refresh_interval;\n";
3211 for(p=0;p<fs->key_flds.size();++p){
3212 string kfld = fs->key_flds[p];
3213 ret += "\tr_equijoin_"+kfld+" = "+generate_se_code(fs->hash_eq[kfld]->pr->get_left_se(),schema)+";\n";
3217 // Passed the cheap pred, now test the join with S.
3218 ret += "\tbucket=0;\n";
3219 ret += "\thash=0;\n";
3220 for(p=0;p<fs->key_flds.size();++p){
3221 string kfld = fs->key_flds[p];
3223 " hash ^= (("+hash_nums[p%NRANDS]+" * lfta_"+
3224 fs->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_type_str()+
3225 +"_to_hash(r_equijoin_"+kfld+")));\n";
3227 ret += "\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n";
3229 ret += "\t\trec = "+tgt+".ht[bucket];\n";
3230 ret += "\t\twhile(rec!=NULL){\n";
3231 ret += "\t\t\tif(hash==rec->hashval){\n";
3232 ret += "\t\t\t\tif(";
3233 for(p=0;p<fs->key_flds.size();++p){
3234 string kfld = fs->key_flds[p];
3235 if(p>0) ret += " && ";
3236 data_type *hdt = fs->hash_eq[kfld]->pr->get_right_se()->get_data_type();
3237 string lhs_op = "r_equijoin_"+kfld;
3238 string rhs_op = "rec->"+kfld;
3239 ret += generate_equality_test(lhs_op,rhs_op,hdt);
3242 ret += "\t\t\t\t\tbreak;\n";
3244 ret += "\t\t\trec=rec->next;\n";
3246 ret += "\t\tif(rec==NULL)\n";
3247 ret += "\t\t\tgoto end;\n";
3249 ret += "\n/*\tPassed the hash lookup clause, unpack the other predicate fields. */\n";
3250 for(w=0;w<where.size();++w){
3251 col_id_set this_pred_cids;
3252 gather_pr_col_ids(where[w]->pr, this_pred_cids, gb_tbl);
3253 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3254 if(unpacked_cids.count( (*csi) ) == 0){
3255 int tblref = (*csi).tblvar_ref;
3256 int schref = (*csi).schema_ref;
3257 string field = (*csi).field;
3258 if(tblref==0) // LHS from packet
3259 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3260 else // RHS from hash bucket
3261 ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n";
3262 unpacked_cids.insert( (*csi) );
3268 // Test the expensive filters on R.
3269 // TODO Should merge this with other predicates and eval in order
3270 // of cost - see the fj code.
3271 // TODO join and postfilter predicates haven't been costed yet.
3272 if(cheap_rpos < r_filt.size()){
3274 // Now generate the predicates.
3275 for(w=cheap_rpos;w<r_filt.size();++w){
3276 sprintf(tmpstr,"//\t\tExpensive R predicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
3279 // Find partial fcns ref'd in this cnf element
3281 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
3282 // Since set<..> is a "Sorted Associative Container",
3283 // we can walk through it in sorted order by walking from
3284 // begin() to end(). (and the partial fcns must be
3285 // evaluated in this order).
3286 set<int>::iterator si;
3287 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3288 if(fcn_ref_cnt[(*si)] > 1){
3289 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3291 if(is_partial_fcn[(*si)]){
3292 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3293 ret += "\t\tif(retval) goto end;\n";
3295 if(fcn_ref_cnt[(*si)] > 1){
3296 if(!is_partial_fcn[(*si)]){
3297 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3299 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3304 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
3308 ret += "\n\n/*\t\t (no expensive R predicate to test)\t*/\n\n";
3311 // TODO sort the additional predicates by cost
3314 for(w=0;w<fs->pred_t1.size();++w){
3315 sprintf(tmpstr,"//\t\tS Predicate clause %d.(cost %d)\n",w,fs->pred_t1[w]->cost);
3318 // Find partial fcns ref'd in this cnf element
3320 collect_partial_fcns_pr(fs->pred_t1[w]->pr, pfcn_refs);
3321 // Since set<..> is a "Sorted Associative Container",
3322 // we can walk through it in sorted order by walking from
3323 // begin() to end(). (and the partial fcns must be
3324 // evaluated in this order).
3325 set<int>::iterator si;
3326 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3327 if(fcn_ref_cnt[(*si)] > 1){
3328 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3330 if(is_partial_fcn[(*si)]){
3331 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3332 ret += "\t\tif(retval) goto end;\n";
3334 if(fcn_ref_cnt[(*si)] > 1){
3335 if(!is_partial_fcn[(*si)]){
3336 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3338 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3343 ret += "\tif( !("+generate_predicate_code(fs->pred_t1[w]->pr,schema)+
3348 for(w=0;w<fs->join_filter.size();++w){
3349 sprintf(tmpstr,"//\t\tJoin Predicate clause %d.(cost %d)\n",w,fs->join_filter[w]->cost);
3352 // Find partial fcns ref'd in this cnf element
3354 collect_partial_fcns_pr(fs->join_filter[w]->pr, pfcn_refs);
3355 // Since set<..> is a "Sorted Associative Container",
3356 // we can walk through it in sorted order by walking from
3357 // begin() to end(). (and the partial fcns must be
3358 // evaluated in this order).
3359 set<int>::iterator si;
3360 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3361 if(fcn_ref_cnt[(*si)] > 1){
3362 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3364 if(is_partial_fcn[(*si)]){
3365 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3366 ret += "\t\tif(retval) goto end;\n";
3368 if(fcn_ref_cnt[(*si)] > 1){
3369 if(!is_partial_fcn[(*si)]){
3370 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3372 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3377 ret += "\tif( !("+generate_predicate_code(fs->join_filter[w]->pr,schema)+
3382 for(w=0;w<fs->postfilter.size();++w){
3383 sprintf(tmpstr,"//\t\tpostfilter Predicate clause %d.(cost %d)\n",w,fs->postfilter[w]->cost);
3386 // Find partial fcns ref'd in this cnf element
3388 collect_partial_fcns_pr(fs->postfilter[w]->pr, pfcn_refs);
3389 // Since set<..> is a "Sorted Associative Container",
3390 // we can walk through it in sorted order by walking from
3391 // begin() to end(). (and the partial fcns must be
3392 // evaluated in this order).
3393 set<int>::iterator si;
3394 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3395 if(fcn_ref_cnt[(*si)] > 1){
3396 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3398 if(is_partial_fcn[(*si)]){
3399 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3400 ret += "\t\tif(retval) goto end;\n";
3402 if(fcn_ref_cnt[(*si)] > 1){
3403 if(!is_partial_fcn[(*si)]){
3404 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3406 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3411 ret += "\tif( !("+generate_predicate_code(fs->postfilter[w]->pr,schema)+
3417 /////////////// post the tuple
3419 // test passed : create the tuple, then assign to it.
3420 ret += "/*\t\tCreate and post the tuple\t*/\n";
3423 for(s=0;s<sl_list.size();++s){
3424 col_id_set this_se_cids;
3425 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
3426 for(auto csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
3427 if(unpacked_cids.count( (*csi) ) == 0){
3428 int tblref = (*csi).tblvar_ref;
3429 int schref = (*csi).schema_ref;
3430 string field = (*csi).field;
3431 if(tblref==0) // LHS from packet
3432 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3433 else // RHS from hash bucket
3434 ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n";
3435 unpacked_cids.insert( (*csi) );
3441 // Unpack partial fcns ref'd by the select clause.
3442 // Its a kind of a WHERE clause ...
3443 for(p=sl_fcns_start;p<sl_fcns_end;p++){
3444 if(fcn_ref_cnt[p] > 1){
3445 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
3447 if(is_partial_fcn[p]){
3448 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3449 ret += "\tif(retval) goto end;\n";
3451 if(fcn_ref_cnt[p] > 1){
3452 if(!is_partial_fcn[p]){
3453 ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
3455 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
3460 // increment the counter of accepted tuples
3461 ret += "\n\t#ifdef LFTA_STATS\n";
3462 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3463 ret += "\t#endif\n\n";
3465 // First, compute the size of the tuple.
3467 // Unpack any BUFFER type selections into temporaries
3468 // so that I can compute their size and not have
3469 // to recompute their value during tuple packing.
3470 // I can use regular assignment here because
3471 // these temporaries are non-persistent.
3473 for(s=0;s<sl_list.size();s++){
3474 data_type *sdt = sl_list[s]->get_data_type();
3475 if(sdt->is_buffer_type()){
3476 sprintf(tmpstr,"\tselvar_%d = ",s);
3478 ret += generate_se_code(sl_list[s],schema);
3484 // The size of the tuple is the size of the tuple struct plus the
3485 // size of the buffers to be copied in.
3487 ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
3488 for(s=0;s<sl_list.size();s++){
3489 data_type *sdt = sl_list[s]->get_data_type();
3490 if(sdt->is_buffer_type()){
3491 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
3498 ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
3499 ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
3501 // Test passed, make assignments to the tuple.
3503 ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
3505 // Mark tuple as REGULAR_TUPLE
3506 ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
3509 for(s=0;s<sl_list.size();s++){
3510 data_type *sdt = sl_list[s]->get_data_type();
3511 if(sdt->is_buffer_type()){
3512 sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
3514 sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
3517 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
3519 // if(sdt->needs_hn_translation())
3520 // ret += sdt->hton_translation() +"( ";
3521 ret += generate_se_code(sl_list[s],schema);
3522 // if(sdt->needs_hn_translation())
3530 ret += "\tpost_tuple(tuple);\n";
3532 // Increment the counter of posted tuples
3533 ret += "\n\t#ifdef LFTA_STATS\n";
3534 ret += "\n\tt->out_tuple_cnt++;\n\n";
3535 ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
3536 ret += "\t#endif\n\n";
3542 string generate_aggr_accept_body(qp_node *fs,string node_name,table_list *schema, string &temporal_flush){
3546 ////////////// Processing for aggregtion query
3548 // First, search for a match. Start by unpacking the group-by attributes.
3550 // One complication : if a real-time aggregate flush occurs,
3551 // the GB attr has already been calculated. So don't compute
3552 // it again if 1) its temporal and 2) it will be computed in the
3553 // agggregate flush code.
3555 // Unpack the partial fcns ref'd by the gb's and the aggr defs.
3556 for(p=gb_fcns_start;p<gb_fcns_end;p++){
3557 if(is_partial_fcn[p]){
3558 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3559 ret += "\tif(retval) goto end;\n";
3562 for(p=ag_fcns_start;p<ag_fcns_end;p++){
3563 if(is_partial_fcn[p]){
3564 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3565 ret += "\tif(retval) goto end;\n";
3569 // increment the counter of accepted tuples
3570 ret += "\n\t#ifdef LFTA_STATS\n";
3571 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3572 ret += "\t#endif\n\n";
3574 ret += "/*\t\tTest if the group is in the hash table \t*/\n";
3575 // Compute the values of the group-by variables.
3576 for(g=0;g<gb_tbl->size();g++){
3577 data_type *gdt = gb_tbl->get_data_type(g);
3578 if((! gdt->is_temporal()) || temporal_flush == ""){
3580 if(gdt->is_buffer_type()){
3581 // NOTE : if the SE defining the gb is anything
3582 // other than a ref to a variable, this will generate
3583 // illegal code. To be resolved with Spatch.
3584 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
3585 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
3587 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
3588 gdt->get_buffer_assign_copy().c_str(), g, g);
3590 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
3597 // A quick aside : if any of the GB attrs are temporal,
3598 // test for change and flush if any change occurred.
3599 // We've already computed the flush code,
3600 // Put it here if this is not a real time query.
3601 // We've already unpacked all column refs, so no need to
3602 // do it again here.
3604 string rt_level = fs->get_val_of_def("real_time");
3605 if(rt_level == "" && temporal_flush != ""){
3606 ret += temporal_flush;
3609 // Compute the hash bucket
3610 if(gb_tbl->size() > 0){
3611 ret += "\thashval = ";\
3612 for(g=0;g<gb_tbl->size();g++){
3613 if(g>0) ret += " ^ ";
3614 data_type *gdt = gb_tbl->get_data_type(g);
3615 if(gdt->is_buffer_type()){
3616 sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
3617 gdt->get_type_str().c_str(), g);
3619 sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
3620 gdt->get_type_str().c_str(), g);
3625 ret += "\thash2 = ((hashval * "+hash_nums[g%NRANDS]+") >> 32) & SLOT_HASH_BITS;\n";
3626 ret+="\tprobe = (hashval >> 32) & (t->max_aggrs-1);\n";
3628 ret+="\tprobe = 0;\n";
3629 ret+="\thash2 = 0;\n\n";
3632 // Does the lfta reference a udaf?
3633 bool has_udaf = false;
3634 for(a=0;a<aggr_tbl->size();a++){
3635 if(! aggr_tbl->is_builtin(a)) has_udaf = true;
3638 // Scan for a match, or alternatively the best slot.
3639 // Currently, hardcode 5 tests.
3641 " gen_val = t->generation & SLOT_GEN_BITS;\n"
3642 " match_found = 0;\n"
3643 " best_slot = probe;\n"
3644 " for(i=0;i<5 && match_found == 0;i++){\n"
3645 " if((t->aggr_table_hashmap[probe] & SLOT_FILLED) && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_HASH_BITS) == hash2 ){\n"
3647 if(gb_tbl->size()>0){
3648 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
3650 string rhs_op, lhs_op;
3651 for(g=0;g<gb_tbl->size();g++){
3652 if(g>0) ret += " && ";
3654 sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
3655 sprintf(tmpstr,"t->aggr_table[probe].gb_var%d",g); rhs_op = tmpstr;
3656 ret += generate_equality_test(lhs_op,rhs_op,gb_tbl->get_data_type(g));
3661 " match_found = 1;\n"
3662 " best_slot = probe;\n"
3665 "// Rate slots in case no match found: prefer empty, then full but old slots\n"
3666 " if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3667 " if((t->aggr_table_hashmap[probe] & SLOT_FILLED)==0)\n"
3668 " best_slot = probe;\n"
3670 " if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) != gen_val){\n"
3671 " best_slot = probe;\n"
3675 " if(probe >= t->max_aggrs)\n"
3678 " if(match_found){\n"
3680 ret += generate_gb_update(node_name, schema, "best_slot",has_udaf);
3683 " if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3685 printf("sgah_qpn name is %s, disorder is %d\n",fs->node_name.c_str(),((sgah_qpn *)fs)->lfta_disorder);
3686 if(((sgah_qpn *)fs)->lfta_disorder <= 1){
3688 " if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS)==gen_val){\n"
3690 bool first_g = true;
3691 for(int g=0;g<gb_tbl->size();g++){
3692 data_type *gdt = gb_tbl->get_data_type(g);
3693 if(gdt->is_temporal()){
3694 if(first_g) first_g = false; else ret+=" + ";
3695 ret += "(gb_attr_"+int_to_string(g)+" - t->aggr_table[best_slot].gb_var"+int_to_string(g)+")";
3698 ret += ") == 0 ){\n";
3701 " fta_aggr_flush_old_"+ node_name+"(f,t->max_aggrs);\n"
3707 ret += generate_tuple_from_aggr(node_name,schema,"best_slot");
3709 "\t\t\t#ifdef LFTA_STATS\n"
3710 "\t\t\tif((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val)\n"
3711 "\t\t\t\tt->collision_cnt++;\n\n"
3715 ret += generate_init_group(schema,"best_slot");
3725 string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, bool is_wj, set<unsigned int> &s_pids){
3727 string ret="static gs_retval_t accept_packet_"+node_name+
3728 "(struct FTA *f, FTAID * ftaid, void *pkt, gs_int32_t sz){\n";
3729 ret += "\tstruct packet *p = (struct packet *)pkt;\n";
3733 // Define all of the variables needed by this
3737 // Gather all column references, need to define unpacking variables.
3740 col_id_set::iterator csi;
3742 // If its a filter join, rebind all colrefs
3743 // to the first range var, to avoid double unpacking.
3746 for(w=0;w<where.size();++w)
3747 reset_pr_col_ids_tblvars(where[w]->pr, gb_tbl);
3748 for(s=0;s<sl_list.size();s++)
3749 reset_se_col_ids_tblvars(sl_list[s], gb_tbl);
3752 for(w=0;w<where.size();++w){
3753 if(is_wj || is_fj || s_pids.count(w) == 0)
3754 gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
3756 for(s=0;s<sl_list.size();s++){
3757 gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
3762 for(g=0;g<gb_tbl->size();g++)
3763 gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
3766 // Variables for unpacking attributes.
3767 ret += "/*\t\tVariables for unpacking attributes\t*/\n";
3768 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
3769 int schref = (*csi).schema_ref;
3770 int tblref = (*csi).tblvar_ref;
3771 string field = (*csi).field;
3772 data_type dt(schema->get_type_name(schref,field));
3773 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
3774 field.c_str(), tblref);
3780 // Variables that are always needed
3781 ret += "/*\t\tVariables which are always needed\t*/\n";
3782 ret += "\tgs_retval_t retval;\n";
3783 ret += "\tgs_int32_t tuple_size, tuple_pos, lfta_bailout;\n";
3784 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
3786 ret+="\tstruct "+generate_fta_name(node_name)+" *t = (struct "+generate_fta_name(node_name)+"*) f;\n\n";
3789 // Variables needed for aggregation queries.
3791 ret += "\n/*\t\tVariables for aggregation\t*/\n";
3792 ret+="\tunsigned int i, probe;\n";
3793 ret+="\tunsigned int gen_val, match_found, best_slot;\n";
3794 ret+="\tgs_uint64_t hashval, hash2;\n";
3795 // Variables for storing group-by attribute values.
3796 if(gb_tbl->size() > 0)
3797 ret += "/*\t\tGroup-by attributes\t*/\n";
3798 for(g=0;g<gb_tbl->size();g++){
3799 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3801 data_type *gdt = gb_tbl->get_data_type(g);
3802 if(gdt->is_buffer_type()){
3803 sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3808 // Temporaries for min/max
3809 string aggr_tmp_str = "";
3810 for(a=0;a<aggr_tbl->size();a++){
3811 string aggr_op = aggr_tbl->get_op(a);
3812 if(aggr_op == "MIN" || aggr_op == "MAX"){
3813 sprintf(tmpstr,"\t%s aggr_tmp_%d;\n",aggr_tbl->get_data_type(a)->get_cvar_type().c_str(),a);
3814 aggr_tmp_str.append(tmpstr);
3817 if(aggr_tmp_str != ""){
3818 ret += "/*\t\tTemp vars for BUFFER aggregates\t*/\n";
3819 ret += aggr_tmp_str;
3822 // Variables for udaf output temporaries
3823 bool no_udaf = true;
3824 for(a=0;a<aggr_tbl->size();a++){
3825 if(! aggr_tbl->is_builtin(a)){
3827 ret+="/*\t\tUDAF output vars.\t*/\n";
3830 int afcn_id = aggr_tbl->get_fcn_id(a);
3831 data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
3832 sprintf(tmpstr,"udaf_ret%d", a);
3833 ret+="\t"+adt->make_cvar(tmpstr)+";\n";
3838 // Variables needed for a filter join query
3839 if(fs->node_type() == "filter_join"){
3840 filter_join_qpn *fjq = (filter_join_qpn *)fs;
3841 bool uses_bloom = fjq->use_bloom;
3842 ret += "/*\t\tJoin fields\t*/\n";
3843 for(g=0;g<fjq->hash_eq.size();g++){
3844 sprintf(tmpstr,"\t%s s_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_right_se()->get_data_type()->get_cvar_type().c_str(),g);
3846 sprintf(tmpstr,"\t%s r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g);
3851 " /* Variables for fj bloom filter */ \n"
3852 "\tunsigned int i=0,j=0,k=0, b, bf_clean = 0, tmp_i, found; \n"
3853 "\tunsigned int bucket, bucket0, bucket1, bucket2;\n"
3854 "\tlong long int curr_fj_ts;\n"
3855 "\tlong long int curr_bin, the_bin;\n"
3860 " /* Variables for fj join table */ \n"
3861 "\tunsigned int i, bucket, found; \n"
3862 "\tunsigned int bucket1, the_bucket;\n"
3863 " long long int curr_fj_ts;\n"
3870 if(fs->node_type() == "watch_join"){
3871 watch_join_qpn *wlq = (watch_join_qpn *)fs;
3872 ret += "/*\t\tJoin fields\t*/\n";
3873 for(int k=0;k<wlq->key_flds.size(); ++k){
3874 string kfld = wlq->key_flds[k];
3875 ret += "\t"+wlq->hash_eq[kfld]->pr->get_right_se()->get_data_type()->get_cvar_type()+" s_equijoin_"+kfld+";\n";
3876 ret += "\t"+wlq->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_cvar_type()+" r_equijoin_"+kfld+";\n";
3879 " /* Variables for wl join table */ \n"
3880 "\tunsigned int i, bucket;\n"
3881 "\tunsigned long long int hash; \n";
3882 string wl_schema = wlq->from[1]->get_schema_name();
3883 string wl_elem_str = generate_watchlist_element_name(wl_schema);
3884 ret += "\tstruct "+wl_elem_str+" *rec = NULL;\n";
3890 // Variables needed to store selected attributes of BUFFER type
3891 // temporarily, in order to compute their size for storage
3892 // in an output tuple.
3894 string select_var_defs = "";
3895 for(int s=0;s<sl_list.size();s++){
3896 data_type *sdt = sl_list[s]->get_data_type();
3897 if(sdt->is_buffer_type()){
3898 sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
3899 select_var_defs.append(tmpstr);
3902 if(select_var_defs != ""){
3903 ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
3904 ret += select_var_defs;
3907 // Variables to store results of partial functions.
3909 if(partial_fcns.size()>0){
3910 ret += "/*\t\tVariables for storing results of partial functions. \t*/\n";
3911 for(p=0;p<partial_fcns.size();++p){
3912 if(is_partial_fcn[p] || (!is_aggr_query && fcn_ref_cnt[p] >1)){
3913 sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
3914 partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
3916 if(!is_aggr_query && fcn_ref_cnt[p] >1){
3917 ret += "\tint fcn_ref_cnt_"+int_to_string(p)+" = 0;\n";
3922 if(is_aggr_query) ret += "\tint unpack_failed = 0;\n";
3926 // variable to hold packet struct //
3928 ret += "\t struct "+node_name+"_input_struct *"+node_name+"_input_struct_var;\n";
3932 ret += "\t#ifdef LFTA_STATS\n";
3933 // variable to store counter of cpu cycles spend in accept_tuple
3934 ret += "\tgs_uint64_t start_cycle = rdtsc();\n";
3935 // increment counter of received tuples
3936 ret += "\tt->in_tuple_cnt++;\n";
3937 ret += "\t#endif\n";
3940 // -------------------------------------------------
3941 // If the packet is "packet", test if its for this lfta,
3942 // and if so load it into its struct
3945 ret+="\n/* packed tuple : test and load. \t*/\n";
3946 ret+="\t"+node_name+"_input_struct_var = (struct "+node_name+"_input_struct *) pkt;\n";
3947 ret+="\tif("+node_name+"_input_struct_var->__lfta_id_fm_nic__ != "+int_to_string(global_id) + ")\n";
3948 ret+="\t\tgoto end;\n\n";
3953 col_id_set unpacked_cids; // Keep track of the cols that have been unpacked.
3955 string temporal_flush;
3957 ret += generate_aggr_accept_prelim(fs, node_name, schema, unpacked_cids, temporal_flush);
3958 else { // non-aggregation operators
3960 // Unpack all the temporal attributes referenced in select clause
3961 // and update the last value of the attribute
3962 col_id_set temp_cids; // col ids of temp attributes in select clause
3964 for(s=0;s<sl_list.size();s++){
3965 data_type *sdt = sl_list[s]->get_data_type();
3966 if (sdt->is_temporal()) {
3967 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
3970 // If this is a filter join,
3971 // ensure that the temporal range field is unpacked.
3973 col_id window_var_cid(((filter_join_qpn *)fs)->temporal_var);
3974 if(temp_cids.count(window_var_cid)==0)
3975 temp_cids.insert(window_var_cid);
3978 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
3979 if(unpacked_cids.count((*csi)) == 0){
3980 int tblref = (*csi).tblvar_ref;
3981 int schref = (*csi).schema_ref;
3982 string field = (*csi).field;
3983 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3984 sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
3987 unpacked_cids.insert( (*csi) );
3993 vector<cnf_elem *> filter = fs->get_filter_clause();
3994 // Test the filter predicate (some query types have additional preds).
3995 if(filter.size() > 0 && !is_wj){ // watchlist join does specialized processing
3997 // Sort by evaluation cost.
3998 // First, estimate evaluation costs
3999 // Eliminate predicates covered by the prefilter (those in s_pids).
4000 // I need to do it before the sort becuase the indices refer
4001 // to the position in the unsorted list./
4002 vector<cnf_elem *> tmp_wh;
4003 for(w=0;w<filter.size();++w){
4004 if(s_pids.count(w) == 0){
4005 compute_cnf_cost(filter[w],Ext_fcns);
4006 tmp_wh.push_back(filter[w]);
4011 sort(filter.begin(), filter.end(), compare_cnf_cost());
4013 // Now generate the predicates.
4014 for(w=0;w<filter.size();++w){
4015 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,filter[w]->cost);
4017 // Find the set of variables accessed in this CNF elem,
4018 // but in no previous element.
4019 col_id_set this_pred_cids;
4020 gather_pr_col_ids(filter[w]->pr, this_pred_cids, gb_tbl);
4021 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4022 if(unpacked_cids.count( (*csi) ) == 0){
4023 int tblref = (*csi).tblvar_ref;
4024 int schref = (*csi).schema_ref;
4025 string field = (*csi).field;
4026 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4027 unpacked_cids.insert( (*csi) );
4030 // Find partial fcns ref'd in this cnf element
4032 collect_partial_fcns_pr(filter[w]->pr, pfcn_refs);
4033 // Since set<..> is a "Sorted Associative Container",
4034 // we can walk through it in sorted order by walking from
4035 // begin() to end(). (and the partial fcns must be
4036 // evaluated in this order).
4037 set<int>::iterator si;
4038 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
4039 if(fcn_ref_cnt[(*si)] > 1){
4040 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
4042 if(is_partial_fcn[(*si)]){
4043 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
4044 ret += "\t\tif(retval) goto end;\n";
4046 if(fcn_ref_cnt[(*si)] > 1){
4047 if(!is_partial_fcn[(*si)]){
4048 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
4050 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
4055 ret += "\tif( !("+generate_predicate_code(filter[w]->pr,schema)+
4059 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
4063 // We've passed the WHERE clause,
4064 // unpack the remainder of the accessed fields.
4066 ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
4067 vector<cnf_elem *> h_eq = ((filter_join_qpn *)fs)-> hash_eq;
4068 for(w=0;w<h_eq.size();++w){
4069 col_id_set this_pred_cids;
4070 gather_pr_col_ids(h_eq[w]->pr, this_pred_cids, gb_tbl);
4071 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4072 if(unpacked_cids.count( (*csi) ) == 0){
4073 int tblref = (*csi).tblvar_ref;
4074 int schref = (*csi).schema_ref;
4075 string field = (*csi).field;
4076 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4077 unpacked_cids.insert( (*csi) );
4081 }else if(is_wj){ // STOPPED HERE move this to wj main body
4083 ret += "\n//\tPassed the WHERE clause, unpack the hash fields. \n";
4084 map<string, cnf_elem *> h_eq = ((watch_join_qpn *)fs)-> hash_eq;
4085 vector<string> kflds = ((watch_join_qpn *)fs)->key_flds;
4086 for(w=0;w<kflds.size();++w){
4087 string kfld = kflds[w];
4088 col_id_set this_pred_cids;
4089 gather_pr_col_ids(h_eq[kfld]->pr, this_pred_cids, gb_tbl);
4090 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4091 if(unpacked_cids.count( (*csi) ) == 0){
4092 int tblref = (*csi).tblvar_ref;
4093 int schref = (*csi).schema_ref;
4094 string field = (*csi).field;
4095 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4096 unpacked_cids.insert( (*csi) );
4102 ret += "\n/*\tPassed the WHERE clause, unpack the rest of the accessed fields. */\n";
4104 for(csi=cid_set.begin();csi!=cid_set.end();++csi){
4105 if(unpacked_cids.count( (*csi) ) == 0){
4106 int schref = (*csi).schema_ref;
4107 int tblref = (*csi).tblvar_ref;
4108 string field = (*csi).field;
4109 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4110 unpacked_cids.insert( (*csi) );
4117 ////////////////// After this, the query types
4118 ////////////////// are processed differently.
4120 if(!is_aggr_query && !is_fj & !is_wj)
4121 ret += generate_sel_accept_body(fs, node_name, schema);
4122 else if(is_aggr_query)
4123 ret += generate_aggr_accept_body(fs, node_name, schema, temporal_flush);
4126 ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
4128 ret += generate_wj_accept_body((watch_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
4135 ret += "\n\tend:\n";
4136 ret += "\t#ifdef LFTA_STATS\n";
4137 ret+= "\tt->cycle_cnt += rdtsc() - start_cycle;\n";
4138 ret += "\t#endif\n";
4139 ret += "\n\treturn 1;\n}\n\n";
4145 string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, bool is_aggr_query, bool is_fj, bool uses_bloom){
4148 string ret = "struct FTA * "+generate_alloc_name(node_name) +
4149 "(struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command, gs_int32_t sz, void *value){\n";
4151 ret+="\tstruct "+generate_fta_name(node_name)+"* f;\n";
4154 ret+="\tif((f=fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n";
4156 // assign a streamid to fta instance
4157 ret+="\t/* assign a streamid */\n";
4158 ret+="\tf->f.ftaid = ftaid;\n";
4159 ret+="\tf->f.ftaid.streamid = (gs_p_t)f;\n";
4160 ret+="\tgslog(LOG_INFO,\"Lfta "+node_name+" has FTAID {ip=%u,port=%u,index=%u,streamid=%u}\\n\",f->f.ftaid.ip,f->f.ftaid.port,f->f.ftaid.index,f->f.ftaid.streamid);\n";
4163 ret += "\tf->n_aggrs = 0;\n";
4165 ret += "\tf->max_aggrs = ";
4167 // Computing the number of aggregate blocks is a little
4168 // tricky. If there are no GB attrs, or if all GB attrs
4169 // are temporal, then use a single aggregate block, else
4170 // use a default value (10). A user specification overrides
4172 bool single_group = true;
4173 for(g=0;g<gb_tbl->size();g++){
4174 data_type *gdt = gb_tbl->get_data_type(g);
4175 if(! gdt->is_temporal() ){
4176 single_group = false;
4179 string max_aggr_str = fs->get_val_of_def("aggregate_slots");
4180 int max_aggr_i = atoi(max_aggr_str.c_str());
4181 if(max_aggr_i <= 0){
4185 ret += int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
4187 unsigned int naggrs = 1; // make it power of 2
4188 unsigned int nones = 0;
4192 naggrs = naggrs << 1;
4193 max_aggr_i = max_aggr_i >> 1;
4195 if(nones==1) // in case it was already a power of 2.
4197 ret += int_to_string(naggrs);
4201 ret+="\tif ((f->aggr_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n";
4202 ret+="\t\treturn(0);\n";
4204 // ret+="/* compute how many integers we need to store the hashmap */\n";
4205 // ret+="\tf->bitmap_size = (f->max_aggrs % (sizeof(gs_uint32_t) * 4)) ? (f->max_aggrs / (sizeof(gs_uint32_t) * 4) + 1) : (f->max_aggrs / (sizeof(gs_uint32_t) * 4));\n\n";
4206 ret+="\tif ((f->aggr_table_hashmap = sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n";
4207 ret+="\t\treturn(0);\n";
4209 ret+="/*\t\tfill bitmap with zero \t*/\n";
4210 ret+="\tfor (i = 0; i < f->max_aggrs; ++i)\n";
4211 ret+="\t\tf->aggr_table_hashmap[i] = 0;\n";
4212 ret+="\tf->generation=0;\n";
4213 ret+="\tf->flush_pos = f->max_aggrs;\n";
4215 ret += "\tf->flush_ctr = 0;\n";
4221 ret+="\tf->first_exec = 1;\n";
4222 unsigned int n_bloom = 11;
4223 string n_bloom_str = fs->get_val_of_def("num_bloom");
4224 int tmp_n_bloom = atoi(n_bloom_str.c_str());
4226 n_bloom = tmp_n_bloom+1;
4228 unsigned int window_len = ((filter_join_qpn *)fs)->temporal_range;
4229 if(window_len < n_bloom){
4230 n_bloom = window_len+1;
4233 int bf_exp_size = 12; // base-2 log of number of bits
4234 string bloom_len_str = fs->get_val_of_def("bloom_size");
4235 int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
4236 if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
4237 bf_exp_size = tmp_bf_exp_size;
4239 int bf_bit_size = 1 << 12;
4240 int bf_byte_size = bf_bit_size / (8*sizeof(char));
4242 int bf_tot = n_bloom*bf_byte_size;
4243 ret+="\tif ((f->bf_table = sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n";
4244 ret+="\t\treturn(0);\n";
4247 " for(i=0;i<"+int_to_string(bf_tot)+";i++)\n"
4248 " f->bf_table[i] = 0;\n"
4251 unsigned int ht_size = 4096;
4252 string ht_size_s = fs->get_val_of_def("aggregate_slots");
4253 int tmp_ht_size = atoi(ht_size_s.c_str());
4254 if(tmp_ht_size > 1024){
4255 unsigned int hs = 1; // make it power of 2
4258 tmp_ht_size = tmp_ht_size >> 1;
4262 ret+="\tif ((f->join_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n";
4263 ret+="\t\treturn(0);\n";
4266 " for(i=0;i<"+int_to_string(ht_size)+";i++)\n"
4267 " f->join_table[i].ts = 0;\n"
4272 // Initialize the complex literals (which might be handles).
4274 for(cl=0;cl<complex_literals->size();cl++){
4275 literal_t *l = complex_literals->get_literal(cl);
4276 // sprintf(tmpstr,"\tf->complex_literal_%d = ",cl);
4277 // ret += tmpstr + l->to_C_code() + ";\n";
4278 sprintf(tmpstr,"&(f->complex_literal_%d)",cl);
4279 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
4284 // Initialize the last seen values of temporal attributes to min(max) value of
4285 // their respective type
4286 // Create places to hold the last values of temporal attributes referenced in select clause
4289 col_id_set temp_cids; // col ids of temp attributes in select clause
4292 col_id_set::iterator csi;
4294 for(s=0;s<sl_list.size();s++){
4295 data_type *sdt = sl_list[s]->get_data_type();
4296 if (sdt->is_temporal()) {
4297 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
4301 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
4302 int tblref = (*csi).tblvar_ref;
4303 int schref = (*csi).schema_ref;
4304 string field = (*csi).field;
4305 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
4306 if (dt.is_increasing()) {
4307 sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_min_literal().c_str());
4309 } else if (dt.is_decreasing()) {
4310 sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_max_literal().c_str());
4315 // initialize last seen values of temporal groubpy variables
4317 for(g=0;g<gb_tbl->size();g++){
4318 data_type *dt = gb_tbl->get_data_type(g);
4319 if(dt->is_temporal()){
4321 fprintf(stderr,"group by attribute %s is temporal, ",
4322 gb_tbl->get_name(g).c_str());
4324 if(dt->is_increasing()){
4325 sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_min_literal().c_str());
4327 sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_max_literal().c_str());
4334 ret += "\tf->f.alloc_fta="+generate_alloc_name(node_name)+";\n";
4335 ret+="\tf->f.free_fta=free_fta_"+node_name+";\n";
4336 ret+="\tf->f.control_fta=control_fta_"+node_name+";\n";
4337 ret+="\tf->f.accept_packet=accept_packet_"+node_name+";\n";
4338 ret+="\tf->f.clock_fta=clock_fta_"+node_name+";\n\n";
4340 // Initialize runtime stats
4341 ret+="\tf->in_tuple_cnt = 0;\n";
4342 ret+="\tf->out_tuple_cnt = 0;\n";
4343 ret+="\tf->out_tuple_sz = 0;\n";
4344 ret+="\tf->accepted_tuple_cnt = 0;\n";
4345 ret+="\tf->cycle_cnt = 0;\n";
4346 ret+="\tf->collision_cnt = 0;\n";
4347 ret+="\tf->eviction_cnt = 0;\n";
4348 ret+="\tf->sampling_rate = 1.0;\n";
4350 ret+="\tf->trace_id = 0;\n\n";
4351 if(param_tbl->size() > 0){
4353 "\tif(load_params_"+node_name+"(f, sz, value, 1)){\n"
4354 "#ifndef LFTA_IN_NIC\n"
4355 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small (%d). This query does not have valid parameters, bailing out.\\n\",sz);\n"
4363 // Register the pass-by-handle parameters
4365 for(ph=0;ph<param_handle_table.size();++ph){
4366 data_type pdt(param_handle_table[ph]->type_name);
4367 sprintf(tmpstr,"\tf->handle_param_%d = %s((struct FTA *)f,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
4368 switch(param_handle_table[ph]->val_type){
4371 if(pdt.is_buffer_type()) ret += "&(";
4372 sprintf(tmpstr,"f->complex_literal_%d",param_handle_table[ph]->complex_literal_idx);
4374 if(pdt.is_buffer_type()) ret += ")";
4378 // not complex, no constructor
4380 ret += param_handle_table[ph]->litval->to_C_code("") + ");\n";
4383 // query parameter handles are regstered/deregistered in the
4384 // load_params function.
4385 // ret += "t->param_"+param_handle_table[ph]->param_name;
4388 fprintf(stderr, "INTERNAL ERROR unknown case found when processing pass-by-handle parameter table.\n");
4393 ret += "\treturn (struct FTA *) f;\n";
4402 //////////////////////////////////////////////////////////////////
4404 string generate_lfta_block(qp_node *fs, table_list *schema, int gid,
4405 // map<string,string> &int_fcn_defs,
4406 ext_fcn_list *Ext_fcns, string &schema_embed_str, ifq_t *ifdb, nic_property *nicp, set<unsigned int> &s_pids){
4411 /////////////////////////////////////////////////////////////
4412 /// Do operator-generic processing, such as
4413 /// gathering the set of referenced columns,
4414 /// generating structures, etc.
4416 // Initialize globals to empty.
4417 gb_tbl = NULL; aggr_tbl = NULL;
4418 global_id = -1; nicprop = NULL;
4419 param_tbl = fs->get_param_tbl();
4420 sl_list.clear(); where.clear();
4421 partial_fcns.clear();
4422 fcn_ref_cnt.clear(); is_partial_fcn.clear();
4423 pred_class.clear(); pred_pos.clear();
4424 sl_fcns_start=sl_fcns_end=wh_fcns_start=wh_fcns_end=0;
4425 gb_fcns_start=gb_fcns_end=ag_fcns_start=ag_fcns_end=0;
4428 // Does the lfta read packed results from the NIC?
4429 nicprop = nicp; // load into global
4431 packed_return = false;
4432 if(nicp && nicp->option_exists("Return")){
4433 if(nicp->option_value("Return") == "Packed"){
4434 packed_return = true;
4436 fprintf(stderr,"Warning, nic option value of Return=%s is not recognized, ignoring\n",nicp->option_value("Return").c_str());
4441 // Extract data which defines the query.
4442 // complex literals gathered now.
4443 complex_literals = fs->get_cplx_lit_tbl(Ext_fcns);
4444 param_handle_table = fs->get_handle_param_tbl(Ext_fcns);
4445 string node_name = fs->get_node_name();
4446 bool is_fj = false, uses_bloom = false;
4448 bool is_watch_tbl = false;
4451 if(fs->node_type() == "spx_qpn"){
4452 is_aggr_query = false;
4453 spx_qpn *spx_node = (spx_qpn *)fs;
4454 sl_list = spx_node->get_select_se_list();
4455 where = spx_node->get_where_clause();
4459 if(fs->node_type() == "sgah_qpn"){
4460 is_aggr_query = true;
4461 sgah_qpn *sgah_node = (sgah_qpn *)fs;
4462 sl_list = sgah_node->get_select_se_list();
4463 where = sgah_node->get_where_clause();
4464 gb_tbl = sgah_node->get_gb_tbl();
4465 aggr_tbl = sgah_node->get_aggr_tbl();
4467 if((sgah_node->get_having_clause()).size() > 0){
4468 fprintf(stderr,"Warning in LFTA %s: HAVING clause will be ignored.\n", fs->get_node_name().c_str());
4471 if(fs->node_type() == "filter_join"){
4472 is_aggr_query = false;
4474 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
4475 sl_list = fj_node->get_select_se_list();
4476 where = fj_node->get_where_clause();
4477 uses_bloom = fj_node->use_bloom;
4481 if(fs->node_type() == "watch_join"){
4482 is_aggr_query = false;
4484 watch_join_qpn *wl_node = (watch_join_qpn *)fs;
4485 sl_list = wl_node->get_select_se_list();
4486 where = wl_node->get_where_clause();
4490 if(fs->node_type() == "watch_tbl_qpn"){
4491 is_aggr_query = false;
4492 is_watch_tbl = true;
4493 vector<scalarexp_t *> empty_sl_list;
4494 vector<cnf_elem *> empty_where;
4495 sl_list = empty_sl_list;
4496 where = empty_where;
4500 fprintf(stderr,"INTERNAL ERROR, unrecognized node type %s in generate_lfta_block\n", fs->node_type().c_str());
4504 // Build list of "partial functions", by clause.
4505 // NOTE : partial fcns are not handles well.
4506 // The act of searching for them associates the fcn call
4507 // in the SE with an index to an array. Refs to the
4508 // fcn value are replaced with refs to the variable they are
4509 // unpacked into. A more general tagging mechanism would be better.
4512 vector<bool> *pfunc_ptr = NULL;
4513 vector<int> *ref_cnt_ptr = NULL;
4514 if(!is_aggr_query){ // don't collect cacheable fcns on aggr query.
4515 ref_cnt_ptr = &fcn_ref_cnt;
4516 pfunc_ptr = &is_partial_fcn;
4520 for(i=0;i<sl_list.size();i++){
4521 find_partial_fcns(sl_list[i], &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
4523 wh_fcns_start = sl_fcns_end = partial_fcns.size();
4524 for(i=0;i<where.size();i++){
4525 find_partial_fcns_pr(where[i]->pr, &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
4527 gb_fcns_start = wh_fcns_end = partial_fcns.size();
4529 for(i=0;i<gb_tbl->size();i++){
4530 find_partial_fcns(gb_tbl->get_def(i), &partial_fcns, NULL, NULL, Ext_fcns);
4533 ag_fcns_start = gb_fcns_end = partial_fcns.size();
4534 if(aggr_tbl != NULL){
4535 for(i=0;i<aggr_tbl->size();i++){
4536 find_partial_fcns(aggr_tbl->get_aggr_se(i), NULL, NULL, &is_partial_fcn, Ext_fcns);
4539 ag_fcns_end = partial_fcns.size();
4541 // Fill up the is_partial_fcn and fcn_ref_cnt arrays.
4543 for(i=0; i<partial_fcns.size();i++){
4544 fcn_ref_cnt.push_back(1);
4545 is_partial_fcn.push_back(true);
4549 // Unmark non-partial expensive functions referenced only once.
4550 for(i=0; i<partial_fcns.size();i++){
4551 if(!is_partial_fcn[i] && fcn_ref_cnt[i] <= 1){
4552 partial_fcns[i]->set_partial_ref(-1);
4556 node_name = normalize_name(node_name);
4558 retval += generate_preamble(schema, /*int_fcn_defs,*/ node_name, schema_embed_str);
4560 if(packed_return){ // generate unpack struct
4561 vector<tablevar_t *> input_tbls = fs->get_input_tbls();
4562 int schref = input_tbls[0]->get_schema_ref();
4563 vector<string> refd_cols;
4564 for(s=0;s<sl_list.size();++s){
4565 gather_nicsafe_cols(sl_list[s],refd_cols, nicp, gb_tbl);
4567 for(p=0;p<where.size();++p){
4568 // I'm not disabling these preds ...
4569 gather_nicsafe_cols(where[p]->pr,refd_cols, nicp, gb_tbl);
4572 for(g=0;g<gb_tbl->size();++g){
4573 gather_nicsafe_cols(gb_tbl->get_def(g),refd_cols, nicp, gb_tbl);
4576 sort(refd_cols.begin(), refd_cols.end());
4577 retval += "struct "+node_name+"_input_struct{\n";
4578 retval += "\tint __lfta_id_fm_nic__;\n";
4580 for(vsi=0;vsi<refd_cols.size();++vsi){
4581 data_type dt(schema->get_type_name(schref,refd_cols[vsi]));
4582 retval+="\t"+dt.get_cvar_type()+" unpack_var_"+refd_cols[vsi]+";\n";
4588 /////////////////////////////////////////////////////
4589 // Common stuff unpacked, do some generation
4593 retval += generate_aggr_struct(node_name, gb_tbl, aggr_tbl);
4595 retval += generate_fj_struct((filter_join_qpn *)fs, node_name);
4597 retval += "\n\n// watchtable code here \n\n";
4598 watch_tbl_qpn *wl_node = (watch_tbl_qpn *)fs;
4599 retval += generate_watchlist_structs(node_name, wl_node->table_layout, wl_node->filename, wl_node->refresh_interval);
4600 retval += generate_watchlist_load(node_name, wl_node->table_layout, wl_node->key_flds);
4604 retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, is_wj, uses_bloom, schema);
4605 retval += generate_tuple_struct(node_name, sl_list) ;
4608 retval += generate_fta_flush(node_name, schema, Ext_fcns) ;
4609 if(param_tbl->size() > 0)
4610 retval += generate_fta_load_params(node_name) ;
4611 retval += generate_fta_free(node_name, is_aggr_query) ;
4612 retval += generate_fta_control(node_name, schema, is_aggr_query) ;
4613 retval += generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, is_wj, s_pids) ;
4615 /* extract the value of Time_Correlation from interface definition */
4619 vector<tablevar_t *> tvec = fs->get_input_tbls();
4620 vector<string> time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es);
4621 if (time_corr_vec.empty())
4622 time_corr = DEFAULT_TIME_CORR;
4624 time_corr = atoi(time_corr_vec[0].c_str());
4626 retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query, is_wj) );
4627 retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) );
4635 int compute_snap_len(qp_node *fs, table_list *schema, string snap_type){
4637 // Initialize global vars
4639 sl_list.clear(); where.clear();
4642 if(fs->node_type() == "watch_tbl_qpn"){
4646 if(fs->node_type() == "spx_qpn"){
4647 spx_qpn *spx_node = (spx_qpn *)fs;
4648 sl_list = spx_node->get_select_se_list();
4649 where = spx_node->get_where_clause();
4651 else if(fs->node_type() == "sgah_qpn"){
4652 sgah_qpn *sgah_node = (sgah_qpn *)fs;
4653 sl_list = sgah_node->get_select_se_list();
4654 where = sgah_node->get_where_clause();
4655 gb_tbl = sgah_node->get_gb_tbl();
4657 else if(fs->node_type() == "filter_join"){
4658 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
4659 sl_list = fj_node->get_select_se_list();
4660 where = fj_node->get_where_clause();
4662 else if(fs->node_type() == "watch_join"){
4663 watch_join_qpn *fj_node = (watch_join_qpn *)fs;
4664 sl_list = fj_node->get_select_se_list();
4665 where = fj_node->get_where_clause();
4667 fprintf(stderr,"INTERNAL ERROR, node type %s not recognized in compute_snap_len\n",fs->node_type().c_str());
4671 // Gather all column references, need to define unpacking variables.
4674 col_id_set::iterator csi;
4676 for(w=0;w<where.size();++w)
4677 gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
4678 for(s=0;s<sl_list.size();s++){
4679 gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
4684 for(g=0;g<gb_tbl->size();g++)
4685 gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
4688 // compute snap length
4691 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4692 int schref = (*csi).schema_ref;
4693 int tblref = (*csi).tblvar_ref;
4694 string field = (*csi).field;
4696 if(snap_type == "index"){
4697 int pos = schema->get_field_idx(schref, field);
4698 if(pos>snap_len) snap_len = pos;
4701 param_list *field_params = schema->get_modifier_list(schref, field);
4702 if(field_params->contains_key("snap_len")){
4703 string fld_snap_str = field_params->val_of("snap_len");
4705 if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
4706 if(fld_snap > snap_len) snap_len = fld_snap;
4709 fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
4715 if(n_snap == cid_set.size()){
4724 // Function which computes an optimal
4725 // set of unpacking functions.
4727 void find_optimal_unpack_fcns(col_id_set &upref_cids, table_list *Schema, map<col_id, string,lt_col_id> &ucol_fcn_map){
4728 map<string, int> pfcn_count;
4729 map<string, int>::iterator msii;
4730 col_id_set::iterator cisi;
4731 set<string>::iterator ssi;
4734 while(ucol_fcn_map.size() < upref_cids.size()){
4736 // Gather unpack functions referenced by unaccounted-for
4737 // columns, and increment their reference count.
4739 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
4740 if(ucol_fcn_map.count((*cisi)) == 0){
4741 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
4742 for(ssi=ufcns.begin();ssi!=ufcns.end();++ssi)
4743 pfcn_count[(*ssi)]++;
4747 // Get the lowest cost per field function.
4748 float min_cost = 0.0;
4749 string best_fcn = "";
4750 for(msii=pfcn_count.begin();msii!=pfcn_count.end();++msii){
4751 int fcost = Schema->get_ufcn_cost((*msii).first);
4753 fprintf(stderr,"CONFIGURATION ERROR, unpack function %s either has negative cost or is not defined.\n",(*msii).first.c_str());
4756 float this_cost = (1.0*fcost)/(*msii).second;
4757 if(msii == pfcn_count.begin() || this_cost < min_cost){
4758 min_cost = this_cost;
4759 best_fcn = (*msii).first;
4763 fprintf(stderr,"ERROR, could not find a best field unpqacking function.\n");
4767 // Assign this function to the unassigned fcns which use it.
4768 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
4769 if(ucol_fcn_map.count((*cisi)) == 0){
4770 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
4771 if(ufcns.count(best_fcn)>0)
4772 ucol_fcn_map[(*cisi)] = best_fcn;
4780 // Generate an initial test test for the lfta
4781 // Assume that the predicate references no external functions,
4782 // and especially no partial functions,
4783 // aggregates, internal functions.
4784 string generate_lfta_prefilter(vector<cnf_set *> &pred_list,
4785 col_id_set &temp_cids, table_list *Schema, ext_fcn_list *Ext_fcns,
4786 vector<col_id_set> &lfta_cols, vector<long long int> &lfta_sigs,
4787 vector<int> &lfta_snap_lens, string iface){
4788 col_id_set tmp_cid_set, cid_set,upref_cids,upall_cids;
4789 col_id_set::iterator csi;
4793 // Gather complex literals in the prefilter.
4794 cplx_lit_table *complex_literals = new cplx_lit_table();
4795 for(p=0;p<pred_list.size();++p){
4796 find_complex_literal_pr(pred_list[p]->pr,Ext_fcns, complex_literals);
4800 // Find the combinable predicates
4801 vector<predicate_t *> pr_list;
4802 for(p=0;p<pred_list.size();++p){
4803 find_combinable_preds(pred_list[p]->pr,&pr_list, Schema, Ext_fcns);
4806 // Analyze the combinable predicates to find the predicate classes.
4807 pred_class.clear(); // idx to equiv pred in equiv_list
4808 pred_pos.clear(); // idx to returned bitmask.
4809 vector<predicate_t *> equiv_list;
4810 vector<int> num_equiv;
4813 for(p=0;p<pr_list.size();++p){
4814 for(q=0;q<equiv_list.size();++q){
4815 if(is_equivalent_class_pred_base(equiv_list[q],pr_list[p],Schema,Ext_fcns))
4818 if(q == equiv_list.size()){ // no equiv : create new
4819 pred_class.push_back(equiv_list.size());
4820 equiv_list.push_back(pr_list[p]);
4821 pred_pos.push_back(0);
4822 num_equiv.push_back(1);
4824 }else{ // pr_list[p] is equivalent to pred q
4825 pred_class.push_back(q);
4826 pred_pos.push_back(num_equiv[q]);
4831 // Generate the variables which hold the common pred handles
4832 ret += "/*\t\tprefilter global vars.\t*/\n";
4833 for(q=0;q<equiv_list.size();++q){
4834 for(p=0;p<=(num_equiv[q]/32);++p){
4835 ret += "void *pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface+";\n";
4839 // Struct to hold prefilter complex literals
4840 ret += "struct prefilter_complex_lit_struct_"+iface+" {\n";
4841 if(complex_literals->size() == 0)
4842 ret += "\tint no_variable;\n";
4844 for(cl=0;cl<complex_literals->size();cl++){
4845 literal_t *l = complex_literals->get_literal(cl);
4846 data_type *dtl = new data_type( l->get_type() );
4847 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
4850 ret += "} prefilter_complex_lits_"+iface+";\n\n";
4853 // Generate the prefilter initialziation code
4854 ret += "void init_lfta_prefilter_"+iface+"(){\n";
4856 // First initialize complex literals, if any.
4857 ret += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4858 for(cl=0;cl<complex_literals->size();cl++){
4859 literal_t *l = complex_literals->get_literal(cl);
4860 sprintf(tmpstr,"&(t->complex_literal_%d)",cl);
4861 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
4865 set<int> epred_seen;
4866 for(p=0;p<pr_list.size();++p){
4867 int q = pred_class[p];
4868 //printf("\tq=%d\n",q);
4869 if(epred_seen.count(q)>0){
4870 ret += "\tregister_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4871 vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4872 vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4873 for(o=0;o<op_list.size();++o){
4875 ret += generate_se_code(op_list[o],Schema)+", ";
4878 ret += "pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+","+int_to_string(pred_pos[p]%32)+");\n";
4879 epred_seen.insert(q);
4881 ret += "\tpref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+" = (void *)register_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4882 vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4883 vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4884 for(o=0;o<op_list.size();++o){
4886 ret += generate_se_code(op_list[o],Schema)+", ";
4889 ret += "NULL,"+int_to_string(pred_pos[p]%32)+");\n";
4890 epred_seen.insert(q);
4897 // Start on main body code generation
4898 ret+="gs_uint64_t lfta_prefilter_"+iface+"(void *pkt){\n";
4901 ///--------------------------------------------------------------
4902 /// Generate and store the prefilter body,
4903 /// reuse it for the snap length calculator
4904 ///-------------------------------------------------------------
4907 body += "\tstruct packet *p = (struct packet *)pkt;\n";
4911 // Gather the colids to store unpacked variables.
4912 for(p=0;p<pred_list.size();++p){
4913 gather_pr_col_ids(pred_list[p]->pr,tmp_cid_set, gb_tbl);
4916 // make the col_ids refer to the base tables, and
4917 // grab the col_ids with at least one unpacking function.
4918 for(csi=tmp_cid_set.begin();csi!=tmp_cid_set.end();++csi){
4919 string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
4921 tmp_col_id.field = (*csi).field;
4922 tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
4923 tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
4924 cid_set.insert(tmp_col_id);
4925 field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
4926 if(fe->get_unpack_fcns().size()>0)
4927 upref_cids.insert(tmp_col_id);
4932 // Find the set of unpacking programs needed for the
4933 // prefilter fields.
4934 map<col_id, string,lt_col_id> ucol_fcn_map;
4935 find_optimal_unpack_fcns(upref_cids, Schema, ucol_fcn_map);
4936 set<string> pref_ufcns;
4937 map<col_id, string,lt_col_id>::iterator mcis;
4938 for(mcis=ucol_fcn_map.begin(); mcis!=ucol_fcn_map.end(); mcis++){
4939 pref_ufcns.insert((*mcis).second);
4944 // Variables for unpacking attributes.
4945 body += "/*\t\tVariables for unpacking attributes\t*/\n";
4946 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4947 int schref = (*csi).schema_ref;
4948 int tblref = (*csi).tblvar_ref;
4949 string field = (*csi).field;
4950 data_type dt(Schema->get_type_name(schref,field));
4951 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4952 field.c_str(), tblref);
4954 sprintf(tmpstr,"\tgs_retval_t ret_%s_%d;\n", field.c_str(),tblref);
4957 // Variables for unpacking temporal attributes.
4958 body += "/*\t\tVariables for unpacking temporal attributes\t*/\n";
4959 for(csi=temp_cids.begin(); csi!=temp_cids.end();++csi){
4960 if (cid_set.count(*csi) == 0) {
4961 int schref = (*csi).schema_ref;
4962 int tblref = (*csi).tblvar_ref;
4963 string field = (*csi).field;
4964 data_type dt(Schema->get_type_name(schref,field));
4965 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4966 field.c_str(), tblref);
4973 // Variables for combinable predicate evaluation
4974 body += "/*\t\tVariables for common prdicate evaluation\t*/\n";
4975 for(q=0;q<equiv_list.size();++q){
4976 for(p=0;p<=(num_equiv[q]/32);++p){
4977 body += "unsigned long int pref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = 0;\n";
4982 // Variables that are always needed
4983 body += "/*\t\tVariables which are always needed\t*/\n";
4984 body += "\tgs_uint64_t retval=0, bitpos=1;\n";
4985 body += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4987 // Call the unpacking functions for the prefilter fields
4988 if(pref_ufcns.size() > 0)
4989 body += "\n/*\t\tcall field unpacking functions\t*/\n";
4990 set<string>::iterator ssi;
4991 for(ssi=pref_ufcns.begin(); ssi!=pref_ufcns.end(); ++ssi){
4992 body += "\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
4996 // Unpack the accessed attributes
4997 body += "\n/*\t\tUnpack the accessed attributes.\t*/\n";
4998 for(csi=cid_set.begin();csi!=cid_set.end();++csi){
4999 int tblref = (*csi).tblvar_ref;
5000 int schref = (*csi).schema_ref;
5001 string field = (*csi).field;
5002 sprintf(tmpstr,"\tret_%s_%d = (%s(p, &unpack_var_%s_%d) == 0);\n",
5003 field.c_str(),tblref,Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
5007 // next unpack the temporal attributes and ignore the errors
5008 // We are assuming here that failed unpack of temporal attributes
5009 // is not going to overwrite the last stored value
5010 // Failed upacks are ignored
5011 for(csi=temp_cids.begin();csi!=temp_cids.end();++csi){
5012 int tblref = (*csi).tblvar_ref;
5013 int schref = (*csi).schema_ref;
5014 string field = (*csi).field;
5015 sprintf(tmpstr,"\t%s(p, &prefilter_temp_vars.unpack_var_%s_%d);\n",
5016 Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
5020 // Evaluate the combinable predicates
5021 if(equiv_list.size()>0)
5022 body += "/*\t\tEvaluate the combinable predicates.\t*/\n";
5023 for(q=0;q<equiv_list.size();++q){
5024 for(p=0;p<=(num_equiv[q]/32);++p){
5026 // Only call the common eval fcn if all ref'd fields present.
5027 col_id_set pred_cids;
5028 col_id_set::iterator cpi;
5029 gather_pr_col_ids(equiv_list[q], pred_cids, gb_tbl);
5030 if(pred_cids.size()>0){
5032 for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
5033 if(cpi != pred_cids.begin())
5035 string field = (*cpi).field;
5036 int tblref = (*cpi).tblvar_ref;
5037 body += "ret_"+field+"_"+int_to_string(tblref);
5042 body += "\t\tpref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = eval_commonpred_"+equiv_list[q]->get_op()+"(pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface;
5043 vector<scalarexp_t *> op_list = equiv_list[q]->get_op_list();
5044 vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
5045 for(o=0;o<op_list.size();++o){
5047 body += ","+generate_se_code(op_list[o],Schema);
5055 for(p=0;p<pred_list.size();++p){
5056 col_id_set pred_cids;
5057 col_id_set::iterator cpi;
5058 gather_pr_col_ids(pred_list[p]->pr,pred_cids, gb_tbl);
5059 if(pred_cids.size()>0){
5061 for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
5062 if(cpi != pred_cids.begin())
5064 string field = (*cpi).field;
5065 int tblref = (*cpi).tblvar_ref;
5066 body += "ret_"+field+"_"+int_to_string(tblref);
5070 body += "\t\tif("+generate_predicate_code(pred_list[p]->pr,Schema)+")\n\t\t\tretval |= bitpos;\n";
5071 body+="\tbitpos = bitpos << 1;\n";
5074 // ---------------------------------------------------------------
5075 // Finished with the body of the prefilter
5076 // --------------------------------------------------------------
5080 // Collect fields referenced by an lfta but not
5081 // already unpacked for the prefilter.
5083 //printf("upref_cids is:\n");
5084 //for(csi=upref_cids.begin();csi!=upref_cids.end();csi++)
5085 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
5086 //printf("pref_ufcns is:\n");
5087 //for(ssi=pref_ufcns.begin();ssi!=pref_ufcns.end();++ssi)
5088 //printf("\t%s\n",(*ssi).c_str());
5091 for(l=0;l<lfta_cols.size();++l){
5092 for(csi=lfta_cols[l].begin();csi!=lfta_cols[l].end();++csi){
5093 string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
5095 tmp_col_id.field = (*csi).field;
5096 tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
5097 tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
5098 field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
5099 set<string> fld_ufcns = fe->get_unpack_fcns();
5100 //printf("tmpcol is (%s, %d), ufcns size is %d, upref_cids cnt is %d\n",tmp_col_id.field.c_str(),tmp_col_id.schema_ref,fld_ufcns.size(), upref_cids.count(tmp_col_id));
5101 if(fld_ufcns.size()>0 && upref_cids.count(tmp_col_id) == 0){
5102 // Ensure that this field not already unpacked.
5104 for(ssi=fld_ufcns.begin();ssi!=fld_ufcns.end();++ssi){
5105 //printf("\tField has unpacking fcn %s\n",(*ssi).c_str());
5106 if(pref_ufcns.count((*ssi))){
5107 //printf("Field already unpacked.\n");
5112 //printf("\tadding to unpack list\n");
5113 upall_cids.insert(tmp_col_id);
5119 //printf("upall_cids is:\n");
5120 //for(csi=upall_cids.begin();csi!=upall_cids.end();csi++)
5121 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
5123 // Get the set of unpacking programs for these.
5124 map<col_id, string,lt_col_id> uall_fcn_map;
5125 find_optimal_unpack_fcns(upall_cids, Schema, uall_fcn_map);
5126 set<string> pall_ufcns;
5127 for(mcis=uall_fcn_map.begin(); mcis!=uall_fcn_map.end(); mcis++){
5128 //printf("uall_fcn_map[%s %d] = %s\n",(*mcis).first.field.c_str(),(*mcis).first.schema_ref,(*mcis).second.c_str());
5129 pall_ufcns.insert((*mcis).second);
5132 // Iterate through the remaining set of unpacking function
5133 if(pall_ufcns.size() > 0)
5134 ret += "//\t\tcall all remaining field unpacking functions.\n";
5135 for(ssi=pall_ufcns.begin(); ssi!=pall_ufcns.end(); ++ssi){
5136 // gather the set of columns unpacked by this ufcn
5137 col_id_set fcol_set;
5138 for(csi=upall_cids.begin();csi!=upall_cids.end();++csi){
5139 if(uall_fcn_map[(*csi)] == (*ssi))
5140 fcol_set.insert((*csi));
5143 // gather the set of lftas which access a field unpacked by the fcn
5144 set<long long int> clfta;
5145 for(l=0;l<lfta_cols.size();l++){
5146 for(csi=fcol_set.begin();csi!=fcol_set.end();++csi){
5147 if(lfta_cols[l].count((*csi)) > 0)
5150 if(csi != fcol_set.end())
5151 clfta.insert(lfta_sigs[l]);
5154 // generate the unpacking code
5156 set<long long int>::iterator sii;
5157 for(sii=clfta.begin();sii!=clfta.end();++sii){
5158 if(sii!=clfta.begin())
5160 sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sii),(*sii));
5163 ret += ")\n\t\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
5167 ret += "\treturn(retval);\n\n";
5171 // --------------------------------------------------------
5172 // reuse prefilter body for snaplen calculator
5174 // This is dummy code, so I'm commenting it out.
5177 ret+="gs_uint32_t lfta_pkt_snaplen(void *pkt){\n";
5182 vector<int> s_snaps = lfta_snap_lens;
5183 sort(s_snaps.begin(), s_snaps.end());
5185 if(s_snaps[0] == -1){
5186 set<unsigned long long int> sigset;
5187 for(i=0;i<lfta_snap_lens.size();++i){
5188 if(lfta_snap_lens[i] == -1){
5189 sigset.insert(lfta_sigs[i]);
5193 set<unsigned long long int>::iterator sulli;
5194 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
5195 if(sulli!=sigset.begin())
5197 sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
5200 ret += ") return -1;\n";
5203 int nextpos = lfta_snap_lens.size()-1;
5204 int nextval = lfta_snap_lens[nextpos];
5205 while(nextval >= 0){
5206 set<unsigned long long int> sigset;
5207 for(i=0;i<lfta_snap_lens.size();++i){
5208 if(lfta_snap_lens[i] == nextval){
5209 sigset.insert(lfta_sigs[i]);
5213 set<unsigned long long int>::iterator sulli;
5214 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
5215 if(sulli!=sigset.begin())
5217 sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
5220 ret += ") return "+int_to_string(nextval)+";\n";
5222 for(nextpos--;nextpos>0 && lfta_snap_lens[nextpos] == nextval;nextpos--);
5224 nextval = lfta_snap_lens[nextpos];
5228 ret += "\treturn 0;\n";
5239 // Generate the struct which will store the the values of
5240 // temporal attributesunpacked by prefilter
5241 string generate_lfta_prefilter_struct(col_id_set &cid_set, table_list *Schema) {
5243 col_id_set::iterator csi;
5245 // printf("generate_lfta_prefilter_struct : %d vars\n",cid_set.size());
5247 string ret="struct prefilter_unpacked_temp_vars {\n";
5248 ret += "\t/*\tVariables for unpacking temporal attributes\t*/\n";
5252 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
5253 int schref = (*csi).schema_ref;
5254 int tblref = (*csi).tblvar_ref;
5255 string field = (*csi).field;
5256 data_type dt(Schema->get_type_name(schref,field), Schema->get_modifier_list(schref,field));
5257 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
5258 field.c_str(), tblref);
5261 if (init_code != "")
5263 if (dt.is_increasing())
5264 init_code += dt.get_min_literal();
5266 init_code += dt.get_max_literal();
5271 ret += "struct prefilter_unpacked_temp_vars prefilter_temp_vars = {" + init_code + "};\n\n";