1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
22 #include "parse_fta.h"
23 #include "parse_schema.h"
24 #include "analyze_fta.h"
25 #include "generate_utils.h"
26 #include "query_plan.h"
27 #include "generate_lfta_code.h"
28 #include "generate_nic_code.h"
32 extern int DEFAULT_LFTA_HASH_TABLE_SIZE;
34 // default value for correlation between the interface card and
36 #define DEFAULT_TIME_CORR 16
41 extern string hash_nums[NRANDS];
44 "12916008961267169387ull", "13447227858232756685ull",
45 "15651770379918602919ull", "1154671861688431608ull",
46 "6777078091984849858ull", "14217205709582564356ull",
47 "4955408621820609982ull", "15813680319165523695ull",
48 "9897969721407807129ull", "5799700135519793083ull",
49 "3446529189623437397ull", "2766403683465910630ull",
50 "3759321430908793328ull", "6569396511892890354ull",
51 "11124853911180290924ull", "17425412145238035549ull",
52 "6879931585355039943ull", "16598635011539670441ull",
53 "9615975578494811651ull", "4378135509538422740ull",
54 "741282195344332574ull", "17368612862906255584ull",
55 "17294299200556814618ull", "518343398779663051ull",
56 "3861893449302272757ull", "8951107288843549591ull",
57 "15785139392894559409ull", "5917810836789601602ull",
58 "16169988133001117004ull", "9792861259254509262ull",
59 "5089058010244872136ull", "2130075224835397689ull",
60 "844136788226150435ull", "1303298091153875333ull",
61 "3579898206894361183ull", "7529542662845336496ull",
62 "13151949992653382522ull", "2145333536541545660ull",
63 "11258221828939586934ull", "3741808146124570279ull",
64 "16272841626371307089ull", "12174572036188391283ull",
65 "9749343496254107661ull", "9141275584134508830ull",
66 "10134192232065698216ull", "12944268412561423018ull",
67 "17499725811865666340ull", "5281482378159088661ull",
68 "13254803486023572607ull", "4526762838498717025ull",
69 "15990846379668494011ull", "10680949816169027468ull",
70 "7116154096012931030ull", "5296740689865236632ull",
71 "5222427027515795922ull", "6893215299448261251ull",
72 "10164707755932877485ull", "15325979189512082255ull",
73 "3713267224148573289ull", "12292682741753167354ull",
74 "4098115959960163588ull", "16095675565885113990ull",
75 "11391590846210510720ull", "8432889531466002673ull",
76 "7146668520368482523ull", "7678169991822407997ull",
77 "9882712513525031447ull", "13904414563513869160ull",
78 "1080076724395768626ull", "8448147843172150388ull",
79 "17633093729608185134ull", "10044622457050142303ull",
80 "4128911859292425737ull", "30642269109444395ull",
81 "16124215396922640581ull", "15444089895060081110ull",
82 "16437006538696302944ull", "800338649777443426ull",
83 "5355794945275091932ull", "11656354278827687117ull",
84 "1110873718944691255ull", "10829576045617693977ull",
85 "3846916616884579955ull", "17055821716837625668ull",
86 "13418968402643535758ull", "11671612594828802128ull",
87 "11597298928184328586ull", "13196028510862205499ull",
88 "16539578557089782373ull", "3182048322921507591ull",
89 "10016080431267550241ull", "148751875162592690ull",
90 "10400930266590768572ull", "4023803397139127870ull",
91 "17766462746879108920ull", "14807761432134600873ull",
92 "13521540421053792403ull", "13980983198941385205ull",
93 "16257584414193564367ull", "1760484796451765024ull"
98 // ----------------------------------------------
99 // Data extracted from the query plan node
100 // for use by code generation.
102 static cplx_lit_table *complex_literals; //Table of literals with constructors.
103 static vector<handle_param_tbl_entry *> param_handle_table;
104 static param_table *param_tbl; // Table of all referenced parameters.
106 static vector<scalarexp_t *> sl_list;
107 static vector<cnf_elem *> where;
109 static gb_table *gb_tbl; // Table of all group-by attributes.
110 static aggregate_table *aggr_tbl; // Table of all referenced aggregates.
112 static bool packed_return; // unpack using structyure, not fcns
113 static nic_property *nicprop; // nic properties for this interface.
114 static int global_id;
117 // The partial_fcns vector can now refer to
118 // partial functions, or expensive functions
119 // which can be cached (if there are multiple refs). A couple
120 // of int vectors distinguish the cases.
121 static vector<scalarexp_t *> partial_fcns;
122 static vector<int> fcn_ref_cnt;
123 static vector<bool> is_partial_fcn;
124 int sl_fcns_start = 0, sl_fcns_end = 0;
125 int wh_fcns_start = 0, wh_fcns_end = 0;
126 int gb_fcns_start = 0, gb_fcns_end = 0;
127 int ag_fcns_start = 0, ag_fcns_end = 0;
130 // These vectors are for combinable predicates.
131 static vector<int> pred_class; // identifies the group
132 static vector<int> pred_pos; // position in the group.
136 static char tmpstr[1000];
138 //////////////////////////////////////////////////////////////////////
139 /// Various utilities
141 string generate_fta_name(string node_name){
142 string ret = normalize_name(node_name);
152 string generate_aggr_struct_name(string node_name){
153 string ret = normalize_name(node_name);
157 ret += "_aggr_struct";
162 string generate_fj_struct_name(string node_name){
163 string ret = normalize_name(node_name);
172 string generate_watchlist_element_name(string node_name){
173 string ret = normalize_name(node_name);
182 string generate_watchlist_struct_name(string node_name){
183 string ret = normalize_name(node_name);
187 ret += "__wl_struct";
192 string generate_watchlist_name(string node_name){
193 string ret = normalize_name(node_name);
202 string generate_unpack_code(int tblref, int schref, string field, table_list *schema, string node_name, string end_goto = string("end")){
205 sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n",
206 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
208 if(!schema->get_modifier_list(schref,field)->contains_key("required"))
209 ret += "\tif(retval) goto "+end_goto+";\n";
212 // TODO: ntoh xforms (aug 2010 : removing ntoh, hton)
213 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
214 if(dt.is_buffer_type()){
215 if(dt.get_type() != v_str_t){
216 ret += "\tif(sizeof(struct "+node_name+"_input_struct)+"+node_name+"_input_struct_var->unpack_var_"+field+".length+int("+node_name+"_input_struct_var->unpack_var_"+field+".data) > sz)\n";
217 ret += "\t\tgoto "+end_goto+";\n";
218 ret+= "\t\t"+node_name+"_input_struct_var->unpack_var_"+field+".data += "+node_name+"_input_struct_var->unpack_var_"+field+".length;\n";
219 ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
220 " = "+node_name+"_input_struct_var->unpack_var_"+field+";+\n";
222 fprintf(stderr,"INTERNAL ERROR buffer type not string type in generate_lfta_code.cc:generate_unpack_code\n");
226 ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
227 " = "+node_name+"_input_struct_var->unpack_var_"+field+";\n";
233 string generate_aggr_struct(string node_name, gb_table *gb_tbl, aggregate_table *aggr_tbl){
234 string ret = "struct " + generate_aggr_struct_name(node_name) + "{\n";
237 for(g=0;g<gb_tbl->size();g++){
238 sprintf(tmpstr,"gb_var%d",g);
239 ret += "\t"+gb_tbl->get_data_type(g)->make_cvar(tmpstr)+";\n";
243 for(a=0;a<aggr_tbl->size();a++){
245 sprintf(tmpstr,"aggr_var%d",a);
246 if(aggr_tbl->is_builtin(a))
247 ret+="\t"+aggr_tbl->get_data_type(a)->make_cvar(tmpstr)+";\n";
249 ret+="\t"+aggr_tbl->get_storage_type(a)->make_cvar(tmpstr)+";\n";
253 ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *next;\n";
262 string generate_fj_struct(filter_join_qpn *fs, string node_name ){
265 if(fs->use_bloom == false){ // uses hash table instead
266 ret = "struct " + generate_fj_struct_name(node_name) + "{\n";
268 for(k=0;k<fs->hash_eq.size();++k){
269 sprintf(tmpstr,"key_var%d",k);
270 ret += "\t"+fs->hash_eq[k]->pr->get_right_se()->get_data_type()->make_cvar(tmpstr)+";\n";
272 ret += "\tlong long int ts;\n";
279 string generate_watchlist_structs(string node_name, table_def *tbl,
280 std::string filename, int refresh_interval){
283 ret += "struct "+generate_watchlist_element_name(node_name)+"{\n";
284 vector<field_entry *> fields = tbl->get_fields();
285 for(int f=0;f<fields.size();++f){
286 data_type dt(fields[f]->get_type());
287 ret += "\t"+dt.make_cvar(fields[f]->get_name())+";\n";
289 ret += "\tgs_uint64_t hashval;\n";
290 ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next;\n";
293 ret += "char *"+generate_watchlist_name(node_name)+"__fstr = \""+filename+"\";\n";
294 ret += "struct "+generate_watchlist_struct_name(node_name)+"{\n";
295 ret += "\tstruct "+ generate_watchlist_element_name(node_name)+" **ht;\n";
296 ret += "\tgs_uint32_t ht_size;\n";
297 ret += "\tgs_uint32_t n_elem;\n";
298 ret += "\tgs_uint32_t refresh_interval;\n";
299 ret += "\ttime_t next_refresh;\n";
300 ret += "\ttime_t last_mtime;\n";
301 ret += "\tchar *filename;\n";
302 ret += "} "+generate_watchlist_name(node_name)+" = { NULL, 0, 0, "+std::to_string(refresh_interval)+", 0, 0, NULL};\n\n";
307 string generate_watchlist_load(string node_name, table_def *tbl, vector<string> keys){
309 string tgt = generate_watchlist_name(node_name);
310 vector<field_entry *> fields = tbl->get_fields();
312 ret += "void reload_watchlist__"+node_name+"(){\n";
313 ret += "\tgs_uint32_t i;\n";
314 ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *ptr = NULL;\n";
315 ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next = NULL;\n";
316 ret += "\tFILE *fl;\n";
317 ret += "\tchar buf[10000];\n";
318 ret += "\tgs_uint32_t buflen = 10000;\n";
319 ret += "\tchar *flds["+std::to_string(fields.size())+"];\n";
320 ret += "\tgs_uint32_t pos, f, linelen, malformed;\n";
321 ret += "\tgs_uint32_t n_malformed, short_lines, toolong_lines, ok;\n";
322 ret += "\tgs_uint64_t hash, bucket;\n";
323 ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *rec;\n\n";
325 ret += "// make sure watchlist file has changed since the last time we loaded it\n";
326 ret += "\tstruct stat file_stat;\n";
327 ret += "\tint err = stat(" + tgt + ".filename, &file_stat);\n";
328 ret += "\tif (err) {\n";
329 ret += "\t\tgslog(LOG_INFO,\"Warning, unable to stat() watchlist file %s to reload " + node_name + ", continue using old version\\n\", " + tgt + ".filename);\n";
330 ret += "\t\treturn;\n";
332 ret += "\tif (file_stat.st_mtime <= " + tgt + ".last_mtime && file_stat.st_ctime <= " + tgt + ".last_mtime) // watchlist file hasn't changed since last time\n";
333 ret += "\t\treturn;\n";
334 ret += "\t" + tgt + ".last_mtime = (file_stat.st_mtime>file_stat.st_ctime)?file_stat.st_mtime:file_stat.st_ctime;\n\n";
336 ret += "// Delete old entries.\n";
337 ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n";
338 ret += "\t\tptr="+tgt+".ht[i];\n";
339 ret += "\t\twhile(ptr!=NULL){\n";
340 for(int f=0;f<fields.size();++f){
341 data_type dt(fields[f]->get_type());
342 if(dt.is_buffer_type()){
343 ret += "\t\t\t"+dt.get_buffer_destroy()+"(&(ptr->"+fields[f]->get_name()+"));\n";
346 ret += "\t\t\tnext = ptr->next;\n";
347 ret += "\t\t\tfree(ptr);\n";
348 ret += "\t\t\tptr = next;\n";
351 ret += "\n// prepare new table. \n";
352 ret += "\tif("+tgt+".n_elem > "+tgt+".ht_size || "+tgt+".ht_size==0){\n";
353 ret += "\t\tif("+tgt+".ht)\n";
354 ret += "\t\t\tfree("+tgt+".ht);\n";
355 ret += "\t\tif("+tgt+".ht_size == 0)\n";
356 ret += "\t\t\t"+tgt+".ht_size = 100000;\n";
358 ret += "\t\t\t"+tgt+".ht_size = "+tgt+".n_elem;\n";
359 ret += "\t\t"+tgt+".ht = (struct "+generate_watchlist_element_name(node_name)+" **)malloc("+tgt+".ht_size * sizeof(struct "+generate_watchlist_element_name(node_name)+" *));\n";
361 ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n";
362 ret += "\t\t"+tgt+".ht[i] = NULL;\n";
364 ret += "\n// load new table\n";
365 ret += "\t"+tgt+".n_elem = 0;\n";
366 ret += "\tfl = fopen("+tgt+".filename, \"r\");\n";
367 ret += "\tif(fl==NULL){\n";
368 ret += "\t\tgslog(LOG_INFO,\"Warning, can't open file %s for watchlist "+node_name+"\\n\","+tgt+".filename);\n";
369 ret += "\t\treturn;\n";
371 ret += "\tmalformed = 0;\n";
372 ret += "\tshort_lines = 0;\n";
373 ret += "\ttoolong_lines = 0;\n";
374 ret += "\twhile(fgets(buf, buflen, fl) != NULL){\n";
375 ret += "\t\tlinelen = strlen(buf);\n";
376 ret += "\t\tbuf[linelen-1]='\\0'; // strip off trailing newline\n";
377 ret += "\t\tlinelen--;\n";
379 ret += "\t\tpos=0;\n";
380 ret += "\t\tmalformed=0;\n";
381 ret += "\t\tok=1;\n";
382 ret += "\t\tflds[0] = buf;\n";
383 ret += "\t\tfor(f=1;pos < linelen && f<"+std::to_string(fields.size())+";++f){\n";
384 ret += "\t\t\tfor(;pos < linelen && buf[pos]!=',' && buf[pos]!='\\n';++pos);\n";
385 ret += "\t\t\tif(pos >= linelen){\n";
386 ret += "\t\t\t\tmalformed = 1;\n";
387 ret += "\t\t\t\tbreak;\n";
389 ret += "\t\t\tbuf[pos]='\\0';\n";
390 ret += "\t\t\tpos++;\n";
391 ret += "\t\t\tflds[f]=buf+pos;\n";
393 ret += "\t\tif(malformed){\n";
394 ret += "\t\t\tok=0;\n";
395 ret += "\t\t\tn_malformed++;\n";
397 ret += "\t\tif(f<"+std::to_string(fields.size())+"){\n";
398 ret += "\t\t\tok=0;\n";
399 ret += "\t\t\tshort_lines++;\n";
401 ret += "\t\tif(pos && (pos<linelen)){\n";
402 ret += "\t\t\tok=0;\n";
403 ret += "\t\t\ttoolong_lines++;\n";
405 ret += "\t\tif(f>="+std::to_string(fields.size())+"){\n";
406 ret += "\t\t\trec = (struct "+generate_watchlist_element_name(node_name)+" *)malloc(sizeof(struct "+generate_watchlist_element_name(node_name)+"));\n";
408 for(int f=0;f<fields.size();++f){
409 data_type dt(fields[f]->get_type());
410 ret += "\t\t\t"+dt.get_wl_extract_fcn()+"(flds["+std::to_string(f)+"], &(rec->"+fields[f]->get_name()+"));\n";
412 // Compute the hash value
413 ret += "\t\t\thash=0;\n";
414 for(int k=0;k<keys.size();++k){
415 string key_fld = keys[k];
417 for(f=0;f<fields.size();++f){
418 if(fields[f]->get_name() == key_fld)
421 data_type dt(fields[f]->get_type());
424 "\t\t\thash ^= (("+hash_nums[f%NRANDS]+" * lfta_"+
425 dt.get_type_str()+"_to_hash(rec->"+fields[f]->get_name()+")));\n";
427 ret += "\t\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n";
428 ret += "\t\t\trec->hashval = hash;\n";
429 ret += "\t\t\trec->next = "+tgt+".ht[bucket];\n";
430 ret += "\t\t\t"+tgt+".ht[bucket] = rec;\n";
431 ret += "\t\t\t"+tgt+".n_elem++;\n";
435 ret += "\tif(n_malformed+toolong_lines > 0){\n";
436 ret += "\t\tgslog(LOG_INFO,\"Errors reading data for watchlist "+node_name+" from file %s: malformed=%d, too short=%d, too long=%d\\n\","+tgt+".filename, malformed, short_lines, toolong_lines);\n";
446 string generate_fta_struct(string node_name, gb_table *gb_tbl,
447 aggregate_table *aggr_tbl, param_table *param_tbl,
448 cplx_lit_table *complex_literals,
449 vector<handle_param_tbl_entry *> ¶m_handle_table,
450 bool is_aggr_query, bool is_fj, bool is_wj, bool uses_bloom,
453 string ret = "struct " + generate_fta_name(node_name) + "{\n";
454 ret += "\tstruct FTA f;\n";
456 //-------------------------------------------------------------
457 // Aggregate-specific fields
461 ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_head, *flush_head;\n";
463 ret+="\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_table; // the groups\n";
464 ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n";
465 // ret+="\tint bitmap_size;\n";
466 ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n";
467 ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n";
468 ret += "\tint max_windows; // max number of open windows.\n";
469 ret += "\tunsigned int generation; // initially zero, increment on\n";
470 ret += "\t // every hash table flush - whether regular or induced.\n";
471 ret += "\t // Old groups are identified by a generation mismatch.\n";
472 ret += "\tunsigned int flush_pos; // next aggr_table entry to examine\n";
473 ret += "\tunsigned int flush_ctr; // control slow flushing\n";
478 bool uses_temporal_flush = false;
479 for(g=0;g<gb_tbl->size();g++){
480 data_type *dt = gb_tbl->get_data_type(g);
481 if(dt->is_temporal()){
483 fprintf(stderr,"group by attribute %s is temporal, ",
484 gb_tbl->get_name(g).c_str());
485 if(dt->is_increasing()){
486 fprintf(stderr,"increasing.\n");
488 fprintf(stderr,"decreasing.\n");
491 data_type *gdt = gb_tbl->get_data_type(g);
492 if(gdt->is_buffer_type()){
493 fprintf(stderr, "\t but temporal BUFFER types are not supported, skipping.\n");
495 sprintf(tmpstr,"\t%s last_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
497 sprintf(tmpstr,"\t%s flush_start_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
499 sprintf(tmpstr,"\t%s last_flushed_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
501 uses_temporal_flush = true;
507 if(! uses_temporal_flush){
508 fprintf(stderr,"Warning: no temporal flush.\n");
512 // ---------------------------------------------------------
513 // Filter-join specific fields
518 "\tunsigned char * bf_table; //array of bloom filters with layout \n"
519 "\t\t// bit 0 bf 0| bit 0 bf 1| bit 0 bf 2| bit 1 bf 0| bit 1 bf 1|.....\n"
520 "\tint first_exec;\n"
521 "\tlong long int last_bin;\n"
522 "\tint last_bloom_pos;\n"
525 }else{ // limited hash table
527 " struct "+generate_fj_struct_name(node_name)+" *join_table;\n"
534 // --------------------------------------------
535 // watchlist-join specific
537 ret += "\ttime_t ux_time;\n";
540 //--------------------------------------------------------
543 // Create places to hold the parameters.
545 vector<string> param_vec = param_tbl->get_param_names();
546 for(p=0;p<param_vec.size();p++){
547 data_type *dt = param_tbl->get_data_type(param_vec[p]);
548 sprintf(tmpstr,"\t%s param_%s;\n",dt->get_cvar_type().c_str(),
549 param_vec[p].c_str());
551 if(param_tbl->handle_access(param_vec[p])){
552 ret += "\tstruct search_handle *param_handle_"+param_vec[p]+";\n";
556 // Create places to hold complex literals.
558 for(cl=0;cl<complex_literals->size();cl++){
559 literal_t *l = complex_literals->get_literal(cl);
560 data_type *dtl = new data_type( l->get_type() );
561 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
565 // Create places to hold the pass-by-handle parameters.
566 for(p=0;p<param_handle_table.size();++p){
567 sprintf(tmpstr,"\tgs_param_handle_t handle_param_%d;\n",p);
571 // Create places to hold the last values of temporal
572 // attributes referenced in select clause
573 // we also need to store values of the temoral attributed
574 // of last flushed tuple in aggr queries
575 // to make sure we generate the cirrect temporal tuple
576 // in the presense of slow flushes
579 col_id_set temp_cids; // col ids of temp attributes in select clause
582 col_id_set::iterator csi;
584 for(s=0;s<sl_list.size();s++){
585 data_type *sdt = sl_list[s]->get_data_type();
586 if (sdt->is_temporal()) {
587 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
591 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
592 int tblref = (*csi).tblvar_ref;
593 int schref = (*csi).schema_ref;
594 string field = (*csi).field;
595 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
596 sprintf(tmpstr,"\t%s last_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
600 ret += "\tgs_uint64_t trace_id;\n\n";
602 // Fields to store the runtime stats
604 ret += "\tgs_uint32_t in_tuple_cnt;\n";
605 ret += "\tgs_uint32_t out_tuple_cnt;\n";
606 ret += "\tgs_uint32_t out_tuple_sz;\n";
607 ret += "\tgs_uint32_t accepted_tuple_cnt;\n";
608 ret += "\tgs_uint64_t cycle_cnt;\n";
609 ret += "\tgs_uint32_t collision_cnt;\n";
610 ret += "\tgs_uint32_t eviction_cnt;\n";
611 ret += "\tgs_float_t sampling_rate;\n";
620 //------------------------------------------------------------
621 // Set colref tblvars to 0..
622 // (special processing for join-like operators in an lfta).
624 void reset_se_col_ids_tblvars(scalarexp_t *se, gb_table *gtbl){
625 vector<scalarexp_t *> operands;
631 switch(se->get_operator_type()){
637 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
640 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
641 reset_se_col_ids_tblvars(se->get_right_se(),gtbl);
645 se->get_colref()->set_tablevar_ref(0);
648 fprintf(stderr,"INTERNAL ERROR: gbvar ref in gather_se_col_ids, but gtbl is NULL.\n");
651 reset_se_col_ids_tblvars(gtbl->get_def(se->get_gb_ref()),gtbl);
657 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
660 operands = se->get_operands();
661 for(o=0;o<operands.size();o++){
662 reset_se_col_ids_tblvars(operands[o], gtbl);
666 fprintf(stderr,"INTERNAL ERROR in reset_se_col_ids_tblvars, line %d, character %d: unknown operator type %d\n",
667 se->get_lineno(), se->get_charno(),se->get_operator_type());
673 // reset column tblvars accessed in this pr.
675 void reset_pr_col_ids_tblvars(predicate_t *pr, gb_table *gtbl){
676 vector<scalarexp_t *> op_list;
679 switch(pr->get_operator_type()){
681 reset_se_col_ids_tblvars(pr->get_left_se(), gtbl);
684 reset_se_col_ids_tblvars(pr->get_left_se(),gtbl) ;
685 reset_se_col_ids_tblvars(pr->get_right_se(),gtbl) ;
688 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
691 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
692 reset_pr_col_ids_tblvars(pr->get_right_pr(),gtbl) ;
695 op_list = pr->get_op_list();
696 for(o=0;o<op_list.size();++o){
697 reset_se_col_ids_tblvars(op_list[o],gtbl) ;
701 fprintf(stderr,"INTERNAL ERROR in reset_pr_col_ids_tblvars, line %d, character %d, unknown predicate operator type %d\n",
702 pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
709 // Generate code that makes reference
710 // to the tuple, and not to any aggregates.
711 static string generate_se_code(scalarexp_t *se,table_list *schema){
713 data_type *ldt, *rdt;
715 vector<scalarexp_t *> operands;
718 switch(se->get_operator_type()){
720 if(se->is_handle_ref()){
721 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
725 if(se->get_literal()->is_cpx_lit()){
726 sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
730 return(se->get_literal()->to_C_code("")); // not complex, no constructor
732 if(se->is_handle_ref()){
733 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
738 ret += se->get_param_name();
741 ldt = se->get_left_se()->get_data_type();
742 if(ldt->complex_operator(se->get_op()) ){
743 ret += ldt->get_complex_operator(se->get_op());
745 ret += generate_se_code(se->get_left_se(),schema);
750 ret += generate_se_code(se->get_left_se(),schema);
755 ldt = se->get_left_se()->get_data_type();
756 rdt = se->get_right_se()->get_data_type();
758 if(ldt->complex_operator(rdt, se->get_op()) ){
759 ret += ldt->get_complex_operator(rdt, se->get_op());
761 ret += generate_se_code(se->get_left_se(),schema);
763 ret += generate_se_code(se->get_right_se(),schema);
767 ret += generate_se_code(se->get_left_se(),schema);
769 ret += generate_se_code(se->get_right_se(),schema);
774 if(se->is_gb()){ // OK to ref gb attrs, but they're not yet unpacked ...
775 // so return the defining code.
776 ret = generate_se_code(gb_tbl->get_def(se->get_gb_ref()), schema );
779 sprintf(tmpstr,"unpack_var_%s_%d",
780 se->get_colref()->get_field().c_str(), se->get_colref()->get_tablevar_ref() );
785 // Should not be ref'ing any aggr here.
786 if(se->get_aggr_ref() >= 0){
787 fprintf(stderr,"INTERNAL ERROR, UDAF reference in generate_se_code.\n");
788 return("ERROR in generate_se_code");
791 if(se->is_partial()){
792 sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
796 operands = se->get_operands();
797 for(o=0;o<operands.size();o++){
799 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
801 ret += generate_se_code(operands[o], schema);
807 fprintf(stderr,"INTERNAL ERROR in generate_se_code (lfta), line %d, character %d: unknown operator type %d\n",
808 se->get_lineno(), se->get_charno(),se->get_operator_type());
809 return("ERROR in generate_se_code");
813 // generate code that refers only to aggregate data and constants.
814 static string generate_se_code_fm_aggr(scalarexp_t *se, string var, table_list *schema){
817 data_type *ldt, *rdt;
819 vector<scalarexp_t *> operands;
822 switch(se->get_operator_type()){
824 if(se->is_handle_ref()){
825 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
829 if(se->get_literal()->is_cpx_lit()){
830 sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
834 return(se->get_literal()->to_C_code("")); // not complex no constructor
836 if(se->is_handle_ref()){
837 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
842 ret += se->get_param_name();
845 ldt = se->get_left_se()->get_data_type();
846 if(ldt->complex_operator(se->get_op()) ){
847 ret += ldt->get_complex_operator(se->get_op());
849 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
854 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
859 ldt = se->get_left_se()->get_data_type();
860 rdt = se->get_right_se()->get_data_type();
862 if(ldt->complex_operator(rdt, se->get_op()) ){
863 ret += ldt->get_complex_operator(rdt, se->get_op());
865 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
867 ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
871 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
873 ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
878 if(se->is_gb()){ // OK to ref gb attrs, but they're not yet
879 // unpacked ... so return the defining code.
880 sprintf(tmpstr,"%sgb_var%d",var.c_str(),se->get_gb_ref());
884 fprintf(stderr,"ERROR reference to non-GB column ref not permitted here,"
885 "error in generate_se_code_fm_aggr, line %d, character %d.\n",
886 se->get_lineno(), se->get_charno());
892 sprintf(tmpstr,"%saggr_var%d",var.c_str(),se->get_aggr_ref());
897 if(se->get_aggr_ref() >= 0){
898 sprintf(tmpstr,"udaf_ret%d",se->get_aggr_ref());
903 if(se->is_partial()){
904 sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
908 operands = se->get_operands();
909 for(o=0;o<operands.size();o++){
911 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
913 ret += generate_se_code_fm_aggr(operands[o], var, schema);
919 fprintf(stderr,"INTERNAL ERROR in generate_lfta_code.cc::generate_se_code_fm_aggr, line %d, character %d: unknown operator type %d\n",
920 se->get_lineno(), se->get_charno(),se->get_operator_type());
921 return("ERROR in generate_se_code");
927 static string unpack_partial_fcn_fm_aggr(scalarexp_t *se, int pfn_id, string var, table_list *schema){
930 vector<scalarexp_t *> operands;
933 if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
934 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn_fm_aggr. line %d, character %d\n",
935 se->get_lineno(), se->get_charno());
936 return("ERROR in generate_se_code");
939 ret = "\tretval = " + se->get_op() + "( ";
940 sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
943 operands = se->get_operands();
944 for(o=0;o<operands.size();o++){
946 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
948 ret += generate_se_code_fm_aggr(operands[o], var, schema);
955 static string generate_cached_fcn(scalarexp_t *se, table_list *schema){
958 vector<scalarexp_t *> operands;
960 if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
961 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to generate_cached_fcn. line %d, character %d\n",
962 se->get_lineno(), se->get_charno());
963 return("ERROR in generate_se_code");
966 ret = se->get_op() + "( ";
968 operands = se->get_operands();
969 for(o=0;o<operands.size();o++){
971 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
973 ret += generate_se_code(operands[o], schema);
982 static string unpack_partial_fcn(scalarexp_t *se, int pfn_id, table_list *schema){
985 vector<scalarexp_t *> operands;
988 if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
989 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn. line %d, character %d\n",
990 se->get_lineno(), se->get_charno());
991 return("ERROR in generate_se_code");
994 ret = "\tretval = " + se->get_op() + "( ",
995 sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
998 operands = se->get_operands();
999 for(o=0;o<operands.size();o++){
1001 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
1003 ret += generate_se_code(operands[o], schema);
1014 static string generate_C_comparison_op(string op){
1015 if(op == "=") return("==");
1016 if(op == "<>") return("!=");
1020 static string generate_C_boolean_op(string op){
1021 if( (op == "AND") || (op == "And") || (op == "and") ){
1024 if( (op == "OR") || (op == "Or") || (op == "or") ){
1027 if( (op == "NOT") || (op == "Not") || (op == "not") ){
1031 fprintf(stderr,"INTERNAL ERROR: unknown boolean operator %s\n",op.c_str());
1032 return("ERROR UNKNOWN BOOLEAN OPERATOR :"+op);
1036 static string generate_predicate_code(predicate_t *pr,table_list *schema){
1038 vector<literal_t *> litv;
1040 data_type *ldt, *rdt;
1041 vector<scalarexp_t *> op_list;
1043 unsigned int bitmask;
1045 switch(pr->get_operator_type()){
1047 ldt = pr->get_left_se()->get_data_type();
1050 litv = pr->get_lit_vec();
1051 for(i=0;i<litv.size();i++){
1052 if(i>0) ret += " || ";
1055 if(ldt->complex_comparison(ldt) ){
1056 ret += ldt->get_comparison_fcn(ldt) ;
1058 if(ldt->is_buffer_type() ) ret += "&";
1059 ret += generate_se_code(pr->get_left_se(), schema);
1061 if(ldt->is_buffer_type() ) ret += "&";
1062 if(litv[i]->is_cpx_lit()){
1063 sprintf(tmpstr,"t->complex_literal_%d",litv[i]->get_cpx_lit_ref() );
1066 ret += litv[i]->to_C_code("");
1070 ret += generate_se_code(pr->get_left_se(), schema);
1072 ret += litv[i]->to_C_code("");
1081 ldt = pr->get_left_se()->get_data_type();
1082 rdt = pr->get_right_se()->get_data_type();
1085 if(ldt->complex_comparison(rdt) ){
1086 ret += ldt->get_comparison_fcn(rdt);
1088 if(ldt->is_buffer_type() ) ret += "&";
1089 ret += generate_se_code(pr->get_left_se(),schema);
1091 if(rdt->is_buffer_type() ) ret += "&";
1092 ret += generate_se_code(pr->get_right_se(),schema);
1094 ret += generate_C_comparison_op(pr->get_op());
1097 ret += generate_se_code(pr->get_left_se(),schema);
1098 ret += generate_C_comparison_op(pr->get_op());
1099 ret += generate_se_code(pr->get_right_se(),schema);
1105 ret += generate_C_boolean_op(pr->get_op());
1106 ret += generate_predicate_code(pr->get_left_pr(),schema);
1109 case PRED_BINARY_OP:
1111 ret += generate_predicate_code(pr->get_left_pr(),schema);
1112 ret += generate_C_boolean_op(pr->get_op());
1113 ret += generate_predicate_code(pr->get_right_pr(),schema);
1117 op_list = pr->get_op_list();
1118 cref = pr->get_combinable_ref();
1119 if(cref >= 0){ // predicate is a combinable pred reference
1120 // Trust, but verify
1121 if(pred_class.size() >= cref && pred_class[cref] >= 0){
1122 ppos = pred_pos[cref];
1123 bitmask = 1 << ppos % 32;
1124 sprintf(tmpstr,"(pref_common_pred_val_%d_%d & %u)",pred_class[cref],ppos/32,bitmask);
1130 ret = pr->get_op() + "(";
1131 if (pr->is_sampling_fcn) {
1132 ret += "t->sampling_rate";
1133 if (!op_list.empty())
1136 for(o=0;o<op_list.size();++o){
1137 if(o>0) ret += ", ";
1138 if(op_list[o]->get_data_type()->is_buffer_type() && (! (op_list[o]->is_handle_ref()) ) )
1140 ret += generate_se_code(op_list[o],schema);
1145 fprintf(stderr,"INTERNAL ERROR in generate_predicate_code, line %d, character %d, unknown predicate operator type %d\n",
1146 pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
1147 return("ERROR in generate_predicate_code");
1152 static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *dt){
1155 if(dt->complex_comparison(dt) ){
1156 ret += dt->get_comparison_fcn(dt);
1158 if(dt->is_buffer_type() ) ret += "&";
1161 if(dt->is_buffer_type() ) ret += "&";
1173 static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
1176 if(dt->complex_comparison(dt) ){
1177 ret += dt->get_comparison_fcn(dt);
1179 if(dt->is_buffer_type() ) ret += "&";
1182 if(dt->is_buffer_type() ) ret += "&";
1194 // Here I assume that only MIN and MAX aggregates can be computed
1195 // over BUFFER data types.
1197 static string generate_aggr_update(string var, aggregate_table *atbl,int aidx, table_list *schema){
1198 string retval = "\t\t";
1199 string op = atbl->get_op(aidx);
1202 if(! atbl->is_builtin(aidx)) {
1204 retval += op+"_LFTA_AGGR_UPDATE_(";
1205 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1206 retval+="("+var+")";
1207 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1208 for(o=0;o<opl.size();++o){
1210 if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1212 retval += generate_se_code(opl[o], schema);
1219 // Built-in aggregate processing.
1221 data_type *dt = atbl->get_data_type(aidx);
1225 retval.append("++;\n");
1230 retval.append(" += ");
1231 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1232 retval.append(";\n");
1236 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1237 retval.append(tmpstr);
1238 if(dt->complex_comparison(dt)){
1239 if(dt->is_buffer_type())
1240 sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1242 sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1244 sprintf(tmpstr,"\t\tif(aggr_tmp_%d < %s)\n",aidx,var.c_str());
1246 retval.append(tmpstr);
1247 if(dt->is_buffer_type()){
1248 sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1250 sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1252 retval.append(tmpstr);
1257 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1258 retval.append(tmpstr);
1259 if(dt->complex_comparison(dt)){
1260 if(dt->is_buffer_type())
1261 sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1263 sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1265 sprintf(tmpstr,"\t\tif(aggr_tmp_%d > %s)\n",aidx,var.c_str());
1267 retval.append(tmpstr);
1268 if(dt->is_buffer_type()){
1269 sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1271 sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1273 retval.append(tmpstr);
1278 if(op == "AND_AGGR"){
1280 retval.append(" &= ");
1281 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1282 retval.append(";\n");
1285 if(op == "OR_AGGR"){
1287 retval.append(" |= ");
1288 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1289 retval.append(";\n");
1292 if(op == "XOR_AGGR"){
1294 retval.append(" ^= ");
1295 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1296 retval.append(";\n");
1299 fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_update.\n",op.c_str());
1300 return("ERROR: aggregate not recognized: "+op);
1306 static string generate_aggr_init(string var, aggregate_table *atbl,int aidx, table_list *schema){
1308 string op = atbl->get_op(aidx);
1311 if(! atbl->is_builtin(aidx)) {
1313 retval += "\t\t"+op+"_LFTA_AGGR_INIT_(";
1314 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1315 retval+="("+var+"));\n";
1317 retval += "\t"+atbl->get_op(aidx)+"_LFTA_AGGR_UPDATE_(";
1318 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1319 retval+="("+var+")";
1320 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1321 for(o=0;o<opl.size();++o){
1323 if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1325 retval += generate_se_code(opl[o],schema);
1331 // Built-in aggregate processing.
1334 data_type *dt = atbl->get_data_type(aidx);
1337 retval = "\t\t"+var;
1338 retval.append(" = 1;\n");
1342 if(op == "SUM" || op == "MIN" || op == "MAX" || op == "AND_AGGR" ||
1343 op == "OR_AGGR" || op == "XOR_AGGR"){
1344 if(dt->is_buffer_type()){
1345 sprintf(tmpstr,"\t\taggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1346 retval.append(tmpstr);
1347 sprintf(tmpstr,"\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_assign_copy().c_str(),var.c_str(),aidx);
1348 retval.append(tmpstr);
1350 retval = "\t\t"+var;
1352 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema));
1353 retval.append(";\n");
1358 fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_init.\n",op.c_str());
1359 return("ERROR: aggregate not recognized: "+op);
1363 ////////////////////////////////////////////////////////////
1366 string generate_preamble(table_list *schema, //map<string,string> &int_fcn_defs,
1367 std::string &node_name, std::string &schema_embed_str){
1368 // Include these only once, not once per lfta
1369 // string ret = "#include \"rts.h\"\n";
1370 // ret += "#include \"fta.h\"\n\n");
1372 string ret = "#ifndef LFTA_IN_NIC\n";
1373 ret += "char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n";
1374 ret += "#include<stdio.h>\n";
1375 ret += "#include <limits.h>\n";
1376 ret += "#include <float.h>\n";
1377 ret += "#include <sys/stat.h>\n";
1378 ret += "#include \"rdtsc.h\"\n";
1387 string generate_tuple_from_aggr(string node_name, table_list *schema, string idx){
1389 // need to create and output the tuple.
1390 string ret = "/*\t\tCreate an output tuple for the aggregate being kicked out \t*/\n";
1391 // Check for any UDAFs with LFTA_BAILOUT
1392 ret += "\tlfta_bailout = 0;\n";
1393 for(a=0;a<aggr_tbl->size();a++){
1394 if(aggr_tbl->has_bailout(a)){
1395 ret += "\tlfta_bailout+="+aggr_tbl->get_op(a)+"_LFTA_AGGR_BAILOUT_(";
1396 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1397 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1400 ret += "\tif(! lfta_bailout){\n";
1402 // First, compute the size of the tuple.
1404 // Unpack UDAF return values
1405 for(a=0;a<aggr_tbl->size();a++){
1406 if(! aggr_tbl->is_builtin(a)){
1407 ret += "\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_OUTPUT_(&(udaf_ret"+int_to_string(a)+"),";
1408 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1409 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1415 // Unpack partial fcns ref'd by the select clause.
1416 if(sl_fcns_start != sl_fcns_end){
1417 ret += "\t\tunpack_failed = 0;\n";
1418 for(p=sl_fcns_start;p<sl_fcns_end;p++){
1419 if(is_partial_fcn[p]){
1420 ret += "\t" + unpack_partial_fcn_fm_aggr(partial_fcns[p], p,
1421 "t->aggr_table["+idx+"].",schema);
1422 ret += "\t\tif(retval) unpack_failed = 1;\n";
1425 // BEGIN don't allocate tuple if
1426 ret += "\t\tif( unpack_failed == 0 ){\n"; // unpack failed.
1429 // Unpack any BUFFER type selections into temporaries
1430 // so that I can compute their size and not have
1431 // to recompute their value during tuple packing.
1432 // I can use regular assignment here because
1433 // these temporaries are non-persistent.
1435 for(s=0;s<sl_list.size();s++){
1436 data_type *sdt = sl_list[s]->get_data_type();
1437 if(sdt->is_buffer_type()){
1438 sprintf(tmpstr,"\t\t\tselvar_%d = ",s);
1440 ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1446 // The size of the tuple is the size of the tuple struct plus the
1447 // size of the buffers to be copied in.
1449 ret += "\t\t\ttuple_size = sizeof( struct ";
1450 ret += generate_tuple_name(node_name);
1452 for(s=0;s<sl_list.size();s++){
1453 data_type *sdt = sl_list[s]->get_data_type();
1454 if(sdt->is_buffer_type()){
1455 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
1462 ret += "\t\t\ttuple = allocate_tuple(f, tuple_size );\n";
1463 ret += "\t\t\tif( tuple != NULL){\n";
1466 // Test passed, make assignments to the tuple.
1468 ret += "\t\t\t\ttuple_pos = sizeof( struct ";
1469 ret += generate_tuple_name(node_name) ;
1472 // Mark tuple as REGULAR_TUPLE
1473 ret += "\n\t\t\t\ttuple->tuple_type = REGULAR_TUPLE;\n";
1475 for(s=0;s<sl_list.size();s++){
1476 data_type *sdt = sl_list[s]->get_data_type();
1477 if(sdt->is_buffer_type()){
1478 sprintf(tmpstr,"\t\t\t\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
1480 sprintf(tmpstr,"\t\t\t\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
1483 sprintf(tmpstr,"\t\t\t\ttuple->tuple_var%d = ",s);
1485 // if(sdt->needs_hn_translation())
1486 // ret += sdt->hton_translation() +"( ";
1487 ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1488 // if(sdt->needs_hn_translation())
1495 ret += "\t\t\t\tpost_tuple(tuple);\n";
1496 ret += "\t\t\t\t#ifdef LFTA_STATS\n";
1497 ret+="\t\t\t\tt->out_tuple_cnt++;\n";
1498 ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
1499 ret += "\t\t\t\t#endif\n\n";
1502 if(sl_fcns_start != sl_fcns_end) // END don't allocate tuple if
1503 ret += "\t\t}\n"; // unpack failed.
1506 // Need to release memory held by BUFFER types.
1509 for(g=0;g<gb_tbl->size();g++){
1510 data_type *gdt = gb_tbl->get_data_type(g);
1511 if(gdt->is_buffer_type()){
1512 sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].gb_var%d));\n",gdt->get_buffer_destroy().c_str(),idx.c_str(),g);
1516 for(a=0;a<aggr_tbl->size();a++){
1517 if(aggr_tbl->is_builtin(a)){
1518 data_type *adt = aggr_tbl->get_data_type(a);
1519 if(adt->is_buffer_type()){
1520 sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].aggr_var%d));\n",adt->get_buffer_destroy().c_str(),idx.c_str(),a);
1524 ret += "\t\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_DESTROY_(";
1525 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1526 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1530 ret += "\t\tt->n_aggrs--;\n";
1536 string generate_gb_match_test(string idx){
1538 string ret="\tif (IS_FILLED(t->aggr_table_bitmap, "+idx+") && IS_NEW(t->aggr_table_bitmap,"+idx+")";
1539 if(gb_tbl->size()>0){
1540 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
1543 // Next, scan list for a match on the group-by attributes.
1544 string rhs_op, lhs_op;
1545 for(g=0;g<gb_tbl->size();g++){
1548 sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
1549 sprintf(tmpstr,"t->aggr_table[%s].gb_var%d",idx.c_str(),g); rhs_op = tmpstr;
1550 ret += generate_equality_test(lhs_op, rhs_op, gb_tbl->get_data_type(g) );
1560 string generate_gb_update(string node_name, table_list *schema, string idx, bool has_udaf){
1564 ret += "/*\t\tMatch found : update in place.\t*/\n";
1567 for(a=0;a<aggr_tbl->size();a++){
1568 sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1569 ret += generate_aggr_update(tmpstr,aggr_tbl,a, schema);
1570 if(! aggr_tbl->is_builtin(a)) has_udaf = true;
1573 // garbage collect copied buffer type gb attrs.
1574 for(g=0;g<gb_tbl->size();g++){
1575 data_type *gdt = gb_tbl->get_data_type(g);
1576 if(gdt->is_buffer_type()){
1577 sprintf(tmpstr,"\t\t\t%s(&(gb_attr_%d));\n",gdt->get_buffer_destroy().c_str(),g);
1584 bool first_udaf = true;
1587 for(a=0;a<aggr_tbl->size();a++){
1588 if(! aggr_tbl->is_builtin(a)){
1589 if(! first_udaf)ret += " || ";
1590 else first_udaf = false;
1591 ret += aggr_tbl->get_op(a)+"_LFTA_AGGR_FLUSHME_(";
1592 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1593 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"))";
1597 ret+=" fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
1598 ret += generate_tuple_from_aggr(node_name,schema,idx);
1599 ret += "\t\tt->aggr_table_hashmap["+idx+"] &= ~SLOT_FILLED;\n";
1606 string generate_init_group( table_list *schema, string idx){
1608 string ret="\t\t\tt->aggr_table_hashmap["+idx+"] = hash2 | SLOT_FILLED | gen_val;\n";
1609 // Fill up the aggregate block.
1610 for(g=0;g<gb_tbl->size();g++){
1611 sprintf(tmpstr,"\t\t\tt->aggr_table[%s].gb_var%d = gb_attr_%d;\n",idx.c_str(),g,g);
1614 for(a=0;a<aggr_tbl->size();a++){
1615 sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1616 ret += generate_aggr_init(tmpstr, aggr_tbl,a, schema);
1618 ret+="\t\tt->n_aggrs++;\n";
1623 string generate_fta_flush(string node_name, table_list *schema,
1624 ext_fcn_list *Ext_fcns){
1627 string select_var_defs ;
1630 // Flush from previous epoch
1632 ret+="static void fta_aggr_flush_old_"+node_name+"(struct FTA *f, unsigned int nflush){\n";
1634 ret += "\tgs_int32_t tuple_size, tuple_pos;\n";
1635 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1636 ret += "\tint i, lfta_bailout;\n";
1637 ret += "\tunsigned int gen_val;\n";
1639 ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1640 ret += generate_fta_name(node_name)+" *) f;\n";
1645 // Variables needed to store selected attributes of BUFFER type
1646 // temporarily, in order to compute their size for storage
1647 // in an output tuple.
1649 select_var_defs = "";
1650 for(s=0;s<sl_list.size();s++){
1651 data_type *sdt = sl_list[s]->get_data_type();
1652 if(sdt->is_buffer_type()){
1653 sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
1654 select_var_defs.append(tmpstr);
1657 if(select_var_defs != ""){
1658 ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
1659 ret += select_var_defs;
1663 // Variables to store results of partial functions.
1664 if(sl_fcns_start != sl_fcns_end){
1665 ret += "/*\t\tVariables to store the results of partial functions.\t*/\n";
1666 for(p=sl_fcns_start;p<sl_fcns_end;p++){
1667 sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
1668 partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
1671 ret += "\tgs_retval_t retval = 0;\n\tint unpack_failed = 0;\n;";
1674 // Variables for udaf output temporaries
1675 bool no_udaf = true;
1677 for(a=0;a<aggr_tbl->size();a++){
1678 if(! aggr_tbl->is_builtin(a)){
1680 ret+="/*\t\tUDAF output vars.\t*/\n";
1683 int afcn_id = aggr_tbl->get_fcn_id(a);
1684 data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
1685 sprintf(tmpstr,"udaf_ret%d", a);
1686 ret+="\t"+adt->make_cvar(tmpstr)+";\n";
1691 // ret+="\tt->flush_finished=1; /* flush will be completed */\n";
1693 ret+="\tgen_val = t->generation & SLOT_GEN_BITS;\n";
1694 ret+="\tfor (i=t->flush_pos; (i < t->max_aggrs) && t->n_aggrs && nflush>0; ++i){\n";
1695 ret+="\t\tif ( (t->aggr_table_hashmap[i] & SLOT_FILLED) && (((t->aggr_table_hashmap[i] & SLOT_GEN_BITS) != gen_val ) || (";
1698 for(g=0;g<gb_tbl->size();g++){
1699 data_type *gdt = gb_tbl->get_data_type(g);
1700 if(gdt->is_temporal()){
1701 if(first_g) first_g=false; else ret+=" || ";
1702 ret += "t->last_gb_"+int_to_string(g)+" > t->aggr_table[i].gb_var"+int_to_string(g)+" ";
1706 ret+="\t\t\tt->aggr_table_hashmap[i] = 0;\n";
1708 "#ifdef LFTA_STATS\n"
1709 "\t\t\tt->eviction_cnt++;\n"
1714 ret+=generate_tuple_from_aggr(node_name,schema,"i");
1716 // ret+="\t\t\tt->n_aggrs--;\n"; // done in generate_tuple_from_aggr
1717 ret+="\t\t\tnflush--;\n";
1720 ret+="\tt->flush_pos=i;\n";
1721 ret+="\tif(t->n_aggrs == 0) {\n";
1722 ret+="\t\tt->flush_pos = t->max_aggrs;\n";
1725 ret+="\tif(t->flush_pos == t->max_aggrs) {\n";
1727 for(int g=0;g<gb_tbl->size();g++){
1728 data_type *dt = gb_tbl->get_data_type(g);
1729 if(dt->is_temporal()){
1730 data_type *gdt = gb_tbl->get_data_type(g);
1731 if(!gdt->is_buffer_type()){
1732 sprintf(tmpstr,"\t\tt->last_flushed_gb_%d = t->flush_start_gb_%d;\n",g,g);
1737 ret += "\t}\n}\n\n";
1742 // TODO Remove sprintf to perform string catenation
1743 string generate_fta_load_params(string node_name){
1745 vector<string> param_names = param_tbl->get_param_names();
1747 string ret = "static int load_params_"+node_name+"(struct "+generate_fta_name(node_name);
1748 ret += " *t, int sz, void *value, int initial_call){\n";
1749 ret += "\tint pos=0;\n";
1750 ret += "\tint data_pos;\n";
1752 for(p=0;p<param_names.size();p++){
1753 data_type *dt = param_tbl->get_data_type(param_names[p]);
1754 if(dt->is_buffer_type()){
1755 sprintf(tmpstr,"\t%s tmp_var_%s;\n",dt->get_cvar_type().c_str(), param_names[p].c_str() );
1757 sprintf(tmpstr,"\t%s access_var_%s;\n",dt->get_tuple_cvar_type().c_str(), param_names[p].c_str() );
1764 ret += "\n\tdata_pos = ";
1765 for(p=0;p<param_names.size();p++){
1766 if(p>0) ret += " + ";
1767 data_type *dt = param_tbl->get_data_type(param_names[p]);
1769 ret += dt->get_tuple_cvar_type();
1773 ret += "\tif(data_pos > sz) return 1;\n\n";
1776 for(p=0;p<param_names.size();p++){
1777 data_type *dt = param_tbl->get_data_type(param_names[p]);
1778 if(dt->is_buffer_type()){
1779 sprintf(tmpstr,"\taccess_var_%s = *( (%s *)((char *)value+pos) );\n",param_names[p].c_str(), dt->get_tuple_cvar_type().c_str() );
1781 switch( dt->get_type() ){
1783 // ret += "\ttmp_var_"+param_names[p]+".data = ntohl( tmp_var_"+param_names[p]+".data );\n"; // ntoh conversion
1784 // ret += "\ttmp_var_"+param_names[p]+".length = ntohl( tmp_var_"+param_names[p]+".length );\n"; // ntoh conversion
1785 sprintf(tmpstr,"\tif( (access_var_%s.offset) + access_var_%s.length > sz) return 1;\n",param_names[p].c_str(), param_names[p].c_str() );
1787 sprintf(tmpstr,"\ttmp_var_%s.data = (gs_sp_t)(value) + access_var_%s.offset ;\n",param_names[p].c_str(), param_names[p].c_str() );
1789 sprintf(tmpstr,"\ttmp_var_%s.length = access_var_%s.length ;\n",param_names[p].c_str(), param_names[p].c_str() );
1793 fprintf(stderr,"ERROR: parameter %s is of type %s, a buffered type, but I don't know how to unpack it as a parameter.\n",param_names[p].c_str(), dt->get_type_str().c_str() );
1797 // First, destroy the old
1798 ret += "\tif(! initial_call)\n";
1799 sprintf(tmpstr,"\t\t%s(&(t->param_%s));\n",dt->get_buffer_destroy().c_str(),param_names[p].c_str());
1801 // Next, create the new.
1802 sprintf(tmpstr,"\t%s((struct FTA *)t, &(t->param_%s), &tmp_var_%s);\n", dt->get_buffer_assign_copy().c_str(), param_names[p].c_str(), param_names[p].c_str() );
1805 // if(dt->needs_hn_translation()){
1806 // sprintf(tmpstr,"\tt->param_%s = %s( *( (%s *)( (char *)value+pos) ) );\n",
1807 // param_names[p].c_str(), dt->ntoh_translation().c_str(), dt->get_cvar_type().c_str() );
1809 sprintf(tmpstr,"\tt->param_%s = *( (%s *)( (char *)value+pos) );\n",
1810 param_names[p].c_str(), dt->get_cvar_type().c_str() );
1814 sprintf(tmpstr,"\tpos += sizeof( %s );\n",dt->get_cvar_type().c_str() );
1818 // Register the pass-by-handle parameters
1820 ret += "/* register and de-register the pass-by-handle parameters */\n";
1823 for(ph=0;ph<param_handle_table.size();++ph){
1824 data_type pdt(param_handle_table[ph]->type_name);
1825 switch(param_handle_table[ph]->val_type){
1831 ret += "\tif(! initial_call)\n";
1832 sprintf(tmpstr, "\t\t%s(t->handle_param_%d);\n",
1833 param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1835 sprintf(tmpstr,"\tt->handle_param_%d = %s((struct FTA *)t,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
1838 if(pdt.is_buffer_type()) ret += "&(";
1839 ret += "t->param_"+param_handle_table[ph]->param_name;
1840 if(pdt.is_buffer_type()) ret += ")";
1844 sprintf(tmpstr, "INTERNAL ERROR unknown case (%d) found when processing pass-by-handle parameter table.",param_handle_table[ph]->val_type);
1845 fprintf(stderr,"%s\n",tmpstr);
1850 ret+="\treturn 0;\n";
1859 string generate_fta_free(string node_name, bool is_aggr_query){
1861 string ret="static gs_retval_t free_fta_"+node_name+"(struct FTA *f, gs_uint32_t recursive){\n";
1862 ret+= "\tstruct "+generate_fta_name(node_name)+
1863 " * t = (struct "+generate_fta_name(node_name)+" *) f;\n";
1864 ret += "\tint i;\n";
1867 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1868 ret+="\t/* \t\tmark all groups as old */\n";
1869 ret+="\tt->generation++;\n";
1870 ret+="\tt->flush_pos = 0;\n";
1871 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1874 // Deregister the pass-by-handle parameters
1875 ret += "/* de-register the pass-by-handle parameters */\n";
1877 for(ph=0;ph<param_handle_table.size();++ph){
1878 sprintf(tmpstr, "\t%s(t->handle_param_%d);\n",
1879 param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1884 ret += "\treturn 0;\n}\n\n";
1889 string generate_fta_control(string node_name, table_list *schema, bool is_aggr_query){
1890 string ret="static gs_retval_t control_fta_"+node_name+"(struct FTA *f, gs_int32_t command, gs_int32_t sz, void *value){\n";
1891 ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1892 ret += generate_fta_name(node_name)+" *) f;\n\n";
1896 ret += "\t/* temp status tuple */\n";
1897 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1898 ret += "\tgs_int32_t tuple_size;\n";
1902 ret+="\tif(command == FTA_COMMAND_FLUSH){\n";
1904 ret+="\t\tif (!t->n_aggrs) {\n";
1905 ret+="\t\t\ttuple = allocate_tuple(f, 0);\n";
1906 ret+="\t\t\tif( tuple != NULL)\n";
1907 ret+="\t\t\t\tpost_tuple(tuple);\n";
1909 ret+="\t\t}else{\n";
1911 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1912 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1913 ret +="\t\tt->generation++;\n";
1914 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1915 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1916 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1917 ret+="\t\t\tt->flush_pos = 0;\n";
1918 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1923 if(param_tbl->size() > 0){
1925 "\tif(command == FTA_COMMAND_LOAD_PARAMS){\n"
1926 "\t\tif(load_params_"+node_name+"(t, sz, value, 0))\n"
1927 "#ifndef LFTA_IN_NIC\n"
1928 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small, ignored.\\n\");\n"
1935 "\tif(command == FTA_COMMAND_SET_SAMPLING_RATE){\n"
1936 "\t\tmemcpy(&t->sampling_rate, value, sizeof(gs_float_t));\n"
1940 ret += "\tif(command == FTA_COMMAND_FILE_DONE ){\n";
1943 ret+="\t\tif (t->n_aggrs) {\n";
1944 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1945 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1946 ret +="\t\tt->generation++;\n";
1947 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1948 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1949 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1950 ret+="\t\t\tt->flush_pos = 0;\n";
1951 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1955 ret += "\t\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+");\n";
1956 ret += "\t\ttuple = allocate_tuple(f, tuple_size );\n";
1957 ret += "\t\tif( tuple == NULL)\n\t\treturn 1;\n";
1959 /* mark tuple as EOF_TUPLE */
1960 ret += "\n\t\t/* Mark tuple as eof_tuple */\n";
1961 ret += "\t\ttuple->tuple_type = EOF_TUPLE;\n";
1962 ret += "\t\tpost_tuple(tuple);\n";
1965 ret += "\treturn 0;\n}\n\n";
1970 string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query, bool advance_uxtime){
1971 string ret="static gs_retval_t clock_fta_"+node_name+"(struct FTA *f){\n";
1972 ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1973 ret += generate_fta_name(node_name)+" *) f;\n\n";
1975 ret += "\t/* Create a temp status tuple */\n";
1976 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1977 ret += "\tgs_int32_t tuple_size;\n";
1978 ret += "\tunsigned int i;\n";
1979 ret += "\ttime_t cur_time;\n";
1980 ret += "\tint time_advanced;\n";
1981 ret += "\tstruct fta_stat stats;\n";
1985 /* copy the last seen values of temporal attributes */
1986 col_id_set temp_cids; // col ids of temp attributes in select clause
1989 /* HACK: in order to reuse the SE generation code, we need to copy
1990 * the last values of the temp attributes into new variables
1991 * which have names unpack_var_XXX_XXX
1995 col_id_set::iterator csi;
1997 for(s=0;s<sl_list.size();s++){
1998 data_type *sdt = sl_list[s]->get_data_type();
1999 if (sdt->is_temporal()) {
2000 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
2004 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2005 int tblref = (*csi).tblvar_ref;
2006 int schref = (*csi).schema_ref;
2007 string field = (*csi).field;
2008 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2009 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
2013 if (is_aggr_query) {
2014 for(g=0;g<gb_tbl->size();g++){
2015 data_type *gdt = gb_tbl->get_data_type(g);
2016 if(gdt->is_temporal()){
2017 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
2019 data_type *gdt = gb_tbl->get_data_type(g);
2020 if(gdt->is_buffer_type()){
2021 sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
2029 ret += "\ttime_advanced = 0;\n";
2031 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2032 int tblref = (*csi).tblvar_ref;
2033 int schref = (*csi).schema_ref;
2034 string field = (*csi).field;
2035 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2037 // update last seen value with the value seen
2038 ret += "\t#ifdef PREFILTER_DEFINED\n";
2039 sprintf(tmpstr,"\tif (prefilter_temp_vars.unpack_var_%s_%d > t->last_%s_%d) {\n\t\tt->last_%s_%d = prefilter_temp_vars.unpack_var_%s_%d;\n",
2040 field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref);
2042 ret += "\t\ttime_advanced = 1;\n\t}\n";
2043 ret += "\t#endif\n";
2045 // we need to pay special attention to time fields
2046 if (field == "time" || field == "timestamp" || field == "timestamp_ms"){
2047 ret += "\tcur_time = time(&cur_time);\n";
2049 if (field == "time") {
2050 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && (t->last_time_%d < (cur_time - %d))) {\n",
2053 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = cur_time - %d;\n",
2054 field.c_str(), tblref, field.c_str(), tblref, time_corr);
2055 } else if (field == "timestamp_ms") {
2056 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((t->last_timestamp_ms_%d/1000) < (cur_time - %d))) {\n",
2059 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = (cur_time - %d) * 1000;\n",
2060 field.c_str(), tblref, field.c_str(), tblref, time_corr);
2062 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((gs_uint32_t)(t->last_%s_%d>>32) < (cur_time - %d))) {\n",
2063 field.c_str(), tblref, time_corr);
2065 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = ((gs_uint64_t)(cur_time - %d))<<32;\n",
2066 field.c_str(), tblref, field.c_str(), tblref, time_corr);
2070 ret += "\t\ttime_advanced = 1;\n";
2073 sprintf(tmpstr,"\telse\n\t\tunpack_var_%s_%d = t->last_%s_%d;\n",
2074 field.c_str(), tblref, field.c_str(), tblref);
2077 sprintf(tmpstr,"\tunpack_var_%s_%d = t->last_%s_%d;\n",
2078 field.c_str(), tblref, field.c_str(), tblref);
2085 ret += "\tt->ux_time = time(&(t->ux_time));\n";
2088 // for aggregation lftas we need to check if the time was advanced beyond the current epoch
2089 if (is_aggr_query) {
2092 bool first_one = true;
2093 for(g=0;g<gb_tbl->size();g++){
2094 data_type *gdt = gb_tbl->get_data_type(g);
2095 if(gdt->is_temporal()){
2096 // To perform the test, first need to compute the value
2097 // of the temporal gb attrs.
2098 if(gdt->is_buffer_type()){
2099 // NOTE : if the SE defining the gb is anything
2100 // other than a ref to a variable, this will generate
2101 // illegal code. To be resolved with Spatch.
2102 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2103 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2105 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2106 gdt->get_buffer_assign_copy().c_str(), g, g);
2108 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2112 sprintf(tmpstr,"t->last_gb_%d",g); string lhs_op = tmpstr;
2113 sprintf(tmpstr,"gb_attr_%d",g); string rhs_op = tmpstr;
2114 if(first_one){first_one = false;} else {change_test.append(") && (");}
2115 change_test.append(generate_equality_test(lhs_op, rhs_op, gdt));
2119 ret += "\n\tif( time_advanced && !( (";
2123 ret += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
2124 ret += "\t\tif(t->flush_pos<t->max_aggrs) \n";
2125 ret += "\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2127 ret += "\t\t/* \t\tmark all groups as old */\n";
2128 ret +="\t\tt->generation++;\n";
2129 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
2130 ret += "//\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
2131 ret += "//\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
2132 ret += "\t\tt->flush_pos = 0;\n";
2134 for(g=0;g<gb_tbl->size();g++){
2135 data_type *gdt = gb_tbl->get_data_type(g);
2136 if(gdt->is_temporal()){
2137 sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g); ret += tmpstr;
2138 sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g); ret += tmpstr;
2145 ret += "\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t) + sizeof(struct fta_stat);\n";
2146 ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
2147 ret += "\tif( tuple == NULL)\n\t\treturn 1;\n";
2150 for(s=0;s<sl_list.size();s++){
2151 data_type *sdt = sl_list[s]->get_data_type();
2152 if(sdt->is_temporal()){
2154 if (sl_list[s]->is_gb()) {
2155 sprintf(tmpstr,"\tt->last_flushed_gb_%d = (t->n_aggrs) ? t->last_flushed_gb_%d : %s;\n",sl_list[s]->get_gb_ref(), sl_list[s]->get_gb_ref(), generate_se_code(sl_list[s],schema).c_str());
2159 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2161 // if(sdt->needs_hn_translation())
2162 // ret += sdt->hton_translation() +"( ";
2163 if (sl_list[s]->is_gb()) {
2164 sprintf(tmpstr, "t->last_flushed_gb_%d",sl_list[s]->get_gb_ref());
2167 ret += generate_se_code(sl_list[s],schema);
2169 // if(sdt->needs_hn_translation())
2175 /* mark tuple as temporal */
2176 ret += "\n\t/* Mark tuple as temporal */\n";
2177 ret += "\ttuple->tuple_type = TEMPORAL_TUPLE;\n";
2179 ret += "\n\t/* Copy trace id */\n";
2180 ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+"), &t->trace_id, sizeof(gs_uint64_t));\n\n";
2182 ret += "\n\t/* Populate runtime stats */\n";
2183 ret += "\tstats.ftaid = f->ftaid;\n";
2184 ret += "\tstats.in_tuple_cnt = t->in_tuple_cnt;\n";
2185 ret += "\tstats.out_tuple_cnt = t->out_tuple_cnt;\n";
2186 ret += "\tstats.out_tuple_sz = t->out_tuple_sz;\n";
2187 ret += "\tstats.accepted_tuple_cnt = t->accepted_tuple_cnt;\n";
2188 ret += "\tstats.cycle_cnt = t->cycle_cnt;\n";
2189 ret += "\tstats.collision_cnt = t->collision_cnt;\n";
2190 ret += "\tstats.eviction_cnt = t->eviction_cnt;\n";
2191 ret += "\tstats.sampling_rate = t->sampling_rate;\n";
2193 ret += "\n#ifdef LFTA_PROFILE\n";
2194 ret += "\n\t/* Print stats */\n";
2195 ret += "\tfprintf(stderr, \"STATS " + node_name + " \");\n";
2196 ret += "\tfprintf(stderr, \"in_tuple_cnt= %u \", t->in_tuple_cnt);\n";
2197 ret += "\tfprintf(stderr, \"out_tuple_cnt= %u \", t->out_tuple_cnt);\n";
2198 ret += "\tfprintf(stderr, \"out_tuple_sz= %u \", t->out_tuple_sz);\n";
2199 ret += "\tfprintf(stderr, \"accepted_tuple_cnt= %u \", t->accepted_tuple_cnt);\n";
2200 ret += "\tfprintf(stderr, \"cycle_cnt= %llu \", t->cycle_cnt);\n";
2201 ret += "\tfprintf(stderr, \"cycle_per_in_tuple= %lf \", ((double)t->cycle_cnt)/((double)t->in_tuple_cnt));\n";
2202 ret += "\tfprintf(stderr, \"collision_cnt= %d\\n\", t->collision_cnt);\n\n";
2203 ret += "\tfprintf(stderr, \"eviction_cnt= %d\\n\", t->eviction_cnt);\n\n";
2204 ret += "\n#endif\n";
2207 ret += "\n\t/* Copy stats */\n";
2208 ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t), &stats, sizeof(fta_stat));\n\n";
2209 ret+="\tpost_tuple(tuple);\n";
2211 ret += "\n\t/* Send a heartbeat message to clearinghouse */\n";
2212 ret += "\tfta_heartbeat(f->ftaid, t->trace_id++, 1, &stats);\n";
2214 ret += "\n\t/* Reset runtime stats */\n";
2215 ret += "\tt->in_tuple_cnt = 0;\n";
2216 ret += "\tt->out_tuple_cnt = 0;\n";
2217 ret += "\tt->out_tuple_sz = 0;\n";
2218 ret += "\tt->accepted_tuple_cnt = 0;\n";
2219 ret += "\tt->cycle_cnt = 0;\n";
2220 ret += "\tt->collision_cnt = 0;\n";
2221 ret += "\tt->eviction_cnt = 0;\n";
2223 ret += "\treturn 0;\n}\n\n";
2229 // accept processing before the where clause,
2230 // do flush processwing.
2231 string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *schema, col_id_set &unpacked_cids, string &temporal_flush){
2235 string ret="\n/*\tslow flush\t*/\n";
2236 string slow_flush_str = fs->get_val_of_def("slow_flush");
2237 int n_slow_flush = atoi(slow_flush_str.c_str());
2238 if(n_slow_flush <= 0) n_slow_flush = 2;
2239 if(n_slow_flush > 1){
2240 ret += "\tt->flush_ctr++;\n";
2241 ret += "\tif(t->flush_ctr >= "+int_to_string(n_slow_flush)+"){\n";
2242 ret += "\t\tt->flush_ctr = 0;\n";
2243 ret+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n";
2246 ret+="\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n\n";
2251 bool first_one = true;
2253 col_id_set flush_cids; // col ids accessed when computing flush variables.
2254 // unpack them at temporal flush test time.
2255 temporal_flush = "";
2258 for(g=0;g<gb_tbl->size();g++){
2259 data_type *gdt = gb_tbl->get_data_type(g);
2260 if(gdt->is_temporal()){
2261 gather_se_col_ids(gb_tbl->get_def(g), flush_cids, gb_tbl);
2263 // To perform the test, first need to compute the value
2264 // of the temporal gb attrs.
2265 if(gdt->is_buffer_type()){
2266 // NOTE : if the SE defining the gb is anything
2267 // other than a ref to a variable, this will generate
2268 // illegal code. To be resolved with Spatch.
2269 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2270 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2271 temporal_flush += tmpstr;
2272 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2273 gdt->get_buffer_assign_copy().c_str(), g, g);
2275 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2277 temporal_flush += tmpstr;
2278 // END computing the value of the temporal GB attr.
2281 sprintf(tmpstr,"t->last_gb_%d",g); string lhs_op = tmpstr;
2282 sprintf(tmpstr,"gb_attr_%d",g); string rhs_op = tmpstr;
2283 if(first_one){first_one = false;} else {change_test.append(") && (");}
2284 change_test += generate_equality_test(lhs_op, rhs_op, gdt);
2287 if(!first_one){ // will be false iff. there is a temporal GB attribute
2288 temporal_flush += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
2289 temporal_flush += "\tif( !( (";
2290 temporal_flush += change_test;
2291 temporal_flush += ") ) ){\n";
2293 // temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2294 temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs){ \n";
2295 temporal_flush+="\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2296 temporal_flush+="\t\t}\n";
2297 temporal_flush+="\t\t/* \t\tmark all groups as old */\n";
2298 temporal_flush+="\t\tt->generation++;\n";
2299 temporal_flush+="\t\tt->flush_pos = 0;\n";
2302 // Now set the saved temporal value of the gb to the
2303 // current value of the gb. Only for simple types,
2304 // not for buffer types -- but the strings are not
2305 // temporal in any case.
2307 for(g=0;g<gb_tbl->size();g++){
2308 data_type *gdt = gb_tbl->get_data_type(g);
2309 if(gdt->is_temporal()){
2310 if(gdt->is_buffer_type()){
2312 fprintf(stderr,"ERROR : can't handle temporal buffer types, ignoring in buffer flush control.\n");
2314 sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);
2315 temporal_flush += tmpstr;
2316 sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);
2317 temporal_flush += tmpstr;
2321 temporal_flush += "\t}\n\n";
2324 // Unpack all the temporal attributes referenced in select clause
2325 // and update the last value of the attribute
2326 col_id_set temp_cids; // col ids of temp attributes in select clause
2327 col_id_set::iterator csi;
2329 for(s=0;s<sl_list.size();s++){
2330 data_type *sdt = sl_list[s]->get_data_type();
2331 if (sdt->is_temporal()) {
2332 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
2336 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2337 if(unpacked_cids.count((*csi)) == 0){
2338 int tblref = (*csi).tblvar_ref;
2339 int schref = (*csi).schema_ref;
2340 string field = (*csi).field;
2341 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2343 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2344 sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n",
2345 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2347 ret += "\tif(retval) return 1;\n";
2349 sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
2352 unpacked_cids.insert( (*csi) );
2357 // Do the flush here if this is a real_time query
2358 string rt_level = fs->get_val_of_def("real_time");
2359 if(rt_level != "" && temporal_flush != ""){
2360 for(csi=flush_cids.begin(); csi != flush_cids.end();++csi){
2361 if(unpacked_cids.count((*csi)) == 0){
2362 int tblref = (*csi).tblvar_ref;
2363 int schref = (*csi).schema_ref;
2364 string field = (*csi).field;
2365 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2367 sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n",
2368 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2370 ret += "\tif(retval) return 1;\n";
2372 unpacked_cids.insert( (*csi) );
2375 ret += temporal_flush;
2381 string generate_sel_accept_body(qp_node *fs, string node_name, table_list *schema){
2386 /////////////// Processing for filter-only query
2388 // test passed : create the tuple, then assign to it.
2389 ret += "/*\t\tCreate and post the tuple\t*/\n";
2391 // Unpack partial fcns ref'd by the select clause.
2392 // Its a kind of a WHERE clause ...
2393 for(p=sl_fcns_start;p<sl_fcns_end;p++){
2394 if(fcn_ref_cnt[p] > 1){
2395 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2397 if(is_partial_fcn[p]){
2398 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2399 ret += "\tif(retval) goto end;\n";
2401 if(fcn_ref_cnt[p] > 1){
2402 if(!is_partial_fcn[p]){
2403 ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2405 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2410 // increment the counter of accepted tuples
2411 ret += "\n\t#ifdef LFTA_STATS\n";
2412 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2413 ret += "\t#endif\n\n";
2415 // First, compute the size of the tuple.
2417 // Unpack any BUFFER type selections into temporaries
2418 // so that I can compute their size and not have
2419 // to recompute their value during tuple packing.
2420 // I can use regular assignment here because
2421 // these temporaries are non-persistent.
2423 for(s=0;s<sl_list.size();s++){
2424 data_type *sdt = sl_list[s]->get_data_type();
2425 if(sdt->is_buffer_type()){
2426 sprintf(tmpstr,"\tselvar_%d = ",s);
2428 ret += generate_se_code(sl_list[s],schema);
2434 // The size of the tuple is the size of the tuple struct plus the
2435 // size of the buffers to be copied in.
2437 ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
2438 for(s=0;s<sl_list.size();s++){
2439 data_type *sdt = sl_list[s]->get_data_type();
2440 if(sdt->is_buffer_type()){
2441 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
2448 ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
2449 ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
2451 // Test passed, make assignments to the tuple.
2453 ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
2455 // Mark tuple as REGULAR_TUPLE
2456 ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
2459 for(s=0;s<sl_list.size();s++){
2460 data_type *sdt = sl_list[s]->get_data_type();
2461 if(sdt->is_buffer_type()){
2462 sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
2464 sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
2467 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2469 // if(sdt->needs_hn_translation())
2470 // ret += sdt->hton_translation() +"( ";
2471 ret += generate_se_code(sl_list[s],schema);
2472 // if(sdt->needs_hn_translation())
2480 ret += "\tpost_tuple(tuple);\n";
2482 // Increment the counter of posted tuples
2483 ret += "\n\t#ifdef LFTA_STATS\n";
2484 ret += "\tt->out_tuple_cnt++;\n";
2485 ret+="\tt->out_tuple_sz+=tuple_size;\n";
2486 ret += "\t#endif\n\n";
2493 // TODO Ensure that postfilter predicates are being generated
2494 string generate_fj_accept_body(filter_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
2500 unsigned int window_len = fs->temporal_range;
2501 unsigned int n_bloom = 11;
2502 string n_bloom_str = fs->get_val_of_def("num_bloom");
2503 int tmp_n_bloom = atoi(n_bloom_str.c_str());
2505 n_bloom = tmp_n_bloom+1;
2506 float bloom_width = (window_len+1.0)/(1.0*n_bloom-1);
2507 sprintf(tmpstr,"%f",bloom_width);
2508 string bloom_width_str = tmpstr;
2510 if(window_len < n_bloom){
2511 n_bloom = window_len+1;
2512 bloom_width_str = "1";
2516 // Grab the current window time
2517 scalarexp_t winvar(fs->temporal_var);
2518 ret += "\tcurr_fj_ts = "+generate_se_code(&winvar,schema)+";\n";
2520 int bf_exp_size = 12; // base-2 log of number of bits
2521 string bloom_len_str = fs->get_val_of_def("bloom_size");
2522 int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
2523 if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
2524 bf_exp_size = tmp_bf_exp_size;
2526 int bf_bit_size = 1 << bf_exp_size;
2527 int bf_byte_size = bf_bit_size / (8*sizeof(char));
2529 unsigned int ht_size = 4096;
2530 string ht_size_s = fs->get_val_of_def("aggregate_slots");
2531 int tmp_ht_size = atoi(ht_size_s.c_str());
2532 if(tmp_ht_size > 1024){
2533 unsigned int hs = 1; // make it power of 2
2536 tmp_ht_size = tmp_ht_size >> 1;
2543 for(i=0;i<bf_exp_size;i++)
2544 bf_mask = (bf_mask << 1) | 1;
2546 for(i=ht_size;i>1;i=i>>1)
2547 bf_mask = (bf_mask << 1) | 1;
2551 printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=%d, bf_byte_size=%d, ht_size=%d, ht_size_s=%s, bf_mask=%d\n",
2554 bloom_width_str.c_str(),
2566 // If this is a bloom-filter fj, first test if the
2567 // bloom filter needs to be advanced.
2568 // SET_BF_EMPTY(table,number of bloom filters,bloom filter index,bit index)
2569 // t->bf_size : number of bits in bloom filter
2571 // TODO: Don't iterate more than n_bloom times!
2572 // As written, its possible to wrap around many times.
2575 "// Clean out old bloom filters if needed.\n"
2576 "// TODO vectorize this ? \n"
2577 " if(t->first_exec){\n"
2578 " t->first_exec = 0;\n"
2579 " t->last_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2580 " t->last_bloom_pos = t->last_bin % "+int_to_string(n_bloom)+";\n"
2582 " curr_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2583 " if(curr_bin != t->last_bin){\n"
2584 " for(the_bin=t->last_bin+1;the_bin<=curr_bin;the_bin++){\n"
2585 " t->last_bloom_pos++;\n"
2586 " if(t->last_bloom_pos >= "+int_to_string(n_bloom)+")\n"
2587 " t->last_bloom_pos = 0;\n"
2588 " tmp_i = t->last_bloom_pos;\n"
2589 " for(j=0;j<"+int_to_string(bf_bit_size)+";j++){\n"
2590 " SET_BF_EMPTY(t->bf_table, "+int_to_string(n_bloom)+", tmp_i,j);\n"
2594 " t->last_bin = curr_bin;\n"
2600 //-----------------------------------------------------------------
2601 // First, determine whether to do S (filter stream) processing.
2604 "// S (filtering stream) predicate, should it be processed?\n"
2607 // Sort S preds based on cost.
2608 vector<cnf_elem *> s_filt = fs->pred_t1;
2609 col_id_set::iterator csi;
2610 if(s_filt.size() > 0){
2612 // Unpack fields ref'd in the S pred
2613 for(w=0;w<s_filt.size();++w){
2614 col_id_set this_pred_cids;
2615 gather_pr_col_ids(s_filt[w]->pr, this_pred_cids, gb_tbl);
2616 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2617 if(unpacked_cids.count( (*csi) ) == 0){
2618 int tblref = (*csi).tblvar_ref;
2619 int schref = (*csi).schema_ref;
2620 string field = (*csi).field;
2621 ret += generate_unpack_code(tblref,schref,field,schema,node_name,"end_s");
2622 unpacked_cids.insert( (*csi) );
2628 // Sort by evaluation cost.
2629 // First, estimate evaluation costs
2630 // Eliminate predicates covered by the prefilter (those in s_pids).
2631 // I need to do it before the sort becuase the indices refer
2632 // to the position in the unsorted list.
2633 vector<cnf_elem *> tmp_wh;
2634 for(w=0;w<s_filt.size();++w){
2635 compute_cnf_cost(s_filt[w],Ext_fcns);
2636 tmp_wh.push_back(s_filt[w]);
2640 sort(s_filt.begin(), s_filt.end(), compare_cnf_cost());
2642 // Now generate the predicates.
2643 for(w=0;w<s_filt.size();++w){
2644 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,s_filt[w]->cost);
2647 // Find partial fcns ref'd in this cnf element
2649 collect_partial_fcns_pr(s_filt[w]->pr, pfcn_refs);
2650 // Since set<..> is a "Sorted Associative Container",
2651 // we can walk through it in sorted order by walking from
2652 // begin() to end(). (and the partial fcns must be
2653 // evaluated in this order).
2654 set<int>::iterator si;
2656 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2657 if(fcn_ref_cnt[(*si)] > 1){
2658 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2660 if(is_partial_fcn[(*si)]){
2661 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2662 ret += "\t\tif(retval) goto end_s;\n";
2664 if(fcn_ref_cnt[(*si)] > 1){
2665 if(!is_partial_fcn[(*si)]){
2666 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2667 // Testing for S is a side branch.
2668 // I don't want a cacheable partial function to be
2669 // marked as evaluated. Therefore I mark the function
2670 // as evalauted ONLY IF it is not partial.
2671 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2677 ret += "\tif( !("+generate_predicate_code(s_filt[w]->pr,schema)+
2678 ") ) goto end_s;\n";
2681 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2684 for(p=0;p<fs->hash_eq.size();++p)
2685 ret += "\t\ts_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_right_se(),schema)+";\n";
2688 // First, generate the S scalar expressions in the hash_eq
2690 // Iterate over the bloom filters
2692 ret += "\t\tbucket=0;\n";
2693 for(p=0;p<fs->hash_eq.size();++p){
2695 " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2696 fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2697 +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2699 // SET_BF_BIT(table,number of bloom filters,bloom filter index,bit index)
2701 " bucket &= "+int_to_string(bf_mask)+";\n"
2702 " SET_BF_BIT(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket);\n"
2707 ret += "// Add the S record to the hash table, choose a position\n";
2708 ret += "\t\tbucket=0;\n";
2709 for(p=0;p<fs->hash_eq.size();++p){
2711 " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2712 fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2713 +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2716 " bucket &= "+int_to_string(bf_mask)+";\n"
2717 " bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2719 // Try the first bucket
2721 for(p=0;p<fs->hash_eq.size();++p){
2722 if(p>0) ret += " && ";
2723 // ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2724 // " == s_equijoin_"+int_to_string(p);
2725 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2726 string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2727 string rhs_op = "s_equijoin_"+int_to_string(p);
2728 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2730 ret += "){\n\t\t\tthe_bucket = bucket;\n";
2731 ret += "\t\t}else{\n\t\t\tif(";
2732 for(p=0;p<fs->hash_eq.size();++p){
2733 if(p>0) ret += " && ";
2734 // ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2735 // " == s_equijoin_"+int_to_string(p);
2736 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2737 string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
2738 string rhs_op = "s_equijoin_"+int_to_string(p);
2739 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2741 ret += "){\n\t\t\t\tthe_bucket = bucket1;\n";
2742 ret += "\t\t\t}else{ \n\t\t\t\tif(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n";
2743 ret+="\t\t\t\t\tthe_bucket = bucket;\n\t\t\t\telse\n\t\t\t\t\tthe_bucket=bucket1;\n";
2744 ret += "\t\t\t}\n\t\t}\n";
2745 for(p=0;p<fs->hash_eq.size();++p){
2746 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2747 if(hdt->is_buffer_type()){
2748 sprintf(tmpstr,"\t\t%s(f, &(t->join_table[the_bucket].key_var%d), &s_equijoin_%d);\n", hdt->get_buffer_assign_copy().c_str(), p, p);
2751 ret += "\t\tt->join_table[the_bucket].key_var"+int_to_string(p)+
2752 " = s_equijoin_"+int_to_string(p)+";\n";
2755 ret+="\t\tt->join_table[the_bucket].ts = curr_fj_ts;\n";
2757 ret += "\tend_s:\n";
2759 // ------------------------------------------------------------
2760 // Next, determine if the R record should be processed.
2764 "// R (main stream) cheap predicate\n"
2768 // Unpack r_filt fields
2769 vector<cnf_elem *> r_filt = fs->pred_t0;
2770 for(w=0;w<r_filt.size();++w){
2771 col_id_set this_pred_cids;
2772 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
2773 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2774 if(unpacked_cids.count( (*csi) ) == 0){
2775 int tblref = (*csi).tblvar_ref;
2776 int schref = (*csi).schema_ref;
2777 string field = (*csi).field;
2778 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2779 unpacked_cids.insert( (*csi) );
2784 // Sort R preds based on cost.
2786 vector<cnf_elem *> tmp_wh;
2787 for(w=0;w<r_filt.size();++w){
2788 compute_cnf_cost(r_filt[w],Ext_fcns);
2789 tmp_wh.push_back(r_filt[w]);
2793 sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
2795 // WARNING! the constant 20 below is a wild-ass guess.
2797 for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++);
2799 // Test the cheap filters on R.
2802 // Now generate the predicates.
2803 for(w=0;w<cheap_rpos;++w){
2804 sprintf(tmpstr,"//\t\tcheap R predicate clause %d. (cost %d)\n",w,r_filt[w]->cost);
2807 // Find partial fcns ref'd in this cnf element
2809 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2810 // Since set<..> is a "Sorted Associative Container",
2811 // we can walk through it in sorted order by walking from
2812 // begin() to end(). (and the partial fcns must be
2813 // evaluated in this order).
2814 set<int>::iterator si;
2815 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2816 if(fcn_ref_cnt[(*si)] > 1){
2817 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2819 if(is_partial_fcn[(*si)]){
2820 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2821 ret += "\t\tif(retval) goto end;\n";
2823 if(fcn_ref_cnt[(*si)] > 1){
2824 if(!is_partial_fcn[(*si)]){
2825 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2827 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2832 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2836 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2839 ret += "\n// Do the join\n\n";
2840 for(p=0;p<fs->hash_eq.size();++p)
2841 ret += "\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n";
2844 // Passed the cheap pred, now test the join with S.
2847 ret += "\t\tbucket"+int_to_string(i)+"=0;\n";
2848 for(p=0;p<fs->hash_eq.size();++p){
2850 " bucket"+int_to_string(i)+
2851 " ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2852 fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
2853 +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2856 " bucket"+int_to_string(i)+" &= "+int_to_string(bf_mask)+";\n";
2858 ret += "\tfound = 0;\n";
2859 ret += "\tfor(b=0;b<"+int_to_string(n_bloom)+" && !found; b++){\n";
2861 "\t\tif(IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket0) && "
2862 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket1) && "
2863 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket2))\n "
2872 ret += "\tfound = 0;\n";
2873 ret += "\t\tbucket=0;\n";
2874 for(p=0;p<fs->hash_eq.size();++p){
2876 " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2877 fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
2878 +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2881 " bucket &= "+int_to_string(bf_mask)+";\n"
2882 " bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2884 // Try the first bucket
2886 for(p=0;p<fs->hash_eq.size();++p){
2887 if(p>0) ret += " && ";
2888 // ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2889 // " == r_equijoin_"+int_to_string(p);
2890 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2891 string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2892 string rhs_op = "s_equijoin_"+int_to_string(p);
2893 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2895 if(p>0) ret += " && ";
2896 ret += "t->join_table[bucket].ts+"+int_to_string(fs->temporal_range)+" <= curr_fj_ts";
2897 ret += "){\n\t\t\tfound = 1;\n";
2898 ret += "\t\t}else {if(";
2899 for(p=0;p<fs->hash_eq.size();++p){
2900 if(p>0) ret += " && ";
2901 // ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2902 // " == r_equijoin_"+int_to_string(p);
2903 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2904 string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
2905 string rhs_op = "s_equijoin_"+int_to_string(p);
2906 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2908 if(p>0) ret += " && ";
2909 ret += "t->join_table[bucket1].ts+"+int_to_string(fs->temporal_range)+" <= curr_fj_ts";
2910 ret += ")\n\t\t\tfound=1;\n";
2919 // Test the expensive filters on R.
2920 if(cheap_rpos < r_filt.size()){
2922 // Now generate the predicates.
2923 for(w=cheap_rpos;w<r_filt.size();++w){
2924 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
2927 // Find partial fcns ref'd in this cnf element
2929 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2930 // Since set<..> is a "Sorted Associative Container",
2931 // we can walk through it in sorted order by walking from
2932 // begin() to end(). (and the partial fcns must be
2933 // evaluated in this order).
2934 set<int>::iterator si;
2935 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2936 if(fcn_ref_cnt[(*si)] > 1){
2937 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2939 if(is_partial_fcn[(*si)]){
2940 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2941 ret += "\t\tif(retval) goto end;\n";
2943 if(fcn_ref_cnt[(*si)] > 1){
2944 if(!is_partial_fcn[(*si)]){
2945 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2947 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2952 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2956 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2961 /////////////// post the tuple
2963 // test passed : create the tuple, then assign to it.
2964 ret += "/*\t\tCreate and post the tuple\t*/\n";
2966 // Unpack r_filt fields
2967 for(s=0;s<sl_list.size();++s){
2968 col_id_set this_se_cids;
2969 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
2970 for(csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
2971 if(unpacked_cids.count( (*csi) ) == 0){
2972 int tblref = (*csi).tblvar_ref;
2973 int schref = (*csi).schema_ref;
2974 string field = (*csi).field;
2975 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2976 unpacked_cids.insert( (*csi) );
2982 // Unpack partial fcns ref'd by the select clause.
2983 // Its a kind of a WHERE clause ...
2984 for(p=sl_fcns_start;p<sl_fcns_end;p++){
2985 if(fcn_ref_cnt[p] > 1){
2986 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2988 if(is_partial_fcn[p]){
2989 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2990 ret += "\tif(retval) goto end;\n";
2992 if(fcn_ref_cnt[p] > 1){
2993 if(!is_partial_fcn[p]){
2994 ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2996 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
3001 // increment the counter of accepted tuples
3002 ret += "\n\t#ifdef LFTA_STATS\n";
3003 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3004 ret += "\t#endif\n\n";
3006 // First, compute the size of the tuple.
3008 // Unpack any BUFFER type selections into temporaries
3009 // so that I can compute their size and not have
3010 // to recompute their value during tuple packing.
3011 // I can use regular assignment here because
3012 // these temporaries are non-persistent.
3014 for(s=0;s<sl_list.size();s++){
3015 data_type *sdt = sl_list[s]->get_data_type();
3016 if(sdt->is_buffer_type()){
3017 sprintf(tmpstr,"\tselvar_%d = ",s);
3019 ret += generate_se_code(sl_list[s],schema);
3025 // The size of the tuple is the size of the tuple struct plus the
3026 // size of the buffers to be copied in.
3028 ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
3029 for(s=0;s<sl_list.size();s++){
3030 data_type *sdt = sl_list[s]->get_data_type();
3031 if(sdt->is_buffer_type()){
3032 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
3039 ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
3040 ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
3042 // Test passed, make assignments to the tuple.
3044 ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
3046 // Mark tuple as REGULAR_TUPLE
3047 ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
3050 for(s=0;s<sl_list.size();s++){
3051 data_type *sdt = sl_list[s]->get_data_type();
3052 if(sdt->is_buffer_type()){
3053 sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
3055 sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
3058 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
3060 // if(sdt->needs_hn_translation())
3061 // ret += sdt->hton_translation() +"( ";
3062 ret += generate_se_code(sl_list[s],schema);
3063 // if(sdt->needs_hn_translation())
3071 ret += "\tpost_tuple(tuple);\n";
3073 // Increment the counter of posted tuples
3074 ret += "\n\t#ifdef LFTA_STATS\n";
3075 ret += "\n\tt->out_tuple_cnt++;\n\n";
3076 ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
3077 ret += "\t#endif\n\n";
3084 string generate_wj_accept_body(watch_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
3090 string wl_schema = fs->from[1]->get_schema_name();
3091 string wl_elem_str = generate_watchlist_element_name(wl_schema);
3092 string wl_node_str = generate_watchlist_struct_name(wl_schema);
3093 string tgt = generate_watchlist_name(wl_schema);
3095 ret += "//\n//\t\tGenerate test to update watchtable here\n//\n\n";
3101 // ------------------------------------------------------------
3102 // Determine if the R record should be processed.
3106 "// R (main stream) cheap predicate\n"
3110 // Unpack r_filt fields
3111 vector<cnf_elem *> r_filt = fs->pred_t0;
3112 for(w=0;w<r_filt.size();++w){
3113 col_id_set this_pred_cids;
3114 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
3115 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3116 if(unpacked_cids.count( (*csi) ) == 0){
3117 int tblref = (*csi).tblvar_ref;
3118 int schref = (*csi).schema_ref;
3119 string field = (*csi).field;
3120 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3121 unpacked_cids.insert( (*csi) );
3126 // Sort R preds based on cost.
3128 vector<cnf_elem *> tmp_wh;
3129 for(w=0;w<r_filt.size();++w){
3130 compute_cnf_cost(r_filt[w],Ext_fcns);
3131 tmp_wh.push_back(r_filt[w]);
3135 sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
3137 // WARNING! the constant 20 below is a wild-ass guess.
3139 for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++);
3141 // Test the cheap filters on R.
3144 // Now generate the predicates.
3145 for(w=0;w<cheap_rpos;++w){
3146 sprintf(tmpstr,"//\t\tCheap R predicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
3149 // Find partial fcns ref'd in this cnf element
3151 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
3152 // Since set<..> is a "Sorted Associative Container",
3153 // we can walk through it in sorted order by walking from
3154 // begin() to end(). (and the partial fcns must be
3155 // evaluated in this order).
3156 set<int>::iterator si;
3157 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3158 if(fcn_ref_cnt[(*si)] > 1){
3159 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3161 if(is_partial_fcn[(*si)]){
3162 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3163 ret += "\t\tif(retval) goto end;\n";
3165 if(fcn_ref_cnt[(*si)] > 1){
3166 if(!is_partial_fcn[(*si)]){
3167 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3169 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3174 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
3178 ret += "\n\n/*\t\t (no cheap R predicate to test)\t*/\n\n";
3181 ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
3182 map<string, cnf_elem *> h_eq = ((watch_join_qpn *)fs)-> hash_eq;
3183 vector<string> kflds = ((watch_join_qpn *)fs)->key_flds;
3184 for(w=0;w<kflds.size();++w){
3185 string kfld = kflds[w];
3186 col_id_set this_pred_cids;
3187 gather_pr_col_ids(h_eq[kfld]->pr, this_pred_cids, gb_tbl);
3188 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3189 if(unpacked_cids.count( (*csi) ) == 0){
3190 int tblref = (*csi).tblvar_ref;
3191 int schref = (*csi).schema_ref;
3192 string field = (*csi).field;
3193 if(tblref==0) // LHS from packet, don't unpack the RHS
3194 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3195 unpacked_cids.insert( (*csi) );
3201 ret += "\n// Do the join\n\n";
3202 ret += "\n// (ensure that the watchtable is fresh)\n";
3203 ret += "\tif(t->ux_time >= "+tgt+".next_refresh){\n";
3204 ret += "\t\treload_watchlist__"+wl_schema+"();\n";
3205 ret += "\t\t"+tgt+".next_refresh = t->ux_time+"+tgt+".refresh_interval;\n";
3209 for(p=0;p<fs->key_flds.size();++p){
3210 string kfld = fs->key_flds[p];
3211 ret += "\tr_equijoin_"+kfld+" = "+generate_se_code(fs->hash_eq[kfld]->pr->get_left_se(),schema)+";\n";
3215 // Passed the cheap pred, now test the join with S.
3216 ret += "\tbucket=0;\n";
3217 ret += "\thash=0;\n";
3218 for(p=0;p<fs->key_flds.size();++p){
3219 string kfld = fs->key_flds[p];
3221 " hash ^= (("+hash_nums[p%NRANDS]+" * lfta_"+
3222 fs->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_type_str()+
3223 +"_to_hash(r_equijoin_"+kfld+")));\n";
3225 ret += "\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n";
3227 ret += "\t\trec = "+tgt+".ht[bucket];\n";
3228 ret += "\t\twhile(rec!=NULL){\n";
3229 ret += "\t\t\tif(hash==rec->hashval){\n";
3230 ret += "\t\t\t\tif(";
3231 for(p=0;p<fs->key_flds.size();++p){
3232 string kfld = fs->key_flds[p];
3233 if(p>0) ret += " && ";
3234 data_type *hdt = fs->hash_eq[kfld]->pr->get_right_se()->get_data_type();
3235 string lhs_op = "r_equijoin_"+kfld;
3236 string rhs_op = "rec->"+kfld;
3237 ret += generate_equality_test(lhs_op,rhs_op,hdt);
3240 ret += "\t\t\t\t\tbreak;\n";
3242 ret += "\t\t\trec=rec->next;\n";
3244 ret += "\t\tif(rec==NULL)\n";
3245 ret += "\t\t\tgoto end;\n";
3247 ret += "\n/*\tPassed the hash lookup clause, unpack the other predicate fields. */\n";
3248 for(w=0;w<where.size();++w){
3249 col_id_set this_pred_cids;
3250 gather_pr_col_ids(where[w]->pr, this_pred_cids, gb_tbl);
3251 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3252 if(unpacked_cids.count( (*csi) ) == 0){
3253 int tblref = (*csi).tblvar_ref;
3254 int schref = (*csi).schema_ref;
3255 string field = (*csi).field;
3256 if(tblref==0) // LHS from packet
3257 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3258 else // RHS from hash bucket
3259 ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n";
3260 unpacked_cids.insert( (*csi) );
3266 // Test the expensive filters on R.
3267 // TODO Should merge this with other predicates and eval in order
3268 // of cost - see the fj code.
3269 // TODO join and postfilter predicates haven't been costed yet.
3270 if(cheap_rpos < r_filt.size()){
3272 // Now generate the predicates.
3273 for(w=cheap_rpos;w<r_filt.size();++w){
3274 sprintf(tmpstr,"//\t\tExpensive R predicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
3277 // Find partial fcns ref'd in this cnf element
3279 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
3280 // Since set<..> is a "Sorted Associative Container",
3281 // we can walk through it in sorted order by walking from
3282 // begin() to end(). (and the partial fcns must be
3283 // evaluated in this order).
3284 set<int>::iterator si;
3285 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3286 if(fcn_ref_cnt[(*si)] > 1){
3287 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3289 if(is_partial_fcn[(*si)]){
3290 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3291 ret += "\t\tif(retval) goto end;\n";
3293 if(fcn_ref_cnt[(*si)] > 1){
3294 if(!is_partial_fcn[(*si)]){
3295 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3297 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3302 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
3306 ret += "\n\n/*\t\t (no expensive R predicate to test)\t*/\n\n";
3309 // TODO sort the additional predicates by cost
3312 for(w=0;w<fs->pred_t1.size();++w){
3313 sprintf(tmpstr,"//\t\tS Predicate clause %d.(cost %d)\n",w,fs->pred_t1[w]->cost);
3316 // Find partial fcns ref'd in this cnf element
3318 collect_partial_fcns_pr(fs->pred_t1[w]->pr, pfcn_refs);
3319 // Since set<..> is a "Sorted Associative Container",
3320 // we can walk through it in sorted order by walking from
3321 // begin() to end(). (and the partial fcns must be
3322 // evaluated in this order).
3323 set<int>::iterator si;
3324 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3325 if(fcn_ref_cnt[(*si)] > 1){
3326 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3328 if(is_partial_fcn[(*si)]){
3329 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3330 ret += "\t\tif(retval) goto end;\n";
3332 if(fcn_ref_cnt[(*si)] > 1){
3333 if(!is_partial_fcn[(*si)]){
3334 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3336 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3341 ret += "\tif( !("+generate_predicate_code(fs->pred_t1[w]->pr,schema)+
3346 for(w=0;w<fs->join_filter.size();++w){
3347 sprintf(tmpstr,"//\t\tJoin Predicate clause %d.(cost %d)\n",w,fs->join_filter[w]->cost);
3350 // Find partial fcns ref'd in this cnf element
3352 collect_partial_fcns_pr(fs->join_filter[w]->pr, pfcn_refs);
3353 // Since set<..> is a "Sorted Associative Container",
3354 // we can walk through it in sorted order by walking from
3355 // begin() to end(). (and the partial fcns must be
3356 // evaluated in this order).
3357 set<int>::iterator si;
3358 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3359 if(fcn_ref_cnt[(*si)] > 1){
3360 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3362 if(is_partial_fcn[(*si)]){
3363 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3364 ret += "\t\tif(retval) goto end;\n";
3366 if(fcn_ref_cnt[(*si)] > 1){
3367 if(!is_partial_fcn[(*si)]){
3368 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3370 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3375 ret += "\tif( !("+generate_predicate_code(fs->join_filter[w]->pr,schema)+
3380 for(w=0;w<fs->postfilter.size();++w){
3381 sprintf(tmpstr,"//\t\tpostfilter Predicate clause %d.(cost %d)\n",w,fs->postfilter[w]->cost);
3384 // Find partial fcns ref'd in this cnf element
3386 collect_partial_fcns_pr(fs->postfilter[w]->pr, pfcn_refs);
3387 // Since set<..> is a "Sorted Associative Container",
3388 // we can walk through it in sorted order by walking from
3389 // begin() to end(). (and the partial fcns must be
3390 // evaluated in this order).
3391 set<int>::iterator si;
3392 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3393 if(fcn_ref_cnt[(*si)] > 1){
3394 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3396 if(is_partial_fcn[(*si)]){
3397 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3398 ret += "\t\tif(retval) goto end;\n";
3400 if(fcn_ref_cnt[(*si)] > 1){
3401 if(!is_partial_fcn[(*si)]){
3402 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3404 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3409 ret += "\tif( !("+generate_predicate_code(fs->postfilter[w]->pr,schema)+
3415 /////////////// post the tuple
3417 // test passed : create the tuple, then assign to it.
3418 ret += "/*\t\tCreate and post the tuple\t*/\n";
3421 for(s=0;s<sl_list.size();++s){
3422 col_id_set this_se_cids;
3423 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
3424 for(auto csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
3425 if(unpacked_cids.count( (*csi) ) == 0){
3426 int tblref = (*csi).tblvar_ref;
3427 int schref = (*csi).schema_ref;
3428 string field = (*csi).field;
3429 if(tblref==0) // LHS from packet
3430 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3431 else // RHS from hash bucket
3432 ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n";
3433 unpacked_cids.insert( (*csi) );
3439 // Unpack partial fcns ref'd by the select clause.
3440 // Its a kind of a WHERE clause ...
3441 for(p=sl_fcns_start;p<sl_fcns_end;p++){
3442 if(fcn_ref_cnt[p] > 1){
3443 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
3445 if(is_partial_fcn[p]){
3446 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3447 ret += "\tif(retval) goto end;\n";
3449 if(fcn_ref_cnt[p] > 1){
3450 if(!is_partial_fcn[p]){
3451 ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
3453 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
3458 // increment the counter of accepted tuples
3459 ret += "\n\t#ifdef LFTA_STATS\n";
3460 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3461 ret += "\t#endif\n\n";
3463 // First, compute the size of the tuple.
3465 // Unpack any BUFFER type selections into temporaries
3466 // so that I can compute their size and not have
3467 // to recompute their value during tuple packing.
3468 // I can use regular assignment here because
3469 // these temporaries are non-persistent.
3471 for(s=0;s<sl_list.size();s++){
3472 data_type *sdt = sl_list[s]->get_data_type();
3473 if(sdt->is_buffer_type()){
3474 sprintf(tmpstr,"\tselvar_%d = ",s);
3476 ret += generate_se_code(sl_list[s],schema);
3482 // The size of the tuple is the size of the tuple struct plus the
3483 // size of the buffers to be copied in.
3485 ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
3486 for(s=0;s<sl_list.size();s++){
3487 data_type *sdt = sl_list[s]->get_data_type();
3488 if(sdt->is_buffer_type()){
3489 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
3496 ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
3497 ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
3499 // Test passed, make assignments to the tuple.
3501 ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
3503 // Mark tuple as REGULAR_TUPLE
3504 ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
3507 for(s=0;s<sl_list.size();s++){
3508 data_type *sdt = sl_list[s]->get_data_type();
3509 if(sdt->is_buffer_type()){
3510 sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
3512 sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
3515 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
3517 // if(sdt->needs_hn_translation())
3518 // ret += sdt->hton_translation() +"( ";
3519 ret += generate_se_code(sl_list[s],schema);
3520 // if(sdt->needs_hn_translation())
3528 ret += "\tpost_tuple(tuple);\n";
3530 // Increment the counter of posted tuples
3531 ret += "\n\t#ifdef LFTA_STATS\n";
3532 ret += "\n\tt->out_tuple_cnt++;\n\n";
3533 ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
3534 ret += "\t#endif\n\n";
3540 string generate_aggr_accept_body(qp_node *fs,string node_name,table_list *schema, string &temporal_flush){
3544 ////////////// Processing for aggregtion query
3546 // First, search for a match. Start by unpacking the group-by attributes.
3548 // One complication : if a real-time aggregate flush occurs,
3549 // the GB attr has already been calculated. So don't compute
3550 // it again if 1) its temporal and 2) it will be computed in the
3551 // agggregate flush code.
3553 // Unpack the partial fcns ref'd by the gb's and the aggr defs.
3554 for(p=gb_fcns_start;p<gb_fcns_end;p++){
3555 if(is_partial_fcn[p]){
3556 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3557 ret += "\tif(retval) goto end;\n";
3560 for(p=ag_fcns_start;p<ag_fcns_end;p++){
3561 if(is_partial_fcn[p]){
3562 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3563 ret += "\tif(retval) goto end;\n";
3567 // increment the counter of accepted tuples
3568 ret += "\n\t#ifdef LFTA_STATS\n";
3569 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3570 ret += "\t#endif\n\n";
3572 ret += "/*\t\tTest if the group is in the hash table \t*/\n";
3573 // Compute the values of the group-by variables.
3574 for(g=0;g<gb_tbl->size();g++){
3575 data_type *gdt = gb_tbl->get_data_type(g);
3576 if((! gdt->is_temporal()) || temporal_flush == ""){
3578 if(gdt->is_buffer_type()){
3579 // NOTE : if the SE defining the gb is anything
3580 // other than a ref to a variable, this will generate
3581 // illegal code. To be resolved with Spatch.
3582 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
3583 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
3585 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
3586 gdt->get_buffer_assign_copy().c_str(), g, g);
3588 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
3595 // A quick aside : if any of the GB attrs are temporal,
3596 // test for change and flush if any change occurred.
3597 // We've already computed the flush code,
3598 // Put it here if this is not a real time query.
3599 // We've already unpacked all column refs, so no need to
3600 // do it again here.
3602 string rt_level = fs->get_val_of_def("real_time");
3603 if(rt_level == "" && temporal_flush != ""){
3604 ret += temporal_flush;
3607 // Compute the hash bucket
3608 if(gb_tbl->size() > 0){
3609 ret += "\thashval = ";\
3610 for(g=0;g<gb_tbl->size();g++){
3611 if(g>0) ret += " ^ ";
3612 data_type *gdt = gb_tbl->get_data_type(g);
3613 if(gdt->is_buffer_type()){
3614 sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
3615 gdt->get_type_str().c_str(), g);
3617 sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
3618 gdt->get_type_str().c_str(), g);
3623 ret += "\thash2 = ((hashval * "+hash_nums[g%NRANDS]+") >> 32) & SLOT_HASH_BITS;\n";
3624 ret+="\tprobe = (hashval >> 32) & (t->max_aggrs-1);\n";
3626 ret+="\tprobe = 0;\n";
3627 ret+="\thash2 = 0;\n\n";
3630 // Does the lfta reference a udaf?
3631 bool has_udaf = false;
3632 for(a=0;a<aggr_tbl->size();a++){
3633 if(! aggr_tbl->is_builtin(a)) has_udaf = true;
3636 // Scan for a match, or alternatively the best slot.
3637 // Currently, hardcode 5 tests.
3639 " gen_val = t->generation & SLOT_GEN_BITS;\n"
3640 " match_found = 0;\n"
3641 " best_slot = probe;\n"
3642 " for(i=0;i<5 && match_found == 0;i++){\n"
3643 " if((t->aggr_table_hashmap[probe] & SLOT_FILLED) && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_HASH_BITS) == hash2 ){\n"
3645 if(gb_tbl->size()>0){
3646 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
3648 string rhs_op, lhs_op;
3649 for(g=0;g<gb_tbl->size();g++){
3650 if(g>0) ret += " && ";
3652 sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
3653 sprintf(tmpstr,"t->aggr_table[probe].gb_var%d",g); rhs_op = tmpstr;
3654 ret += generate_equality_test(lhs_op,rhs_op,gb_tbl->get_data_type(g));
3659 " match_found = 1;\n"
3660 " best_slot = probe;\n"
3663 "// Rate slots in case no match found: prefer empty, then full but old slots\n"
3664 " if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3665 " if((t->aggr_table_hashmap[probe] & SLOT_FILLED)==0)\n"
3666 " best_slot = probe;\n"
3668 " if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) != gen_val){\n"
3669 " best_slot = probe;\n"
3673 " if(probe >= t->max_aggrs)\n"
3676 " if(match_found){\n"
3678 ret += generate_gb_update(node_name, schema, "best_slot",has_udaf);
3681 " if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3683 printf("sgah_qpn name is %s, disorder is %d\n",fs->node_name.c_str(),((sgah_qpn *)fs)->lfta_disorder);
3684 if(((sgah_qpn *)fs)->lfta_disorder <= 1){
3686 " if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS)==gen_val){\n"
3688 bool first_g = true;
3689 for(int g=0;g<gb_tbl->size();g++){
3690 data_type *gdt = gb_tbl->get_data_type(g);
3691 if(gdt->is_temporal()){
3692 if(first_g) first_g = false; else ret+=" + ";
3693 ret += "(gb_attr_"+int_to_string(g)+" - t->aggr_table[best_slot].gb_var"+int_to_string(g)+")";
3696 ret += ") == 0 ){\n";
3699 " fta_aggr_flush_old_"+ node_name+"(f,t->max_aggrs);\n"
3705 ret += generate_tuple_from_aggr(node_name,schema,"best_slot");
3707 "\t\t\t#ifdef LFTA_STATS\n"
3708 "\t\t\tif((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val)\n"
3709 "\t\t\t\tt->collision_cnt++;\n\n"
3713 ret += generate_init_group(schema,"best_slot");
3723 string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, bool is_wj, set<unsigned int> &s_pids){
3725 string ret="static gs_retval_t accept_packet_"+node_name+
3726 "(struct FTA *f, FTAID * ftaid, void *pkt, gs_int32_t sz){\n";
3727 ret += "\tstruct packet *p = (struct packet *)pkt;\n";
3731 // Define all of the variables needed by this
3735 // Gather all column references, need to define unpacking variables.
3738 col_id_set::iterator csi;
3740 // If its a filter join, rebind all colrefs
3741 // to the first range var, to avoid double unpacking.
3744 for(w=0;w<where.size();++w)
3745 reset_pr_col_ids_tblvars(where[w]->pr, gb_tbl);
3746 for(s=0;s<sl_list.size();s++)
3747 reset_se_col_ids_tblvars(sl_list[s], gb_tbl);
3750 for(w=0;w<where.size();++w){
3751 if(is_wj || is_fj || s_pids.count(w) == 0)
3752 gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
3754 for(s=0;s<sl_list.size();s++){
3755 gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
3760 for(g=0;g<gb_tbl->size();g++)
3761 gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
3764 // Variables for unpacking attributes.
3765 ret += "/*\t\tVariables for unpacking attributes\t*/\n";
3766 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
3767 int schref = (*csi).schema_ref;
3768 int tblref = (*csi).tblvar_ref;
3769 string field = (*csi).field;
3770 data_type dt(schema->get_type_name(schref,field));
3771 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
3772 field.c_str(), tblref);
3778 // Variables that are always needed
3779 ret += "/*\t\tVariables which are always needed\t*/\n";
3780 ret += "\tgs_retval_t retval;\n";
3781 ret += "\tgs_int32_t tuple_size, tuple_pos, lfta_bailout;\n";
3782 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
3784 ret+="\tstruct "+generate_fta_name(node_name)+" *t = (struct "+generate_fta_name(node_name)+"*) f;\n\n";
3787 // Variables needed for aggregation queries.
3789 ret += "\n/*\t\tVariables for aggregation\t*/\n";
3790 ret+="\tunsigned int i, probe;\n";
3791 ret+="\tunsigned int gen_val, match_found, best_slot;\n";
3792 ret+="\tgs_uint64_t hashval, hash2;\n";
3793 // Variables for storing group-by attribute values.
3794 if(gb_tbl->size() > 0)
3795 ret += "/*\t\tGroup-by attributes\t*/\n";
3796 for(g=0;g<gb_tbl->size();g++){
3797 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3799 data_type *gdt = gb_tbl->get_data_type(g);
3800 if(gdt->is_buffer_type()){
3801 sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3806 // Temporaries for min/max
3807 string aggr_tmp_str = "";
3808 for(a=0;a<aggr_tbl->size();a++){
3809 string aggr_op = aggr_tbl->get_op(a);
3810 if(aggr_op == "MIN" || aggr_op == "MAX"){
3811 sprintf(tmpstr,"\t%s aggr_tmp_%d;\n",aggr_tbl->get_data_type(a)->get_cvar_type().c_str(),a);
3812 aggr_tmp_str.append(tmpstr);
3815 if(aggr_tmp_str != ""){
3816 ret += "/*\t\tTemp vars for BUFFER aggregates\t*/\n";
3817 ret += aggr_tmp_str;
3820 // Variables for udaf output temporaries
3821 bool no_udaf = true;
3822 for(a=0;a<aggr_tbl->size();a++){
3823 if(! aggr_tbl->is_builtin(a)){
3825 ret+="/*\t\tUDAF output vars.\t*/\n";
3828 int afcn_id = aggr_tbl->get_fcn_id(a);
3829 data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
3830 sprintf(tmpstr,"udaf_ret%d", a);
3831 ret+="\t"+adt->make_cvar(tmpstr)+";\n";
3836 // Variables needed for a filter join query
3837 if(fs->node_type() == "filter_join"){
3838 filter_join_qpn *fjq = (filter_join_qpn *)fs;
3839 bool uses_bloom = fjq->use_bloom;
3840 ret += "/*\t\tJoin fields\t*/\n";
3841 for(g=0;g<fjq->hash_eq.size();g++){
3842 sprintf(tmpstr,"\t%s s_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_right_se()->get_data_type()->get_cvar_type().c_str(),g);
3844 sprintf(tmpstr,"\t%s r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g);
3849 " /* Variables for fj bloom filter */ \n"
3850 "\tunsigned int i=0,j=0,k=0, b, bf_clean = 0, tmp_i, found; \n"
3851 "\tunsigned int bucket, bucket0, bucket1, bucket2;\n"
3852 "\tlong long int curr_fj_ts;\n"
3853 "\tlong long int curr_bin, the_bin;\n"
3858 " /* Variables for fj join table */ \n"
3859 "\tunsigned int i, bucket, found; \n"
3860 "\tunsigned int bucket1, the_bucket;\n"
3861 " long long int curr_fj_ts;\n"
3868 if(fs->node_type() == "watch_join"){
3869 watch_join_qpn *wlq = (watch_join_qpn *)fs;
3870 ret += "/*\t\tJoin fields\t*/\n";
3871 for(int k=0;k<wlq->key_flds.size(); ++k){
3872 string kfld = wlq->key_flds[k];
3873 ret += "\t"+wlq->hash_eq[kfld]->pr->get_right_se()->get_data_type()->get_cvar_type()+" s_equijoin_"+kfld+";\n";
3874 ret += "\t"+wlq->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_cvar_type()+" r_equijoin_"+kfld+";\n";
3877 " /* Variables for wl join table */ \n"
3878 "\tunsigned int i, bucket;\n"
3879 "\tunsigned long long int hash; \n";
3880 string wl_schema = wlq->from[1]->get_schema_name();
3881 string wl_elem_str = generate_watchlist_element_name(wl_schema);
3882 ret += "\tstruct "+wl_elem_str+" *rec = NULL;\n";
3888 // Variables needed to store selected attributes of BUFFER type
3889 // temporarily, in order to compute their size for storage
3890 // in an output tuple.
3892 string select_var_defs = "";
3893 for(int s=0;s<sl_list.size();s++){
3894 data_type *sdt = sl_list[s]->get_data_type();
3895 if(sdt->is_buffer_type()){
3896 sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
3897 select_var_defs.append(tmpstr);
3900 if(select_var_defs != ""){
3901 ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
3902 ret += select_var_defs;
3905 // Variables to store results of partial functions.
3907 if(partial_fcns.size()>0){
3908 ret += "/*\t\tVariables for storing results of partial functions. \t*/\n";
3909 for(p=0;p<partial_fcns.size();++p){
3910 if(is_partial_fcn[p] || (!is_aggr_query && fcn_ref_cnt[p] >1)){
3911 sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
3912 partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
3914 if(!is_aggr_query && fcn_ref_cnt[p] >1){
3915 ret += "\tint fcn_ref_cnt_"+int_to_string(p)+" = 0;\n";
3920 if(is_aggr_query) ret += "\tint unpack_failed = 0;\n";
3924 // variable to hold packet struct //
3926 ret += "\t struct "+node_name+"_input_struct *"+node_name+"_input_struct_var;\n";
3930 ret += "\t#ifdef LFTA_STATS\n";
3931 // variable to store counter of cpu cycles spend in accept_tuple
3932 ret += "\tgs_uint64_t start_cycle = rdtsc();\n";
3933 // increment counter of received tuples
3934 ret += "\tt->in_tuple_cnt++;\n";
3935 ret += "\t#endif\n";
3938 // -------------------------------------------------
3939 // If the packet is "packet", test if its for this lfta,
3940 // and if so load it into its struct
3943 ret+="\n/* packed tuple : test and load. \t*/\n";
3944 ret+="\t"+node_name+"_input_struct_var = (struct "+node_name+"_input_struct *) pkt;\n";
3945 ret+="\tif("+node_name+"_input_struct_var->__lfta_id_fm_nic__ != "+int_to_string(global_id) + ")\n";
3946 ret+="\t\tgoto end;\n\n";
3951 col_id_set unpacked_cids; // Keep track of the cols that have been unpacked.
3953 string temporal_flush;
3955 ret += generate_aggr_accept_prelim(fs, node_name, schema, unpacked_cids, temporal_flush);
3956 else { // non-aggregation operators
3958 // Unpack all the temporal attributes referenced in select clause
3959 // and update the last value of the attribute
3960 col_id_set temp_cids; // col ids of temp attributes in select clause
3962 for(s=0;s<sl_list.size();s++){
3963 data_type *sdt = sl_list[s]->get_data_type();
3964 if (sdt->is_temporal()) {
3965 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
3968 // If this is a filter join,
3969 // ensure that the temporal range field is unpacked.
3971 col_id window_var_cid(((filter_join_qpn *)fs)->temporal_var);
3972 if(temp_cids.count(window_var_cid)==0)
3973 temp_cids.insert(window_var_cid);
3976 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
3977 if(unpacked_cids.count((*csi)) == 0){
3978 int tblref = (*csi).tblvar_ref;
3979 int schref = (*csi).schema_ref;
3980 string field = (*csi).field;
3981 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3982 sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
3985 unpacked_cids.insert( (*csi) );
3991 vector<cnf_elem *> filter = fs->get_filter_clause();
3992 // Test the filter predicate (some query types have additional preds).
3993 if(filter.size() > 0 && !is_wj){ // watchlist join does specialized processing
3995 // Sort by evaluation cost.
3996 // First, estimate evaluation costs
3997 // Eliminate predicates covered by the prefilter (those in s_pids).
3998 // I need to do it before the sort becuase the indices refer
3999 // to the position in the unsorted list./
4000 vector<cnf_elem *> tmp_wh;
4001 for(w=0;w<filter.size();++w){
4002 if(s_pids.count(w) == 0){
4003 compute_cnf_cost(filter[w],Ext_fcns);
4004 tmp_wh.push_back(filter[w]);
4009 sort(filter.begin(), filter.end(), compare_cnf_cost());
4011 // Now generate the predicates.
4012 for(w=0;w<filter.size();++w){
4013 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,filter[w]->cost);
4015 // Find the set of variables accessed in this CNF elem,
4016 // but in no previous element.
4017 col_id_set this_pred_cids;
4018 gather_pr_col_ids(filter[w]->pr, this_pred_cids, gb_tbl);
4019 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4020 if(unpacked_cids.count( (*csi) ) == 0){
4021 int tblref = (*csi).tblvar_ref;
4022 int schref = (*csi).schema_ref;
4023 string field = (*csi).field;
4024 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4025 unpacked_cids.insert( (*csi) );
4028 // Find partial fcns ref'd in this cnf element
4030 collect_partial_fcns_pr(filter[w]->pr, pfcn_refs);
4031 // Since set<..> is a "Sorted Associative Container",
4032 // we can walk through it in sorted order by walking from
4033 // begin() to end(). (and the partial fcns must be
4034 // evaluated in this order).
4035 set<int>::iterator si;
4036 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
4037 if(fcn_ref_cnt[(*si)] > 1){
4038 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
4040 if(is_partial_fcn[(*si)]){
4041 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
4042 ret += "\t\tif(retval) goto end;\n";
4044 if(fcn_ref_cnt[(*si)] > 1){
4045 if(!is_partial_fcn[(*si)]){
4046 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
4048 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
4053 ret += "\tif( !("+generate_predicate_code(filter[w]->pr,schema)+
4057 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
4061 // We've passed the WHERE clause,
4062 // unpack the remainder of the accessed fields.
4064 ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
4065 vector<cnf_elem *> h_eq = ((filter_join_qpn *)fs)-> hash_eq;
4066 for(w=0;w<h_eq.size();++w){
4067 col_id_set this_pred_cids;
4068 gather_pr_col_ids(h_eq[w]->pr, this_pred_cids, gb_tbl);
4069 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4070 if(unpacked_cids.count( (*csi) ) == 0){
4071 int tblref = (*csi).tblvar_ref;
4072 int schref = (*csi).schema_ref;
4073 string field = (*csi).field;
4074 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4075 unpacked_cids.insert( (*csi) );
4079 }else if(is_wj){ // STOPPED HERE move this to wj main body
4081 ret += "\n//\tPassed the WHERE clause, unpack the hash fields. \n";
4082 map<string, cnf_elem *> h_eq = ((watch_join_qpn *)fs)-> hash_eq;
4083 vector<string> kflds = ((watch_join_qpn *)fs)->key_flds;
4084 for(w=0;w<kflds.size();++w){
4085 string kfld = kflds[w];
4086 col_id_set this_pred_cids;
4087 gather_pr_col_ids(h_eq[kfld]->pr, this_pred_cids, gb_tbl);
4088 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4089 if(unpacked_cids.count( (*csi) ) == 0){
4090 int tblref = (*csi).tblvar_ref;
4091 int schref = (*csi).schema_ref;
4092 string field = (*csi).field;
4093 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4094 unpacked_cids.insert( (*csi) );
4100 ret += "\n/*\tPassed the WHERE clause, unpack the rest of the accessed fields. */\n";
4102 for(csi=cid_set.begin();csi!=cid_set.end();++csi){
4103 if(unpacked_cids.count( (*csi) ) == 0){
4104 int schref = (*csi).schema_ref;
4105 int tblref = (*csi).tblvar_ref;
4106 string field = (*csi).field;
4107 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4108 unpacked_cids.insert( (*csi) );
4115 ////////////////// After this, the query types
4116 ////////////////// are processed differently.
4118 if(!is_aggr_query && !is_fj & !is_wj)
4119 ret += generate_sel_accept_body(fs, node_name, schema);
4120 else if(is_aggr_query)
4121 ret += generate_aggr_accept_body(fs, node_name, schema, temporal_flush);
4124 ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
4126 ret += generate_wj_accept_body((watch_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
4133 ret += "\n\tend:\n";
4134 ret += "\t#ifdef LFTA_STATS\n";
4135 ret+= "\tt->cycle_cnt += rdtsc() - start_cycle;\n";
4136 ret += "\t#endif\n";
4137 ret += "\n\treturn 1;\n}\n\n";
4143 string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, bool is_aggr_query, bool is_fj, bool uses_bloom){
4146 string ret = "struct FTA * "+generate_alloc_name(node_name) +
4147 "(struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command, gs_int32_t sz, void *value){\n";
4149 ret+="\tstruct "+generate_fta_name(node_name)+"* f;\n";
4152 ret+="\tif((f=fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n";
4154 // assign a streamid to fta instance
4155 ret+="\t/* assign a streamid */\n";
4156 ret+="\tf->f.ftaid = ftaid;\n";
4157 ret+="\tf->f.ftaid.streamid = (gs_p_t)f;\n";
4158 ret+="\tgslog(LOG_INFO,\"Lfta "+node_name+" has FTAID {ip=%u,port=%u,index=%u,streamid=%u}\\n\",f->f.ftaid.ip,f->f.ftaid.port,f->f.ftaid.index,f->f.ftaid.streamid);\n";
4161 ret += "\tf->n_aggrs = 0;\n";
4163 ret += "\tf->max_aggrs = ";
4165 // Computing the number of aggregate blocks is a little
4166 // tricky. If there are no GB attrs, or if all GB attrs
4167 // are temporal, then use a single aggregate block, else
4168 // use a default value (10). A user specification overrides
4170 bool single_group = true;
4171 for(g=0;g<gb_tbl->size();g++){
4172 data_type *gdt = gb_tbl->get_data_type(g);
4173 if(! gdt->is_temporal() ){
4174 single_group = false;
4177 string max_aggr_str = fs->get_val_of_def("aggregate_slots");
4178 int max_aggr_i = atoi(max_aggr_str.c_str());
4179 if(max_aggr_i <= 0){
4183 ret += int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
4185 unsigned int naggrs = 1; // make it power of 2
4186 unsigned int nones = 0;
4190 naggrs = naggrs << 1;
4191 max_aggr_i = max_aggr_i >> 1;
4193 if(nones==1) // in case it was already a power of 2.
4195 ret += int_to_string(naggrs);
4199 ret+="\tif ((f->aggr_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n";
4200 ret+="\t\treturn(0);\n";
4202 // ret+="/* compute how many integers we need to store the hashmap */\n";
4203 // ret+="\tf->bitmap_size = (f->max_aggrs % (sizeof(gs_uint32_t) * 4)) ? (f->max_aggrs / (sizeof(gs_uint32_t) * 4) + 1) : (f->max_aggrs / (sizeof(gs_uint32_t) * 4));\n\n";
4204 ret+="\tif ((f->aggr_table_hashmap = sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n";
4205 ret+="\t\treturn(0);\n";
4207 ret+="/*\t\tfill bitmap with zero \t*/\n";
4208 ret+="\tfor (i = 0; i < f->max_aggrs; ++i)\n";
4209 ret+="\t\tf->aggr_table_hashmap[i] = 0;\n";
4210 ret+="\tf->generation=0;\n";
4211 ret+="\tf->flush_pos = f->max_aggrs;\n";
4213 ret += "\tf->flush_ctr = 0;\n";
4219 ret+="\tf->first_exec = 1;\n";
4220 unsigned int n_bloom = 11;
4221 string n_bloom_str = fs->get_val_of_def("num_bloom");
4222 int tmp_n_bloom = atoi(n_bloom_str.c_str());
4224 n_bloom = tmp_n_bloom+1;
4226 unsigned int window_len = ((filter_join_qpn *)fs)->temporal_range;
4227 if(window_len < n_bloom){
4228 n_bloom = window_len+1;
4231 int bf_exp_size = 12; // base-2 log of number of bits
4232 string bloom_len_str = fs->get_val_of_def("bloom_size");
4233 int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
4234 if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
4235 bf_exp_size = tmp_bf_exp_size;
4237 int bf_bit_size = 1 << 12;
4238 int bf_byte_size = bf_bit_size / (8*sizeof(char));
4240 int bf_tot = n_bloom*bf_byte_size;
4241 ret+="\tif ((f->bf_table = sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n";
4242 ret+="\t\treturn(0);\n";
4245 " for(i=0;i<"+int_to_string(bf_tot)+";i++)\n"
4246 " f->bf_table[i] = 0;\n"
4249 unsigned int ht_size = 4096;
4250 string ht_size_s = fs->get_val_of_def("aggregate_slots");
4251 int tmp_ht_size = atoi(ht_size_s.c_str());
4252 if(tmp_ht_size > 1024){
4253 unsigned int hs = 1; // make it power of 2
4256 tmp_ht_size = tmp_ht_size >> 1;
4260 ret+="\tif ((f->join_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n";
4261 ret+="\t\treturn(0);\n";
4264 " for(i=0;i<"+int_to_string(ht_size)+";i++)\n"
4265 " f->join_table[i].ts = 0;\n"
4270 // Initialize the complex literals (which might be handles).
4272 for(cl=0;cl<complex_literals->size();cl++){
4273 literal_t *l = complex_literals->get_literal(cl);
4274 // sprintf(tmpstr,"\tf->complex_literal_%d = ",cl);
4275 // ret += tmpstr + l->to_C_code() + ";\n";
4276 sprintf(tmpstr,"&(f->complex_literal_%d)",cl);
4277 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
4282 // Initialize the last seen values of temporal attributes to min(max) value of
4283 // their respective type
4284 // Create places to hold the last values of temporal attributes referenced in select clause
4287 col_id_set temp_cids; // col ids of temp attributes in select clause
4290 col_id_set::iterator csi;
4292 for(s=0;s<sl_list.size();s++){
4293 data_type *sdt = sl_list[s]->get_data_type();
4294 if (sdt->is_temporal()) {
4295 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
4299 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
4300 int tblref = (*csi).tblvar_ref;
4301 int schref = (*csi).schema_ref;
4302 string field = (*csi).field;
4303 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
4304 if (dt.is_increasing()) {
4305 sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_min_literal().c_str());
4307 } else if (dt.is_decreasing()) {
4308 sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_max_literal().c_str());
4313 // initialize last seen values of temporal groubpy variables
4315 for(g=0;g<gb_tbl->size();g++){
4316 data_type *dt = gb_tbl->get_data_type(g);
4317 if(dt->is_temporal()){
4319 fprintf(stderr,"group by attribute %s is temporal, ",
4320 gb_tbl->get_name(g).c_str());
4322 if(dt->is_increasing()){
4323 sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_min_literal().c_str());
4325 sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_max_literal().c_str());
4332 ret += "\tf->f.alloc_fta="+generate_alloc_name(node_name)+";\n";
4333 ret+="\tf->f.free_fta=free_fta_"+node_name+";\n";
4334 ret+="\tf->f.control_fta=control_fta_"+node_name+";\n";
4335 ret+="\tf->f.accept_packet=accept_packet_"+node_name+";\n";
4336 ret+="\tf->f.clock_fta=clock_fta_"+node_name+";\n\n";
4338 // Initialize runtime stats
4339 ret+="\tf->in_tuple_cnt = 0;\n";
4340 ret+="\tf->out_tuple_cnt = 0;\n";
4341 ret+="\tf->out_tuple_sz = 0;\n";
4342 ret+="\tf->accepted_tuple_cnt = 0;\n";
4343 ret+="\tf->cycle_cnt = 0;\n";
4344 ret+="\tf->collision_cnt = 0;\n";
4345 ret+="\tf->eviction_cnt = 0;\n";
4346 ret+="\tf->sampling_rate = 1.0;\n";
4348 ret+="\tf->trace_id = 0;\n\n";
4349 if(param_tbl->size() > 0){
4351 "\tif(load_params_"+node_name+"(f, sz, value, 1)){\n"
4352 "#ifndef LFTA_IN_NIC\n"
4353 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small (%d). This query does not have valid parameters, bailing out.\\n\",sz);\n"
4361 // Register the pass-by-handle parameters
4363 for(ph=0;ph<param_handle_table.size();++ph){
4364 data_type pdt(param_handle_table[ph]->type_name);
4365 sprintf(tmpstr,"\tf->handle_param_%d = %s((struct FTA *)f,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
4366 switch(param_handle_table[ph]->val_type){
4369 if(pdt.is_buffer_type()) ret += "&(";
4370 sprintf(tmpstr,"f->complex_literal_%d",param_handle_table[ph]->complex_literal_idx);
4372 if(pdt.is_buffer_type()) ret += ")";
4376 // not complex, no constructor
4378 ret += param_handle_table[ph]->litval->to_C_code("") + ");\n";
4381 // query parameter handles are regstered/deregistered in the
4382 // load_params function.
4383 // ret += "t->param_"+param_handle_table[ph]->param_name;
4386 fprintf(stderr, "INTERNAL ERROR unknown case found when processing pass-by-handle parameter table.\n");
4391 ret += "\treturn (struct FTA *) f;\n";
4400 //////////////////////////////////////////////////////////////////
4402 string generate_lfta_block(qp_node *fs, table_list *schema, int gid,
4403 // map<string,string> &int_fcn_defs,
4404 ext_fcn_list *Ext_fcns, string &schema_embed_str, ifq_t *ifdb, nic_property *nicp, set<unsigned int> &s_pids){
4409 /////////////////////////////////////////////////////////////
4410 /// Do operator-generic processing, such as
4411 /// gathering the set of referenced columns,
4412 /// generating structures, etc.
4414 // Initialize globals to empty.
4415 gb_tbl = NULL; aggr_tbl = NULL;
4416 global_id = -1; nicprop = NULL;
4417 param_tbl = fs->get_param_tbl();
4418 sl_list.clear(); where.clear();
4419 partial_fcns.clear();
4420 fcn_ref_cnt.clear(); is_partial_fcn.clear();
4421 pred_class.clear(); pred_pos.clear();
4422 sl_fcns_start=sl_fcns_end=wh_fcns_start=wh_fcns_end=0;
4423 gb_fcns_start=gb_fcns_end=ag_fcns_start=ag_fcns_end=0;
4426 // Does the lfta read packed results from the NIC?
4427 nicprop = nicp; // load into global
4429 packed_return = false;
4430 if(nicp && nicp->option_exists("Return")){
4431 if(nicp->option_value("Return") == "Packed"){
4432 packed_return = true;
4434 fprintf(stderr,"Warning, nic option value of Return=%s is not recognized, ignoring\n",nicp->option_value("Return").c_str());
4439 // Extract data which defines the query.
4440 // complex literals gathered now.
4441 complex_literals = fs->get_cplx_lit_tbl(Ext_fcns);
4442 param_handle_table = fs->get_handle_param_tbl(Ext_fcns);
4443 string node_name = fs->get_node_name();
4444 bool is_fj = false, uses_bloom = false;
4446 bool is_watch_tbl = false;
4449 if(fs->node_type() == "spx_qpn"){
4450 is_aggr_query = false;
4451 spx_qpn *spx_node = (spx_qpn *)fs;
4452 sl_list = spx_node->get_select_se_list();
4453 where = spx_node->get_where_clause();
4457 if(fs->node_type() == "sgah_qpn"){
4458 is_aggr_query = true;
4459 sgah_qpn *sgah_node = (sgah_qpn *)fs;
4460 sl_list = sgah_node->get_select_se_list();
4461 where = sgah_node->get_where_clause();
4462 gb_tbl = sgah_node->get_gb_tbl();
4463 aggr_tbl = sgah_node->get_aggr_tbl();
4465 if((sgah_node->get_having_clause()).size() > 0){
4466 fprintf(stderr,"Warning in LFTA %s: HAVING clause will be ignored.\n", fs->get_node_name().c_str());
4469 if(fs->node_type() == "filter_join"){
4470 is_aggr_query = false;
4472 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
4473 sl_list = fj_node->get_select_se_list();
4474 where = fj_node->get_where_clause();
4475 uses_bloom = fj_node->use_bloom;
4479 if(fs->node_type() == "watch_join"){
4480 is_aggr_query = false;
4482 watch_join_qpn *wl_node = (watch_join_qpn *)fs;
4483 sl_list = wl_node->get_select_se_list();
4484 where = wl_node->get_where_clause();
4488 if(fs->node_type() == "watch_tbl_qpn"){
4489 is_aggr_query = false;
4490 is_watch_tbl = true;
4491 vector<scalarexp_t *> empty_sl_list;
4492 vector<cnf_elem *> empty_where;
4493 sl_list = empty_sl_list;
4494 where = empty_where;
4498 fprintf(stderr,"INTERNAL ERROR, unrecognized node type %s in generate_lfta_block\n", fs->node_type().c_str());
4502 // Build list of "partial functions", by clause.
4503 // NOTE : partial fcns are not handles well.
4504 // The act of searching for them associates the fcn call
4505 // in the SE with an index to an array. Refs to the
4506 // fcn value are replaced with refs to the variable they are
4507 // unpacked into. A more general tagging mechanism would be better.
4510 vector<bool> *pfunc_ptr = NULL;
4511 vector<int> *ref_cnt_ptr = NULL;
4512 if(!is_aggr_query){ // don't collect cacheable fcns on aggr query.
4513 ref_cnt_ptr = &fcn_ref_cnt;
4514 pfunc_ptr = &is_partial_fcn;
4518 for(i=0;i<sl_list.size();i++){
4519 find_partial_fcns(sl_list[i], &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
4521 wh_fcns_start = sl_fcns_end = partial_fcns.size();
4522 for(i=0;i<where.size();i++){
4523 find_partial_fcns_pr(where[i]->pr, &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
4525 gb_fcns_start = wh_fcns_end = partial_fcns.size();
4527 for(i=0;i<gb_tbl->size();i++){
4528 find_partial_fcns(gb_tbl->get_def(i), &partial_fcns, NULL, NULL, Ext_fcns);
4531 ag_fcns_start = gb_fcns_end = partial_fcns.size();
4532 if(aggr_tbl != NULL){
4533 for(i=0;i<aggr_tbl->size();i++){
4534 find_partial_fcns(aggr_tbl->get_aggr_se(i), NULL, NULL, &is_partial_fcn, Ext_fcns);
4537 ag_fcns_end = partial_fcns.size();
4539 // Fill up the is_partial_fcn and fcn_ref_cnt arrays.
4541 for(i=0; i<partial_fcns.size();i++){
4542 fcn_ref_cnt.push_back(1);
4543 is_partial_fcn.push_back(true);
4547 // Unmark non-partial expensive functions referenced only once.
4548 for(i=0; i<partial_fcns.size();i++){
4549 if(!is_partial_fcn[i] && fcn_ref_cnt[i] <= 1){
4550 partial_fcns[i]->set_partial_ref(-1);
4554 node_name = normalize_name(node_name);
4556 retval += generate_preamble(schema, /*int_fcn_defs,*/ node_name, schema_embed_str);
4558 if(packed_return){ // generate unpack struct
4559 vector<tablevar_t *> input_tbls = fs->get_input_tbls();
4560 int schref = input_tbls[0]->get_schema_ref();
4561 vector<string> refd_cols;
4562 for(s=0;s<sl_list.size();++s){
4563 gather_nicsafe_cols(sl_list[s],refd_cols, nicp, gb_tbl);
4565 for(p=0;p<where.size();++p){
4566 // I'm not disabling these preds ...
4567 gather_nicsafe_cols(where[p]->pr,refd_cols, nicp, gb_tbl);
4570 for(g=0;g<gb_tbl->size();++g){
4571 gather_nicsafe_cols(gb_tbl->get_def(g),refd_cols, nicp, gb_tbl);
4574 sort(refd_cols.begin(), refd_cols.end());
4575 retval += "struct "+node_name+"_input_struct{\n";
4576 retval += "\tint __lfta_id_fm_nic__;\n";
4578 for(vsi=0;vsi<refd_cols.size();++vsi){
4579 data_type dt(schema->get_type_name(schref,refd_cols[vsi]));
4580 retval+="\t"+dt.get_cvar_type()+" unpack_var_"+refd_cols[vsi]+";\n";
4586 /////////////////////////////////////////////////////
4587 // Common stuff unpacked, do some generation
4591 retval += generate_aggr_struct(node_name, gb_tbl, aggr_tbl);
4593 retval += generate_fj_struct((filter_join_qpn *)fs, node_name);
4595 retval += "\n\n// watchtable code here \n\n";
4596 watch_tbl_qpn *wl_node = (watch_tbl_qpn *)fs;
4597 retval += generate_watchlist_structs(node_name, wl_node->table_layout, wl_node->filename, wl_node->refresh_interval);
4598 retval += generate_watchlist_load(node_name, wl_node->table_layout, wl_node->key_flds);
4602 retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, is_wj, uses_bloom, schema);
4603 retval += generate_tuple_struct(node_name, sl_list) ;
4606 retval += generate_fta_flush(node_name, schema, Ext_fcns) ;
4607 if(param_tbl->size() > 0)
4608 retval += generate_fta_load_params(node_name) ;
4609 retval += generate_fta_free(node_name, is_aggr_query) ;
4610 retval += generate_fta_control(node_name, schema, is_aggr_query) ;
4611 retval += generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, is_wj, s_pids) ;
4613 /* extract the value of Time_Correlation from interface definition */
4617 vector<tablevar_t *> tvec = fs->get_input_tbls();
4618 vector<string> time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es);
4619 if (time_corr_vec.empty())
4620 time_corr = DEFAULT_TIME_CORR;
4622 time_corr = atoi(time_corr_vec[0].c_str());
4624 retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query, is_wj) );
4625 retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) );
4633 int compute_snap_len(qp_node *fs, table_list *schema){
4635 // Initialize global vars
4637 sl_list.clear(); where.clear();
4640 if(fs->node_type() == "watch_tbl_qpn"){
4644 if(fs->node_type() == "spx_qpn"){
4645 spx_qpn *spx_node = (spx_qpn *)fs;
4646 sl_list = spx_node->get_select_se_list();
4647 where = spx_node->get_where_clause();
4649 else if(fs->node_type() == "sgah_qpn"){
4650 sgah_qpn *sgah_node = (sgah_qpn *)fs;
4651 sl_list = sgah_node->get_select_se_list();
4652 where = sgah_node->get_where_clause();
4653 gb_tbl = sgah_node->get_gb_tbl();
4655 else if(fs->node_type() == "filter_join"){
4656 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
4657 sl_list = fj_node->get_select_se_list();
4658 where = fj_node->get_where_clause();
4660 else if(fs->node_type() == "watch_join"){
4661 watch_join_qpn *fj_node = (watch_join_qpn *)fs;
4662 sl_list = fj_node->get_select_se_list();
4663 where = fj_node->get_where_clause();
4665 fprintf(stderr,"INTERNAL ERROR, node type %s not recognized in compute_snap_len\n",fs->node_type().c_str());
4669 // Gather all column references, need to define unpacking variables.
4672 col_id_set::iterator csi;
4674 for(w=0;w<where.size();++w)
4675 gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
4676 for(s=0;s<sl_list.size();s++){
4677 gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
4682 for(g=0;g<gb_tbl->size();g++)
4683 gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
4686 // compute snap length
4689 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4690 int schref = (*csi).schema_ref;
4691 int tblref = (*csi).tblvar_ref;
4692 string field = (*csi).field;
4694 param_list *field_params = schema->get_modifier_list(schref, field);
4695 if(field_params->contains_key("snap_len")){
4696 string fld_snap_str = field_params->val_of("snap_len");
4698 if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
4699 if(fld_snap > snap_len) snap_len = fld_snap;
4702 fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
4707 if(n_snap == cid_set.size()){
4716 // Function which computes an optimal
4717 // set of unpacking functions.
4719 void find_optimal_unpack_fcns(col_id_set &upref_cids, table_list *Schema, map<col_id, string,lt_col_id> &ucol_fcn_map){
4720 map<string, int> pfcn_count;
4721 map<string, int>::iterator msii;
4722 col_id_set::iterator cisi;
4723 set<string>::iterator ssi;
4726 while(ucol_fcn_map.size() < upref_cids.size()){
4728 // Gather unpack functions referenced by unaccounted-for
4729 // columns, and increment their reference count.
4731 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
4732 if(ucol_fcn_map.count((*cisi)) == 0){
4733 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
4734 for(ssi=ufcns.begin();ssi!=ufcns.end();++ssi)
4735 pfcn_count[(*ssi)]++;
4739 // Get the lowest cost per field function.
4740 float min_cost = 0.0;
4741 string best_fcn = "";
4742 for(msii=pfcn_count.begin();msii!=pfcn_count.end();++msii){
4743 int fcost = Schema->get_ufcn_cost((*msii).first);
4745 fprintf(stderr,"CONFIGURATION ERROR, unpack function %s either has negative cost or is not defined.\n",(*msii).first.c_str());
4748 float this_cost = (1.0*fcost)/(*msii).second;
4749 if(msii == pfcn_count.begin() || this_cost < min_cost){
4750 min_cost = this_cost;
4751 best_fcn = (*msii).first;
4755 fprintf(stderr,"ERROR, could not find a best field unpqacking function.\n");
4759 // Assign this function to the unassigned fcns which use it.
4760 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
4761 if(ucol_fcn_map.count((*cisi)) == 0){
4762 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
4763 if(ufcns.count(best_fcn)>0)
4764 ucol_fcn_map[(*cisi)] = best_fcn;
4772 // Generate an initial test test for the lfta
4773 // Assume that the predicate references no external functions,
4774 // and especially no partial functions,
4775 // aggregates, internal functions.
4776 string generate_lfta_prefilter(vector<cnf_set *> &pred_list,
4777 col_id_set &temp_cids, table_list *Schema, ext_fcn_list *Ext_fcns,
4778 vector<col_id_set> &lfta_cols, vector<long long int> &lfta_sigs,
4779 vector<int> &lfta_snap_lens, string iface){
4780 col_id_set tmp_cid_set, cid_set,upref_cids,upall_cids;
4781 col_id_set::iterator csi;
4785 // Gather complex literals in the prefilter.
4786 cplx_lit_table *complex_literals = new cplx_lit_table();
4787 for(p=0;p<pred_list.size();++p){
4788 find_complex_literal_pr(pred_list[p]->pr,Ext_fcns, complex_literals);
4792 // Find the combinable predicates
4793 vector<predicate_t *> pr_list;
4794 for(p=0;p<pred_list.size();++p){
4795 find_combinable_preds(pred_list[p]->pr,&pr_list, Schema, Ext_fcns);
4798 // Analyze the combinable predicates to find the predicate classes.
4799 pred_class.clear(); // idx to equiv pred in equiv_list
4800 pred_pos.clear(); // idx to returned bitmask.
4801 vector<predicate_t *> equiv_list;
4802 vector<int> num_equiv;
4805 for(p=0;p<pr_list.size();++p){
4806 for(q=0;q<equiv_list.size();++q){
4807 if(is_equivalent_class_pred_base(equiv_list[q],pr_list[p],Schema,Ext_fcns))
4810 if(q == equiv_list.size()){ // no equiv : create new
4811 pred_class.push_back(equiv_list.size());
4812 equiv_list.push_back(pr_list[p]);
4813 pred_pos.push_back(0);
4814 num_equiv.push_back(1);
4816 }else{ // pr_list[p] is equivalent to pred q
4817 pred_class.push_back(q);
4818 pred_pos.push_back(num_equiv[q]);
4823 // Generate the variables which hold the common pred handles
4824 ret += "/*\t\tprefilter global vars.\t*/\n";
4825 for(q=0;q<equiv_list.size();++q){
4826 for(p=0;p<=(num_equiv[q]/32);++p){
4827 ret += "void *pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface+";\n";
4831 // Struct to hold prefilter complex literals
4832 ret += "struct prefilter_complex_lit_struct_"+iface+" {\n";
4833 if(complex_literals->size() == 0)
4834 ret += "\tint no_variable;\n";
4836 for(cl=0;cl<complex_literals->size();cl++){
4837 literal_t *l = complex_literals->get_literal(cl);
4838 data_type *dtl = new data_type( l->get_type() );
4839 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
4842 ret += "} prefilter_complex_lits_"+iface+";\n\n";
4845 // Generate the prefilter initialziation code
4846 ret += "void init_lfta_prefilter_"+iface+"(){\n";
4848 // First initialize complex literals, if any.
4849 ret += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4850 for(cl=0;cl<complex_literals->size();cl++){
4851 literal_t *l = complex_literals->get_literal(cl);
4852 sprintf(tmpstr,"&(t->complex_literal_%d)",cl);
4853 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
4857 set<int> epred_seen;
4858 for(p=0;p<pr_list.size();++p){
4859 int q = pred_class[p];
4860 //printf("\tq=%d\n",q);
4861 if(epred_seen.count(q)>0){
4862 ret += "\tregister_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4863 vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4864 vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4865 for(o=0;o<op_list.size();++o){
4867 ret += generate_se_code(op_list[o],Schema)+", ";
4870 ret += "pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+","+int_to_string(pred_pos[p]%32)+");\n";
4871 epred_seen.insert(q);
4873 ret += "\tpref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+" = (void *)register_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4874 vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4875 vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4876 for(o=0;o<op_list.size();++o){
4878 ret += generate_se_code(op_list[o],Schema)+", ";
4881 ret += "NULL,"+int_to_string(pred_pos[p]%32)+");\n";
4882 epred_seen.insert(q);
4889 // Start on main body code generation
4890 ret+="gs_uint64_t lfta_prefilter_"+iface+"(void *pkt){\n";
4893 ///--------------------------------------------------------------
4894 /// Generate and store the prefilter body,
4895 /// reuse it for the snap length calculator
4896 ///-------------------------------------------------------------
4899 body += "\tstruct packet *p = (struct packet *)pkt;\n";
4903 // Gather the colids to store unpacked variables.
4904 for(p=0;p<pred_list.size();++p){
4905 gather_pr_col_ids(pred_list[p]->pr,tmp_cid_set, gb_tbl);
4908 // make the col_ids refer to the base tables, and
4909 // grab the col_ids with at least one unpacking function.
4910 for(csi=tmp_cid_set.begin();csi!=tmp_cid_set.end();++csi){
4911 string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
4913 tmp_col_id.field = (*csi).field;
4914 tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
4915 tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
4916 cid_set.insert(tmp_col_id);
4917 field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
4918 if(fe->get_unpack_fcns().size()>0)
4919 upref_cids.insert(tmp_col_id);
4924 // Find the set of unpacking programs needed for the
4925 // prefilter fields.
4926 map<col_id, string,lt_col_id> ucol_fcn_map;
4927 find_optimal_unpack_fcns(upref_cids, Schema, ucol_fcn_map);
4928 set<string> pref_ufcns;
4929 map<col_id, string,lt_col_id>::iterator mcis;
4930 for(mcis=ucol_fcn_map.begin(); mcis!=ucol_fcn_map.end(); mcis++){
4931 pref_ufcns.insert((*mcis).second);
4936 // Variables for unpacking attributes.
4937 body += "/*\t\tVariables for unpacking attributes\t*/\n";
4938 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4939 int schref = (*csi).schema_ref;
4940 int tblref = (*csi).tblvar_ref;
4941 string field = (*csi).field;
4942 data_type dt(Schema->get_type_name(schref,field));
4943 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4944 field.c_str(), tblref);
4946 sprintf(tmpstr,"\tgs_retval_t ret_%s_%d;\n", field.c_str(),tblref);
4949 // Variables for unpacking temporal attributes.
4950 body += "/*\t\tVariables for unpacking temporal attributes\t*/\n";
4951 for(csi=temp_cids.begin(); csi!=temp_cids.end();++csi){
4952 if (cid_set.count(*csi) == 0) {
4953 int schref = (*csi).schema_ref;
4954 int tblref = (*csi).tblvar_ref;
4955 string field = (*csi).field;
4956 data_type dt(Schema->get_type_name(schref,field));
4957 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4958 field.c_str(), tblref);
4965 // Variables for combinable predicate evaluation
4966 body += "/*\t\tVariables for common prdicate evaluation\t*/\n";
4967 for(q=0;q<equiv_list.size();++q){
4968 for(p=0;p<=(num_equiv[q]/32);++p){
4969 body += "unsigned long int pref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = 0;\n";
4974 // Variables that are always needed
4975 body += "/*\t\tVariables which are always needed\t*/\n";
4976 body += "\tgs_uint64_t retval=0, bitpos=1;\n";
4977 body += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4979 // Call the unpacking functions for the prefilter fields
4980 if(pref_ufcns.size() > 0)
4981 body += "\n/*\t\tcall field unpacking functions\t*/\n";
4982 set<string>::iterator ssi;
4983 for(ssi=pref_ufcns.begin(); ssi!=pref_ufcns.end(); ++ssi){
4984 body += "\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
4988 // Unpack the accessed attributes
4989 body += "\n/*\t\tUnpack the accessed attributes.\t*/\n";
4990 for(csi=cid_set.begin();csi!=cid_set.end();++csi){
4991 int tblref = (*csi).tblvar_ref;
4992 int schref = (*csi).schema_ref;
4993 string field = (*csi).field;
4994 sprintf(tmpstr,"\tret_%s_%d = (%s(p, &unpack_var_%s_%d) == 0);\n",
4995 field.c_str(),tblref,Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
4999 // next unpack the temporal attributes and ignore the errors
5000 // We are assuming here that failed unpack of temporal attributes
5001 // is not going to overwrite the last stored value
5002 // Failed upacks are ignored
5003 for(csi=temp_cids.begin();csi!=temp_cids.end();++csi){
5004 int tblref = (*csi).tblvar_ref;
5005 int schref = (*csi).schema_ref;
5006 string field = (*csi).field;
5007 sprintf(tmpstr,"\t%s(p, &prefilter_temp_vars.unpack_var_%s_%d);\n",
5008 Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
5012 // Evaluate the combinable predicates
5013 if(equiv_list.size()>0)
5014 body += "/*\t\tEvaluate the combinable predicates.\t*/\n";
5015 for(q=0;q<equiv_list.size();++q){
5016 for(p=0;p<=(num_equiv[q]/32);++p){
5018 // Only call the common eval fcn if all ref'd fields present.
5019 col_id_set pred_cids;
5020 col_id_set::iterator cpi;
5021 gather_pr_col_ids(equiv_list[q], pred_cids, gb_tbl);
5022 if(pred_cids.size()>0){
5024 for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
5025 if(cpi != pred_cids.begin())
5027 string field = (*cpi).field;
5028 int tblref = (*cpi).tblvar_ref;
5029 body += "ret_"+field+"_"+int_to_string(tblref);
5034 body += "\t\tpref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = eval_commonpred_"+equiv_list[q]->get_op()+"(pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface;
5035 vector<scalarexp_t *> op_list = equiv_list[q]->get_op_list();
5036 vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
5037 for(o=0;o<op_list.size();++o){
5039 body += ","+generate_se_code(op_list[o],Schema);
5047 for(p=0;p<pred_list.size();++p){
5048 col_id_set pred_cids;
5049 col_id_set::iterator cpi;
5050 gather_pr_col_ids(pred_list[p]->pr,pred_cids, gb_tbl);
5051 if(pred_cids.size()>0){
5053 for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
5054 if(cpi != pred_cids.begin())
5056 string field = (*cpi).field;
5057 int tblref = (*cpi).tblvar_ref;
5058 body += "ret_"+field+"_"+int_to_string(tblref);
5062 body += "\t\tif("+generate_predicate_code(pred_list[p]->pr,Schema)+")\n\t\t\tretval |= bitpos;\n";
5063 body+="\tbitpos = bitpos << 1;\n";
5066 // ---------------------------------------------------------------
5067 // Finished with the body of the prefilter
5068 // --------------------------------------------------------------
5072 // Collect fields referenced by an lfta but not
5073 // already unpacked for the prefilter.
5075 //printf("upref_cids is:\n");
5076 //for(csi=upref_cids.begin();csi!=upref_cids.end();csi++)
5077 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
5078 //printf("pref_ufcns is:\n");
5079 //for(ssi=pref_ufcns.begin();ssi!=pref_ufcns.end();++ssi)
5080 //printf("\t%s\n",(*ssi).c_str());
5083 for(l=0;l<lfta_cols.size();++l){
5084 for(csi=lfta_cols[l].begin();csi!=lfta_cols[l].end();++csi){
5085 string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
5087 tmp_col_id.field = (*csi).field;
5088 tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
5089 tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
5090 field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
5091 set<string> fld_ufcns = fe->get_unpack_fcns();
5092 //printf("tmpcol is (%s, %d), ufcns size is %d, upref_cids cnt is %d\n",tmp_col_id.field.c_str(),tmp_col_id.schema_ref,fld_ufcns.size(), upref_cids.count(tmp_col_id));
5093 if(fld_ufcns.size()>0 && upref_cids.count(tmp_col_id) == 0){
5094 // Ensure that this field not already unpacked.
5096 for(ssi=fld_ufcns.begin();ssi!=fld_ufcns.end();++ssi){
5097 //printf("\tField has unpacking fcn %s\n",(*ssi).c_str());
5098 if(pref_ufcns.count((*ssi))){
5099 //printf("Field already unpacked.\n");
5104 //printf("\tadding to unpack list\n");
5105 upall_cids.insert(tmp_col_id);
5111 //printf("upall_cids is:\n");
5112 //for(csi=upall_cids.begin();csi!=upall_cids.end();csi++)
5113 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
5115 // Get the set of unpacking programs for these.
5116 map<col_id, string,lt_col_id> uall_fcn_map;
5117 find_optimal_unpack_fcns(upall_cids, Schema, uall_fcn_map);
5118 set<string> pall_ufcns;
5119 for(mcis=uall_fcn_map.begin(); mcis!=uall_fcn_map.end(); mcis++){
5120 //printf("uall_fcn_map[%s %d] = %s\n",(*mcis).first.field.c_str(),(*mcis).first.schema_ref,(*mcis).second.c_str());
5121 pall_ufcns.insert((*mcis).second);
5124 // Iterate through the remaining set of unpacking function
5125 if(pall_ufcns.size() > 0)
5126 ret += "//\t\tcall all remaining field unpacking functions.\n";
5127 for(ssi=pall_ufcns.begin(); ssi!=pall_ufcns.end(); ++ssi){
5128 // gather the set of columns unpacked by this ufcn
5129 col_id_set fcol_set;
5130 for(csi=upall_cids.begin();csi!=upall_cids.end();++csi){
5131 if(uall_fcn_map[(*csi)] == (*ssi))
5132 fcol_set.insert((*csi));
5135 // gather the set of lftas which access a field unpacked by the fcn
5136 set<long long int> clfta;
5137 for(l=0;l<lfta_cols.size();l++){
5138 for(csi=fcol_set.begin();csi!=fcol_set.end();++csi){
5139 if(lfta_cols[l].count((*csi)) > 0)
5142 if(csi != fcol_set.end())
5143 clfta.insert(lfta_sigs[l]);
5146 // generate the unpacking code
5148 set<long long int>::iterator sii;
5149 for(sii=clfta.begin();sii!=clfta.end();++sii){
5150 if(sii!=clfta.begin())
5152 sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sii),(*sii));
5155 ret += ")\n\t\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
5159 ret += "\treturn(retval);\n\n";
5163 // --------------------------------------------------------
5164 // reuse prefilter body for snaplen calculator
5166 // This is dummy code, so I'm commenting it out.
5169 ret+="gs_uint32_t lfta_pkt_snaplen(void *pkt){\n";
5174 vector<int> s_snaps = lfta_snap_lens;
5175 sort(s_snaps.begin(), s_snaps.end());
5177 if(s_snaps[0] == -1){
5178 set<unsigned long long int> sigset;
5179 for(i=0;i<lfta_snap_lens.size();++i){
5180 if(lfta_snap_lens[i] == -1){
5181 sigset.insert(lfta_sigs[i]);
5185 set<unsigned long long int>::iterator sulli;
5186 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
5187 if(sulli!=sigset.begin())
5189 sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
5192 ret += ") return -1;\n";
5195 int nextpos = lfta_snap_lens.size()-1;
5196 int nextval = lfta_snap_lens[nextpos];
5197 while(nextval >= 0){
5198 set<unsigned long long int> sigset;
5199 for(i=0;i<lfta_snap_lens.size();++i){
5200 if(lfta_snap_lens[i] == nextval){
5201 sigset.insert(lfta_sigs[i]);
5205 set<unsigned long long int>::iterator sulli;
5206 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
5207 if(sulli!=sigset.begin())
5209 sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
5212 ret += ") return "+int_to_string(nextval)+";\n";
5214 for(nextpos--;nextpos>0 && lfta_snap_lens[nextpos] == nextval;nextpos--);
5216 nextval = lfta_snap_lens[nextpos];
5220 ret += "\treturn 0;\n";
5231 // Generate the struct which will store the the values of
5232 // temporal attributesunpacked by prefilter
5233 string generate_lfta_prefilter_struct(col_id_set &cid_set, table_list *Schema) {
5235 col_id_set::iterator csi;
5237 // printf("generate_lfta_prefilter_struct : %d vars\n",cid_set.size());
5239 string ret="struct prefilter_unpacked_temp_vars {\n";
5240 ret += "\t/*\tVariables for unpacking temporal attributes\t*/\n";
5244 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
5245 int schref = (*csi).schema_ref;
5246 int tblref = (*csi).tblvar_ref;
5247 string field = (*csi).field;
5248 data_type dt(Schema->get_type_name(schref,field), Schema->get_modifier_list(schref,field));
5249 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
5250 field.c_str(), tblref);
5253 if (init_code != "")
5255 if (dt.is_increasing())
5256 init_code += dt.get_min_literal();
5258 init_code += dt.get_max_literal();
5263 ret += "struct prefilter_unpacked_temp_vars prefilter_temp_vars = {" + init_code + "};\n\n";