1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
22 #include "parse_fta.h"
23 #include "parse_schema.h"
24 #include "analyze_fta.h"
25 #include "generate_utils.h"
26 #include "query_plan.h"
27 #include "generate_lfta_code.h"
28 #include "generate_nic_code.h"
32 extern int DEFAULT_LFTA_HASH_TABLE_SIZE;
34 // default value for correlation between the interface card and
36 #define DEFAULT_TIME_CORR 16
41 extern string hash_nums[NRANDS];
44 "12916008961267169387ull", "13447227858232756685ull",
45 "15651770379918602919ull", "1154671861688431608ull",
46 "6777078091984849858ull", "14217205709582564356ull",
47 "4955408621820609982ull", "15813680319165523695ull",
48 "9897969721407807129ull", "5799700135519793083ull",
49 "3446529189623437397ull", "2766403683465910630ull",
50 "3759321430908793328ull", "6569396511892890354ull",
51 "11124853911180290924ull", "17425412145238035549ull",
52 "6879931585355039943ull", "16598635011539670441ull",
53 "9615975578494811651ull", "4378135509538422740ull",
54 "741282195344332574ull", "17368612862906255584ull",
55 "17294299200556814618ull", "518343398779663051ull",
56 "3861893449302272757ull", "8951107288843549591ull",
57 "15785139392894559409ull", "5917810836789601602ull",
58 "16169988133001117004ull", "9792861259254509262ull",
59 "5089058010244872136ull", "2130075224835397689ull",
60 "844136788226150435ull", "1303298091153875333ull",
61 "3579898206894361183ull", "7529542662845336496ull",
62 "13151949992653382522ull", "2145333536541545660ull",
63 "11258221828939586934ull", "3741808146124570279ull",
64 "16272841626371307089ull", "12174572036188391283ull",
65 "9749343496254107661ull", "9141275584134508830ull",
66 "10134192232065698216ull", "12944268412561423018ull",
67 "17499725811865666340ull", "5281482378159088661ull",
68 "13254803486023572607ull", "4526762838498717025ull",
69 "15990846379668494011ull", "10680949816169027468ull",
70 "7116154096012931030ull", "5296740689865236632ull",
71 "5222427027515795922ull", "6893215299448261251ull",
72 "10164707755932877485ull", "15325979189512082255ull",
73 "3713267224148573289ull", "12292682741753167354ull",
74 "4098115959960163588ull", "16095675565885113990ull",
75 "11391590846210510720ull", "8432889531466002673ull",
76 "7146668520368482523ull", "7678169991822407997ull",
77 "9882712513525031447ull", "13904414563513869160ull",
78 "1080076724395768626ull", "8448147843172150388ull",
79 "17633093729608185134ull", "10044622457050142303ull",
80 "4128911859292425737ull", "30642269109444395ull",
81 "16124215396922640581ull", "15444089895060081110ull",
82 "16437006538696302944ull", "800338649777443426ull",
83 "5355794945275091932ull", "11656354278827687117ull",
84 "1110873718944691255ull", "10829576045617693977ull",
85 "3846916616884579955ull", "17055821716837625668ull",
86 "13418968402643535758ull", "11671612594828802128ull",
87 "11597298928184328586ull", "13196028510862205499ull",
88 "16539578557089782373ull", "3182048322921507591ull",
89 "10016080431267550241ull", "148751875162592690ull",
90 "10400930266590768572ull", "4023803397139127870ull",
91 "17766462746879108920ull", "14807761432134600873ull",
92 "13521540421053792403ull", "13980983198941385205ull",
93 "16257584414193564367ull", "1760484796451765024ull"
98 // ----------------------------------------------
99 // Data extracted from the query plan node
100 // for use by code generation.
102 static cplx_lit_table *complex_literals; //Table of literals with constructors.
103 static vector<handle_param_tbl_entry *> param_handle_table;
104 static param_table *param_tbl; // Table of all referenced parameters.
106 static vector<scalarexp_t *> sl_list;
107 static vector<cnf_elem *> where;
109 static gb_table *gb_tbl; // Table of all group-by attributes.
110 static aggregate_table *aggr_tbl; // Table of all referenced aggregates.
112 static bool packed_return; // unpack using structyure, not fcns
113 static nic_property *nicprop; // nic properties for this interface.
114 static int global_id;
117 // The partial_fcns vector can now refer to
118 // partial functions, or expensive functions
119 // which can be cached (if there are multiple refs). A couple
120 // of int vectors distinguish the cases.
121 static vector<scalarexp_t *> partial_fcns;
122 static vector<int> fcn_ref_cnt;
123 static vector<bool> is_partial_fcn;
124 int sl_fcns_start = 0, sl_fcns_end = 0;
125 int wh_fcns_start = 0, wh_fcns_end = 0;
126 int gb_fcns_start = 0, gb_fcns_end = 0;
127 int ag_fcns_start = 0, ag_fcns_end = 0;
130 // These vectors are for combinable predicates.
131 static vector<int> pred_class; // identifies the group
132 static vector<int> pred_pos; // position in the group.
136 static char tmpstr[1000];
138 //////////////////////////////////////////////////////////////////////
139 /// Various utilities
141 string generate_fta_name(string node_name){
142 string ret = normalize_name(node_name);
152 string generate_aggr_struct_name(string node_name){
153 string ret = normalize_name(node_name);
157 ret += "_aggr_struct";
162 string generate_fj_struct_name(string node_name){
163 string ret = normalize_name(node_name);
172 string generate_watchlist_element_name(string node_name){
173 string ret = normalize_name(node_name);
182 string generate_watchlist_struct_name(string node_name){
183 string ret = normalize_name(node_name);
187 ret += "__wl_struct";
192 string generate_watchlist_name(string node_name){
193 string ret = normalize_name(node_name);
202 string generate_unpack_code(int tblref, int schref, string field, table_list *schema, string node_name, string end_goto = string("end")){
205 sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n",
206 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
208 if(!schema->get_modifier_list(schref,field)->contains_key("required"))
209 ret += "\tif(retval) goto "+end_goto+";\n";
212 // TODO: ntoh xforms (aug 2010 : removing ntoh, hton)
213 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
214 if(dt.is_buffer_type()){
215 if(dt.get_type() != v_str_t){
216 ret += "\tif(sizeof(struct "+node_name+"_input_struct)+"+node_name+"_input_struct_var->unpack_var_"+field+".length+int("+node_name+"_input_struct_var->unpack_var_"+field+".data) > sz)\n";
217 ret += "\t\tgoto "+end_goto+";\n";
218 ret+= "\t\t"+node_name+"_input_struct_var->unpack_var_"+field+".data += "+node_name+"_input_struct_var->unpack_var_"+field+".length;\n";
219 ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
220 " = "+node_name+"_input_struct_var->unpack_var_"+field+";+\n";
222 fprintf(stderr,"INTERNAL ERROR buffer type not string type in generate_lfta_code.cc:generate_unpack_code\n");
226 ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
227 " = "+node_name+"_input_struct_var->unpack_var_"+field+";\n";
233 string generate_aggr_struct(string node_name, gb_table *gb_tbl, aggregate_table *aggr_tbl){
234 string ret = "struct " + generate_aggr_struct_name(node_name) + "{\n";
237 for(g=0;g<gb_tbl->size();g++){
238 sprintf(tmpstr,"gb_var%d",g);
239 ret += "\t"+gb_tbl->get_data_type(g)->make_cvar(tmpstr)+";\n";
243 for(a=0;a<aggr_tbl->size();a++){
245 sprintf(tmpstr,"aggr_var%d",a);
246 if(aggr_tbl->is_builtin(a))
247 ret+="\t"+aggr_tbl->get_data_type(a)->make_cvar(tmpstr)+";\n";
249 ret+="\t"+aggr_tbl->get_storage_type(a)->make_cvar(tmpstr)+";\n";
253 ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *next;\n";
262 string generate_fj_struct(filter_join_qpn *fs, string node_name ){
265 if(fs->use_bloom == false){ // uses hash table instead
266 ret = "struct " + generate_fj_struct_name(node_name) + "{\n";
268 for(k=0;k<fs->hash_eq.size();++k){
269 sprintf(tmpstr,"key_var%d",k);
270 ret += "\t"+fs->hash_eq[k]->pr->get_right_se()->get_data_type()->make_cvar(tmpstr)+";\n";
272 ret += "\tlong long int ts;\n";
279 string generate_watchlist_structs(string node_name, table_def *tbl,
280 std::string filename, int refresh_interval){
283 ret += "struct "+generate_watchlist_element_name(node_name)+"{\n";
284 vector<field_entry *> fields = tbl->get_fields();
285 for(int f=0;f<fields.size();++f){
286 data_type dt(fields[f]->get_type());
287 ret += "\t"+dt.make_cvar(fields[f]->get_name())+";\n";
289 ret += "\tgs_uint64_t hashval;\n";
290 ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next;\n";
293 ret += "char *"+generate_watchlist_name(node_name)+"__fstr = \""+filename+"\";\n";
294 ret += "struct "+generate_watchlist_struct_name(node_name)+"{\n";
295 ret += "\tstruct "+ generate_watchlist_element_name(node_name)+" **ht;\n";
296 ret += "\tgs_uint32_t ht_size;\n";
297 ret += "\tgs_uint32_t n_elem;\n";
298 ret += "\tgs_uint32_t refresh_interval;\n";
299 ret += "\ttime_t next_refresh;\n";
300 ret += "\ttime_t last_mtime;\n";
301 ret += "\tchar *filename;\n";
302 ret += "} "+generate_watchlist_name(node_name)+" = { NULL, 0, 0, "+std::to_string(refresh_interval)+", 0, 0, NULL};\n\n";
307 string generate_watchlist_load(string node_name, table_def *tbl, vector<string> keys){
309 string tgt = generate_watchlist_name(node_name);
310 vector<field_entry *> fields = tbl->get_fields();
312 ret += "void reload_watchlist__"+node_name+"(){\n";
313 ret += "\tgs_uint32_t i;\n";
314 ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *ptr = NULL;\n";
315 ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next = NULL;\n";
316 ret += "\tFILE *fl;\n";
317 ret += "\tchar buf[10000];\n";
318 ret += "\tgs_uint32_t buflen = 10000;\n";
319 ret += "\tchar *flds["+std::to_string(fields.size())+"];\n";
320 ret += "\tgs_uint32_t pos, f, linelen, malformed;\n";
321 ret += "\tgs_uint32_t n_malformed, short_lines, toolong_lines, ok;\n";
322 ret += "\tgs_uint64_t hash, bucket;\n";
323 ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *rec;\n\n";
325 ret += "// make sure watchlist file has changed since the last time we loaded it\n";
326 ret += "\tstruct stat file_stat;\n";
327 ret += "\tint err = stat(" + tgt + ".filename, &file_stat);\n";
328 ret += "\tif (err) {\n";
329 ret += "\t\tgslog(LOG_INFO,\"Warning, unable to stat() watchlist file %s to reload " + node_name + ", continue using old version\\n\", " + tgt + ".filename);\n";
330 ret += "\t\treturn;\n";
332 ret += "\tif (file_stat.st_mtime <= " + tgt + ".last_mtime && file_stat.st_ctime <= " + tgt + ".last_mtime) // watchlist file hasn't changed since last time\n";
333 ret += "\t\treturn;\n";
334 ret += "\t" + tgt + ".last_mtime = (file_stat.st_mtime>file_stat.st_ctime)?file_stat.st_mtime:file_stat.st_ctime;\n\n";
336 ret += "// Delete old entries.\n";
337 ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n";
338 ret += "\t\tptr="+tgt+".ht[i];\n";
339 ret += "\t\twhile(ptr!=NULL){\n";
340 for(int f=0;f<fields.size();++f){
341 data_type dt(fields[f]->get_type());
342 if(dt.is_buffer_type()){
343 ret += "\t\t\t"+dt.get_buffer_destroy()+"(&(ptr->"+fields[f]->get_name()+"));\n";
346 ret += "\t\t\tnext = ptr->next;\n";
347 ret += "\t\t\tfree(ptr);\n";
348 ret += "\t\t\tptr = next;\n";
351 ret += "\n// prepare new table. \n";
352 ret += "\tif("+tgt+".n_elem > "+tgt+".ht_size || "+tgt+".ht_size==0){\n";
353 ret += "\t\tif("+tgt+".ht)\n";
354 ret += "\t\t\tfree("+tgt+".ht);\n";
355 ret += "\t\tif("+tgt+".ht_size == 0)\n";
356 ret += "\t\t\t"+tgt+".ht_size = 100000;\n";
358 ret += "\t\t\t"+tgt+".ht_size = "+tgt+".n_elem;\n";
359 ret += "\t\t"+tgt+".ht = (struct "+generate_watchlist_element_name(node_name)+" **)malloc("+tgt+".ht_size * sizeof(struct "+generate_watchlist_element_name(node_name)+" *));\n";
361 ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n";
362 ret += "\t\t"+tgt+".ht[i] = NULL;\n";
364 ret += "\n// load new table\n";
365 ret += "\t"+tgt+".n_elem = 0;\n";
366 ret += "\tfl = fopen("+tgt+".filename, \"r\");\n";
367 ret += "\tif(fl==NULL){\n";
368 ret += "\t\tgslog(LOG_INFO,\"Warning, can't open file %s for watchlist "+node_name+"\\n\","+tgt+".filename);\n";
369 ret += "\t\treturn;\n";
371 ret += "\tmalformed = 0;\n";
372 ret += "\tshort_lines = 0;\n";
373 ret += "\ttoolong_lines = 0;\n";
374 ret += "\twhile(fgets(buf, buflen, fl) != NULL){\n";
375 ret += "\t\tlinelen = strlen(buf);\n";
376 ret += "\t\tbuf[linelen-1]='\\0'; // strip off trailing newline\n";
377 ret += "\t\tlinelen--;\n";
379 ret += "\t\tpos=0;\n";
380 ret += "\t\tmalformed=0;\n";
381 ret += "\t\tok=1;\n";
382 ret += "\t\tflds[0] = buf;\n";
383 ret += "\t\tfor(f=1;pos < linelen && f<"+std::to_string(fields.size())+";++f){\n";
384 ret += "\t\t\tfor(;pos < linelen && buf[pos]!=',' && buf[pos]!='\\n';++pos);\n";
385 ret += "\t\t\tif(pos >= linelen){\n";
386 ret += "\t\t\t\tmalformed = 1;\n";
387 ret += "\t\t\t\tbreak;\n";
389 ret += "\t\t\tbuf[pos]='\\0';\n";
390 ret += "\t\t\tpos++;\n";
391 ret += "\t\t\tflds[f]=buf+pos;\n";
393 ret += "\t\tif(malformed){\n";
394 ret += "\t\t\tok=0;\n";
395 ret += "\t\t\tn_malformed++;\n";
397 ret += "\t\tif(f<"+std::to_string(fields.size())+"){\n";
398 ret += "\t\t\tok=0;\n";
399 ret += "\t\t\tshort_lines++;\n";
401 ret += "\t\tif(pos && (pos<linelen)){\n";
402 ret += "\t\t\tok=0;\n";
403 ret += "\t\t\ttoolong_lines++;\n";
405 ret += "\t\tif(f>="+std::to_string(fields.size())+"){\n";
406 ret += "\t\t\trec = (struct "+generate_watchlist_element_name(node_name)+" *)malloc(sizeof(struct "+generate_watchlist_element_name(node_name)+"));\n";
408 for(int f=0;f<fields.size();++f){
409 data_type dt(fields[f]->get_type());
410 ret += "\t\t\t"+dt.get_wl_extract_fcn()+"(flds["+std::to_string(f)+"], &(rec->"+fields[f]->get_name()+"));\n";
412 // Compute the hash value
413 ret += "\t\t\thash=0;\n";
414 for(int k=0;k<keys.size();++k){
415 string key_fld = keys[k];
417 for(f=0;f<fields.size();++f){
418 if(fields[f]->get_name() == key_fld)
421 data_type dt(fields[f]->get_type());
424 "\t\t\thash ^= (("+hash_nums[f%NRANDS]+" * lfta_"+
425 dt.get_type_str()+"_to_hash(rec->"+fields[f]->get_name()+")));\n";
427 ret += "\t\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n";
428 ret += "\t\t\trec->hashval = hash;\n";
429 ret += "\t\t\trec->next = "+tgt+".ht[bucket];\n";
430 ret += "\t\t\t"+tgt+".ht[bucket] = rec;\n";
431 ret += "\t\t\t"+tgt+".n_elem++;\n";
435 ret += "\tif(n_malformed+toolong_lines > 0){\n";
436 ret += "\t\tgslog(LOG_INFO,\"Errors reading data for watchlist "+node_name+" from file %s: malformed=%d, too short=%d, too long=%d\\n\","+tgt+".filename, malformed, short_lines, toolong_lines);\n";
446 string generate_fta_struct(string node_name, gb_table *gb_tbl,
447 aggregate_table *aggr_tbl, param_table *param_tbl,
448 cplx_lit_table *complex_literals,
449 vector<handle_param_tbl_entry *> ¶m_handle_table,
450 bool is_aggr_query, bool is_fj, bool is_wj, bool uses_bloom,
453 string ret = "struct " + generate_fta_name(node_name) + "{\n";
454 ret += "\tstruct FTA f;\n";
456 //-------------------------------------------------------------
457 // Aggregate-specific fields
461 ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_head, *flush_head;\n";
463 ret+="\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_table; // the groups\n";
464 ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n";
465 // ret+="\tint bitmap_size;\n";
466 ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n";
467 ret += "\tgs_int32_t n_ticks; // for limiting slow flush\n";
468 ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n";
469 ret += "\tint max_windows; // max number of open windows.\n";
470 ret += "\tunsigned int generation; // initially zero, increment on\n";
471 ret += "\t // every hash table flush - whether regular or induced.\n";
472 ret += "\t // Old groups are identified by a generation mismatch.\n";
473 ret += "\tunsigned int flush_pos; // next aggr_table entry to examine\n";
474 ret += "\tunsigned int flush_ctr; // control slow flushing\n";
479 bool uses_temporal_flush = false;
480 for(g=0;g<gb_tbl->size();g++){
481 data_type *dt = gb_tbl->get_data_type(g);
482 if(dt->is_temporal()){
484 fprintf(stderr,"group by attribute %s is temporal, ",
485 gb_tbl->get_name(g).c_str());
486 if(dt->is_increasing()){
487 fprintf(stderr,"increasing.\n");
489 fprintf(stderr,"decreasing.\n");
492 data_type *gdt = gb_tbl->get_data_type(g);
493 if(gdt->is_buffer_type()){
494 fprintf(stderr, "\t but temporal BUFFER types are not supported, skipping.\n");
496 sprintf(tmpstr,"\t%s last_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
498 sprintf(tmpstr,"\t%s flush_start_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
500 sprintf(tmpstr,"\t%s last_flushed_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
502 uses_temporal_flush = true;
508 if(! uses_temporal_flush){
509 fprintf(stderr,"Warning: no temporal flush.\n");
513 // ---------------------------------------------------------
514 // Filter-join specific fields
519 "\tunsigned char * bf_table; //array of bloom filters with layout \n"
520 "\t\t// bit 0 bf 0| bit 0 bf 1| bit 0 bf 2| bit 1 bf 0| bit 1 bf 1|.....\n"
521 "\tint first_exec;\n"
522 "\tlong long int last_bin;\n"
523 "\tint last_bloom_pos;\n"
526 }else{ // limited hash table
528 " struct "+generate_fj_struct_name(node_name)+" *join_table;\n"
535 // --------------------------------------------
536 // watchlist-join specific
538 ret += "\ttime_t ux_time;\n";
541 //--------------------------------------------------------
544 // Create places to hold the parameters.
546 vector<string> param_vec = param_tbl->get_param_names();
547 for(p=0;p<param_vec.size();p++){
548 data_type *dt = param_tbl->get_data_type(param_vec[p]);
549 sprintf(tmpstr,"\t%s param_%s;\n",dt->get_cvar_type().c_str(),
550 param_vec[p].c_str());
552 if(param_tbl->handle_access(param_vec[p])){
553 ret += "\tstruct search_handle *param_handle_"+param_vec[p]+";\n";
557 // Create places to hold complex literals.
559 for(cl=0;cl<complex_literals->size();cl++){
560 literal_t *l = complex_literals->get_literal(cl);
561 data_type *dtl = new data_type( l->get_type() );
562 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
566 // Create places to hold the pass-by-handle parameters.
567 for(p=0;p<param_handle_table.size();++p){
568 sprintf(tmpstr,"\tgs_param_handle_t handle_param_%d;\n",p);
572 // Create places to hold the last values of temporal
573 // attributes referenced in select clause
574 // we also need to store values of the temoral attributed
575 // of last flushed tuple in aggr queries
576 // to make sure we generate the cirrect temporal tuple
577 // in the presense of slow flushes
580 col_id_set temp_cids; // col ids of temp attributes in select clause
583 col_id_set::iterator csi;
585 for(s=0;s<sl_list.size();s++){
586 data_type *sdt = sl_list[s]->get_data_type();
587 if (sdt->is_temporal()) {
588 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
592 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
593 int tblref = (*csi).tblvar_ref;
594 int schref = (*csi).schema_ref;
595 string field = (*csi).field;
596 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
597 sprintf(tmpstr,"\t%s last_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
601 ret += "\tgs_uint64_t trace_id;\n\n";
603 // Fields to store the runtime stats
605 ret += "\tgs_uint32_t in_tuple_cnt;\n";
606 ret += "\tgs_uint32_t out_tuple_cnt;\n";
607 ret += "\tgs_uint32_t out_tuple_sz;\n";
608 ret += "\tgs_uint32_t accepted_tuple_cnt;\n";
609 ret += "\tgs_uint64_t cycle_cnt;\n";
610 ret += "\tgs_uint32_t collision_cnt;\n";
611 ret += "\tgs_uint32_t eviction_cnt;\n";
612 ret += "\tgs_float_t sampling_rate;\n";
621 //------------------------------------------------------------
622 // Set colref tblvars to 0..
623 // (special processing for join-like operators in an lfta).
625 void reset_se_col_ids_tblvars(scalarexp_t *se, gb_table *gtbl){
626 vector<scalarexp_t *> operands;
632 switch(se->get_operator_type()){
638 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
641 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
642 reset_se_col_ids_tblvars(se->get_right_se(),gtbl);
646 se->get_colref()->set_tablevar_ref(0);
649 fprintf(stderr,"INTERNAL ERROR: gbvar ref in gather_se_col_ids, but gtbl is NULL.\n");
652 reset_se_col_ids_tblvars(gtbl->get_def(se->get_gb_ref()),gtbl);
658 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
661 operands = se->get_operands();
662 for(o=0;o<operands.size();o++){
663 reset_se_col_ids_tblvars(operands[o], gtbl);
667 fprintf(stderr,"INTERNAL ERROR in reset_se_col_ids_tblvars, line %d, character %d: unknown operator type %d\n",
668 se->get_lineno(), se->get_charno(),se->get_operator_type());
674 // reset column tblvars accessed in this pr.
676 void reset_pr_col_ids_tblvars(predicate_t *pr, gb_table *gtbl){
677 vector<scalarexp_t *> op_list;
680 switch(pr->get_operator_type()){
682 reset_se_col_ids_tblvars(pr->get_left_se(), gtbl);
685 reset_se_col_ids_tblvars(pr->get_left_se(),gtbl) ;
686 reset_se_col_ids_tblvars(pr->get_right_se(),gtbl) ;
689 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
692 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
693 reset_pr_col_ids_tblvars(pr->get_right_pr(),gtbl) ;
696 op_list = pr->get_op_list();
697 for(o=0;o<op_list.size();++o){
698 reset_se_col_ids_tblvars(op_list[o],gtbl) ;
702 fprintf(stderr,"INTERNAL ERROR in reset_pr_col_ids_tblvars, line %d, character %d, unknown predicate operator type %d\n",
703 pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
710 // Generate code that makes reference
711 // to the tuple, and not to any aggregates.
712 static string generate_se_code(scalarexp_t *se,table_list *schema){
714 data_type *ldt, *rdt;
716 vector<scalarexp_t *> operands;
719 switch(se->get_operator_type()){
721 if(se->is_handle_ref()){
722 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
726 if(se->get_literal()->is_cpx_lit()){
727 sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
731 return(se->get_literal()->to_C_code("")); // not complex, no constructor
733 if(se->is_handle_ref()){
734 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
739 ret += se->get_param_name();
742 ldt = se->get_left_se()->get_data_type();
743 if(ldt->complex_operator(se->get_op()) ){
744 ret += ldt->get_complex_operator(se->get_op());
746 ret += generate_se_code(se->get_left_se(),schema);
751 ret += generate_se_code(se->get_left_se(),schema);
756 ldt = se->get_left_se()->get_data_type();
757 rdt = se->get_right_se()->get_data_type();
759 if(ldt->complex_operator(rdt, se->get_op()) ){
760 ret += ldt->get_complex_operator(rdt, se->get_op());
762 ret += generate_se_code(se->get_left_se(),schema);
764 ret += generate_se_code(se->get_right_se(),schema);
768 ret += generate_se_code(se->get_left_se(),schema);
770 ret += generate_se_code(se->get_right_se(),schema);
775 if(se->is_gb()){ // OK to ref gb attrs, but they're not yet unpacked ...
776 // so return the defining code.
777 ret = generate_se_code(gb_tbl->get_def(se->get_gb_ref()), schema );
780 sprintf(tmpstr,"unpack_var_%s_%d",
781 se->get_colref()->get_field().c_str(), se->get_colref()->get_tablevar_ref() );
786 // Should not be ref'ing any aggr here.
787 if(se->get_aggr_ref() >= 0){
788 fprintf(stderr,"INTERNAL ERROR, UDAF reference in generate_se_code.\n");
789 return("ERROR in generate_se_code");
792 if(se->is_partial()){
793 sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
797 operands = se->get_operands();
798 for(o=0;o<operands.size();o++){
800 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
802 ret += generate_se_code(operands[o], schema);
808 fprintf(stderr,"INTERNAL ERROR in generate_se_code (lfta), line %d, character %d: unknown operator type %d\n",
809 se->get_lineno(), se->get_charno(),se->get_operator_type());
810 return("ERROR in generate_se_code");
814 // generate code that refers only to aggregate data and constants.
815 static string generate_se_code_fm_aggr(scalarexp_t *se, string var, table_list *schema){
818 data_type *ldt, *rdt;
820 vector<scalarexp_t *> operands;
823 switch(se->get_operator_type()){
825 if(se->is_handle_ref()){
826 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
830 if(se->get_literal()->is_cpx_lit()){
831 sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
835 return(se->get_literal()->to_C_code("")); // not complex no constructor
837 if(se->is_handle_ref()){
838 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
843 ret += se->get_param_name();
846 ldt = se->get_left_se()->get_data_type();
847 if(ldt->complex_operator(se->get_op()) ){
848 ret += ldt->get_complex_operator(se->get_op());
850 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
855 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
860 ldt = se->get_left_se()->get_data_type();
861 rdt = se->get_right_se()->get_data_type();
863 if(ldt->complex_operator(rdt, se->get_op()) ){
864 ret += ldt->get_complex_operator(rdt, se->get_op());
866 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
868 ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
872 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
874 ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
879 if(se->is_gb()){ // OK to ref gb attrs, but they're not yet
880 // unpacked ... so return the defining code.
881 sprintf(tmpstr,"%sgb_var%d",var.c_str(),se->get_gb_ref());
885 fprintf(stderr,"ERROR reference to non-GB column ref not permitted here,"
886 "error in generate_se_code_fm_aggr, line %d, character %d.\n",
887 se->get_lineno(), se->get_charno());
893 sprintf(tmpstr,"%saggr_var%d",var.c_str(),se->get_aggr_ref());
898 if(se->get_aggr_ref() >= 0){
899 sprintf(tmpstr,"udaf_ret%d",se->get_aggr_ref());
904 if(se->is_partial()){
905 sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
909 operands = se->get_operands();
910 for(o=0;o<operands.size();o++){
912 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
914 ret += generate_se_code_fm_aggr(operands[o], var, schema);
920 fprintf(stderr,"INTERNAL ERROR in generate_lfta_code.cc::generate_se_code_fm_aggr, line %d, character %d: unknown operator type %d\n",
921 se->get_lineno(), se->get_charno(),se->get_operator_type());
922 return("ERROR in generate_se_code");
928 static string unpack_partial_fcn_fm_aggr(scalarexp_t *se, int pfn_id, string var, table_list *schema){
931 vector<scalarexp_t *> operands;
934 if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
935 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn_fm_aggr. line %d, character %d\n",
936 se->get_lineno(), se->get_charno());
937 return("ERROR in generate_se_code");
940 ret = "\tretval = " + se->get_op() + "( ";
941 sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
944 operands = se->get_operands();
945 for(o=0;o<operands.size();o++){
947 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
949 ret += generate_se_code_fm_aggr(operands[o], var, schema);
956 static string generate_cached_fcn(scalarexp_t *se, table_list *schema){
959 vector<scalarexp_t *> operands;
961 if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
962 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to generate_cached_fcn. line %d, character %d\n",
963 se->get_lineno(), se->get_charno());
964 return("ERROR in generate_se_code");
967 ret = se->get_op() + "( ";
969 operands = se->get_operands();
970 for(o=0;o<operands.size();o++){
972 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
974 ret += generate_se_code(operands[o], schema);
983 static string unpack_partial_fcn(scalarexp_t *se, int pfn_id, table_list *schema){
986 vector<scalarexp_t *> operands;
989 if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
990 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn. line %d, character %d\n",
991 se->get_lineno(), se->get_charno());
992 return("ERROR in generate_se_code");
995 ret = "\tretval = " + se->get_op() + "( ",
996 sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
999 operands = se->get_operands();
1000 for(o=0;o<operands.size();o++){
1002 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
1004 ret += generate_se_code(operands[o], schema);
1015 static string generate_C_comparison_op(string op){
1016 if(op == "=") return("==");
1017 if(op == "<>") return("!=");
1021 static string generate_C_boolean_op(string op){
1022 if( (op == "AND") || (op == "And") || (op == "and") ){
1025 if( (op == "OR") || (op == "Or") || (op == "or") ){
1028 if( (op == "NOT") || (op == "Not") || (op == "not") ){
1032 fprintf(stderr,"INTERNAL ERROR: unknown boolean operator %s\n",op.c_str());
1033 return("ERROR UNKNOWN BOOLEAN OPERATOR :"+op);
1037 static string generate_predicate_code(predicate_t *pr,table_list *schema){
1039 vector<literal_t *> litv;
1041 data_type *ldt, *rdt;
1042 vector<scalarexp_t *> op_list;
1044 unsigned int bitmask;
1046 switch(pr->get_operator_type()){
1048 ldt = pr->get_left_se()->get_data_type();
1051 litv = pr->get_lit_vec();
1052 for(i=0;i<litv.size();i++){
1053 if(i>0) ret += " || ";
1056 if(ldt->complex_comparison(ldt) ){
1057 ret += ldt->get_equals_fcn(ldt) ;
1059 if(ldt->is_buffer_type() ) ret += "&";
1060 ret += generate_se_code(pr->get_left_se(), schema);
1062 if(ldt->is_buffer_type() ) ret += "&";
1063 if(litv[i]->is_cpx_lit()){
1064 sprintf(tmpstr,"t->complex_literal_%d",litv[i]->get_cpx_lit_ref() );
1067 ret += litv[i]->to_C_code("");
1071 ret += generate_se_code(pr->get_left_se(), schema);
1073 ret += litv[i]->to_C_code("");
1082 ldt = pr->get_left_se()->get_data_type();
1083 rdt = pr->get_right_se()->get_data_type();
1086 if(ldt->complex_comparison(rdt) ){
1087 // TODO can use get_equals_fcn if op is "=" ?
1088 ret += ldt->get_comparison_fcn(rdt);
1090 if(ldt->is_buffer_type() ) ret += "&";
1091 ret += generate_se_code(pr->get_left_se(),schema);
1093 if(rdt->is_buffer_type() ) ret += "&";
1094 ret += generate_se_code(pr->get_right_se(),schema);
1096 ret += generate_C_comparison_op(pr->get_op());
1099 ret += generate_se_code(pr->get_left_se(),schema);
1100 ret += generate_C_comparison_op(pr->get_op());
1101 ret += generate_se_code(pr->get_right_se(),schema);
1107 ret += generate_C_boolean_op(pr->get_op());
1108 ret += generate_predicate_code(pr->get_left_pr(),schema);
1111 case PRED_BINARY_OP:
1113 ret += generate_predicate_code(pr->get_left_pr(),schema);
1114 ret += generate_C_boolean_op(pr->get_op());
1115 ret += generate_predicate_code(pr->get_right_pr(),schema);
1119 op_list = pr->get_op_list();
1120 cref = pr->get_combinable_ref();
1121 if(cref >= 0){ // predicate is a combinable pred reference
1122 // Trust, but verify
1123 if(pred_class.size() >= cref && pred_class[cref] >= 0){
1124 ppos = pred_pos[cref];
1125 bitmask = 1 << ppos % 32;
1126 sprintf(tmpstr,"(pref_common_pred_val_%d_%d & %u)",pred_class[cref],ppos/32,bitmask);
1132 ret = pr->get_op() + "(";
1133 if (pr->is_sampling_fcn) {
1134 ret += "t->sampling_rate";
1135 if (!op_list.empty())
1138 for(o=0;o<op_list.size();++o){
1139 if(o>0) ret += ", ";
1140 if(op_list[o]->get_data_type()->is_buffer_type() && (! (op_list[o]->is_handle_ref()) ) )
1142 ret += generate_se_code(op_list[o],schema);
1147 fprintf(stderr,"INTERNAL ERROR in generate_predicate_code, line %d, character %d, unknown predicate operator type %d\n",
1148 pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
1149 return("ERROR in generate_predicate_code");
1154 static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *dt){
1157 if(dt->complex_comparison(dt) ){
1158 ret += dt->get_equals_fcn(dt);
1160 if(dt->is_buffer_type() ) ret += "&";
1163 if(dt->is_buffer_type() ) ret += "&";
1175 //static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
1178 // if(dt->complex_comparison(dt) ){
1179 // ret += dt->get_equals_fcn(dt);
1181 // if(dt->is_buffer_type() ) ret += "&";
1184 // if(dt->is_buffer_type() ) ret += "&";
1196 // Here I assume that only MIN and MAX aggregates can be computed
1197 // over BUFFER data types.
1199 static string generate_aggr_update(string var, aggregate_table *atbl,int aidx, table_list *schema){
1200 string retval = "\t\t";
1201 string op = atbl->get_op(aidx);
1204 if(! atbl->is_builtin(aidx)) {
1206 retval += op+"_LFTA_AGGR_UPDATE_(";
1207 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1208 retval+="("+var+")";
1209 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1210 for(o=0;o<opl.size();++o){
1212 if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1214 retval += generate_se_code(opl[o], schema);
1221 // Built-in aggregate processing.
1223 data_type *dt = atbl->get_data_type(aidx);
1227 retval.append("++;\n");
1232 retval.append(" += ");
1233 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1234 retval.append(";\n");
1238 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1239 retval.append(tmpstr);
1240 if(dt->complex_comparison(dt)){
1241 if(dt->is_buffer_type())
1242 sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1244 sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1246 sprintf(tmpstr,"\t\tif(aggr_tmp_%d < %s)\n",aidx,var.c_str());
1248 retval.append(tmpstr);
1249 if(dt->is_buffer_type()){
1250 sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1252 sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1254 retval.append(tmpstr);
1259 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1260 retval.append(tmpstr);
1261 if(dt->complex_comparison(dt)){
1262 if(dt->is_buffer_type())
1263 sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1265 sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1267 sprintf(tmpstr,"\t\tif(aggr_tmp_%d > %s)\n",aidx,var.c_str());
1269 retval.append(tmpstr);
1270 if(dt->is_buffer_type()){
1271 sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1273 sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1275 retval.append(tmpstr);
1280 if(op == "AND_AGGR"){
1282 retval.append(" &= ");
1283 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1284 retval.append(";\n");
1287 if(op == "OR_AGGR"){
1289 retval.append(" |= ");
1290 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1291 retval.append(";\n");
1294 if(op == "XOR_AGGR"){
1296 retval.append(" ^= ");
1297 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1298 retval.append(";\n");
1301 fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_update.\n",op.c_str());
1302 return("ERROR: aggregate not recognized: "+op);
1308 static string generate_aggr_init(string var, aggregate_table *atbl,int aidx, table_list *schema){
1310 string op = atbl->get_op(aidx);
1313 if(! atbl->is_builtin(aidx)) {
1315 retval += "\t\t"+op+"_LFTA_AGGR_INIT_(";
1316 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1317 retval+="("+var+"));\n";
1319 retval += "\t"+atbl->get_op(aidx)+"_LFTA_AGGR_UPDATE_(";
1320 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1321 retval+="("+var+")";
1322 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1323 for(o=0;o<opl.size();++o){
1325 if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1327 retval += generate_se_code(opl[o],schema);
1333 // Built-in aggregate processing.
1336 data_type *dt = atbl->get_data_type(aidx);
1339 retval = "\t\t"+var;
1340 retval.append(" = 1;\n");
1344 if(op == "SUM" || op == "MIN" || op == "MAX" || op == "AND_AGGR" ||
1345 op == "OR_AGGR" || op == "XOR_AGGR"){
1346 if(dt->is_buffer_type()){
1347 sprintf(tmpstr,"\t\taggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1348 retval.append(tmpstr);
1349 sprintf(tmpstr,"\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_assign_copy().c_str(),var.c_str(),aidx);
1350 retval.append(tmpstr);
1352 retval = "\t\t"+var;
1354 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema));
1355 retval.append(";\n");
1360 fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_init.\n",op.c_str());
1361 return("ERROR: aggregate not recognized: "+op);
1365 ////////////////////////////////////////////////////////////
1368 string generate_preamble(table_list *schema, //map<string,string> &int_fcn_defs,
1369 std::string &node_name, std::string &schema_embed_str){
1370 // Include these only once, not once per lfta
1371 // string ret = "#include \"rts.h\"\n";
1372 // ret += "#include \"fta.h\"\n\n");
1374 string ret = "#ifndef LFTA_IN_NIC\n";
1375 ret += "const char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n";
1376 ret += "#include<stdio.h>\n";
1377 ret += "#include <limits.h>\n";
1378 ret += "#include <float.h>\n";
1379 ret += "#include <sys/stat.h>\n";
1380 ret += "#include \"rdtsc.h\"\n";
1389 string generate_tuple_from_aggr(string node_name, table_list *schema, string idx){
1391 // need to create and output the tuple.
1392 string ret = "/*\t\tCreate an output tuple for the aggregate being kicked out \t*/\n";
1393 // Check for any UDAFs with LFTA_BAILOUT
1394 ret += "\tlfta_bailout = 0;\n";
1395 for(a=0;a<aggr_tbl->size();a++){
1396 if(aggr_tbl->has_bailout(a)){
1397 ret += "\tlfta_bailout+="+aggr_tbl->get_op(a)+"_LFTA_AGGR_BAILOUT_(";
1398 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1399 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1402 ret += "\tif(! lfta_bailout){\n";
1404 // First, compute the size of the tuple.
1406 // Unpack UDAF return values
1407 for(a=0;a<aggr_tbl->size();a++){
1408 if(! aggr_tbl->is_builtin(a)){
1409 ret += "\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_OUTPUT_(&(udaf_ret"+int_to_string(a)+"),";
1410 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1411 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1417 // Unpack partial fcns ref'd by the select clause.
1418 if(sl_fcns_start != sl_fcns_end){
1419 ret += "\t\tunpack_failed = 0;\n";
1420 for(p=sl_fcns_start;p<sl_fcns_end;p++){
1421 if(is_partial_fcn[p]){
1422 ret += "\t" + unpack_partial_fcn_fm_aggr(partial_fcns[p], p,
1423 "t->aggr_table["+idx+"].",schema);
1424 ret += "\t\tif(retval) unpack_failed = 1;\n";
1427 // BEGIN don't allocate tuple if
1428 ret += "\t\tif( unpack_failed == 0 ){\n"; // unpack failed.
1431 // Unpack any BUFFER type selections into temporaries
1432 // so that I can compute their size and not have
1433 // to recompute their value during tuple packing.
1434 // I can use regular assignment here because
1435 // these temporaries are non-persistent.
1437 for(s=0;s<sl_list.size();s++){
1438 data_type *sdt = sl_list[s]->get_data_type();
1439 if(sdt->is_buffer_type()){
1440 sprintf(tmpstr,"\t\t\tselvar_%d = ",s);
1442 ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1448 // The size of the tuple is the size of the tuple struct plus the
1449 // size of the buffers to be copied in.
1451 ret += "\t\t\ttuple_size = sizeof( struct ";
1452 ret += generate_tuple_name(node_name);
1454 for(s=0;s<sl_list.size();s++){
1455 data_type *sdt = sl_list[s]->get_data_type();
1456 if(sdt->is_buffer_type()){
1457 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
1464 ret += "\t\t\ttuple = (struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
1465 ret += "\t\t\tif( tuple != NULL){\n";
1468 // Test passed, make assignments to the tuple.
1470 ret += "\t\t\t\ttuple_pos = sizeof( struct ";
1471 ret += generate_tuple_name(node_name) ;
1474 // Mark tuple as REGULAR_TUPLE
1475 ret += "\n\t\t\t\ttuple->tuple_type = REGULAR_TUPLE;\n";
1477 for(s=0;s<sl_list.size();s++){
1478 data_type *sdt = sl_list[s]->get_data_type();
1479 if(sdt->is_buffer_type()){
1480 sprintf(tmpstr,"\t\t\t\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
1482 sprintf(tmpstr,"\t\t\t\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
1485 sprintf(tmpstr,"\t\t\t\ttuple->tuple_var%d = ",s);
1487 // if(sdt->needs_hn_translation())
1488 // ret += sdt->hton_translation() +"( ";
1489 ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1490 // if(sdt->needs_hn_translation())
1497 ret += "\t\t\t\tpost_tuple(tuple);\n";
1498 ret += "\t\t\t\t#ifdef LFTA_STATS\n";
1499 ret+="\t\t\t\tt->out_tuple_cnt++;\n";
1500 ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
1501 ret += "\t\t\t\t#endif\n\n";
1504 if(sl_fcns_start != sl_fcns_end) // END don't allocate tuple if
1505 ret += "\t\t}\n"; // unpack failed.
1508 // Need to release memory held by BUFFER types.
1511 for(g=0;g<gb_tbl->size();g++){
1512 data_type *gdt = gb_tbl->get_data_type(g);
1513 if(gdt->is_buffer_type()){
1514 sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].gb_var%d));\n",gdt->get_buffer_destroy().c_str(),idx.c_str(),g);
1518 for(a=0;a<aggr_tbl->size();a++){
1519 if(aggr_tbl->is_builtin(a)){
1520 data_type *adt = aggr_tbl->get_data_type(a);
1521 if(adt->is_buffer_type()){
1522 sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].aggr_var%d));\n",adt->get_buffer_destroy().c_str(),idx.c_str(),a);
1526 ret += "\t\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_DESTROY_(";
1527 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1528 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1532 ret += "\t\tt->n_aggrs--;\n";
1538 string generate_gb_match_test(string idx){
1540 string ret="\tif (IS_FILLED(t->aggr_table_bitmap, "+idx+") && IS_NEW(t->aggr_table_bitmap,"+idx+")";
1541 if(gb_tbl->size()>0){
1542 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
1545 // Next, scan list for a match on the group-by attributes.
1546 string rhs_op, lhs_op;
1547 for(g=0;g<gb_tbl->size();g++){
1550 sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
1551 sprintf(tmpstr,"t->aggr_table[%s].gb_var%d",idx.c_str(),g); rhs_op = tmpstr;
1552 ret += generate_equality_test(lhs_op, rhs_op, gb_tbl->get_data_type(g) );
1562 string generate_gb_update(string node_name, table_list *schema, string idx, bool has_udaf){
1566 ret += "/*\t\tMatch found : update in place.\t*/\n";
1569 for(a=0;a<aggr_tbl->size();a++){
1570 sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1571 ret += generate_aggr_update(tmpstr,aggr_tbl,a, schema);
1572 if(! aggr_tbl->is_builtin(a)) has_udaf = true;
1575 // garbage collect copied buffer type gb attrs.
1576 for(g=0;g<gb_tbl->size();g++){
1577 data_type *gdt = gb_tbl->get_data_type(g);
1578 if(gdt->is_buffer_type()){
1579 sprintf(tmpstr,"\t\t\t%s(&(gb_attr_%d));\n",gdt->get_buffer_destroy().c_str(),g);
1586 bool first_udaf = true;
1589 for(a=0;a<aggr_tbl->size();a++){
1590 if(! aggr_tbl->is_builtin(a)){
1591 if(! first_udaf)ret += " || ";
1592 else first_udaf = false;
1593 ret += aggr_tbl->get_op(a)+"_LFTA_AGGR_FLUSHME_(";
1594 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1595 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"))";
1599 ret+=" fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
1600 ret += generate_tuple_from_aggr(node_name,schema,idx);
1601 ret += "\t\tt->aggr_table_hashmap["+idx+"] &= ~SLOT_FILLED;\n";
1608 string generate_init_group( table_list *schema, string idx){
1610 string ret="\t\t\tt->aggr_table_hashmap["+idx+"] = hash2 | SLOT_FILLED | gen_val;\n";
1611 // Fill up the aggregate block.
1612 for(g=0;g<gb_tbl->size();g++){
1613 sprintf(tmpstr,"\t\t\tt->aggr_table[%s].gb_var%d = gb_attr_%d;\n",idx.c_str(),g,g);
1616 for(a=0;a<aggr_tbl->size();a++){
1617 sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1618 ret += generate_aggr_init(tmpstr, aggr_tbl,a, schema);
1620 ret+="\t\tt->n_aggrs++;\n";
1625 string generate_fta_flush(string node_name, table_list *schema,
1626 ext_fcn_list *Ext_fcns){
1629 string select_var_defs ;
1632 // Flush from previous epoch
1634 ret+="static void fta_aggr_flush_old_"+node_name+"(struct FTA *f, unsigned int nflush){\n";
1636 ret += "\tgs_int32_t tuple_size, tuple_pos;\n";
1637 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1638 ret += "\tint i, lfta_bailout;\n";
1639 ret += "\tunsigned int gen_val;\n";
1641 ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1642 ret += generate_fta_name(node_name)+" *) f;\n";
1647 // Variables needed to store selected attributes of BUFFER type
1648 // temporarily, in order to compute their size for storage
1649 // in an output tuple.
1651 select_var_defs = "";
1652 for(s=0;s<sl_list.size();s++){
1653 data_type *sdt = sl_list[s]->get_data_type();
1654 if(sdt->is_buffer_type()){
1655 sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
1656 select_var_defs.append(tmpstr);
1659 if(select_var_defs != ""){
1660 ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
1661 ret += select_var_defs;
1665 // Variables to store results of partial functions.
1666 if(sl_fcns_start != sl_fcns_end){
1667 ret += "/*\t\tVariables to store the results of partial functions.\t*/\n";
1668 for(p=sl_fcns_start;p<sl_fcns_end;p++){
1669 sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
1670 partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
1673 ret += "\tgs_retval_t retval = 0;\n\tint unpack_failed = 0;\n;";
1676 // Variables for udaf output temporaries
1677 bool no_udaf = true;
1679 for(a=0;a<aggr_tbl->size();a++){
1680 if(! aggr_tbl->is_builtin(a)){
1682 ret+="/*\t\tUDAF output vars.\t*/\n";
1685 int afcn_id = aggr_tbl->get_fcn_id(a);
1686 data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
1687 sprintf(tmpstr,"udaf_ret%d", a);
1688 ret+="\t"+adt->make_cvar(tmpstr)+";\n";
1693 // ret+="\tt->flush_finished=1; /* flush will be completed */\n";
1695 ret+="\tgen_val = t->generation & SLOT_GEN_BITS;\n";
1696 ret+="\tfor (i=t->flush_pos; (i < t->max_aggrs) && t->n_aggrs && nflush>0; ++i){\n";
1697 ret+="\t\tif ( (t->aggr_table_hashmap[i] & SLOT_FILLED) && (((t->aggr_table_hashmap[i] & SLOT_GEN_BITS) != gen_val ) || (";
1700 for(g=0;g<gb_tbl->size();g++){
1701 data_type *gdt = gb_tbl->get_data_type(g);
1702 if(gdt->is_temporal()){
1703 if(first_g) first_g=false; else ret+=" || ";
1704 ret += "t->last_gb_"+int_to_string(g)+" > t->aggr_table[i].gb_var"+int_to_string(g)+" ";
1708 ret+="\t\t\tt->aggr_table_hashmap[i] = 0;\n";
1710 "#ifdef LFTA_STATS\n"
1711 "\t\t\tt->eviction_cnt++;\n"
1716 ret+=generate_tuple_from_aggr(node_name,schema,"i");
1718 // ret+="\t\t\tt->n_aggrs--;\n"; // done in generate_tuple_from_aggr
1719 ret+="\t\t\tnflush--;\n";
1722 ret+="\tt->flush_pos=i;\n";
1723 ret+="\tif(t->n_aggrs == 0) {\n";
1724 ret+="\t\tt->flush_pos = t->max_aggrs;\n";
1727 ret+="\tif(t->flush_pos == t->max_aggrs) {\n";
1729 for(int g=0;g<gb_tbl->size();g++){
1730 data_type *dt = gb_tbl->get_data_type(g);
1731 if(dt->is_temporal()){
1732 data_type *gdt = gb_tbl->get_data_type(g);
1733 if(!gdt->is_buffer_type()){
1734 sprintf(tmpstr,"\t\tt->last_flushed_gb_%d = t->flush_start_gb_%d;\n",g,g);
1739 ret += "\t}\n}\n\n";
1744 // TODO Remove sprintf to perform string catenation
1745 string generate_fta_load_params(string node_name){
1747 vector<string> param_names = param_tbl->get_param_names();
1749 string ret = "static int load_params_"+node_name+"(struct "+generate_fta_name(node_name);
1750 ret += " *t, int sz, void *value, int initial_call){\n";
1751 ret += "\tint pos=0;\n";
1752 ret += "\tint data_pos;\n";
1754 for(p=0;p<param_names.size();p++){
1755 data_type *dt = param_tbl->get_data_type(param_names[p]);
1756 if(dt->is_buffer_type()){
1757 sprintf(tmpstr,"\t%s tmp_var_%s;\n",dt->get_cvar_type().c_str(), param_names[p].c_str() );
1759 sprintf(tmpstr,"\t%s access_var_%s;\n",dt->get_tuple_cvar_type().c_str(), param_names[p].c_str() );
1766 ret += "\n\tdata_pos = ";
1767 for(p=0;p<param_names.size();p++){
1768 if(p>0) ret += " + ";
1769 data_type *dt = param_tbl->get_data_type(param_names[p]);
1771 ret += dt->get_tuple_cvar_type();
1775 ret += "\tif(data_pos > sz) return 1;\n\n";
1778 for(p=0;p<param_names.size();p++){
1779 data_type *dt = param_tbl->get_data_type(param_names[p]);
1780 if(dt->is_buffer_type()){
1781 sprintf(tmpstr,"\taccess_var_%s = *( (%s *)((char *)value+pos) );\n",param_names[p].c_str(), dt->get_tuple_cvar_type().c_str() );
1783 switch( dt->get_type() ){
1785 // ret += "\ttmp_var_"+param_names[p]+".data = ntohl( tmp_var_"+param_names[p]+".data );\n"; // ntoh conversion
1786 // ret += "\ttmp_var_"+param_names[p]+".length = ntohl( tmp_var_"+param_names[p]+".length );\n"; // ntoh conversion
1787 sprintf(tmpstr,"\tif( (access_var_%s.offset) + access_var_%s.length > sz) return 1;\n",param_names[p].c_str(), param_names[p].c_str() );
1789 sprintf(tmpstr,"\ttmp_var_%s.data = (gs_sp_t)(value) + access_var_%s.offset ;\n",param_names[p].c_str(), param_names[p].c_str() );
1791 sprintf(tmpstr,"\ttmp_var_%s.length = access_var_%s.length ;\n",param_names[p].c_str(), param_names[p].c_str() );
1795 fprintf(stderr,"ERROR: parameter %s is of type %s, a buffered type, but I don't know how to unpack it as a parameter.\n",param_names[p].c_str(), dt->get_type_str().c_str() );
1799 // First, destroy the old
1800 ret += "\tif(! initial_call)\n";
1801 sprintf(tmpstr,"\t\t%s(&(t->param_%s));\n",dt->get_buffer_destroy().c_str(),param_names[p].c_str());
1803 // Next, create the new.
1804 sprintf(tmpstr,"\t%s((struct FTA *)t, &(t->param_%s), &tmp_var_%s);\n", dt->get_buffer_assign_copy().c_str(), param_names[p].c_str(), param_names[p].c_str() );
1807 // if(dt->needs_hn_translation()){
1808 // sprintf(tmpstr,"\tt->param_%s = %s( *( (%s *)( (char *)value+pos) ) );\n",
1809 // param_names[p].c_str(), dt->ntoh_translation().c_str(), dt->get_cvar_type().c_str() );
1811 sprintf(tmpstr,"\tt->param_%s = *( (%s *)( (char *)value+pos) );\n",
1812 param_names[p].c_str(), dt->get_cvar_type().c_str() );
1816 sprintf(tmpstr,"\tpos += sizeof( %s );\n",dt->get_cvar_type().c_str() );
1820 // Register the pass-by-handle parameters
1822 ret += "/* register and de-register the pass-by-handle parameters */\n";
1825 for(ph=0;ph<param_handle_table.size();++ph){
1826 data_type pdt(param_handle_table[ph]->type_name);
1827 switch(param_handle_table[ph]->val_type){
1833 ret += "\tif(! initial_call)\n";
1834 sprintf(tmpstr, "\t\t%s(t->handle_param_%d);\n",
1835 param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1837 sprintf(tmpstr,"\tt->handle_param_%d = %s((struct FTA *)t,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
1840 if(pdt.is_buffer_type()) ret += "&(";
1841 ret += "t->param_"+param_handle_table[ph]->param_name;
1842 if(pdt.is_buffer_type()) ret += ")";
1846 sprintf(tmpstr, "INTERNAL ERROR unknown case (%d) found when processing pass-by-handle parameter table.",param_handle_table[ph]->val_type);
1847 fprintf(stderr,"%s\n",tmpstr);
1852 ret+="\treturn 0;\n";
1861 string generate_fta_free(string node_name, bool is_aggr_query){
1863 string ret="static gs_retval_t free_fta_"+node_name+"(struct FTA *f, gs_uint32_t recursive){\n";
1864 ret+= "\tstruct "+generate_fta_name(node_name)+
1865 " * t = (struct "+generate_fta_name(node_name)+" *) f;\n";
1866 ret += "\tint i;\n";
1869 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1870 ret+="\t/* \t\tmark all groups as old */\n";
1871 ret+="\tt->generation++;\n";
1872 ret+="\tt->flush_pos = 0;\n";
1873 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1876 // Deregister the pass-by-handle parameters
1877 ret += "/* de-register the pass-by-handle parameters */\n";
1879 for(ph=0;ph<param_handle_table.size();++ph){
1880 sprintf(tmpstr, "\t%s(t->handle_param_%d);\n",
1881 param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1886 ret += "\treturn 0;\n}\n\n";
1891 string generate_fta_control(string node_name, table_list *schema, bool is_aggr_query){
1892 string ret="static gs_retval_t control_fta_"+node_name+"(struct FTA *f, gs_int32_t command, gs_int32_t sz, void *value){\n";
1893 ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1894 ret += generate_fta_name(node_name)+" *) f;\n\n";
1898 ret += "\t/* temp status tuple */\n";
1899 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1900 ret += "\tgs_int32_t tuple_size;\n";
1904 ret+="\tif(command == FTA_COMMAND_FLUSH){\n";
1906 ret+="\t\tif (!t->n_aggrs) {\n";
1907 ret+="\t\t\ttuple = (struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, 0);\n";
1908 ret+="\t\t\tif( tuple != NULL)\n";
1909 ret+="\t\t\t\tpost_tuple(tuple);\n";
1911 ret+="\t\t}else{\n";
1913 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1914 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1915 ret +="\t\tt->generation++;\n";
1916 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1917 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1918 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1919 ret+="\t\t\tt->flush_pos = 0;\n";
1920 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1925 if(param_tbl->size() > 0){
1927 "\tif(command == FTA_COMMAND_LOAD_PARAMS){\n"
1928 "\t\tif(load_params_"+node_name+"(t, sz, value, 0))\n"
1929 "#ifndef LFTA_IN_NIC\n"
1930 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small, ignored.\\n\");\n"
1937 "\tif(command == FTA_COMMAND_SET_SAMPLING_RATE){\n"
1938 "\t\tmemcpy(&t->sampling_rate, value, sizeof(gs_float_t));\n"
1942 ret += "\tif(command == FTA_COMMAND_FILE_DONE ){\n";
1945 ret+="\t\tif (t->n_aggrs) {\n";
1946 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1947 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1948 ret +="\t\tt->generation++;\n";
1949 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1950 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1951 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1952 ret+="\t\t\tt->flush_pos = 0;\n";
1953 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1957 ret += "\t\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+");\n";
1958 ret += "\t\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
1959 ret += "\t\tif( tuple == NULL)\n\t\treturn 1;\n";
1961 /* mark tuple as EOF_TUPLE */
1962 ret += "\n\t\t/* Mark tuple as eof_tuple */\n";
1963 ret += "\t\ttuple->tuple_type = EOF_TUPLE;\n";
1964 ret += "\t\tpost_tuple(tuple);\n";
1967 ret += "\treturn 0;\n}\n\n";
1972 string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query, bool advance_uxtime){
1973 string ret="static gs_retval_t clock_fta_"+node_name+"(struct FTA *f){\n";
1974 ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1975 ret += generate_fta_name(node_name)+" *) f;\n\n";
1977 ret += "\t/* Create a temp status tuple */\n";
1978 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1979 ret += "\tgs_int32_t tuple_size;\n";
1980 ret += "\tunsigned int i;\n";
1981 ret += "\ttime_t cur_time;\n";
1982 ret += "\tint time_advanced;\n";
1983 ret += "\tstruct fta_stat stats;\n";
1987 /* copy the last seen values of temporal attributes */
1988 col_id_set temp_cids; // col ids of temp attributes in select clause
1991 /* HACK: in order to reuse the SE generation code, we need to copy
1992 * the last values of the temp attributes into new variables
1993 * which have names unpack_var_XXX_XXX
1997 col_id_set::iterator csi;
1999 for(s=0;s<sl_list.size();s++){
2000 data_type *sdt = sl_list[s]->get_data_type();
2001 if (sdt->is_temporal()) {
2002 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
2006 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2007 int tblref = (*csi).tblvar_ref;
2008 int schref = (*csi).schema_ref;
2009 string field = (*csi).field;
2010 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2011 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
2015 if (is_aggr_query) {
2016 for(g=0;g<gb_tbl->size();g++){
2017 data_type *gdt = gb_tbl->get_data_type(g);
2018 if(gdt->is_temporal()){
2019 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
2021 data_type *gdt = gb_tbl->get_data_type(g);
2022 if(gdt->is_buffer_type()){
2023 sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
2031 ret += "\ttime_advanced = 0;\n";
2033 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2034 int tblref = (*csi).tblvar_ref;
2035 int schref = (*csi).schema_ref;
2036 string field = (*csi).field;
2037 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2039 // update last seen value with the value seen
2040 ret += "\t#ifdef PREFILTER_DEFINED\n";
2041 sprintf(tmpstr,"\tif (prefilter_temp_vars.unpack_var_%s_%d > t->last_%s_%d) {\n\t\tt->last_%s_%d = prefilter_temp_vars.unpack_var_%s_%d;\n",
2042 field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref);
2044 ret += "\t\ttime_advanced = 1;\n\t}\n";
2045 ret += "\t#endif\n";
2047 // we need to pay special attention to time fields
2048 if (field == "time" || field == "timestamp" || field == "timestamp_ms"){
2049 ret += "\tcur_time = time(&cur_time);\n";
2051 if (field == "time") {
2052 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && (t->last_time_%d < (cur_time - %d))) {\n",
2055 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = cur_time - %d;\n",
2056 field.c_str(), tblref, field.c_str(), tblref, time_corr);
2057 } else if (field == "timestamp_ms") {
2058 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((t->last_timestamp_ms_%d/1000) < (cur_time - %d))) {\n",
2061 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = (cur_time - %d) * 1000;\n",
2062 field.c_str(), tblref, field.c_str(), tblref, time_corr);
2064 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((gs_uint32_t)(t->last_%s_%d>>32) < (cur_time - %d))) {\n",
2065 field.c_str(), tblref, time_corr);
2067 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = ((gs_uint64_t)(cur_time - %d))<<32;\n",
2068 field.c_str(), tblref, field.c_str(), tblref, time_corr);
2072 ret += "\t\ttime_advanced = 1;\n";
2075 sprintf(tmpstr,"\telse\n\t\tunpack_var_%s_%d = t->last_%s_%d;\n",
2076 field.c_str(), tblref, field.c_str(), tblref);
2079 sprintf(tmpstr,"\tunpack_var_%s_%d = t->last_%s_%d;\n",
2080 field.c_str(), tblref, field.c_str(), tblref);
2087 ret += "\tt->ux_time = time(&(t->ux_time));\n";
2090 // for aggregation lftas we need to check if the time was advanced beyond the current epoch
2091 if (is_aggr_query) {
2094 bool first_one = true;
2095 for(g=0;g<gb_tbl->size();g++){
2096 data_type *gdt = gb_tbl->get_data_type(g);
2097 if(gdt->is_temporal()){
2098 // To perform the test, first need to compute the value
2099 // of the temporal gb attrs.
2100 if(gdt->is_buffer_type()){
2101 // NOTE : if the SE defining the gb is anything
2102 // other than a ref to a variable, this will generate
2103 // illegal code. To be resolved with Spatch.
2104 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2105 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2107 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2108 gdt->get_buffer_assign_copy().c_str(), g, g);
2110 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2114 sprintf(tmpstr,"t->last_gb_%d",g); string lhs_op = tmpstr;
2115 sprintf(tmpstr,"gb_attr_%d",g); string rhs_op = tmpstr;
2116 if(first_one){first_one = false;} else {change_test.append(") && (");}
2117 change_test.append(generate_equality_test(lhs_op, rhs_op, gdt));
2121 ret += "\n\tif( time_advanced && !( (";
2125 ret += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
2126 ret += "\t\tif(t->flush_pos<t->max_aggrs) \n";
2127 ret += "\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2129 ret += "\t\t/* \t\tmark all groups as old */\n";
2130 ret +="\t\tt->generation++;\n";
2131 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
2132 ret += "//\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
2133 ret += "//\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
2134 ret += "\t\tt->flush_pos = 0;\n";
2136 for(g=0;g<gb_tbl->size();g++){
2137 data_type *gdt = gb_tbl->get_data_type(g);
2138 if(gdt->is_temporal()){
2139 sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g); ret += tmpstr;
2140 sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g); ret += tmpstr;
2143 ret += "\t\tt->n_ticks = 0; // reset clock tick counter, limit slow flush\n";
2144 ret += "\t}else{\n";
2145 ret += "//\tLimit slow flush, do a full flush at two clock ticks past the change in generation.\n";
2146 ret += "\t\tt->n_ticks++;\n";
2147 ret += "\t\tif(t->n_ticks == 2){\n";
2148 ret += "\t\t\tif(t->flush_pos<t->max_aggrs) \n";
2149 ret += "\t\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2156 ret += "\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t) + sizeof(struct fta_stat);\n";
2157 ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+"*)allocate_tuple(f, tuple_size );\n";
2158 ret += "\tif( tuple == NULL)\n\t\treturn 1;\n";
2161 for(s=0;s<sl_list.size();s++){
2162 data_type *sdt = sl_list[s]->get_data_type();
2163 if(sdt->is_temporal()){
2165 if (sl_list[s]->is_gb()) {
2166 sprintf(tmpstr,"\tt->last_flushed_gb_%d = (t->n_aggrs) ? t->last_flushed_gb_%d : %s;\n",sl_list[s]->get_gb_ref(), sl_list[s]->get_gb_ref(), generate_se_code(sl_list[s],schema).c_str());
2170 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2172 // if(sdt->needs_hn_translation())
2173 // ret += sdt->hton_translation() +"( ";
2174 if (sl_list[s]->is_gb()) {
2175 sprintf(tmpstr, "t->last_flushed_gb_%d",sl_list[s]->get_gb_ref());
2178 ret += generate_se_code(sl_list[s],schema);
2180 // if(sdt->needs_hn_translation())
2186 /* mark tuple as temporal */
2187 ret += "\n\t/* Mark tuple as temporal */\n";
2188 ret += "\ttuple->tuple_type = TEMPORAL_TUPLE;\n";
2190 ret += "\n\t/* Copy trace id */\n";
2191 ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+"), &t->trace_id, sizeof(gs_uint64_t));\n\n";
2193 ret += "\n\t/* Populate runtime stats */\n";
2194 ret += "\tstats.ftaid = f->ftaid;\n";
2195 ret += "\tstats.in_tuple_cnt = t->in_tuple_cnt;\n";
2196 ret += "\tstats.out_tuple_cnt = t->out_tuple_cnt;\n";
2197 ret += "\tstats.out_tuple_sz = t->out_tuple_sz;\n";
2198 ret += "\tstats.accepted_tuple_cnt = t->accepted_tuple_cnt;\n";
2199 ret += "\tstats.cycle_cnt = t->cycle_cnt;\n";
2200 ret += "\tstats.collision_cnt = t->collision_cnt;\n";
2201 ret += "\tstats.eviction_cnt = t->eviction_cnt;\n";
2202 ret += "\tstats.sampling_rate = t->sampling_rate;\n";
2204 ret += "\n#ifdef LFTA_PROFILE\n";
2205 ret += "\n\t/* Print stats */\n";
2206 ret += "\tfprintf(stderr, \"STATS " + node_name + " \");\n";
2207 ret += "\tfprintf(stderr, \"in_tuple_cnt= %u \", t->in_tuple_cnt);\n";
2208 ret += "\tfprintf(stderr, \"out_tuple_cnt= %u \", t->out_tuple_cnt);\n";
2209 ret += "\tfprintf(stderr, \"out_tuple_sz= %u \", t->out_tuple_sz);\n";
2210 ret += "\tfprintf(stderr, \"accepted_tuple_cnt= %u \", t->accepted_tuple_cnt);\n";
2211 ret += "\tfprintf(stderr, \"cycle_cnt= %llu \", t->cycle_cnt);\n";
2212 ret += "\tfprintf(stderr, \"cycle_per_in_tuple= %lf \", ((double)t->cycle_cnt)/((double)t->in_tuple_cnt));\n";
2213 ret += "\tfprintf(stderr, \"collision_cnt= %d\\n\", t->collision_cnt);\n\n";
2214 ret += "\tfprintf(stderr, \"eviction_cnt= %d\\n\", t->eviction_cnt);\n\n";
2215 ret += "\n#endif\n";
2218 ret += "\n\t/* Copy stats */\n";
2219 ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t), &stats, sizeof(fta_stat));\n\n";
2220 ret+="\tpost_tuple(tuple);\n";
2222 ret += "\n\t/* Send a heartbeat message to clearinghouse */\n";
2223 ret += "\n\t/* Disable heartbeats for now to avoid overloading clearinghouse */\n";
2224 ret += "\t/* fta_heartbeat(f->ftaid, t->trace_id++, 1, &stats); */\n";
2226 ret += "\n\t/* Reset runtime stats */\n";
2227 ret += "\tt->in_tuple_cnt = 0;\n";
2228 ret += "\tt->out_tuple_cnt = 0;\n";
2229 ret += "\tt->out_tuple_sz = 0;\n";
2230 ret += "\tt->accepted_tuple_cnt = 0;\n";
2231 ret += "\tt->cycle_cnt = 0;\n";
2232 ret += "\tt->collision_cnt = 0;\n";
2233 ret += "\tt->eviction_cnt = 0;\n";
2235 ret += "\treturn 0;\n}\n\n";
2241 // accept processing before the where clause,
2242 // do flush processwing.
2243 string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *schema, col_id_set &unpacked_cids, string &temporal_flush){
2247 string ret="\n/*\tslow flush\t*/\n";
2248 string slow_flush_str = fs->get_val_of_def("slow_flush");
2249 int n_slow_flush = atoi(slow_flush_str.c_str());
2250 if(n_slow_flush <= 0) n_slow_flush = 2;
2251 if(n_slow_flush > 1){
2252 ret += "\tt->flush_ctr++;\n";
2253 ret += "\tif(t->flush_ctr >= "+int_to_string(n_slow_flush)+"){\n";
2254 ret += "\t\tt->flush_ctr = 0;\n";
2255 ret+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n";
2258 ret+="\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n\n";
2263 bool first_one = true;
2265 col_id_set flush_cids; // col ids accessed when computing flush variables.
2266 // unpack them at temporal flush test time.
2267 temporal_flush = "";
2270 for(g=0;g<gb_tbl->size();g++){
2271 data_type *gdt = gb_tbl->get_data_type(g);
2272 if(gdt->is_temporal()){
2273 gather_se_col_ids(gb_tbl->get_def(g), flush_cids, gb_tbl);
2275 // To perform the test, first need to compute the value
2276 // of the temporal gb attrs.
2277 if(gdt->is_buffer_type()){
2278 // NOTE : if the SE defining the gb is anything
2279 // other than a ref to a variable, this will generate
2280 // illegal code. To be resolved with Spatch.
2281 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2282 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2283 temporal_flush += tmpstr;
2284 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2285 gdt->get_buffer_assign_copy().c_str(), g, g);
2287 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2289 temporal_flush += tmpstr;
2290 // END computing the value of the temporal GB attr.
2293 sprintf(tmpstr,"t->last_gb_%d",g); string lhs_op = tmpstr;
2294 sprintf(tmpstr,"gb_attr_%d",g); string rhs_op = tmpstr;
2295 if(first_one){first_one = false;} else {change_test.append(") && (");}
2296 change_test += generate_equality_test(lhs_op, rhs_op, gdt);
2299 if(!first_one){ // will be false iff. there is a temporal GB attribute
2300 temporal_flush += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
2301 temporal_flush += "\tif( !( (";
2302 temporal_flush += change_test;
2303 temporal_flush += ") ) ){\n";
2305 // temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2306 temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs){ \n";
2307 temporal_flush+="\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2308 temporal_flush+="\t\t}\n";
2309 temporal_flush+="\t\t/* \t\tmark all groups as old */\n";
2310 temporal_flush+="\t\tt->generation++;\n";
2311 temporal_flush+="\t\tt->flush_pos = 0;\n";
2312 temporal_flush+="\t\tt->n_ticks = 0; // reset clock tick counter, to limit slow flush\n";
2315 // Now set the saved temporal value of the gb to the
2316 // current value of the gb. Only for simple types,
2317 // not for buffer types -- but the strings are not
2318 // temporal in any case.
2320 for(g=0;g<gb_tbl->size();g++){
2321 data_type *gdt = gb_tbl->get_data_type(g);
2322 if(gdt->is_temporal()){
2323 if(gdt->is_buffer_type()){
2325 fprintf(stderr,"ERROR : can't handle temporal buffer types, ignoring in buffer flush control.\n");
2327 sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);
2328 temporal_flush += tmpstr;
2329 sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);
2330 temporal_flush += tmpstr;
2334 temporal_flush += "\t}\n\n";
2337 // Unpack all the temporal attributes referenced in select clause
2338 // and update the last value of the attribute
2339 col_id_set temp_cids; // col ids of temp attributes in select clause
2340 col_id_set::iterator csi;
2342 for(s=0;s<sl_list.size();s++){
2343 data_type *sdt = sl_list[s]->get_data_type();
2344 if (sdt->is_temporal()) {
2345 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
2349 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2350 if(unpacked_cids.count((*csi)) == 0){
2351 int tblref = (*csi).tblvar_ref;
2352 int schref = (*csi).schema_ref;
2353 string field = (*csi).field;
2354 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2356 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2357 sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n",
2358 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2360 ret += "\tif(retval) return 1;\n";
2362 sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
2365 unpacked_cids.insert( (*csi) );
2370 // Do the flush here if this is a real_time query
2371 string rt_level = fs->get_val_of_def("real_time");
2372 if(rt_level != "" && temporal_flush != ""){
2373 for(csi=flush_cids.begin(); csi != flush_cids.end();++csi){
2374 if(unpacked_cids.count((*csi)) == 0){
2375 int tblref = (*csi).tblvar_ref;
2376 int schref = (*csi).schema_ref;
2377 string field = (*csi).field;
2378 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2380 sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n",
2381 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2383 ret += "\tif(retval) return 1;\n";
2385 unpacked_cids.insert( (*csi) );
2388 ret += temporal_flush;
2394 string generate_sel_accept_body(qp_node *fs, string node_name, table_list *schema){
2399 /////////////// Processing for filter-only query
2401 // test passed : create the tuple, then assign to it.
2402 ret += "/*\t\tCreate and post the tuple\t*/\n";
2404 // Unpack partial fcns ref'd by the select clause.
2405 // Its a kind of a WHERE clause ...
2406 for(p=sl_fcns_start;p<sl_fcns_end;p++){
2407 if(fcn_ref_cnt[p] > 1){
2408 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2410 if(is_partial_fcn[p]){
2411 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2412 ret += "\tif(retval) goto end;\n";
2414 if(fcn_ref_cnt[p] > 1){
2415 if(!is_partial_fcn[p]){
2416 ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2418 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2423 // increment the counter of accepted tuples
2424 ret += "\n\t#ifdef LFTA_STATS\n";
2425 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2426 ret += "\t#endif\n\n";
2428 // First, compute the size of the tuple.
2430 // Unpack any BUFFER type selections into temporaries
2431 // so that I can compute their size and not have
2432 // to recompute their value during tuple packing.
2433 // I can use regular assignment here because
2434 // these temporaries are non-persistent.
2436 for(s=0;s<sl_list.size();s++){
2437 data_type *sdt = sl_list[s]->get_data_type();
2438 if(sdt->is_buffer_type()){
2439 sprintf(tmpstr,"\tselvar_%d = ",s);
2441 ret += generate_se_code(sl_list[s],schema);
2447 // The size of the tuple is the size of the tuple struct plus the
2448 // size of the buffers to be copied in.
2450 ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
2451 for(s=0;s<sl_list.size();s++){
2452 data_type *sdt = sl_list[s]->get_data_type();
2453 if(sdt->is_buffer_type()){
2454 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
2461 ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
2462 ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
2464 // Test passed, make assignments to the tuple.
2466 ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
2468 // Mark tuple as REGULAR_TUPLE
2469 ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
2472 for(s=0;s<sl_list.size();s++){
2473 data_type *sdt = sl_list[s]->get_data_type();
2474 if(sdt->is_buffer_type()){
2475 sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
2477 sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
2480 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2482 // if(sdt->needs_hn_translation())
2483 // ret += sdt->hton_translation() +"( ";
2484 ret += generate_se_code(sl_list[s],schema);
2485 // if(sdt->needs_hn_translation())
2493 ret += "\tpost_tuple(tuple);\n";
2495 // Increment the counter of posted tuples
2496 ret += "\n\t#ifdef LFTA_STATS\n";
2497 ret += "\tt->out_tuple_cnt++;\n";
2498 ret+="\tt->out_tuple_sz+=tuple_size;\n";
2499 ret += "\t#endif\n\n";
2506 // TODO Ensure that postfilter predicates are being generated
2507 string generate_fj_accept_body(filter_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
2513 unsigned int window_len = fs->temporal_range;
2514 unsigned int n_bloom = 11;
2515 string n_bloom_str = fs->get_val_of_def("num_bloom");
2516 int tmp_n_bloom = atoi(n_bloom_str.c_str());
2518 n_bloom = tmp_n_bloom+1;
2519 float bloom_width = (window_len+1.0)/(1.0*n_bloom-1);
2520 sprintf(tmpstr,"%f",bloom_width);
2521 string bloom_width_str = tmpstr;
2523 if(window_len < n_bloom){
2524 n_bloom = window_len+1;
2525 bloom_width_str = "1";
2529 // Grab the current window time
2530 scalarexp_t winvar(fs->temporal_var);
2531 ret += "\tcurr_fj_ts = "+generate_se_code(&winvar,schema)+";\n";
2533 int bf_exp_size = 12; // base-2 log of number of bits
2534 string bloom_len_str = fs->get_val_of_def("bloom_size");
2535 int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
2536 if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
2537 bf_exp_size = tmp_bf_exp_size;
2539 int bf_bit_size = 1 << bf_exp_size;
2540 int bf_byte_size = bf_bit_size / (8*sizeof(char));
2542 unsigned int ht_size = 4096;
2543 string ht_size_s = fs->get_val_of_def("aggregate_slots");
2544 int tmp_ht_size = atoi(ht_size_s.c_str());
2545 if(tmp_ht_size > 1024){
2546 unsigned int hs = 1; // make it power of 2
2549 tmp_ht_size = tmp_ht_size >> 1;
2556 for(i=0;i<bf_exp_size;i++)
2557 bf_mask = (bf_mask << 1) | 1;
2559 for(i=ht_size;i>1;i=i>>1)
2560 bf_mask = (bf_mask << 1) | 1;
2564 printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=%d, bf_byte_size=%d, ht_size=%d, ht_size_s=%s, bf_mask=%d\n",
2567 bloom_width_str.c_str(),
2579 // If this is a bloom-filter fj, first test if the
2580 // bloom filter needs to be advanced.
2581 // SET_BF_EMPTY(table,number of bloom filters,bloom filter index,bit index)
2582 // t->bf_size : number of bits in bloom filter
2584 // TODO: Don't iterate more than n_bloom times!
2585 // As written, its possible to wrap around many times.
2588 "// Clean out old bloom filters if needed.\n"
2589 "// TODO vectorize this ? \n"
2590 " if(t->first_exec){\n"
2591 " t->first_exec = 0;\n"
2592 " t->last_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2593 " t->last_bloom_pos = t->last_bin % "+int_to_string(n_bloom)+";\n"
2595 " curr_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2596 " if(curr_bin != t->last_bin){\n"
2597 " for(the_bin=t->last_bin+1;the_bin<=curr_bin;the_bin++){\n"
2598 " t->last_bloom_pos++;\n"
2599 " if(t->last_bloom_pos >= "+int_to_string(n_bloom)+")\n"
2600 " t->last_bloom_pos = 0;\n"
2601 " tmp_i = t->last_bloom_pos;\n"
2602 " for(j=0;j<"+int_to_string(bf_bit_size)+";j++){\n"
2603 " SET_BF_EMPTY(t->bf_table, "+int_to_string(n_bloom)+", tmp_i,j);\n"
2607 " t->last_bin = curr_bin;\n"
2613 //-----------------------------------------------------------------
2614 // First, determine whether to do S (filter stream) processing.
2617 "// S (filtering stream) predicate, should it be processed?\n"
2620 // Sort S preds based on cost.
2621 vector<cnf_elem *> s_filt = fs->pred_t1;
2622 col_id_set::iterator csi;
2623 if(s_filt.size() > 0){
2625 // Unpack fields ref'd in the S pred
2626 for(w=0;w<s_filt.size();++w){
2627 col_id_set this_pred_cids;
2628 gather_pr_col_ids(s_filt[w]->pr, this_pred_cids, gb_tbl);
2629 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2630 if(unpacked_cids.count( (*csi) ) == 0){
2631 int tblref = (*csi).tblvar_ref;
2632 int schref = (*csi).schema_ref;
2633 string field = (*csi).field;
2634 ret += generate_unpack_code(tblref,schref,field,schema,node_name,"end_s");
2635 unpacked_cids.insert( (*csi) );
2641 // Sort by evaluation cost.
2642 // First, estimate evaluation costs
2643 // Eliminate predicates covered by the prefilter (those in s_pids).
2644 // I need to do it before the sort becuase the indices refer
2645 // to the position in the unsorted list.
2646 vector<cnf_elem *> tmp_wh;
2647 for(w=0;w<s_filt.size();++w){
2648 compute_cnf_cost(s_filt[w],Ext_fcns);
2649 tmp_wh.push_back(s_filt[w]);
2653 sort(s_filt.begin(), s_filt.end(), compare_cnf_cost());
2655 // Now generate the predicates.
2656 for(w=0;w<s_filt.size();++w){
2657 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,s_filt[w]->cost);
2660 // Find partial fcns ref'd in this cnf element
2662 collect_partial_fcns_pr(s_filt[w]->pr, pfcn_refs);
2663 // Since set<..> is a "Sorted Associative Container",
2664 // we can walk through it in sorted order by walking from
2665 // begin() to end(). (and the partial fcns must be
2666 // evaluated in this order).
2667 set<int>::iterator si;
2669 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2670 if(fcn_ref_cnt[(*si)] > 1){
2671 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2673 if(is_partial_fcn[(*si)]){
2674 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2675 ret += "\t\tif(retval) goto end_s;\n";
2677 if(fcn_ref_cnt[(*si)] > 1){
2678 if(!is_partial_fcn[(*si)]){
2679 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2680 // Testing for S is a side branch.
2681 // I don't want a cacheable partial function to be
2682 // marked as evaluated. Therefore I mark the function
2683 // as evalauted ONLY IF it is not partial.
2684 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2690 ret += "\tif( !("+generate_predicate_code(s_filt[w]->pr,schema)+
2691 ") ) goto end_s;\n";
2694 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2697 for(p=0;p<fs->hash_eq.size();++p)
2698 ret += "\t\ts_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_right_se(),schema)+";\n";
2701 // First, generate the S scalar expressions in the hash_eq
2703 // Iterate over the bloom filters
2705 ret += "\t\tbucket=0;\n";
2706 for(p=0;p<fs->hash_eq.size();++p){
2708 " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2709 fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2710 +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2712 // SET_BF_BIT(table,number of bloom filters,bloom filter index,bit index)
2714 " bucket &= "+int_to_string(bf_mask)+";\n"
2715 " SET_BF_BIT(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket);\n"
2720 ret += "// Add the S record to the hash table, choose a position\n";
2721 ret += "\t\tbucket=0;\n";
2722 for(p=0;p<fs->hash_eq.size();++p){
2724 " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2725 fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2726 +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2729 " bucket &= "+int_to_string(bf_mask)+";\n"
2730 " bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2732 // Try the first bucket
2734 for(p=0;p<fs->hash_eq.size();++p){
2735 if(p>0) ret += " && ";
2736 // ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2737 // " == s_equijoin_"+int_to_string(p);
2738 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2739 string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2740 string rhs_op = "s_equijoin_"+int_to_string(p);
2741 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2743 ret += "){\n\t\t\tthe_bucket = bucket;\n";
2744 ret += "\t\t}else{\n\t\t\tif(";
2745 for(p=0;p<fs->hash_eq.size();++p){
2746 if(p>0) ret += " && ";
2747 // ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2748 // " == s_equijoin_"+int_to_string(p);
2749 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2750 string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
2751 string rhs_op = "s_equijoin_"+int_to_string(p);
2752 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2754 ret += "){\n\t\t\t\tthe_bucket = bucket1;\n";
2755 ret += "\t\t\t}else{ \n\t\t\t\tif(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n";
2756 ret+="\t\t\t\t\tthe_bucket = bucket;\n\t\t\t\telse\n\t\t\t\t\tthe_bucket=bucket1;\n";
2757 ret += "\t\t\t}\n\t\t}\n";
2758 for(p=0;p<fs->hash_eq.size();++p){
2759 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2760 if(hdt->is_buffer_type()){
2761 sprintf(tmpstr,"\t\t%s(f, &(t->join_table[the_bucket].key_var%d), &s_equijoin_%d);\n", hdt->get_buffer_assign_copy().c_str(), p, p);
2764 ret += "\t\tt->join_table[the_bucket].key_var"+int_to_string(p)+
2765 " = s_equijoin_"+int_to_string(p)+";\n";
2768 ret+="\t\tt->join_table[the_bucket].ts = curr_fj_ts;\n";
2770 ret += "\tend_s:\n";
2772 // ------------------------------------------------------------
2773 // Next, determine if the R record should be processed.
2777 "// R (main stream) cheap predicate\n"
2781 // Unpack r_filt fields
2782 vector<cnf_elem *> r_filt = fs->pred_t0;
2783 for(w=0;w<r_filt.size();++w){
2784 col_id_set this_pred_cids;
2785 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
2786 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2787 if(unpacked_cids.count( (*csi) ) == 0){
2788 int tblref = (*csi).tblvar_ref;
2789 int schref = (*csi).schema_ref;
2790 string field = (*csi).field;
2791 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2792 unpacked_cids.insert( (*csi) );
2797 // Sort R preds based on cost.
2799 vector<cnf_elem *> tmp_wh;
2800 for(w=0;w<r_filt.size();++w){
2801 compute_cnf_cost(r_filt[w],Ext_fcns);
2802 tmp_wh.push_back(r_filt[w]);
2806 sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
2808 // WARNING! the constant 20 below is a wild-ass guess.
2810 for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++);
2812 // Test the cheap filters on R.
2815 // Now generate the predicates.
2816 for(w=0;w<cheap_rpos;++w){
2817 sprintf(tmpstr,"//\t\tcheap R predicate clause %d. (cost %d)\n",w,r_filt[w]->cost);
2820 // Find partial fcns ref'd in this cnf element
2822 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2823 // Since set<..> is a "Sorted Associative Container",
2824 // we can walk through it in sorted order by walking from
2825 // begin() to end(). (and the partial fcns must be
2826 // evaluated in this order).
2827 set<int>::iterator si;
2828 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2829 if(fcn_ref_cnt[(*si)] > 1){
2830 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2832 if(is_partial_fcn[(*si)]){
2833 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2834 ret += "\t\tif(retval) goto end;\n";
2836 if(fcn_ref_cnt[(*si)] > 1){
2837 if(!is_partial_fcn[(*si)]){
2838 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2840 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2845 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2849 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2852 ret += "\n// Do the join\n\n";
2853 for(p=0;p<fs->hash_eq.size();++p)
2854 ret += "\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n";
2857 // Passed the cheap pred, now test the join with S.
2860 ret += "\t\tbucket"+int_to_string(i)+"=0;\n";
2861 for(p=0;p<fs->hash_eq.size();++p){
2863 " bucket"+int_to_string(i)+
2864 " ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2865 fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
2866 +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2869 " bucket"+int_to_string(i)+" &= "+int_to_string(bf_mask)+";\n";
2871 ret += "\tfound = 0;\n";
2872 ret += "\tfor(b=0;b<"+int_to_string(n_bloom)+" && !found; b++){\n";
2874 "\t\tif(IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket0) && "
2875 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket1) && "
2876 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket2))\n "
2885 ret += "\tfound = 0;\n";
2886 ret += "\t\tbucket=0;\n";
2887 for(p=0;p<fs->hash_eq.size();++p){
2889 " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2890 fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
2891 +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2894 " bucket &= "+int_to_string(bf_mask)+";\n"
2895 " bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2897 // Try the first bucket
2899 for(p=0;p<fs->hash_eq.size();++p){
2900 if(p>0) ret += " && ";
2901 // ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2902 // " == r_equijoin_"+int_to_string(p);
2903 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2904 string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2905 string rhs_op = "s_equijoin_"+int_to_string(p);
2906 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2908 if(p>0) ret += " && ";
2909 ret += "t->join_table[bucket].ts+"+int_to_string(fs->temporal_range)+" <= curr_fj_ts";
2910 ret += "){\n\t\t\tfound = 1;\n";
2911 ret += "\t\t}else {if(";
2912 for(p=0;p<fs->hash_eq.size();++p){
2913 if(p>0) ret += " && ";
2914 // ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2915 // " == r_equijoin_"+int_to_string(p);
2916 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2917 string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
2918 string rhs_op = "s_equijoin_"+int_to_string(p);
2919 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2921 if(p>0) ret += " && ";
2922 ret += "t->join_table[bucket1].ts+"+int_to_string(fs->temporal_range)+" <= curr_fj_ts";
2923 ret += ")\n\t\t\tfound=1;\n";
2932 // Test the expensive filters on R.
2933 if(cheap_rpos < r_filt.size()){
2935 // Now generate the predicates.
2936 for(w=cheap_rpos;w<r_filt.size();++w){
2937 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
2940 // Find partial fcns ref'd in this cnf element
2942 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2943 // Since set<..> is a "Sorted Associative Container",
2944 // we can walk through it in sorted order by walking from
2945 // begin() to end(). (and the partial fcns must be
2946 // evaluated in this order).
2947 set<int>::iterator si;
2948 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2949 if(fcn_ref_cnt[(*si)] > 1){
2950 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2952 if(is_partial_fcn[(*si)]){
2953 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2954 ret += "\t\tif(retval) goto end;\n";
2956 if(fcn_ref_cnt[(*si)] > 1){
2957 if(!is_partial_fcn[(*si)]){
2958 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2960 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2965 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2969 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2974 /////////////// post the tuple
2976 // test passed : create the tuple, then assign to it.
2977 ret += "/*\t\tCreate and post the tuple\t*/\n";
2979 // Unpack r_filt fields
2980 for(s=0;s<sl_list.size();++s){
2981 col_id_set this_se_cids;
2982 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
2983 for(csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
2984 if(unpacked_cids.count( (*csi) ) == 0){
2985 int tblref = (*csi).tblvar_ref;
2986 int schref = (*csi).schema_ref;
2987 string field = (*csi).field;
2988 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2989 unpacked_cids.insert( (*csi) );
2995 // Unpack partial fcns ref'd by the select clause.
2996 // Its a kind of a WHERE clause ...
2997 for(p=sl_fcns_start;p<sl_fcns_end;p++){
2998 if(fcn_ref_cnt[p] > 1){
2999 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
3001 if(is_partial_fcn[p]){
3002 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3003 ret += "\tif(retval) goto end;\n";
3005 if(fcn_ref_cnt[p] > 1){
3006 if(!is_partial_fcn[p]){
3007 ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
3009 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
3014 // increment the counter of accepted tuples
3015 ret += "\n\t#ifdef LFTA_STATS\n";
3016 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3017 ret += "\t#endif\n\n";
3019 // First, compute the size of the tuple.
3021 // Unpack any BUFFER type selections into temporaries
3022 // so that I can compute their size and not have
3023 // to recompute their value during tuple packing.
3024 // I can use regular assignment here because
3025 // these temporaries are non-persistent.
3027 for(s=0;s<sl_list.size();s++){
3028 data_type *sdt = sl_list[s]->get_data_type();
3029 if(sdt->is_buffer_type()){
3030 sprintf(tmpstr,"\tselvar_%d = ",s);
3032 ret += generate_se_code(sl_list[s],schema);
3038 // The size of the tuple is the size of the tuple struct plus the
3039 // size of the buffers to be copied in.
3041 ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
3042 for(s=0;s<sl_list.size();s++){
3043 data_type *sdt = sl_list[s]->get_data_type();
3044 if(sdt->is_buffer_type()){
3045 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
3052 ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
3053 ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
3055 // Test passed, make assignments to the tuple.
3057 ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
3059 // Mark tuple as REGULAR_TUPLE
3060 ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
3063 for(s=0;s<sl_list.size();s++){
3064 data_type *sdt = sl_list[s]->get_data_type();
3065 if(sdt->is_buffer_type()){
3066 sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
3068 sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
3071 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
3073 // if(sdt->needs_hn_translation())
3074 // ret += sdt->hton_translation() +"( ";
3075 ret += generate_se_code(sl_list[s],schema);
3076 // if(sdt->needs_hn_translation())
3084 ret += "\tpost_tuple(tuple);\n";
3086 // Increment the counter of posted tuples
3087 ret += "\n\t#ifdef LFTA_STATS\n";
3088 ret += "\n\tt->out_tuple_cnt++;\n\n";
3089 ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
3090 ret += "\t#endif\n\n";
3097 string generate_wj_accept_body(watch_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
3103 string wl_schema = fs->from[1]->get_schema_name();
3104 string wl_elem_str = generate_watchlist_element_name(wl_schema);
3105 string wl_node_str = generate_watchlist_struct_name(wl_schema);
3106 string tgt = generate_watchlist_name(wl_schema);
3108 ret += "//\n//\t\tGenerate test to update watchtable here\n//\n\n";
3114 // ------------------------------------------------------------
3115 // Determine if the R record should be processed.
3119 "// R (main stream) cheap predicate\n"
3123 // Unpack r_filt fields
3124 vector<cnf_elem *> r_filt = fs->pred_t0;
3125 for(w=0;w<r_filt.size();++w){
3126 col_id_set this_pred_cids;
3127 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
3128 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3129 if(unpacked_cids.count( (*csi) ) == 0){
3130 int tblref = (*csi).tblvar_ref;
3131 int schref = (*csi).schema_ref;
3132 string field = (*csi).field;
3133 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3134 unpacked_cids.insert( (*csi) );
3139 // Sort R preds based on cost.
3141 vector<cnf_elem *> tmp_wh;
3142 for(w=0;w<r_filt.size();++w){
3143 compute_cnf_cost(r_filt[w],Ext_fcns);
3144 tmp_wh.push_back(r_filt[w]);
3148 sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
3150 // WARNING! the constant 20 below is a wild-ass guess.
3152 for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++);
3154 // Test the cheap filters on R.
3157 // Now generate the predicates.
3158 for(w=0;w<cheap_rpos;++w){
3159 sprintf(tmpstr,"//\t\tCheap R predicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
3162 // Find partial fcns ref'd in this cnf element
3164 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
3165 // Since set<..> is a "Sorted Associative Container",
3166 // we can walk through it in sorted order by walking from
3167 // begin() to end(). (and the partial fcns must be
3168 // evaluated in this order).
3169 set<int>::iterator si;
3170 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3171 if(fcn_ref_cnt[(*si)] > 1){
3172 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3174 if(is_partial_fcn[(*si)]){
3175 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3176 ret += "\t\tif(retval) goto end;\n";
3178 if(fcn_ref_cnt[(*si)] > 1){
3179 if(!is_partial_fcn[(*si)]){
3180 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3182 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3187 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
3191 ret += "\n\n/*\t\t (no cheap R predicate to test)\t*/\n\n";
3194 ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
3195 map<string, cnf_elem *> h_eq = ((watch_join_qpn *)fs)-> hash_eq;
3196 vector<string> kflds = ((watch_join_qpn *)fs)->key_flds;
3197 for(w=0;w<kflds.size();++w){
3198 string kfld = kflds[w];
3199 col_id_set this_pred_cids;
3200 gather_pr_col_ids(h_eq[kfld]->pr, this_pred_cids, gb_tbl);
3201 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3202 if(unpacked_cids.count( (*csi) ) == 0){
3203 int tblref = (*csi).tblvar_ref;
3204 int schref = (*csi).schema_ref;
3205 string field = (*csi).field;
3206 if(tblref==0) // LHS from packet, don't unpack the RHS
3207 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3208 unpacked_cids.insert( (*csi) );
3214 ret += "\n// Do the join\n\n";
3215 ret += "\n// (ensure that the watchtable is fresh)\n";
3216 ret += "\tif(t->ux_time >= "+tgt+".next_refresh){\n";
3217 ret += "\t\treload_watchlist__"+wl_schema+"();\n";
3218 ret += "\t\t"+tgt+".next_refresh = t->ux_time+"+tgt+".refresh_interval;\n";
3222 for(p=0;p<fs->key_flds.size();++p){
3223 string kfld = fs->key_flds[p];
3224 ret += "\tr_equijoin_"+kfld+" = "+generate_se_code(fs->hash_eq[kfld]->pr->get_left_se(),schema)+";\n";
3228 // Passed the cheap pred, now test the join with S.
3229 ret += "\tbucket=0;\n";
3230 ret += "\thash=0;\n";
3231 for(p=0;p<fs->key_flds.size();++p){
3232 string kfld = fs->key_flds[p];
3234 " hash ^= (("+hash_nums[p%NRANDS]+" * lfta_"+
3235 fs->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_type_str()+
3236 +"_to_hash(r_equijoin_"+kfld+")));\n";
3238 ret += "\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n";
3240 ret += "\t\trec = "+tgt+".ht[bucket];\n";
3241 ret += "\t\twhile(rec!=NULL){\n";
3242 ret += "\t\t\tif(hash==rec->hashval){\n";
3243 ret += "\t\t\t\tif(";
3244 for(p=0;p<fs->key_flds.size();++p){
3245 string kfld = fs->key_flds[p];
3246 if(p>0) ret += " && ";
3247 data_type *hdt = fs->hash_eq[kfld]->pr->get_right_se()->get_data_type();
3248 string lhs_op = "r_equijoin_"+kfld;
3249 string rhs_op = "rec->"+kfld;
3250 ret += generate_equality_test(lhs_op,rhs_op,hdt);
3253 ret += "\t\t\t\t\tbreak;\n";
3255 ret += "\t\t\trec=rec->next;\n";
3257 ret += "\t\tif(rec==NULL)\n";
3258 ret += "\t\t\tgoto end;\n";
3260 ret += "\n/*\tPassed the hash lookup clause, unpack the other predicate fields. */\n";
3261 for(w=0;w<where.size();++w){
3262 col_id_set this_pred_cids;
3263 gather_pr_col_ids(where[w]->pr, this_pred_cids, gb_tbl);
3264 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3265 if(unpacked_cids.count( (*csi) ) == 0){
3266 int tblref = (*csi).tblvar_ref;
3267 int schref = (*csi).schema_ref;
3268 string field = (*csi).field;
3269 if(tblref==0) // LHS from packet
3270 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3271 else // RHS from hash bucket
3272 ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n";
3273 unpacked_cids.insert( (*csi) );
3279 // Test the expensive filters on R.
3280 // TODO Should merge this with other predicates and eval in order
3281 // of cost - see the fj code.
3282 // TODO join and postfilter predicates haven't been costed yet.
3283 if(cheap_rpos < r_filt.size()){
3285 // Now generate the predicates.
3286 for(w=cheap_rpos;w<r_filt.size();++w){
3287 sprintf(tmpstr,"//\t\tExpensive R predicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
3290 // Find partial fcns ref'd in this cnf element
3292 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
3293 // Since set<..> is a "Sorted Associative Container",
3294 // we can walk through it in sorted order by walking from
3295 // begin() to end(). (and the partial fcns must be
3296 // evaluated in this order).
3297 set<int>::iterator si;
3298 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3299 if(fcn_ref_cnt[(*si)] > 1){
3300 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3302 if(is_partial_fcn[(*si)]){
3303 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3304 ret += "\t\tif(retval) goto end;\n";
3306 if(fcn_ref_cnt[(*si)] > 1){
3307 if(!is_partial_fcn[(*si)]){
3308 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3310 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3315 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
3319 ret += "\n\n/*\t\t (no expensive R predicate to test)\t*/\n\n";
3322 // TODO sort the additional predicates by cost
3325 for(w=0;w<fs->pred_t1.size();++w){
3326 sprintf(tmpstr,"//\t\tS Predicate clause %d.(cost %d)\n",w,fs->pred_t1[w]->cost);
3329 // Find partial fcns ref'd in this cnf element
3331 collect_partial_fcns_pr(fs->pred_t1[w]->pr, pfcn_refs);
3332 // Since set<..> is a "Sorted Associative Container",
3333 // we can walk through it in sorted order by walking from
3334 // begin() to end(). (and the partial fcns must be
3335 // evaluated in this order).
3336 set<int>::iterator si;
3337 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3338 if(fcn_ref_cnt[(*si)] > 1){
3339 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3341 if(is_partial_fcn[(*si)]){
3342 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3343 ret += "\t\tif(retval) goto end;\n";
3345 if(fcn_ref_cnt[(*si)] > 1){
3346 if(!is_partial_fcn[(*si)]){
3347 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3349 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3354 ret += "\tif( !("+generate_predicate_code(fs->pred_t1[w]->pr,schema)+
3359 for(w=0;w<fs->join_filter.size();++w){
3360 sprintf(tmpstr,"//\t\tJoin Predicate clause %d.(cost %d)\n",w,fs->join_filter[w]->cost);
3363 // Find partial fcns ref'd in this cnf element
3365 collect_partial_fcns_pr(fs->join_filter[w]->pr, pfcn_refs);
3366 // Since set<..> is a "Sorted Associative Container",
3367 // we can walk through it in sorted order by walking from
3368 // begin() to end(). (and the partial fcns must be
3369 // evaluated in this order).
3370 set<int>::iterator si;
3371 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3372 if(fcn_ref_cnt[(*si)] > 1){
3373 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3375 if(is_partial_fcn[(*si)]){
3376 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3377 ret += "\t\tif(retval) goto end;\n";
3379 if(fcn_ref_cnt[(*si)] > 1){
3380 if(!is_partial_fcn[(*si)]){
3381 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3383 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3388 ret += "\tif( !("+generate_predicate_code(fs->join_filter[w]->pr,schema)+
3393 for(w=0;w<fs->postfilter.size();++w){
3394 sprintf(tmpstr,"//\t\tpostfilter Predicate clause %d.(cost %d)\n",w,fs->postfilter[w]->cost);
3397 // Find partial fcns ref'd in this cnf element
3399 collect_partial_fcns_pr(fs->postfilter[w]->pr, pfcn_refs);
3400 // Since set<..> is a "Sorted Associative Container",
3401 // we can walk through it in sorted order by walking from
3402 // begin() to end(). (and the partial fcns must be
3403 // evaluated in this order).
3404 set<int>::iterator si;
3405 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3406 if(fcn_ref_cnt[(*si)] > 1){
3407 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3409 if(is_partial_fcn[(*si)]){
3410 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3411 ret += "\t\tif(retval) goto end;\n";
3413 if(fcn_ref_cnt[(*si)] > 1){
3414 if(!is_partial_fcn[(*si)]){
3415 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3417 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3422 ret += "\tif( !("+generate_predicate_code(fs->postfilter[w]->pr,schema)+
3428 /////////////// post the tuple
3430 // test passed : create the tuple, then assign to it.
3431 ret += "/*\t\tCreate and post the tuple\t*/\n";
3434 for(s=0;s<sl_list.size();++s){
3435 col_id_set this_se_cids;
3436 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
3437 for(auto csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
3438 if(unpacked_cids.count( (*csi) ) == 0){
3439 int tblref = (*csi).tblvar_ref;
3440 int schref = (*csi).schema_ref;
3441 string field = (*csi).field;
3442 if(tblref==0) // LHS from packet
3443 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3444 else // RHS from hash bucket
3445 ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n";
3446 unpacked_cids.insert( (*csi) );
3452 // Unpack partial fcns ref'd by the select clause.
3453 // Its a kind of a WHERE clause ...
3454 for(p=sl_fcns_start;p<sl_fcns_end;p++){
3455 if(fcn_ref_cnt[p] > 1){
3456 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
3458 if(is_partial_fcn[p]){
3459 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3460 ret += "\tif(retval) goto end;\n";
3462 if(fcn_ref_cnt[p] > 1){
3463 if(!is_partial_fcn[p]){
3464 ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
3466 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
3471 // increment the counter of accepted tuples
3472 ret += "\n\t#ifdef LFTA_STATS\n";
3473 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3474 ret += "\t#endif\n\n";
3476 // First, compute the size of the tuple.
3478 // Unpack any BUFFER type selections into temporaries
3479 // so that I can compute their size and not have
3480 // to recompute their value during tuple packing.
3481 // I can use regular assignment here because
3482 // these temporaries are non-persistent.
3484 for(s=0;s<sl_list.size();s++){
3485 data_type *sdt = sl_list[s]->get_data_type();
3486 if(sdt->is_buffer_type()){
3487 sprintf(tmpstr,"\tselvar_%d = ",s);
3489 ret += generate_se_code(sl_list[s],schema);
3495 // The size of the tuple is the size of the tuple struct plus the
3496 // size of the buffers to be copied in.
3498 ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
3499 for(s=0;s<sl_list.size();s++){
3500 data_type *sdt = sl_list[s]->get_data_type();
3501 if(sdt->is_buffer_type()){
3502 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
3509 ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
3510 ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
3512 // Test passed, make assignments to the tuple.
3514 ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
3516 // Mark tuple as REGULAR_TUPLE
3517 ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
3520 for(s=0;s<sl_list.size();s++){
3521 data_type *sdt = sl_list[s]->get_data_type();
3522 if(sdt->is_buffer_type()){
3523 sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
3525 sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
3528 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
3530 // if(sdt->needs_hn_translation())
3531 // ret += sdt->hton_translation() +"( ";
3532 ret += generate_se_code(sl_list[s],schema);
3533 // if(sdt->needs_hn_translation())
3541 ret += "\tpost_tuple(tuple);\n";
3543 // Increment the counter of posted tuples
3544 ret += "\n\t#ifdef LFTA_STATS\n";
3545 ret += "\n\tt->out_tuple_cnt++;\n\n";
3546 ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
3547 ret += "\t#endif\n\n";
3553 string generate_aggr_accept_body(qp_node *fs,string node_name,table_list *schema, string &temporal_flush){
3557 ////////////// Processing for aggregtion query
3559 // First, search for a match. Start by unpacking the group-by attributes.
3561 // One complication : if a real-time aggregate flush occurs,
3562 // the GB attr has already been calculated. So don't compute
3563 // it again if 1) its temporal and 2) it will be computed in the
3564 // agggregate flush code.
3566 // Unpack the partial fcns ref'd by the gb's and the aggr defs.
3567 for(p=gb_fcns_start;p<gb_fcns_end;p++){
3568 if(is_partial_fcn[p]){
3569 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3570 ret += "\tif(retval) goto end;\n";
3573 for(p=ag_fcns_start;p<ag_fcns_end;p++){
3574 if(is_partial_fcn[p]){
3575 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3576 ret += "\tif(retval) goto end;\n";
3580 // increment the counter of accepted tuples
3581 ret += "\n\t#ifdef LFTA_STATS\n";
3582 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3583 ret += "\t#endif\n\n";
3585 ret += "/*\t\tTest if the group is in the hash table \t*/\n";
3586 // Compute the values of the group-by variables.
3587 for(g=0;g<gb_tbl->size();g++){
3588 data_type *gdt = gb_tbl->get_data_type(g);
3589 if((! gdt->is_temporal()) || temporal_flush == ""){
3591 if(gdt->is_buffer_type()){
3592 // NOTE : if the SE defining the gb is anything
3593 // other than a ref to a variable, this will generate
3594 // illegal code. To be resolved with Spatch.
3595 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
3596 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
3598 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
3599 gdt->get_buffer_assign_copy().c_str(), g, g);
3601 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
3608 // A quick aside : if any of the GB attrs are temporal,
3609 // test for change and flush if any change occurred.
3610 // We've already computed the flush code,
3611 // Put it here if this is not a real time query.
3612 // We've already unpacked all column refs, so no need to
3613 // do it again here.
3615 string rt_level = fs->get_val_of_def("real_time");
3616 if(rt_level == "" && temporal_flush != ""){
3617 ret += temporal_flush;
3620 // Compute the hash bucket
3621 if(gb_tbl->size() > 0){
3622 ret += "\thashval = ";\
3623 for(g=0;g<gb_tbl->size();g++){
3624 if(g>0) ret += " ^ ";
3625 data_type *gdt = gb_tbl->get_data_type(g);
3626 if(gdt->is_buffer_type()){
3627 sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
3628 gdt->get_type_str().c_str(), g);
3630 sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
3631 gdt->get_type_str().c_str(), g);
3636 ret += "\thash2 = ((hashval * "+hash_nums[g%NRANDS]+") >> 32) & SLOT_HASH_BITS;\n";
3637 ret+="\tprobe = (hashval >> 32) & (t->max_aggrs-1);\n";
3639 ret+="\tprobe = 0;\n";
3640 ret+="\thash2 = 0;\n\n";
3643 // Does the lfta reference a udaf?
3644 bool has_udaf = false;
3645 for(a=0;a<aggr_tbl->size();a++){
3646 if(! aggr_tbl->is_builtin(a)) has_udaf = true;
3649 // Scan for a match, or alternatively the best slot.
3650 // Currently, hardcode 5 tests.
3652 " gen_val = t->generation & SLOT_GEN_BITS;\n"
3653 " match_found = 0;\n"
3654 " best_slot = probe;\n"
3655 " for(i=0;i<5 && match_found == 0;i++){\n"
3656 " if((t->aggr_table_hashmap[probe] & SLOT_FILLED) && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_HASH_BITS) == hash2 ){\n"
3658 if(gb_tbl->size()>0){
3659 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
3661 string rhs_op, lhs_op;
3662 for(g=0;g<gb_tbl->size();g++){
3663 if(g>0) ret += " && ";
3665 sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
3666 sprintf(tmpstr,"t->aggr_table[probe].gb_var%d",g); rhs_op = tmpstr;
3667 ret += generate_equality_test(lhs_op,rhs_op,gb_tbl->get_data_type(g));
3672 " match_found = 1;\n"
3673 " best_slot = probe;\n"
3676 "// Rate slots in case no match found: prefer empty, then full but old slots\n"
3677 " if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3678 " if((t->aggr_table_hashmap[probe] & SLOT_FILLED)==0)\n"
3679 " best_slot = probe;\n"
3681 " if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) != gen_val){\n"
3682 " best_slot = probe;\n"
3686 " if(probe >= t->max_aggrs)\n"
3689 " if(match_found){\n"
3691 ret += generate_gb_update(node_name, schema, "best_slot",has_udaf);
3694 " if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3696 printf("sgah_qpn name is %s, disorder is %d\n",fs->node_name.c_str(),((sgah_qpn *)fs)->lfta_disorder);
3697 if(((sgah_qpn *)fs)->lfta_disorder <= 1){
3699 " if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS)==gen_val){\n"
3701 bool first_g = true;
3702 for(int g=0;g<gb_tbl->size();g++){
3703 data_type *gdt = gb_tbl->get_data_type(g);
3704 if(gdt->is_temporal()){
3705 if(first_g) first_g = false; else ret+=" + ";
3706 ret += "(gb_attr_"+int_to_string(g)+" - t->aggr_table[best_slot].gb_var"+int_to_string(g)+")";
3709 ret += ") == 0 ){\n";
3712 " fta_aggr_flush_old_"+ node_name+"(f,t->max_aggrs);\n"
3718 ret += generate_tuple_from_aggr(node_name,schema,"best_slot");
3720 "\t\t\t#ifdef LFTA_STATS\n"
3721 "\t\t\tif((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val)\n"
3722 "\t\t\t\tt->collision_cnt++;\n\n"
3726 ret += generate_init_group(schema,"best_slot");
3736 string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, bool is_wj, set<unsigned int> &s_pids){
3738 string ret="static gs_retval_t accept_packet_"+node_name+
3739 "(struct FTA *f, FTAID * ftaid, void *pkt, gs_int32_t sz){\n";
3740 ret += "\tstruct packet *p = (struct packet *)pkt;\n";
3744 // Define all of the variables needed by this
3748 // Gather all column references, need to define unpacking variables.
3751 col_id_set::iterator csi;
3753 // If its a filter join, rebind all colrefs
3754 // to the first range var, to avoid double unpacking.
3757 for(w=0;w<where.size();++w)
3758 reset_pr_col_ids_tblvars(where[w]->pr, gb_tbl);
3759 for(s=0;s<sl_list.size();s++)
3760 reset_se_col_ids_tblvars(sl_list[s], gb_tbl);
3763 for(w=0;w<where.size();++w){
3764 if(is_wj || is_fj || s_pids.count(w) == 0)
3765 gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
3767 for(s=0;s<sl_list.size();s++){
3768 gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
3773 for(g=0;g<gb_tbl->size();g++)
3774 gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
3777 // Variables for unpacking attributes.
3778 ret += "/*\t\tVariables for unpacking attributes\t*/\n";
3779 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
3780 int schref = (*csi).schema_ref;
3781 int tblref = (*csi).tblvar_ref;
3782 string field = (*csi).field;
3783 data_type dt(schema->get_type_name(schref,field));
3784 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
3785 field.c_str(), tblref);
3791 // Variables that are always needed
3792 ret += "/*\t\tVariables which are always needed\t*/\n";
3793 ret += "\tgs_retval_t retval;\n";
3794 ret += "\tgs_int32_t tuple_size, tuple_pos, lfta_bailout;\n";
3795 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
3797 ret+="\tstruct "+generate_fta_name(node_name)+" *t = (struct "+generate_fta_name(node_name)+"*) f;\n\n";
3800 // Variables needed for aggregation queries.
3802 ret += "\n/*\t\tVariables for aggregation\t*/\n";
3803 ret+="\tunsigned int i, probe;\n";
3804 ret+="\tunsigned int gen_val, match_found, best_slot;\n";
3805 ret+="\tgs_uint64_t hashval, hash2;\n";
3806 // Variables for storing group-by attribute values.
3807 if(gb_tbl->size() > 0)
3808 ret += "/*\t\tGroup-by attributes\t*/\n";
3809 for(g=0;g<gb_tbl->size();g++){
3810 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3812 data_type *gdt = gb_tbl->get_data_type(g);
3813 if(gdt->is_buffer_type()){
3814 sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3819 // Temporaries for min/max
3820 string aggr_tmp_str = "";
3821 for(a=0;a<aggr_tbl->size();a++){
3822 string aggr_op = aggr_tbl->get_op(a);
3823 if(aggr_op == "MIN" || aggr_op == "MAX"){
3824 sprintf(tmpstr,"\t%s aggr_tmp_%d;\n",aggr_tbl->get_data_type(a)->get_cvar_type().c_str(),a);
3825 aggr_tmp_str.append(tmpstr);
3828 if(aggr_tmp_str != ""){
3829 ret += "/*\t\tTemp vars for BUFFER aggregates\t*/\n";
3830 ret += aggr_tmp_str;
3833 // Variables for udaf output temporaries
3834 bool no_udaf = true;
3835 for(a=0;a<aggr_tbl->size();a++){
3836 if(! aggr_tbl->is_builtin(a)){
3838 ret+="/*\t\tUDAF output vars.\t*/\n";
3841 int afcn_id = aggr_tbl->get_fcn_id(a);
3842 data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
3843 sprintf(tmpstr,"udaf_ret%d", a);
3844 ret+="\t"+adt->make_cvar(tmpstr)+";\n";
3849 // Variables needed for a filter join query
3850 if(fs->node_type() == "filter_join"){
3851 filter_join_qpn *fjq = (filter_join_qpn *)fs;
3852 bool uses_bloom = fjq->use_bloom;
3853 ret += "/*\t\tJoin fields\t*/\n";
3854 for(g=0;g<fjq->hash_eq.size();g++){
3855 sprintf(tmpstr,"\t%s s_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_right_se()->get_data_type()->get_cvar_type().c_str(),g);
3857 sprintf(tmpstr,"\t%s r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g);
3862 " /* Variables for fj bloom filter */ \n"
3863 "\tunsigned int i=0,j=0,k=0, b, bf_clean = 0, tmp_i, found; \n"
3864 "\tunsigned int bucket, bucket0, bucket1, bucket2;\n"
3865 "\tlong long int curr_fj_ts;\n"
3866 "\tlong long int curr_bin, the_bin;\n"
3871 " /* Variables for fj join table */ \n"
3872 "\tunsigned int i, bucket, found; \n"
3873 "\tunsigned int bucket1, the_bucket;\n"
3874 " long long int curr_fj_ts;\n"
3881 if(fs->node_type() == "watch_join"){
3882 watch_join_qpn *wlq = (watch_join_qpn *)fs;
3883 ret += "/*\t\tJoin fields\t*/\n";
3884 for(int k=0;k<wlq->key_flds.size(); ++k){
3885 string kfld = wlq->key_flds[k];
3886 ret += "\t"+wlq->hash_eq[kfld]->pr->get_right_se()->get_data_type()->get_cvar_type()+" s_equijoin_"+kfld+";\n";
3887 ret += "\t"+wlq->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_cvar_type()+" r_equijoin_"+kfld+";\n";
3890 " /* Variables for wl join table */ \n"
3891 "\tunsigned int i, bucket;\n"
3892 "\tunsigned long long int hash; \n";
3893 string wl_schema = wlq->from[1]->get_schema_name();
3894 string wl_elem_str = generate_watchlist_element_name(wl_schema);
3895 ret += "\tstruct "+wl_elem_str+" *rec = NULL;\n";
3901 // Variables needed to store selected attributes of BUFFER type
3902 // temporarily, in order to compute their size for storage
3903 // in an output tuple.
3905 string select_var_defs = "";
3906 for(int s=0;s<sl_list.size();s++){
3907 data_type *sdt = sl_list[s]->get_data_type();
3908 if(sdt->is_buffer_type()){
3909 sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
3910 select_var_defs.append(tmpstr);
3913 if(select_var_defs != ""){
3914 ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
3915 ret += select_var_defs;
3918 // Variables to store results of partial functions.
3920 if(partial_fcns.size()>0){
3921 ret += "/*\t\tVariables for storing results of partial functions. \t*/\n";
3922 for(p=0;p<partial_fcns.size();++p){
3923 if(is_partial_fcn[p] || (!is_aggr_query && fcn_ref_cnt[p] >1)){
3924 sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
3925 partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
3927 if(!is_aggr_query && fcn_ref_cnt[p] >1){
3928 ret += "\tint fcn_ref_cnt_"+int_to_string(p)+" = 0;\n";
3933 if(is_aggr_query) ret += "\tint unpack_failed = 0;\n";
3937 // variable to hold packet struct //
3939 ret += "\t struct "+node_name+"_input_struct *"+node_name+"_input_struct_var;\n";
3943 ret += "\t#ifdef LFTA_STATS\n";
3944 // variable to store counter of cpu cycles spend in accept_tuple
3945 ret += "\tgs_uint64_t start_cycle = rdtsc();\n";
3946 // increment counter of received tuples
3947 ret += "\tt->in_tuple_cnt++;\n";
3948 ret += "\t#endif\n";
3951 // -------------------------------------------------
3952 // If the packet is "packet", test if its for this lfta,
3953 // and if so load it into its struct
3956 ret+="\n/* packed tuple : test and load. \t*/\n";
3957 ret+="\t"+node_name+"_input_struct_var = (struct "+node_name+"_input_struct *) pkt;\n";
3958 ret+="\tif("+node_name+"_input_struct_var->__lfta_id_fm_nic__ != "+int_to_string(global_id) + ")\n";
3959 ret+="\t\tgoto end;\n\n";
3964 col_id_set unpacked_cids; // Keep track of the cols that have been unpacked.
3966 string temporal_flush;
3968 ret += generate_aggr_accept_prelim(fs, node_name, schema, unpacked_cids, temporal_flush);
3969 else { // non-aggregation operators
3971 // Unpack all the temporal attributes referenced in select clause
3972 // and update the last value of the attribute
3973 col_id_set temp_cids; // col ids of temp attributes in select clause
3975 for(s=0;s<sl_list.size();s++){
3976 data_type *sdt = sl_list[s]->get_data_type();
3977 if (sdt->is_temporal()) {
3978 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
3981 // If this is a filter join,
3982 // ensure that the temporal range field is unpacked.
3984 col_id window_var_cid(((filter_join_qpn *)fs)->temporal_var);
3985 if(temp_cids.count(window_var_cid)==0)
3986 temp_cids.insert(window_var_cid);
3989 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
3990 if(unpacked_cids.count((*csi)) == 0){
3991 int tblref = (*csi).tblvar_ref;
3992 int schref = (*csi).schema_ref;
3993 string field = (*csi).field;
3994 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3995 sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
3998 unpacked_cids.insert( (*csi) );
4004 vector<cnf_elem *> filter = fs->get_filter_clause();
4005 // Test the filter predicate (some query types have additional preds).
4006 if(filter.size() > 0 && !is_wj){ // watchlist join does specialized processing
4008 // Sort by evaluation cost.
4009 // First, estimate evaluation costs
4010 // Eliminate predicates covered by the prefilter (those in s_pids).
4011 // I need to do it before the sort becuase the indices refer
4012 // to the position in the unsorted list./
4013 vector<cnf_elem *> tmp_wh;
4014 for(w=0;w<filter.size();++w){
4015 if(s_pids.count(w) == 0){
4016 compute_cnf_cost(filter[w],Ext_fcns);
4017 tmp_wh.push_back(filter[w]);
4022 sort(filter.begin(), filter.end(), compare_cnf_cost());
4024 // Now generate the predicates.
4025 for(w=0;w<filter.size();++w){
4026 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,filter[w]->cost);
4028 // Find the set of variables accessed in this CNF elem,
4029 // but in no previous element.
4030 col_id_set this_pred_cids;
4031 gather_pr_col_ids(filter[w]->pr, this_pred_cids, gb_tbl);
4032 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4033 if(unpacked_cids.count( (*csi) ) == 0){
4034 int tblref = (*csi).tblvar_ref;
4035 int schref = (*csi).schema_ref;
4036 string field = (*csi).field;
4037 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4038 unpacked_cids.insert( (*csi) );
4041 // Find partial fcns ref'd in this cnf element
4043 collect_partial_fcns_pr(filter[w]->pr, pfcn_refs);
4044 // Since set<..> is a "Sorted Associative Container",
4045 // we can walk through it in sorted order by walking from
4046 // begin() to end(). (and the partial fcns must be
4047 // evaluated in this order).
4048 set<int>::iterator si;
4049 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
4050 if(fcn_ref_cnt[(*si)] > 1){
4051 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
4053 if(is_partial_fcn[(*si)]){
4054 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
4055 ret += "\t\tif(retval) goto end;\n";
4057 if(fcn_ref_cnt[(*si)] > 1){
4058 if(!is_partial_fcn[(*si)]){
4059 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
4061 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
4066 ret += "\tif( !("+generate_predicate_code(filter[w]->pr,schema)+
4070 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
4074 // We've passed the WHERE clause,
4075 // unpack the remainder of the accessed fields.
4077 ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
4078 vector<cnf_elem *> h_eq = ((filter_join_qpn *)fs)-> hash_eq;
4079 for(w=0;w<h_eq.size();++w){
4080 col_id_set this_pred_cids;
4081 gather_pr_col_ids(h_eq[w]->pr, this_pred_cids, gb_tbl);
4082 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4083 if(unpacked_cids.count( (*csi) ) == 0){
4084 int tblref = (*csi).tblvar_ref;
4085 int schref = (*csi).schema_ref;
4086 string field = (*csi).field;
4087 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4088 unpacked_cids.insert( (*csi) );
4092 }else if(is_wj){ // STOPPED HERE move this to wj main body
4094 ret += "\n//\tPassed the WHERE clause, unpack the hash fields. \n";
4095 map<string, cnf_elem *> h_eq = ((watch_join_qpn *)fs)-> hash_eq;
4096 vector<string> kflds = ((watch_join_qpn *)fs)->key_flds;
4097 for(w=0;w<kflds.size();++w){
4098 string kfld = kflds[w];
4099 col_id_set this_pred_cids;
4100 gather_pr_col_ids(h_eq[kfld]->pr, this_pred_cids, gb_tbl);
4101 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4102 if(unpacked_cids.count( (*csi) ) == 0){
4103 int tblref = (*csi).tblvar_ref;
4104 int schref = (*csi).schema_ref;
4105 string field = (*csi).field;
4106 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4107 unpacked_cids.insert( (*csi) );
4113 ret += "\n/*\tPassed the WHERE clause, unpack the rest of the accessed fields. */\n";
4115 for(csi=cid_set.begin();csi!=cid_set.end();++csi){
4116 if(unpacked_cids.count( (*csi) ) == 0){
4117 int schref = (*csi).schema_ref;
4118 int tblref = (*csi).tblvar_ref;
4119 string field = (*csi).field;
4120 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4121 unpacked_cids.insert( (*csi) );
4128 ////////////////// After this, the query types
4129 ////////////////// are processed differently.
4131 if(!is_aggr_query && !is_fj & !is_wj)
4132 ret += generate_sel_accept_body(fs, node_name, schema);
4133 else if(is_aggr_query)
4134 ret += generate_aggr_accept_body(fs, node_name, schema, temporal_flush);
4137 ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
4139 ret += generate_wj_accept_body((watch_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
4146 ret += "\n\tend:\n";
4147 ret += "\t#ifdef LFTA_STATS\n";
4148 ret+= "\tt->cycle_cnt += rdtsc() - start_cycle;\n";
4149 ret += "\t#endif\n";
4150 ret += "\n\treturn 1;\n}\n\n";
4156 string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, bool is_aggr_query, bool is_fj, bool uses_bloom){
4159 string ret = "struct FTA * "+generate_alloc_name(node_name) +
4160 "(struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command, gs_int32_t sz, void *value){\n";
4162 ret+="\tstruct "+generate_fta_name(node_name)+"* f;\n";
4165 ret+="\tif((f=(struct "+generate_fta_name(node_name)+" *)fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n";
4167 // assign a streamid to fta instance
4168 ret+="\t/* assign a streamid */\n";
4169 ret+="\tf->f.ftaid = ftaid;\n";
4170 ret+="\tf->f.ftaid.streamid = (gs_p_t)f;\n";
4171 ret+="\tgslog(LOG_INFO,\"Lfta "+node_name+" has FTAID {ip=%u,port=%u,index=%u,streamid=%u}\\n\",f->f.ftaid.ip,f->f.ftaid.port,f->f.ftaid.index,f->f.ftaid.streamid);\n";
4174 ret += "\tf->n_aggrs = 0;\n";
4175 ret += "\tf->n_ticks = 0; // for limiting slow flush\n";
4177 ret += "\tf->max_aggrs = ";
4179 // Computing the number of aggregate blocks is a little
4180 // tricky. If there are no GB attrs, or if all GB attrs
4181 // are temporal, then use a single aggregate block, else
4182 // use a default value (10). A user specification overrides
4184 bool single_group = true;
4185 for(g=0;g<gb_tbl->size();g++){
4186 data_type *gdt = gb_tbl->get_data_type(g);
4187 if(! gdt->is_temporal() ){
4188 single_group = false;
4191 string max_aggr_str = fs->get_val_of_def("aggregate_slots");
4192 int max_aggr_i = atoi(max_aggr_str.c_str());
4193 if(max_aggr_i <= 0){
4197 ret += int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
4199 unsigned int naggrs = 1; // make it power of 2
4200 unsigned int nones = 0;
4204 naggrs = naggrs << 1;
4205 max_aggr_i = max_aggr_i >> 1;
4207 if(nones==1) // in case it was already a power of 2.
4209 ret += int_to_string(naggrs);
4213 ret+="\tif ((f->aggr_table = (struct "+generate_aggr_struct_name(node_name)+" *)sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n";
4214 ret+="\t\treturn(0);\n";
4216 // ret+="/* compute how many integers we need to store the hashmap */\n";
4217 // ret+="\tf->bitmap_size = (f->max_aggrs % (sizeof(gs_uint32_t) * 4)) ? (f->max_aggrs / (sizeof(gs_uint32_t) * 4) + 1) : (f->max_aggrs / (sizeof(gs_uint32_t) * 4));\n\n";
4218 ret+="\tif ((f->aggr_table_hashmap = (gs_uint32_t *)sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n";
4219 ret+="\t\treturn(0);\n";
4221 ret+="/*\t\tfill bitmap with zero \t*/\n";
4222 ret+="\tfor (i = 0; i < f->max_aggrs; ++i)\n";
4223 ret+="\t\tf->aggr_table_hashmap[i] = 0;\n";
4224 ret+="\tf->generation=0;\n";
4225 ret+="\tf->flush_pos = f->max_aggrs;\n";
4227 ret += "\tf->flush_ctr = 0;\n";
4233 ret+="\tf->first_exec = 1;\n";
4234 unsigned int n_bloom = 11;
4235 string n_bloom_str = fs->get_val_of_def("num_bloom");
4236 int tmp_n_bloom = atoi(n_bloom_str.c_str());
4238 n_bloom = tmp_n_bloom+1;
4240 unsigned int window_len = ((filter_join_qpn *)fs)->temporal_range;
4241 if(window_len < n_bloom){
4242 n_bloom = window_len+1;
4245 int bf_exp_size = 12; // base-2 log of number of bits
4246 string bloom_len_str = fs->get_val_of_def("bloom_size");
4247 int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
4248 if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
4249 bf_exp_size = tmp_bf_exp_size;
4251 int bf_bit_size = 1 << 12;
4252 int bf_byte_size = bf_bit_size / (8*sizeof(char));
4254 int bf_tot = n_bloom*bf_byte_size;
4255 ret+="\tif ((f->bf_table = (unsigned char *)sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n";
4256 ret+="\t\treturn(0);\n";
4259 " for(i=0;i<"+int_to_string(bf_tot)+";i++)\n"
4260 " f->bf_table[i] = 0;\n"
4263 unsigned int ht_size = 4096;
4264 string ht_size_s = fs->get_val_of_def("aggregate_slots");
4265 int tmp_ht_size = atoi(ht_size_s.c_str());
4266 if(tmp_ht_size > 1024){
4267 unsigned int hs = 1; // make it power of 2
4270 tmp_ht_size = tmp_ht_size >> 1;
4274 ret+="\tif ((f->join_table = (struct "+generate_fj_struct_name(node_name)+" *) sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n";
4275 ret+="\t\treturn(0);\n";
4278 " for(i=0;i<"+int_to_string(ht_size)+";i++)\n"
4279 " f->join_table[i].ts = 0;\n"
4284 // Initialize the complex literals (which might be handles).
4286 for(cl=0;cl<complex_literals->size();cl++){
4287 literal_t *l = complex_literals->get_literal(cl);
4288 // sprintf(tmpstr,"\tf->complex_literal_%d = ",cl);
4289 // ret += tmpstr + l->to_C_code() + ";\n";
4290 sprintf(tmpstr,"&(f->complex_literal_%d)",cl);
4291 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
4296 // Initialize the last seen values of temporal attributes to min(max) value of
4297 // their respective type
4298 // Create places to hold the last values of temporal attributes referenced in select clause
4301 col_id_set temp_cids; // col ids of temp attributes in select clause
4304 col_id_set::iterator csi;
4306 for(s=0;s<sl_list.size();s++){
4307 data_type *sdt = sl_list[s]->get_data_type();
4308 if (sdt->is_temporal()) {
4309 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
4313 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
4314 int tblref = (*csi).tblvar_ref;
4315 int schref = (*csi).schema_ref;
4316 string field = (*csi).field;
4317 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
4318 if (dt.is_increasing()) {
4319 sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_min_literal().c_str());
4321 } else if (dt.is_decreasing()) {
4322 sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_max_literal().c_str());
4327 // initialize last seen values of temporal groubpy variables
4329 for(g=0;g<gb_tbl->size();g++){
4330 data_type *dt = gb_tbl->get_data_type(g);
4331 if(dt->is_temporal()){
4333 fprintf(stderr,"group by attribute %s is temporal, ",
4334 gb_tbl->get_name(g).c_str());
4336 if(dt->is_increasing()){
4337 sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_min_literal().c_str());
4339 sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_max_literal().c_str());
4346 ret += "\tf->f.alloc_fta="+generate_alloc_name(node_name)+";\n";
4347 ret+="\tf->f.free_fta=free_fta_"+node_name+";\n";
4348 ret+="\tf->f.control_fta=control_fta_"+node_name+";\n";
4349 ret+="\tf->f.accept_packet=accept_packet_"+node_name+";\n";
4350 ret+="\tf->f.clock_fta=clock_fta_"+node_name+";\n\n";
4352 // Initialize runtime stats
4353 ret+="\tf->in_tuple_cnt = 0;\n";
4354 ret+="\tf->out_tuple_cnt = 0;\n";
4355 ret+="\tf->out_tuple_sz = 0;\n";
4356 ret+="\tf->accepted_tuple_cnt = 0;\n";
4357 ret+="\tf->cycle_cnt = 0;\n";
4358 ret+="\tf->collision_cnt = 0;\n";
4359 ret+="\tf->eviction_cnt = 0;\n";
4360 ret+="\tf->sampling_rate = 1.0;\n";
4362 ret+="\tf->trace_id = 0;\n\n";
4363 if(param_tbl->size() > 0){
4365 "\tif(load_params_"+node_name+"(f, sz, value, 1)){\n"
4366 "#ifndef LFTA_IN_NIC\n"
4367 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small (%d). This query does not have valid parameters, bailing out.\\n\",sz);\n"
4375 // Register the pass-by-handle parameters
4377 for(ph=0;ph<param_handle_table.size();++ph){
4378 data_type pdt(param_handle_table[ph]->type_name);
4379 sprintf(tmpstr,"\tf->handle_param_%d = %s((struct FTA *)f,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
4380 switch(param_handle_table[ph]->val_type){
4383 if(pdt.is_buffer_type()) ret += "&(";
4384 sprintf(tmpstr,"f->complex_literal_%d",param_handle_table[ph]->complex_literal_idx);
4386 if(pdt.is_buffer_type()) ret += ")";
4390 // not complex, no constructor
4392 ret += param_handle_table[ph]->litval->to_C_code("") + ");\n";
4395 // query parameter handles are regstered/deregistered in the
4396 // load_params function.
4397 // ret += "t->param_"+param_handle_table[ph]->param_name;
4400 fprintf(stderr, "INTERNAL ERROR unknown case found when processing pass-by-handle parameter table.\n");
4405 ret += "\treturn (struct FTA *) f;\n";
4414 //////////////////////////////////////////////////////////////////
4416 string generate_lfta_block(qp_node *fs, table_list *schema, int gid,
4417 // map<string,string> &int_fcn_defs,
4418 ext_fcn_list *Ext_fcns, string &schema_embed_str, ifq_t *ifdb, nic_property *nicp, set<unsigned int> &s_pids){
4423 /////////////////////////////////////////////////////////////
4424 /// Do operator-generic processing, such as
4425 /// gathering the set of referenced columns,
4426 /// generating structures, etc.
4428 // Initialize globals to empty.
4429 gb_tbl = NULL; aggr_tbl = NULL;
4430 global_id = -1; nicprop = NULL;
4431 param_tbl = fs->get_param_tbl();
4432 sl_list.clear(); where.clear();
4433 partial_fcns.clear();
4434 fcn_ref_cnt.clear(); is_partial_fcn.clear();
4435 pred_class.clear(); pred_pos.clear();
4436 sl_fcns_start=sl_fcns_end=wh_fcns_start=wh_fcns_end=0;
4437 gb_fcns_start=gb_fcns_end=ag_fcns_start=ag_fcns_end=0;
4440 // Does the lfta read packed results from the NIC?
4441 nicprop = nicp; // load into global
4443 packed_return = false;
4444 if(nicp && nicp->option_exists("Return")){
4445 if(nicp->option_value("Return") == "Packed"){
4446 packed_return = true;
4448 fprintf(stderr,"Warning, nic option value of Return=%s is not recognized, ignoring\n",nicp->option_value("Return").c_str());
4453 // Extract data which defines the query.
4454 // complex literals gathered now.
4455 complex_literals = fs->get_cplx_lit_tbl(Ext_fcns);
4456 param_handle_table = fs->get_handle_param_tbl(Ext_fcns);
4457 string node_name = fs->get_node_name();
4458 bool is_fj = false, uses_bloom = false;
4460 bool is_watch_tbl = false;
4463 if(fs->node_type() == "spx_qpn"){
4464 is_aggr_query = false;
4465 spx_qpn *spx_node = (spx_qpn *)fs;
4466 sl_list = spx_node->get_select_se_list();
4467 where = spx_node->get_where_clause();
4471 if(fs->node_type() == "sgah_qpn"){
4472 is_aggr_query = true;
4473 sgah_qpn *sgah_node = (sgah_qpn *)fs;
4474 sl_list = sgah_node->get_select_se_list();
4475 where = sgah_node->get_where_clause();
4476 gb_tbl = sgah_node->get_gb_tbl();
4477 aggr_tbl = sgah_node->get_aggr_tbl();
4479 if((sgah_node->get_having_clause()).size() > 0){
4480 fprintf(stderr,"Warning in LFTA %s: HAVING clause will be ignored.\n", fs->get_node_name().c_str());
4483 if(fs->node_type() == "filter_join"){
4484 is_aggr_query = false;
4486 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
4487 sl_list = fj_node->get_select_se_list();
4488 where = fj_node->get_where_clause();
4489 uses_bloom = fj_node->use_bloom;
4493 if(fs->node_type() == "watch_join"){
4494 is_aggr_query = false;
4496 watch_join_qpn *wl_node = (watch_join_qpn *)fs;
4497 sl_list = wl_node->get_select_se_list();
4498 where = wl_node->get_where_clause();
4502 if(fs->node_type() == "watch_tbl_qpn"){
4503 is_aggr_query = false;
4504 is_watch_tbl = true;
4505 vector<scalarexp_t *> empty_sl_list;
4506 vector<cnf_elem *> empty_where;
4507 sl_list = empty_sl_list;
4508 where = empty_where;
4512 fprintf(stderr,"INTERNAL ERROR, unrecognized node type %s in generate_lfta_block\n", fs->node_type().c_str());
4516 // Build list of "partial functions", by clause.
4517 // NOTE : partial fcns are not handled well.
4518 // The act of searching for them associates the fcn call
4519 // in the SE with an index to an array. Refs to the
4520 // fcn value are replaced with refs to the variable they are
4521 // unpacked into. A more general tagging mechanism would be better.
4524 vector<bool> *pfunc_ptr = NULL;
4525 vector<int> *ref_cnt_ptr = NULL;
4526 if(!is_aggr_query){ // don't collect cacheable fcns on aggr query.
4527 ref_cnt_ptr = &fcn_ref_cnt;
4528 pfunc_ptr = &is_partial_fcn;
4532 for(i=0;i<sl_list.size();i++){
4533 find_partial_fcns(sl_list[i], &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
4535 wh_fcns_start = sl_fcns_end = partial_fcns.size();
4536 for(i=0;i<where.size();i++){
4537 find_partial_fcns_pr(where[i]->pr, &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
4539 gb_fcns_start = wh_fcns_end = partial_fcns.size();
4541 for(i=0;i<gb_tbl->size();i++){
4542 find_partial_fcns(gb_tbl->get_def(i), &partial_fcns, NULL, NULL, Ext_fcns);
4545 ag_fcns_start = gb_fcns_end = partial_fcns.size();
4546 if(aggr_tbl != NULL){
4547 for(i=0;i<aggr_tbl->size();i++){
4548 find_partial_fcns(aggr_tbl->get_aggr_se(i), &partial_fcns, NULL, &is_partial_fcn, Ext_fcns);
4551 ag_fcns_end = partial_fcns.size();
4553 // Fill up the is_partial_fcn and fcn_ref_cnt arrays.
4555 for(i=0; i<partial_fcns.size();i++){
4556 fcn_ref_cnt.push_back(1);
4557 is_partial_fcn.push_back(true);
4561 // Unmark non-partial expensive functions referenced only once.
4562 for(i=0; i<partial_fcns.size();i++){
4563 if(!is_partial_fcn[i] && fcn_ref_cnt[i] <= 1){
4564 partial_fcns[i]->set_partial_ref(-1);
4568 node_name = normalize_name(node_name);
4570 retval += generate_preamble(schema, /*int_fcn_defs,*/ node_name, schema_embed_str);
4572 if(packed_return){ // generate unpack struct
4573 vector<tablevar_t *> input_tbls = fs->get_input_tbls();
4574 int schref = input_tbls[0]->get_schema_ref();
4575 vector<string> refd_cols;
4576 for(s=0;s<sl_list.size();++s){
4577 gather_nicsafe_cols(sl_list[s],refd_cols, nicp, gb_tbl);
4579 for(p=0;p<where.size();++p){
4580 // I'm not disabling these preds ...
4581 gather_nicsafe_cols(where[p]->pr,refd_cols, nicp, gb_tbl);
4584 for(g=0;g<gb_tbl->size();++g){
4585 gather_nicsafe_cols(gb_tbl->get_def(g),refd_cols, nicp, gb_tbl);
4588 sort(refd_cols.begin(), refd_cols.end());
4589 retval += "struct "+node_name+"_input_struct{\n";
4590 retval += "\tint __lfta_id_fm_nic__;\n";
4592 for(vsi=0;vsi<refd_cols.size();++vsi){
4593 data_type dt(schema->get_type_name(schref,refd_cols[vsi]));
4594 retval+="\t"+dt.get_cvar_type()+" unpack_var_"+refd_cols[vsi]+";\n";
4600 /////////////////////////////////////////////////////
4601 // Common stuff unpacked, do some generation
4605 retval += generate_aggr_struct(node_name, gb_tbl, aggr_tbl);
4607 retval += generate_fj_struct((filter_join_qpn *)fs, node_name);
4609 retval += "\n\n// watchtable code here \n\n";
4610 watch_tbl_qpn *wl_node = (watch_tbl_qpn *)fs;
4611 retval += generate_watchlist_structs(node_name, wl_node->table_layout, wl_node->filename, wl_node->refresh_interval);
4612 retval += generate_watchlist_load(node_name, wl_node->table_layout, wl_node->key_flds);
4616 retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, is_wj, uses_bloom, schema);
4617 retval += generate_tuple_struct(node_name, sl_list) ;
4620 retval += generate_fta_flush(node_name, schema, Ext_fcns) ;
4621 if(param_tbl->size() > 0)
4622 retval += generate_fta_load_params(node_name) ;
4623 retval += generate_fta_free(node_name, is_aggr_query) ;
4624 retval += generate_fta_control(node_name, schema, is_aggr_query) ;
4625 retval += generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, is_wj, s_pids) ;
4627 /* extract the value of Time_Correlation from interface definition */
4631 vector<tablevar_t *> tvec = fs->get_input_tbls();
4632 vector<string> time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es);
4633 if (time_corr_vec.empty())
4634 time_corr = DEFAULT_TIME_CORR;
4636 time_corr = atoi(time_corr_vec[0].c_str());
4638 retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query, is_wj) );
4639 retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) );
4647 int compute_snap_len(qp_node *fs, table_list *schema, string snap_type){
4649 // Initialize global vars
4651 sl_list.clear(); where.clear();
4654 if(fs->node_type() == "watch_tbl_qpn"){
4658 if(fs->node_type() == "spx_qpn"){
4659 spx_qpn *spx_node = (spx_qpn *)fs;
4660 sl_list = spx_node->get_select_se_list();
4661 where = spx_node->get_where_clause();
4663 else if(fs->node_type() == "sgah_qpn"){
4664 sgah_qpn *sgah_node = (sgah_qpn *)fs;
4665 sl_list = sgah_node->get_select_se_list();
4666 where = sgah_node->get_where_clause();
4667 gb_tbl = sgah_node->get_gb_tbl();
4669 else if(fs->node_type() == "filter_join"){
4670 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
4671 sl_list = fj_node->get_select_se_list();
4672 where = fj_node->get_where_clause();
4674 else if(fs->node_type() == "watch_join"){
4675 watch_join_qpn *fj_node = (watch_join_qpn *)fs;
4676 sl_list = fj_node->get_select_se_list();
4677 where = fj_node->get_where_clause();
4679 fprintf(stderr,"INTERNAL ERROR, node type %s not recognized in compute_snap_len\n",fs->node_type().c_str());
4683 // Gather all column references, need to define unpacking variables.
4686 col_id_set::iterator csi;
4688 for(w=0;w<where.size();++w)
4689 gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
4690 for(s=0;s<sl_list.size();s++){
4691 gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
4696 for(g=0;g<gb_tbl->size();g++)
4697 gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
4700 // compute snap length
4703 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4704 int schref = (*csi).schema_ref;
4705 int tblref = (*csi).tblvar_ref;
4706 string field = (*csi).field;
4708 if(snap_type == "index"){
4709 int pos = schema->get_field_idx(schref, field);
4710 if(pos>snap_len) snap_len = pos;
4713 param_list *field_params = schema->get_modifier_list(schref, field);
4714 if(field_params->contains_key("snap_len")){
4715 string fld_snap_str = field_params->val_of("snap_len");
4717 if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
4718 if(fld_snap > snap_len) snap_len = fld_snap;
4721 fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
4727 if(n_snap == cid_set.size()){
4736 // Function which computes an optimal
4737 // set of unpacking functions.
4739 void find_optimal_unpack_fcns(col_id_set &upref_cids, table_list *Schema, map<col_id, string,lt_col_id> &ucol_fcn_map){
4740 map<string, int> pfcn_count;
4741 map<string, int>::iterator msii;
4742 col_id_set::iterator cisi;
4743 set<string>::iterator ssi;
4746 while(ucol_fcn_map.size() < upref_cids.size()){
4748 // Gather unpack functions referenced by unaccounted-for
4749 // columns, and increment their reference count.
4751 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
4752 if(ucol_fcn_map.count((*cisi)) == 0){
4753 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
4754 for(ssi=ufcns.begin();ssi!=ufcns.end();++ssi)
4755 pfcn_count[(*ssi)]++;
4759 // Get the lowest cost per field function.
4760 float min_cost = 0.0;
4761 string best_fcn = "";
4762 for(msii=pfcn_count.begin();msii!=pfcn_count.end();++msii){
4763 int fcost = Schema->get_ufcn_cost((*msii).first);
4765 fprintf(stderr,"CONFIGURATION ERROR, unpack function %s either has negative cost or is not defined.\n",(*msii).first.c_str());
4768 float this_cost = (1.0*fcost)/(*msii).second;
4769 if(msii == pfcn_count.begin() || this_cost < min_cost){
4770 min_cost = this_cost;
4771 best_fcn = (*msii).first;
4775 fprintf(stderr,"ERROR, could not find a best field unpqacking function.\n");
4779 // Assign this function to the unassigned fcns which use it.
4780 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
4781 if(ucol_fcn_map.count((*cisi)) == 0){
4782 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
4783 if(ufcns.count(best_fcn)>0)
4784 ucol_fcn_map[(*cisi)] = best_fcn;
4792 // Generate an initial test test for the lfta
4793 // Assume that the predicate references no external functions,
4794 // and especially no partial functions,
4795 // aggregates, internal functions.
4796 string generate_lfta_prefilter(vector<cnf_set *> &pred_list,
4797 col_id_set &temp_cids, table_list *Schema, ext_fcn_list *Ext_fcns,
4798 vector<col_id_set> &lfta_cols, vector<long long int> &lfta_sigs,
4799 vector<int> &lfta_snap_lens, string iface){
4800 col_id_set tmp_cid_set, cid_set,upref_cids,upall_cids;
4801 col_id_set::iterator csi;
4805 // Gather complex literals in the prefilter.
4806 cplx_lit_table *complex_literals = new cplx_lit_table();
4807 for(p=0;p<pred_list.size();++p){
4808 find_complex_literal_pr(pred_list[p]->pr,Ext_fcns, complex_literals);
4812 // Find the combinable predicates
4813 vector<predicate_t *> pr_list;
4814 for(p=0;p<pred_list.size();++p){
4815 find_combinable_preds(pred_list[p]->pr,&pr_list, Schema, Ext_fcns);
4818 // Analyze the combinable predicates to find the predicate classes.
4819 pred_class.clear(); // idx to equiv pred in equiv_list
4820 pred_pos.clear(); // idx to returned bitmask.
4821 vector<predicate_t *> equiv_list;
4822 vector<int> num_equiv;
4825 for(p=0;p<pr_list.size();++p){
4826 for(q=0;q<equiv_list.size();++q){
4827 if(is_equivalent_class_pred_base(equiv_list[q],pr_list[p],Schema,Ext_fcns))
4830 if(q == equiv_list.size()){ // no equiv : create new
4831 pred_class.push_back(equiv_list.size());
4832 equiv_list.push_back(pr_list[p]);
4833 pred_pos.push_back(0);
4834 num_equiv.push_back(1);
4836 }else{ // pr_list[p] is equivalent to pred q
4837 pred_class.push_back(q);
4838 pred_pos.push_back(num_equiv[q]);
4843 // Generate the variables which hold the common pred handles
4844 ret += "/*\t\tprefilter global vars.\t*/\n";
4845 for(q=0;q<equiv_list.size();++q){
4846 for(p=0;p<=(num_equiv[q]/32);++p){
4847 ret += "void *pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface+";\n";
4851 // Struct to hold prefilter complex literals
4852 ret += "struct prefilter_complex_lit_struct_"+iface+" {\n";
4853 if(complex_literals->size() == 0)
4854 ret += "\tint no_variable;\n";
4856 for(cl=0;cl<complex_literals->size();cl++){
4857 literal_t *l = complex_literals->get_literal(cl);
4858 data_type *dtl = new data_type( l->get_type() );
4859 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
4862 ret += "} prefilter_complex_lits_"+iface+";\n\n";
4865 // Generate the prefilter initialziation code
4866 ret += "void init_lfta_prefilter_"+iface+"(){\n";
4868 // First initialize complex literals, if any.
4869 ret += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4870 for(cl=0;cl<complex_literals->size();cl++){
4871 literal_t *l = complex_literals->get_literal(cl);
4872 sprintf(tmpstr,"&(t->complex_literal_%d)",cl);
4873 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
4877 set<int> epred_seen;
4878 for(p=0;p<pr_list.size();++p){
4879 int q = pred_class[p];
4880 //printf("\tq=%d\n",q);
4881 if(epred_seen.count(q)>0){
4882 ret += "\tregister_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4883 vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4884 vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4885 for(o=0;o<op_list.size();++o){
4887 ret += generate_se_code(op_list[o],Schema)+", ";
4890 ret += "pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+","+int_to_string(pred_pos[p]%32)+");\n";
4891 epred_seen.insert(q);
4893 ret += "\tpref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+" = (void *)register_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4894 vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4895 vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4896 for(o=0;o<op_list.size();++o){
4898 ret += generate_se_code(op_list[o],Schema)+", ";
4901 ret += "NULL,"+int_to_string(pred_pos[p]%32)+");\n";
4902 epred_seen.insert(q);
4909 // Start on main body code generation
4910 ret+="gs_uint64_t lfta_prefilter_"+iface+"(void *pkt){\n";
4913 ///--------------------------------------------------------------
4914 /// Generate and store the prefilter body,
4915 /// reuse it for the snap length calculator
4916 ///-------------------------------------------------------------
4919 body += "\tstruct packet *p = (struct packet *)pkt;\n";
4923 // Gather the colids to store unpacked variables.
4924 for(p=0;p<pred_list.size();++p){
4925 gather_pr_col_ids(pred_list[p]->pr,tmp_cid_set, gb_tbl);
4928 // make the col_ids refer to the base tables, and
4929 // grab the col_ids with at least one unpacking function.
4930 for(csi=tmp_cid_set.begin();csi!=tmp_cid_set.end();++csi){
4931 string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
4933 tmp_col_id.field = (*csi).field;
4934 tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
4935 tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
4936 cid_set.insert(tmp_col_id);
4937 field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
4938 if(fe->get_unpack_fcns().size()>0)
4939 upref_cids.insert(tmp_col_id);
4944 // Find the set of unpacking programs needed for the
4945 // prefilter fields.
4946 map<col_id, string,lt_col_id> ucol_fcn_map;
4947 find_optimal_unpack_fcns(upref_cids, Schema, ucol_fcn_map);
4948 set<string> pref_ufcns;
4949 map<col_id, string,lt_col_id>::iterator mcis;
4950 for(mcis=ucol_fcn_map.begin(); mcis!=ucol_fcn_map.end(); mcis++){
4951 pref_ufcns.insert((*mcis).second);
4956 // Variables for unpacking attributes.
4957 body += "/*\t\tVariables for unpacking attributes\t*/\n";
4958 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4959 int schref = (*csi).schema_ref;
4960 int tblref = (*csi).tblvar_ref;
4961 string field = (*csi).field;
4962 data_type dt(Schema->get_type_name(schref,field));
4963 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4964 field.c_str(), tblref);
4966 sprintf(tmpstr,"\tgs_retval_t ret_%s_%d;\n", field.c_str(),tblref);
4969 // Variables for unpacking temporal attributes.
4970 body += "/*\t\tVariables for unpacking temporal attributes\t*/\n";
4971 for(csi=temp_cids.begin(); csi!=temp_cids.end();++csi){
4972 if (cid_set.count(*csi) == 0) {
4973 int schref = (*csi).schema_ref;
4974 int tblref = (*csi).tblvar_ref;
4975 string field = (*csi).field;
4976 data_type dt(Schema->get_type_name(schref,field));
4977 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4978 field.c_str(), tblref);
4985 // Variables for combinable predicate evaluation
4986 body += "/*\t\tVariables for common prdicate evaluation\t*/\n";
4987 for(q=0;q<equiv_list.size();++q){
4988 for(p=0;p<=(num_equiv[q]/32);++p){
4989 body += "unsigned long int pref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = 0;\n";
4994 // Variables that are always needed
4995 body += "/*\t\tVariables which are always needed\t*/\n";
4996 body += "\tgs_uint64_t retval=0, bitpos=1;\n";
4997 body += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4999 // Call the unpacking functions for the prefilter fields
5000 if(pref_ufcns.size() > 0)
5001 body += "\n/*\t\tcall field unpacking functions\t*/\n";
5002 set<string>::iterator ssi;
5003 for(ssi=pref_ufcns.begin(); ssi!=pref_ufcns.end(); ++ssi){
5004 body += "\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
5008 // Unpack the accessed attributes
5009 body += "\n/*\t\tUnpack the accessed attributes.\t*/\n";
5010 for(csi=cid_set.begin();csi!=cid_set.end();++csi){
5011 int tblref = (*csi).tblvar_ref;
5012 int schref = (*csi).schema_ref;
5013 string field = (*csi).field;
5014 sprintf(tmpstr,"\tret_%s_%d = (%s(p, &unpack_var_%s_%d) == 0);\n",
5015 field.c_str(),tblref,Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
5019 // next unpack the temporal attributes and ignore the errors
5020 // We are assuming here that failed unpack of temporal attributes
5021 // is not going to overwrite the last stored value
5022 // Failed upacks are ignored
5023 for(csi=temp_cids.begin();csi!=temp_cids.end();++csi){
5024 int tblref = (*csi).tblvar_ref;
5025 int schref = (*csi).schema_ref;
5026 string field = (*csi).field;
5027 sprintf(tmpstr,"\t%s(p, &prefilter_temp_vars.unpack_var_%s_%d);\n",
5028 Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
5032 // Evaluate the combinable predicates
5033 if(equiv_list.size()>0)
5034 body += "/*\t\tEvaluate the combinable predicates.\t*/\n";
5035 for(q=0;q<equiv_list.size();++q){
5036 for(p=0;p<=(num_equiv[q]/32);++p){
5038 // Only call the common eval fcn if all ref'd fields present.
5039 col_id_set pred_cids;
5040 col_id_set::iterator cpi;
5041 gather_pr_col_ids(equiv_list[q], pred_cids, gb_tbl);
5042 if(pred_cids.size()>0){
5044 for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
5045 if(cpi != pred_cids.begin())
5047 string field = (*cpi).field;
5048 int tblref = (*cpi).tblvar_ref;
5049 body += "ret_"+field+"_"+int_to_string(tblref);
5054 body += "\t\tpref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = eval_commonpred_"+equiv_list[q]->get_op()+"(pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface;
5055 vector<scalarexp_t *> op_list = equiv_list[q]->get_op_list();
5056 vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
5057 for(o=0;o<op_list.size();++o){
5059 body += ","+generate_se_code(op_list[o],Schema);
5067 for(p=0;p<pred_list.size();++p){
5068 col_id_set pred_cids;
5069 col_id_set::iterator cpi;
5070 gather_pr_col_ids(pred_list[p]->pr,pred_cids, gb_tbl);
5071 if(pred_cids.size()>0){
5073 for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
5074 if(cpi != pred_cids.begin())
5076 string field = (*cpi).field;
5077 int tblref = (*cpi).tblvar_ref;
5078 body += "ret_"+field+"_"+int_to_string(tblref);
5082 body += "\t\tif("+generate_predicate_code(pred_list[p]->pr,Schema)+")\n\t\t\tretval |= bitpos;\n";
5083 body+="\tbitpos = bitpos << 1;\n";
5086 // ---------------------------------------------------------------
5087 // Finished with the body of the prefilter
5088 // --------------------------------------------------------------
5092 // Collect fields referenced by an lfta but not
5093 // already unpacked for the prefilter.
5095 //printf("upref_cids is:\n");
5096 //for(csi=upref_cids.begin();csi!=upref_cids.end();csi++)
5097 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
5098 //printf("pref_ufcns is:\n");
5099 //for(ssi=pref_ufcns.begin();ssi!=pref_ufcns.end();++ssi)
5100 //printf("\t%s\n",(*ssi).c_str());
5103 for(l=0;l<lfta_cols.size();++l){
5104 for(csi=lfta_cols[l].begin();csi!=lfta_cols[l].end();++csi){
5105 string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
5107 tmp_col_id.field = (*csi).field;
5108 tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
5109 tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
5110 field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
5111 set<string> fld_ufcns = fe->get_unpack_fcns();
5112 //printf("tmpcol is (%s, %d), ufcns size is %d, upref_cids cnt is %d\n",tmp_col_id.field.c_str(),tmp_col_id.schema_ref,fld_ufcns.size(), upref_cids.count(tmp_col_id));
5113 if(fld_ufcns.size()>0 && upref_cids.count(tmp_col_id) == 0){
5114 // Ensure that this field not already unpacked.
5116 for(ssi=fld_ufcns.begin();ssi!=fld_ufcns.end();++ssi){
5117 //printf("\tField has unpacking fcn %s\n",(*ssi).c_str());
5118 if(pref_ufcns.count((*ssi))){
5119 //printf("Field already unpacked.\n");
5124 //printf("\tadding to unpack list\n");
5125 upall_cids.insert(tmp_col_id);
5131 //printf("upall_cids is:\n");
5132 //for(csi=upall_cids.begin();csi!=upall_cids.end();csi++)
5133 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
5135 // Get the set of unpacking programs for these.
5136 map<col_id, string,lt_col_id> uall_fcn_map;
5137 find_optimal_unpack_fcns(upall_cids, Schema, uall_fcn_map);
5138 set<string> pall_ufcns;
5139 for(mcis=uall_fcn_map.begin(); mcis!=uall_fcn_map.end(); mcis++){
5140 //printf("uall_fcn_map[%s %d] = %s\n",(*mcis).first.field.c_str(),(*mcis).first.schema_ref,(*mcis).second.c_str());
5141 pall_ufcns.insert((*mcis).second);
5144 // Iterate through the remaining set of unpacking function
5145 if(pall_ufcns.size() > 0)
5146 ret += "//\t\tcall all remaining field unpacking functions.\n";
5147 for(ssi=pall_ufcns.begin(); ssi!=pall_ufcns.end(); ++ssi){
5148 // gather the set of columns unpacked by this ufcn
5149 col_id_set fcol_set;
5150 for(csi=upall_cids.begin();csi!=upall_cids.end();++csi){
5151 if(uall_fcn_map[(*csi)] == (*ssi))
5152 fcol_set.insert((*csi));
5155 // gather the set of lftas which access a field unpacked by the fcn
5156 set<long long int> clfta;
5157 for(l=0;l<lfta_cols.size();l++){
5158 for(csi=fcol_set.begin();csi!=fcol_set.end();++csi){
5159 if(lfta_cols[l].count((*csi)) > 0)
5162 if(csi != fcol_set.end())
5163 clfta.insert(lfta_sigs[l]);
5166 // generate the unpacking code
5168 set<long long int>::iterator sii;
5169 for(sii=clfta.begin();sii!=clfta.end();++sii){
5170 if(sii!=clfta.begin())
5172 sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sii),(*sii));
5175 ret += ")\n\t\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
5179 ret += "\treturn(retval);\n\n";
5183 // --------------------------------------------------------
5184 // reuse prefilter body for snaplen calculator
5186 // This is dummy code, so I'm commenting it out.
5189 ret+="gs_uint32_t lfta_pkt_snaplen(void *pkt){\n";
5194 vector<int> s_snaps = lfta_snap_lens;
5195 sort(s_snaps.begin(), s_snaps.end());
5197 if(s_snaps[0] == -1){
5198 set<unsigned long long int> sigset;
5199 for(i=0;i<lfta_snap_lens.size();++i){
5200 if(lfta_snap_lens[i] == -1){
5201 sigset.insert(lfta_sigs[i]);
5205 set<unsigned long long int>::iterator sulli;
5206 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
5207 if(sulli!=sigset.begin())
5209 sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
5212 ret += ") return -1;\n";
5215 int nextpos = lfta_snap_lens.size()-1;
5216 int nextval = lfta_snap_lens[nextpos];
5217 while(nextval >= 0){
5218 set<unsigned long long int> sigset;
5219 for(i=0;i<lfta_snap_lens.size();++i){
5220 if(lfta_snap_lens[i] == nextval){
5221 sigset.insert(lfta_sigs[i]);
5225 set<unsigned long long int>::iterator sulli;
5226 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
5227 if(sulli!=sigset.begin())
5229 sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
5232 ret += ") return "+int_to_string(nextval)+";\n";
5234 for(nextpos--;nextpos>0 && lfta_snap_lens[nextpos] == nextval;nextpos--);
5236 nextval = lfta_snap_lens[nextpos];
5240 ret += "\treturn 0;\n";
5251 // Generate the struct which will store the the values of
5252 // temporal attributesunpacked by prefilter
5253 string generate_lfta_prefilter_struct(col_id_set &cid_set, table_list *Schema) {
5255 col_id_set::iterator csi;
5257 // printf("generate_lfta_prefilter_struct : %d vars\n",cid_set.size());
5259 string ret="struct prefilter_unpacked_temp_vars {\n";
5260 ret += "\t/*\tVariables for unpacking temporal attributes\t*/\n";
5264 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
5265 int schref = (*csi).schema_ref;
5266 int tblref = (*csi).tblvar_ref;
5267 string field = (*csi).field;
5268 data_type dt(Schema->get_type_name(schref,field), Schema->get_modifier_list(schref,field));
5269 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
5270 field.c_str(), tblref);
5273 if (init_code != "")
5275 if (dt.is_increasing())
5276 init_code += dt.get_min_literal();
5278 init_code += dt.get_max_literal();
5283 ret += "struct prefilter_unpacked_temp_vars prefilter_temp_vars = {" + init_code + "};\n\n";