See the License for the specific language governing permissions and limitations under the License. ------------------------------------------- */ #include #include #include //#include #include #include "parse_fta.h" #include "parse_schema.h" #include "analyze_fta.h" #include "generate_utils.h" #include "query_plan.h" #include "generate_lfta_code.h" #include "generate_nic_code.h" using namespace std; extern int DEFAULT_LFTA_HASH_TABLE_SIZE; // default value for correlation between the interface card and // the system clock #define DEFAULT_TIME_CORR 16 // For fast hashing //#define NRANDS 100 extern string hash_nums[NRANDS]; /* = { "12916008961267169387ull", "13447227858232756685ull", "15651770379918602919ull", "1154671861688431608ull", "6777078091984849858ull", "14217205709582564356ull", "4955408621820609982ull", "15813680319165523695ull", "9897969721407807129ull", "5799700135519793083ull", "3446529189623437397ull", "2766403683465910630ull", "3759321430908793328ull", "6569396511892890354ull", "11124853911180290924ull", "17425412145238035549ull", "6879931585355039943ull", "16598635011539670441ull", "9615975578494811651ull", "4378135509538422740ull", "741282195344332574ull", "17368612862906255584ull", "17294299200556814618ull", "518343398779663051ull", "3861893449302272757ull", "8951107288843549591ull", "15785139392894559409ull", "5917810836789601602ull", "16169988133001117004ull", "9792861259254509262ull", "5089058010244872136ull", "2130075224835397689ull", "844136788226150435ull", "1303298091153875333ull", "3579898206894361183ull", "7529542662845336496ull", "13151949992653382522ull", "2145333536541545660ull", "11258221828939586934ull", "3741808146124570279ull", "16272841626371307089ull", "12174572036188391283ull", "9749343496254107661ull", "9141275584134508830ull", "10134192232065698216ull", "12944268412561423018ull", "17499725811865666340ull", "5281482378159088661ull", "13254803486023572607ull", "4526762838498717025ull", "15990846379668494011ull", "10680949816169027468ull", "7116154096012931030ull", "5296740689865236632ull", "5222427027515795922ull", "6893215299448261251ull", "10164707755932877485ull", "15325979189512082255ull", "3713267224148573289ull", "12292682741753167354ull", "4098115959960163588ull", "16095675565885113990ull", "11391590846210510720ull", "8432889531466002673ull", "7146668520368482523ull", "7678169991822407997ull", "9882712513525031447ull", "13904414563513869160ull", "1080076724395768626ull", "8448147843172150388ull", "17633093729608185134ull", "10044622457050142303ull", "4128911859292425737ull", "30642269109444395ull", "16124215396922640581ull", "15444089895060081110ull", "16437006538696302944ull", "800338649777443426ull", "5355794945275091932ull", "11656354278827687117ull", "1110873718944691255ull", "10829576045617693977ull", "3846916616884579955ull", "17055821716837625668ull", "13418968402643535758ull", "11671612594828802128ull", "11597298928184328586ull", "13196028510862205499ull", "16539578557089782373ull", "3182048322921507591ull", "10016080431267550241ull", "148751875162592690ull", "10400930266590768572ull", "4023803397139127870ull", "17766462746879108920ull", "14807761432134600873ull", "13521540421053792403ull", "13980983198941385205ull", "16257584414193564367ull", "1760484796451765024ull" }; */ // ---------------------------------------------- // Data extracted from the query plan node // for use by code generation. static cplx_lit_table *complex_literals; //Table of literals with constructors. static vector param_handle_table; static param_table *param_tbl; // Table of all referenced parameters. static vector sl_list; static vector where; static gb_table *gb_tbl; // Table of all group-by attributes. static aggregate_table *aggr_tbl; // Table of all referenced aggregates. static bool packed_return; // unpack using structyure, not fcns static nic_property *nicprop; // nic properties for this interface. static int global_id; // The partial_fcns vector can now refer to // partial functions, or expensive functions // which can be cached (if there are multiple refs). A couple // of int vectors distinguish the cases. static vector partial_fcns; static vector fcn_ref_cnt; static vector is_partial_fcn; int sl_fcns_start = 0, sl_fcns_end = 0; int wh_fcns_start = 0, wh_fcns_end = 0; int gb_fcns_start = 0, gb_fcns_end = 0; int ag_fcns_start = 0, ag_fcns_end = 0; // These vectors are for combinable predicates. static vector pred_class; // identifies the group static vector pred_pos; // position in the group. static char tmpstr[1000]; ////////////////////////////////////////////////////////////////////// /// Various utilities string generate_fta_name(string node_name){ string ret = normalize_name(node_name); if(ret == ""){ ret = "default"; } ret += "_fta"; return(ret); } string generate_aggr_struct_name(string node_name){ string ret = normalize_name(node_name); if(ret == ""){ ret = "default"; } ret += "_aggr_struct"; return(ret); } string generate_fj_struct_name(string node_name){ string ret = normalize_name(node_name); if(ret == ""){ ret = "default"; } ret += "_fj_struct"; return(ret); } string generate_watchlist_element_name(string node_name){ string ret = normalize_name(node_name); if(ret == ""){ ret = "default"; } ret += "__wl_elem"; return(ret); } string generate_watchlist_struct_name(string node_name){ string ret = normalize_name(node_name); if(ret == ""){ ret = "default"; } ret += "__wl_struct"; return(ret); } string generate_watchlist_name(string node_name){ string ret = normalize_name(node_name); if(ret == ""){ ret = "default"; } ret += "__wl"; return(ret); } string generate_unpack_code(int tblref, int schref, string field, table_list *schema, string node_name, string end_goto = string("end")){ string ret; if(! packed_return){ sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n", schema->get_fcn(schref,field).c_str(), field.c_str(), tblref); ret += tmpstr; if(!schema->get_modifier_list(schref,field)->contains_key("required")) ret += "\tif(retval) goto "+end_goto+";\n"; }else{ // TODO: ntoh xforms (aug 2010 : removing ntoh, hton) data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field)); if(dt.is_buffer_type()){ if(dt.get_type() != v_str_t){ ret += "\tif(sizeof(struct "+node_name+"_input_struct)+"+node_name+"_input_struct_var->unpack_var_"+field+".length+int("+node_name+"_input_struct_var->unpack_var_"+field+".data) > sz)\n"; ret += "\t\tgoto "+end_goto+";\n"; ret+= "\t\t"+node_name+"_input_struct_var->unpack_var_"+field+".data += "+node_name+"_input_struct_var->unpack_var_"+field+".length;\n"; ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+ " = "+node_name+"_input_struct_var->unpack_var_"+field+";+\n"; }else{ fprintf(stderr,"INTERNAL ERROR buffer type not string type in generate_lfta_code.cc:generate_unpack_code\n"); exit(1); } }else{ ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+ " = "+node_name+"_input_struct_var->unpack_var_"+field+";\n"; } } return ret; } string generate_aggr_struct(string node_name, gb_table *gb_tbl, aggregate_table *aggr_tbl){ string ret = "struct " + generate_aggr_struct_name(node_name) + "{\n"; int g; for(g=0;gsize();g++){ sprintf(tmpstr,"gb_var%d",g); ret += "\t"+gb_tbl->get_data_type(g)->make_cvar(tmpstr)+";\n"; } int a; for(a=0;asize();a++){ ret += "\t"; sprintf(tmpstr,"aggr_var%d",a); if(aggr_tbl->is_builtin(a)) ret+="\t"+aggr_tbl->get_data_type(a)->make_cvar(tmpstr)+";\n"; else ret+="\t"+aggr_tbl->get_storage_type(a)->make_cvar(tmpstr)+";\n"; } /* ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *next;\n"; */ ret += "};\n\n"; return(ret); } string generate_fj_struct(filter_join_qpn *fs, string node_name ){ string ret; if(fs->use_bloom == false){ // uses hash table instead ret = "struct " + generate_fj_struct_name(node_name) + "{\n"; int k; for(k=0;khash_eq.size();++k){ sprintf(tmpstr,"key_var%d",k); ret += "\t"+fs->hash_eq[k]->pr->get_right_se()->get_data_type()->make_cvar(tmpstr)+";\n"; } ret += "\tlong long int ts;\n"; ret += "};\n\n"; } return(ret); } string generate_watchlist_structs(string node_name, table_def *tbl, std::string filename, int refresh_interval){ string ret; ret += "struct "+generate_watchlist_element_name(node_name)+"{\n"; vector fields = tbl->get_fields(); for(int f=0;fget_type()); ret += "\t"+dt.make_cvar(fields[f]->get_name())+";\n"; } ret += "\tgs_uint64_t hashval;\n"; ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next;\n"; ret += "};\n\n"; ret += "char *"+generate_watchlist_name(node_name)+"__fstr = \""+filename+"\";\n"; ret += "struct "+generate_watchlist_struct_name(node_name)+"{\n"; ret += "\tstruct "+ generate_watchlist_element_name(node_name)+" **ht;\n"; ret += "\tgs_uint32_t ht_size;\n"; ret += "\tgs_uint32_t n_elem;\n"; ret += "\tgs_uint32_t refresh_interval;\n"; ret += "\ttime_t next_refresh;\n"; ret += "\ttime_t last_mtime;\n"; ret += "\tchar *filename;\n"; ret += "} "+generate_watchlist_name(node_name)+" = { NULL, 0, 0, "+std::to_string(refresh_interval)+", 0, 0, NULL};\n\n"; return ret; } string generate_watchlist_load(string node_name, table_def *tbl, vector keys){ string ret; string tgt = generate_watchlist_name(node_name); vector fields = tbl->get_fields(); ret += "void reload_watchlist__"+node_name+"(){\n"; ret += "\tgs_uint32_t i;\n"; ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *ptr = NULL;\n"; ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next = NULL;\n"; ret += "\tFILE *fl;\n"; ret += "\tchar buf[10000];\n"; ret += "\tgs_uint32_t buflen = 10000;\n"; ret += "\tchar *flds["+std::to_string(fields.size())+"];\n"; ret += "\tgs_uint32_t pos, f, linelen, malformed;\n"; ret += "\tgs_uint32_t n_malformed, short_lines, toolong_lines, ok;\n"; ret += "\tgs_uint64_t hash, bucket;\n"; ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *rec;\n\n"; ret += "// make sure watchlist file has changed since the last time we loaded it\n"; ret += "\tstruct stat file_stat;\n"; ret += "\tint err = stat(" + tgt + ".filename, &file_stat);\n"; ret += "\tif (err) {\n"; ret += "\t\tgslog(LOG_INFO,\"Warning, unable to stat() watchlist file %s to reload " + node_name + ", continue using old version\\n\", " + tgt + ".filename);\n"; ret += "\t\treturn;\n"; ret += "\t}\n"; ret += "\tif (file_stat.st_mtime <= " + tgt + ".last_mtime && file_stat.st_ctime <= " + tgt + ".last_mtime) // watchlist file hasn't changed since last time\n"; ret += "\t\treturn;\n"; ret += "\t" + tgt + ".last_mtime = (file_stat.st_mtime>file_stat.st_ctime)?file_stat.st_mtime:file_stat.st_ctime;\n\n"; ret += "// Delete old entries.\n"; ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n"; ret += "\t\tptr="+tgt+".ht[i];\n"; ret += "\t\twhile(ptr!=NULL){\n"; for(int f=0;fget_type()); if(dt.is_buffer_type()){ ret += "\t\t\t"+dt.get_buffer_destroy()+"(&(ptr->"+fields[f]->get_name()+"));\n"; } } ret += "\t\t\tnext = ptr->next;\n"; ret += "\t\t\tfree(ptr);\n"; ret += "\t\t\tptr = next;\n"; ret += "\t\t}\n"; ret += "\t}\n"; ret += "\n// prepare new table. \n"; ret += "\tif("+tgt+".n_elem > "+tgt+".ht_size || "+tgt+".ht_size==0){\n"; ret += "\t\tif("+tgt+".ht)\n"; ret += "\t\t\tfree("+tgt+".ht);\n"; ret += "\t\tif("+tgt+".ht_size == 0)\n"; ret += "\t\t\t"+tgt+".ht_size = 100000;\n"; ret += "\t\telse\n"; ret += "\t\t\t"+tgt+".ht_size = "+tgt+".n_elem;\n"; ret += "\t\t"+tgt+".ht = (struct "+generate_watchlist_element_name(node_name)+" **)malloc("+tgt+".ht_size * sizeof(struct "+generate_watchlist_element_name(node_name)+" *));\n"; ret += "\t}\n"; ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n"; ret += "\t\t"+tgt+".ht[i] = NULL;\n"; ret += "\t}\n"; ret += "\n// load new table\n"; ret += "\t"+tgt+".n_elem = 0;\n"; ret += "\tfl = fopen("+tgt+".filename, \"r\");\n"; ret += "\tif(fl==NULL){\n"; ret += "\t\tgslog(LOG_INFO,\"Warning, can't open file %s for watchlist "+node_name+"\\n\","+tgt+".filename);\n"; ret += "\t\treturn;\n"; ret += "\t}\n"; ret += "\tmalformed = 0;\n"; ret += "\tshort_lines = 0;\n"; ret += "\ttoolong_lines = 0;\n"; ret += "\twhile(fgets(buf, buflen, fl) != NULL){\n"; ret += "\t\tlinelen = strlen(buf);\n"; ret += "\t\tbuf[linelen-1]='\\0'; // strip off trailing newline\n"; ret += "\t\tlinelen--;\n"; ret += "\t\tpos=0;\n"; ret += "\t\tmalformed=0;\n"; ret += "\t\tok=1;\n"; ret += "\t\tflds[0] = buf;\n"; ret += "\t\tfor(f=1;pos < linelen && f<"+std::to_string(fields.size())+";++f){\n"; ret += "\t\t\tfor(;pos < linelen && buf[pos]!=',' && buf[pos]!='\\n';++pos);\n"; ret += "\t\t\tif(pos >= linelen){\n"; ret += "\t\t\t\tmalformed = 1;\n"; ret += "\t\t\t\tbreak;\n"; ret += "\t\t\t}\n"; ret += "\t\t\tbuf[pos]='\\0';\n"; ret += "\t\t\tpos++;\n"; ret += "\t\t\tflds[f]=buf+pos;\n"; ret += "\t\t}\n"; ret += "\t\tif(malformed){\n"; ret += "\t\t\tok=0;\n"; ret += "\t\t\tn_malformed++;\n"; ret += "\t\t}\n"; ret += "\t\tif(f<"+std::to_string(fields.size())+"){\n"; ret += "\t\t\tok=0;\n"; ret += "\t\t\tshort_lines++;\n"; ret += "\t\t}\n"; ret += "\t\tif(pos && (posget_type()); ret += "\t\t\t"+dt.get_wl_extract_fcn()+"(flds["+std::to_string(f)+"], &(rec->"+fields[f]->get_name()+"));\n"; } // Compute the hash value ret += "\t\t\thash=0;\n"; for(int k=0;kget_name() == key_fld) break; } data_type dt(fields[f]->get_type()); ret += "\t\t\thash ^= (("+hash_nums[f%NRANDS]+" * lfta_"+ dt.get_type_str()+"_to_hash(rec->"+fields[f]->get_name()+")));\n"; } ret += "\t\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n"; ret += "\t\t\trec->hashval = hash;\n"; ret += "\t\t\trec->next = "+tgt+".ht[bucket];\n"; ret += "\t\t\t"+tgt+".ht[bucket] = rec;\n"; ret += "\t\t\t"+tgt+".n_elem++;\n"; ret += "\t\t}\n"; ret += "\t}\n"; ret += "\tif(n_malformed+toolong_lines > 0){\n"; ret += "\t\tgslog(LOG_INFO,\"Errors reading data for watchlist "+node_name+" from file %s: malformed=%d, too short=%d, too long=%d\\n\","+tgt+".filename, malformed, short_lines, toolong_lines);\n"; ret += "\t}\n"; ret += "}\n\n"; return ret; } string generate_fta_struct(string node_name, gb_table *gb_tbl, aggregate_table *aggr_tbl, param_table *param_tbl, cplx_lit_table *complex_literals, vector ¶m_handle_table, bool is_aggr_query, bool is_fj, bool is_wj, bool uses_bloom, table_list *schema){ string ret = "struct " + generate_fta_name(node_name) + "{\n"; ret += "\tstruct FTA f;\n"; //------------------------------------------------------------- // Aggregate-specific fields if(is_aggr_query){ /* ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_head, *flush_head;\n"; */ ret+="\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_table; // the groups\n"; ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n"; // ret+="\tint bitmap_size;\n"; ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n"; ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n"; ret += "\tint max_windows; // max number of open windows.\n"; ret += "\tunsigned int generation; // initially zero, increment on\n"; ret += "\t // every hash table flush - whether regular or induced.\n"; ret += "\t // Old groups are identified by a generation mismatch.\n"; ret += "\tunsigned int flush_pos; // next aggr_table entry to examine\n"; ret += "\tunsigned int flush_ctr; // control slow flushing\n"; int g; bool uses_temporal_flush = false; for(g=0;gsize();g++){ data_type *dt = gb_tbl->get_data_type(g); if(dt->is_temporal()){ /* fprintf(stderr,"group by attribute %s is temporal, ", gb_tbl->get_name(g).c_str()); if(dt->is_increasing()){ fprintf(stderr,"increasing.\n"); }else{ fprintf(stderr,"decreasing.\n"); } */ data_type *gdt = gb_tbl->get_data_type(g); if(gdt->is_buffer_type()){ fprintf(stderr, "\t but temporal BUFFER types are not supported, skipping.\n"); }else{ sprintf(tmpstr,"\t%s last_gb_%d;\n",gdt->get_cvar_type().c_str(),g); ret += tmpstr; sprintf(tmpstr,"\t%s flush_start_gb_%d;\n",gdt->get_cvar_type().c_str(),g); ret += tmpstr; sprintf(tmpstr,"\t%s last_flushed_gb_%d;\n",gdt->get_cvar_type().c_str(),g); ret += tmpstr; uses_temporal_flush = true; } } } if(! uses_temporal_flush){ fprintf(stderr,"Warning: no temporal flush.\n"); } } // --------------------------------------------------------- // Filter-join specific fields if(is_fj){ if(uses_bloom){ ret += "\tunsigned char * bf_table; //array of bloom filters with layout \n" "\t\t// bit 0 bf 0| bit 0 bf 1| bit 0 bf 2| bit 1 bf 0| bit 1 bf 1|.....\n" "\tint first_exec;\n" "\tlong long int last_bin;\n" "\tint last_bloom_pos;\n" "\n" ; }else{ // limited hash table ret += " struct "+generate_fj_struct_name(node_name)+" *join_table;\n" "\n" ; } } // -------------------------------------------- // watchlist-join specific if(is_wj){ ret += "\ttime_t ux_time;\n"; } //-------------------------------------------------------- // Common fields // Create places to hold the parameters. int p; vector param_vec = param_tbl->get_param_names(); for(p=0;pget_data_type(param_vec[p]); sprintf(tmpstr,"\t%s param_%s;\n",dt->get_cvar_type().c_str(), param_vec[p].c_str()); ret += tmpstr; if(param_tbl->handle_access(param_vec[p])){ ret += "\tstruct search_handle *param_handle_"+param_vec[p]+";\n"; } } // Create places to hold complex literals. int cl; for(cl=0;clsize();cl++){ literal_t *l = complex_literals->get_literal(cl); data_type *dtl = new data_type( l->get_type() ); sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl); ret += tmpstr; } // Create places to hold the pass-by-handle parameters. for(p=0;pget_data_type(); if (sdt->is_temporal()) { gather_se_col_ids(sl_list[s],temp_cids, gb_tbl); } } for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){ int tblref = (*csi).tblvar_ref; int schref = (*csi).schema_ref; string field = (*csi).field; data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field)); sprintf(tmpstr,"\t%s last_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref); ret += tmpstr; } ret += "\tgs_uint64_t trace_id;\n\n"; // Fields to store the runtime stats ret += "\tgs_uint32_t in_tuple_cnt;\n"; ret += "\tgs_uint32_t out_tuple_cnt;\n"; ret += "\tgs_uint32_t out_tuple_sz;\n"; ret += "\tgs_uint32_t accepted_tuple_cnt;\n"; ret += "\tgs_uint64_t cycle_cnt;\n"; ret += "\tgs_uint32_t collision_cnt;\n"; ret += "\tgs_uint32_t eviction_cnt;\n"; ret += "\tgs_float_t sampling_rate;\n"; ret += "};\n\n"; return(ret); } //------------------------------------------------------------ // Set colref tblvars to 0.. // (special processing for join-like operators in an lfta). void reset_se_col_ids_tblvars(scalarexp_t *se, gb_table *gtbl){ vector operands; int o; if(! se) return; switch(se->get_operator_type()){ case SE_LITERAL: case SE_PARAM: case SE_IFACE_PARAM: return; case SE_UNARY_OP: reset_se_col_ids_tblvars(se->get_left_se(),gtbl); return; case SE_BINARY_OP: reset_se_col_ids_tblvars(se->get_left_se(),gtbl); reset_se_col_ids_tblvars(se->get_right_se(),gtbl); return; case SE_COLREF: if(! se->is_gb() ){ se->get_colref()->set_tablevar_ref(0); }else{ if(gtbl==NULL){ fprintf(stderr,"INTERNAL ERROR: gbvar ref in gather_se_col_ids, but gtbl is NULL.\n"); exit(1); } reset_se_col_ids_tblvars(gtbl->get_def(se->get_gb_ref()),gtbl); } return; case SE_AGGR_STAR: return; case SE_AGGR_SE: reset_se_col_ids_tblvars(se->get_left_se(),gtbl); return; case SE_FUNC: operands = se->get_operands(); for(o=0;oget_lineno(), se->get_charno(),se->get_operator_type()); exit(1); } } // reset column tblvars accessed in this pr. void reset_pr_col_ids_tblvars(predicate_t *pr, gb_table *gtbl){ vector op_list; int o; switch(pr->get_operator_type()){ case PRED_IN: reset_se_col_ids_tblvars(pr->get_left_se(), gtbl); return; case PRED_COMPARE: reset_se_col_ids_tblvars(pr->get_left_se(),gtbl) ; reset_se_col_ids_tblvars(pr->get_right_se(),gtbl) ; return; case PRED_UNARY_OP: reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ; return; case PRED_BINARY_OP: reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ; reset_pr_col_ids_tblvars(pr->get_right_pr(),gtbl) ; return; case PRED_FUNC: op_list = pr->get_op_list(); for(o=0;oget_lineno(), pr->get_charno(), pr->get_operator_type() ); } } // Generate code that makes reference // to the tuple, and not to any aggregates. static string generate_se_code(scalarexp_t *se,table_list *schema){ string ret; data_type *ldt, *rdt; int o; vector operands; switch(se->get_operator_type()){ case SE_LITERAL: if(se->is_handle_ref()){ sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() ); ret = tmpstr; return(ret); } if(se->get_literal()->is_cpx_lit()){ sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() ); ret = tmpstr; return(ret); } return(se->get_literal()->to_C_code("")); // not complex, no constructor case SE_PARAM: if(se->is_handle_ref()){ sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() ); ret = tmpstr; return(ret); } ret += "t->param_"; ret += se->get_param_name(); return(ret); case SE_UNARY_OP: ldt = se->get_left_se()->get_data_type(); if(ldt->complex_operator(se->get_op()) ){ ret += ldt->get_complex_operator(se->get_op()); ret += "("; ret += generate_se_code(se->get_left_se(),schema); ret += ")"; }else{ ret += "("; ret += se->get_op(); ret += generate_se_code(se->get_left_se(),schema); ret += ")"; } return(ret); case SE_BINARY_OP: ldt = se->get_left_se()->get_data_type(); rdt = se->get_right_se()->get_data_type(); if(ldt->complex_operator(rdt, se->get_op()) ){ ret += ldt->get_complex_operator(rdt, se->get_op()); ret += "("; ret += generate_se_code(se->get_left_se(),schema); ret += ", "; ret += generate_se_code(se->get_right_se(),schema); ret += ")"; }else{ ret += "("; ret += generate_se_code(se->get_left_se(),schema); ret += se->get_op(); ret += generate_se_code(se->get_right_se(),schema); ret += ")"; } return(ret); case SE_COLREF: if(se->is_gb()){ // OK to ref gb attrs, but they're not yet unpacked ... // so return the defining code. ret = generate_se_code(gb_tbl->get_def(se->get_gb_ref()), schema ); }else{ sprintf(tmpstr,"unpack_var_%s_%d", se->get_colref()->get_field().c_str(), se->get_colref()->get_tablevar_ref() ); ret = tmpstr; } return(ret); case SE_FUNC: // Should not be ref'ing any aggr here. if(se->get_aggr_ref() >= 0){ fprintf(stderr,"INTERNAL ERROR, UDAF reference in generate_se_code.\n"); return("ERROR in generate_se_code"); } if(se->is_partial()){ sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref()); ret = tmpstr; }else{ ret += se->op + "("; operands = se->get_operands(); for(o=0;o0) ret += ", "; if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) ) ret += "&"; ret += generate_se_code(operands[o], schema); } ret += ")"; } return(ret); default: fprintf(stderr,"INTERNAL ERROR in generate_se_code (lfta), line %d, character %d: unknown operator type %d\n", se->get_lineno(), se->get_charno(),se->get_operator_type()); return("ERROR in generate_se_code"); } } // generate code that refers only to aggregate data and constants. static string generate_se_code_fm_aggr(scalarexp_t *se, string var, table_list *schema){ string ret; data_type *ldt, *rdt; int o; vector operands; switch(se->get_operator_type()){ case SE_LITERAL: if(se->is_handle_ref()){ sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() ); ret = tmpstr; return(ret); } if(se->get_literal()->is_cpx_lit()){ sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() ); ret = tmpstr; return(ret); } return(se->get_literal()->to_C_code("")); // not complex no constructor case SE_PARAM: if(se->is_handle_ref()){ sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() ); ret = tmpstr; return(ret); } ret += "t->param_"; ret += se->get_param_name(); return(ret); case SE_UNARY_OP: ldt = se->get_left_se()->get_data_type(); if(ldt->complex_operator(se->get_op()) ){ ret += ldt->get_complex_operator(se->get_op()); ret += "("; ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema); ret += ")"; }else{ ret += "("; ret += se->get_op(); ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema); ret += ")"; } return(ret); case SE_BINARY_OP: ldt = se->get_left_se()->get_data_type(); rdt = se->get_right_se()->get_data_type(); if(ldt->complex_operator(rdt, se->get_op()) ){ ret += ldt->get_complex_operator(rdt, se->get_op()); ret += "("; ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema); ret += ", "; ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema); ret += ")"; }else{ ret += "("; ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema); ret += se->get_op(); ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema); ret += ")"; } return(ret); case SE_COLREF: if(se->is_gb()){ // OK to ref gb attrs, but they're not yet // unpacked ... so return the defining code. sprintf(tmpstr,"%sgb_var%d",var.c_str(),se->get_gb_ref()); ret = tmpstr; }else{ fprintf(stderr,"ERROR reference to non-GB column ref not permitted here," "error in generate_se_code_fm_aggr, line %d, character %d.\n", se->get_lineno(), se->get_charno()); ret = tmpstr; } return(ret); case SE_AGGR_STAR: case SE_AGGR_SE: sprintf(tmpstr,"%saggr_var%d",var.c_str(),se->get_aggr_ref()); ret = tmpstr; return(ret); case SE_FUNC: // Is it a UDAF? if(se->get_aggr_ref() >= 0){ sprintf(tmpstr,"udaf_ret%d",se->get_aggr_ref()); ret = tmpstr; return(ret); } if(se->is_partial()){ sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref()); ret = tmpstr; }else{ ret += se->op + "("; operands = se->get_operands(); for(o=0;o0) ret += ", "; if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) ) ret += "&"; ret += generate_se_code_fm_aggr(operands[o], var, schema); } ret += ")"; } return(ret); default: fprintf(stderr,"INTERNAL ERROR in generate_lfta_code.cc::generate_se_code_fm_aggr, line %d, character %d: unknown operator type %d\n", se->get_lineno(), se->get_charno(),se->get_operator_type()); return("ERROR in generate_se_code"); } } static string unpack_partial_fcn_fm_aggr(scalarexp_t *se, int pfn_id, string var, table_list *schema){ string ret; int o; vector operands; if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){ fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn_fm_aggr. line %d, character %d\n", se->get_lineno(), se->get_charno()); return("ERROR in generate_se_code"); } ret = "\tretval = " + se->get_op() + "( "; sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id); ret += tmpstr; operands = se->get_operands(); for(o=0;oget_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) ) ret += "&"; ret += generate_se_code_fm_aggr(operands[o], var, schema); } ret += ");\n"; return(ret); } static string generate_cached_fcn(scalarexp_t *se, table_list *schema){ string ret; int o; vector operands; if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){ fprintf(stderr,"INTERNAL ERROR, non-function SE passed to generate_cached_fcn. line %d, character %d\n", se->get_lineno(), se->get_charno()); return("ERROR in generate_se_code"); } ret = se->get_op() + "( "; operands = se->get_operands(); for(o=0;oget_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) ) ret += "&"; ret += generate_se_code(operands[o], schema); } ret += ");\n"; return(ret); } static string unpack_partial_fcn(scalarexp_t *se, int pfn_id, table_list *schema){ string ret; int o; vector operands; if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){ fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn. line %d, character %d\n", se->get_lineno(), se->get_charno()); return("ERROR in generate_se_code"); } ret = "\tretval = " + se->get_op() + "( ", sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id); ret += tmpstr; operands = se->get_operands(); for(o=0;oget_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) ) ret += "&"; ret += generate_se_code(operands[o], schema); } ret += ");\n"; return(ret); } static string generate_C_comparison_op(string op){ if(op == "=") return("=="); if(op == "<>") return("!="); return(op); } static string generate_C_boolean_op(string op){ if( (op == "AND") || (op == "And") || (op == "and") ){ return("&&"); } if( (op == "OR") || (op == "Or") || (op == "or") ){ return("||"); } if( (op == "NOT") || (op == "Not") || (op == "not") ){ return("!"); } fprintf(stderr,"INTERNAL ERROR: unknown boolean operator %s\n",op.c_str()); return("ERROR UNKNOWN BOOLEAN OPERATOR :"+op); } static string generate_predicate_code(predicate_t *pr,table_list *schema){ string ret; vector litv; int i; data_type *ldt, *rdt; vector op_list; int o,cref,ppos; unsigned int bitmask; switch(pr->get_operator_type()){ case PRED_IN: ldt = pr->get_left_se()->get_data_type(); ret += "( "; litv = pr->get_lit_vec(); for(i=0;i0) ret += " || "; ret += "( "; if(ldt->complex_comparison(ldt) ){ ret += ldt->get_equals_fcn(ldt) ; ret += "( "; if(ldt->is_buffer_type() ) ret += "&"; ret += generate_se_code(pr->get_left_se(), schema); ret += ", "; if(ldt->is_buffer_type() ) ret += "&"; if(litv[i]->is_cpx_lit()){ sprintf(tmpstr,"t->complex_literal_%d",litv[i]->get_cpx_lit_ref() ); ret += tmpstr; }else{ ret += litv[i]->to_C_code(""); } ret += ") == 0"; }else{ ret += generate_se_code(pr->get_left_se(), schema); ret += " == "; ret += litv[i]->to_C_code(""); } ret += " )"; } ret += " )"; return(ret); case PRED_COMPARE: ldt = pr->get_left_se()->get_data_type(); rdt = pr->get_right_se()->get_data_type(); ret += "( "; if(ldt->complex_comparison(rdt) ){ // TODO can use get_equals_fcn if op is "=" ? ret += ldt->get_comparison_fcn(rdt); ret += "("; if(ldt->is_buffer_type() ) ret += "&"; ret += generate_se_code(pr->get_left_se(),schema); ret += ", "; if(rdt->is_buffer_type() ) ret += "&"; ret += generate_se_code(pr->get_right_se(),schema); ret += ") "; ret += generate_C_comparison_op(pr->get_op()); ret += "0"; }else{ ret += generate_se_code(pr->get_left_se(),schema); ret += generate_C_comparison_op(pr->get_op()); ret += generate_se_code(pr->get_right_se(),schema); } ret += " )"; return(ret); case PRED_UNARY_OP: ret += "( "; ret += generate_C_boolean_op(pr->get_op()); ret += generate_predicate_code(pr->get_left_pr(),schema); ret += " )"; return(ret); case PRED_BINARY_OP: ret += "( "; ret += generate_predicate_code(pr->get_left_pr(),schema); ret += generate_C_boolean_op(pr->get_op()); ret += generate_predicate_code(pr->get_right_pr(),schema); ret += " )"; return(ret); case PRED_FUNC: op_list = pr->get_op_list(); cref = pr->get_combinable_ref(); if(cref >= 0){ // predicate is a combinable pred reference // Trust, but verify if(pred_class.size() >= cref && pred_class[cref] >= 0){ ppos = pred_pos[cref]; bitmask = 1 << ppos % 32; sprintf(tmpstr,"(pref_common_pred_val_%d_%d & %u)",pred_class[cref],ppos/32,bitmask); ret = tmpstr; return ret; } } ret = pr->get_op() + "("; if (pr->is_sampling_fcn) { ret += "t->sampling_rate"; if (!op_list.empty()) ret += ", "; } for(o=0;o0) ret += ", "; if(op_list[o]->get_data_type()->is_buffer_type() && (! (op_list[o]->is_handle_ref()) ) ) ret += "&"; ret += generate_se_code(op_list[o],schema); } ret += " )"; return(ret); default: fprintf(stderr,"INTERNAL ERROR in generate_predicate_code, line %d, character %d, unknown predicate operator type %d\n", pr->get_lineno(), pr->get_charno(), pr->get_operator_type() ); return("ERROR in generate_predicate_code"); } } static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *dt){ string ret; if(dt->complex_comparison(dt) ){ ret += dt->get_equals_fcn(dt); ret += "("; if(dt->is_buffer_type() ) ret += "&"; ret += lhs_op; ret += ", "; if(dt->is_buffer_type() ) ret += "&"; ret += rhs_op; ret += ") == 0"; }else{ ret += lhs_op; ret += " == "; ret += rhs_op; } return(ret); } //static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){ // string ret; // // if(dt->complex_comparison(dt) ){ // ret += dt->get_equals_fcn(dt); // ret += "("; // if(dt->is_buffer_type() ) ret += "&"; // ret += lhs_op; // ret += ", "; // if(dt->is_buffer_type() ) ret += "&"; // ret += rhs_op; // ret += ") == 0"; // }else{ // ret += lhs_op; // ret += " == "; // ret += rhs_op; // } // // return(ret); //} // Here I assume that only MIN and MAX aggregates can be computed // over BUFFER data types. static string generate_aggr_update(string var, aggregate_table *atbl,int aidx, table_list *schema){ string retval = "\t\t"; string op = atbl->get_op(aidx); // Is it a UDAF if(! atbl->is_builtin(aidx)) { int o; retval += op+"_LFTA_AGGR_UPDATE_("; if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&"; retval+="("+var+")"; vector opl = atbl->get_operand_list(aidx); for(o=0;oget_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) ) retval.append("&"); retval += generate_se_code(opl[o], schema); } retval += ");\n"; return retval; } // Built-in aggregate processing. data_type *dt = atbl->get_data_type(aidx); if(op == "COUNT"){ retval.append(var); retval.append("++;\n"); return(retval); } if(op == "SUM"){ retval.append(var); retval.append(" += "); retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) ); retval.append(";\n"); return(retval); } if(op == "MIN"){ sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() ); retval.append(tmpstr); if(dt->complex_comparison(dt)){ if(dt->is_buffer_type()) sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str()); else sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str()); }else{ sprintf(tmpstr,"\t\tif(aggr_tmp_%d < %s)\n",aidx,var.c_str()); } retval.append(tmpstr); if(dt->is_buffer_type()){ sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx); }else{ sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx); } retval.append(tmpstr); return(retval); } if(op == "MAX"){ sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() ); retval.append(tmpstr); if(dt->complex_comparison(dt)){ if(dt->is_buffer_type()) sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str()); else sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str()); }else{ sprintf(tmpstr,"\t\tif(aggr_tmp_%d > %s)\n",aidx,var.c_str()); } retval.append(tmpstr); if(dt->is_buffer_type()){ sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx); }else{ sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx); } retval.append(tmpstr); return(retval); } if(op == "AND_AGGR"){ retval.append(var); retval.append(" &= "); retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) ); retval.append(";\n"); return(retval); } if(op == "OR_AGGR"){ retval.append(var); retval.append(" |= "); retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) ); retval.append(";\n"); return(retval); } if(op == "XOR_AGGR"){ retval.append(var); retval.append(" ^= "); retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) ); retval.append(";\n"); return(retval); } fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_update.\n",op.c_str()); return("ERROR: aggregate not recognized: "+op); } static string generate_aggr_init(string var, aggregate_table *atbl,int aidx, table_list *schema){ string retval; string op = atbl->get_op(aidx); // Is it a UDAF if(! atbl->is_builtin(aidx)) { int o; retval += "\t\t"+op+"_LFTA_AGGR_INIT_("; if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&"; retval+="("+var+"));\n"; // Add 1st tupl retval += "\t"+atbl->get_op(aidx)+"_LFTA_AGGR_UPDATE_("; if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&"; retval+="("+var+")"; vector opl = atbl->get_operand_list(aidx); for(o=0;oget_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) ) retval.append("&"); retval += generate_se_code(opl[o],schema); } retval += ");\n"; return(retval); } // Built-in aggregate processing. data_type *dt = atbl->get_data_type(aidx); if(op == "COUNT"){ retval = "\t\t"+var; retval.append(" = 1;\n"); return(retval); } if(op == "SUM" || op == "MIN" || op == "MAX" || op == "AND_AGGR" || op == "OR_AGGR" || op == "XOR_AGGR"){ if(dt->is_buffer_type()){ sprintf(tmpstr,"\t\taggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() ); retval.append(tmpstr); sprintf(tmpstr,"\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_assign_copy().c_str(),var.c_str(),aidx); retval.append(tmpstr); }else{ retval = "\t\t"+var; retval += " = "; retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema)); retval.append(";\n"); } return(retval); } fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_init.\n",op.c_str()); return("ERROR: aggregate not recognized: "+op); } //////////////////////////////////////////////////////////// string generate_preamble(table_list *schema, //map &int_fcn_defs, std::string &node_name, std::string &schema_embed_str){ // Include these only once, not once per lfta // string ret = "#include \"rts.h\"\n"; // ret += "#include \"fta.h\"\n\n"); string ret = "#ifndef LFTA_IN_NIC\n"; ret += "char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n"; ret += "#include\n"; ret += "#include \n"; ret += "#include \n"; ret += "#include \n"; ret += "#include \"rdtsc.h\"\n"; ret += "#endif\n"; return(ret); } string generate_tuple_from_aggr(string node_name, table_list *schema, string idx){ int a,p,s; // need to create and output the tuple. string ret = "/*\t\tCreate an output tuple for the aggregate being kicked out \t*/\n"; // Check for any UDAFs with LFTA_BAILOUT ret += "\tlfta_bailout = 0;\n"; for(a=0;asize();a++){ if(aggr_tbl->has_bailout(a)){ ret += "\tlfta_bailout+="+aggr_tbl->get_op(a)+"_LFTA_AGGR_BAILOUT_("; if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&"; ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n"; } } ret += "\tif(! lfta_bailout){\n"; // First, compute the size of the tuple. // Unpack UDAF return values for(a=0;asize();a++){ if(! aggr_tbl->is_builtin(a)){ ret += "\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_OUTPUT_(&(udaf_ret"+int_to_string(a)+"),"; if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&"; ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n"; } } // Unpack partial fcns ref'd by the select clause. if(sl_fcns_start != sl_fcns_end){ ret += "\t\tunpack_failed = 0;\n"; for(p=sl_fcns_start;paggr_table["+idx+"].",schema); ret += "\t\tif(retval) unpack_failed = 1;\n"; } } // BEGIN don't allocate tuple if ret += "\t\tif( unpack_failed == 0 ){\n"; // unpack failed. } // Unpack any BUFFER type selections into temporaries // so that I can compute their size and not have // to recompute their value during tuple packing. // I can use regular assignment here because // these temporaries are non-persistent. for(s=0;sget_data_type(); if(sdt->is_buffer_type()){ sprintf(tmpstr,"\t\t\tselvar_%d = ",s); ret += tmpstr; ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema); ret += ";\n"; } } // The size of the tuple is the size of the tuple struct plus the // size of the buffers to be copied in. ret += "\t\t\ttuple_size = sizeof( struct "; ret += generate_tuple_name(node_name); ret += ")"; for(s=0;sget_data_type(); if(sdt->is_buffer_type()){ sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s); ret += tmpstr; } } ret += ";\n"; ret += "\t\t\ttuple = allocate_tuple(f, tuple_size );\n"; ret += "\t\t\tif( tuple != NULL){\n"; // Test passed, make assignments to the tuple. ret += "\t\t\t\ttuple_pos = sizeof( struct "; ret += generate_tuple_name(node_name) ; ret += ");\n"; // Mark tuple as REGULAR_TUPLE ret += "\n\t\t\t\ttuple->tuple_type = REGULAR_TUPLE;\n"; for(s=0;sget_data_type(); if(sdt->is_buffer_type()){ sprintf(tmpstr,"\t\t\t\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s); ret += tmpstr; sprintf(tmpstr,"\t\t\t\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s); ret += tmpstr; }else{ sprintf(tmpstr,"\t\t\t\ttuple->tuple_var%d = ",s); ret += tmpstr; // if(sdt->needs_hn_translation()) // ret += sdt->hton_translation() +"( "; ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema); // if(sdt->needs_hn_translation()) // ret += ") "; ret += ";\n"; } } // Generate output. ret += "\t\t\t\tpost_tuple(tuple);\n"; ret += "\t\t\t\t#ifdef LFTA_STATS\n"; ret+="\t\t\t\tt->out_tuple_cnt++;\n"; ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n"; ret += "\t\t\t\t#endif\n\n"; ret += "\t\t\t}\n"; if(sl_fcns_start != sl_fcns_end) // END don't allocate tuple if ret += "\t\t}\n"; // unpack failed. ret += "\t}\n"; // Need to release memory held by BUFFER types. int g; for(g=0;gsize();g++){ data_type *gdt = gb_tbl->get_data_type(g); if(gdt->is_buffer_type()){ sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].gb_var%d));\n",gdt->get_buffer_destroy().c_str(),idx.c_str(),g); ret += tmpstr; } } for(a=0;asize();a++){ if(aggr_tbl->is_builtin(a)){ data_type *adt = aggr_tbl->get_data_type(a); if(adt->is_buffer_type()){ sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].aggr_var%d));\n",adt->get_buffer_destroy().c_str(),idx.c_str(),a); ret += tmpstr; } }else{ ret += "\t\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_DESTROY_("; if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&"; ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n"; } } ret += "\t\tt->n_aggrs--;\n"; return(ret); } string generate_gb_match_test(string idx){ int g; string ret="\tif (IS_FILLED(t->aggr_table_bitmap, "+idx+") && IS_NEW(t->aggr_table_bitmap,"+idx+")"; if(gb_tbl->size()>0){ ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n"; ret+="\t\t"; // Next, scan list for a match on the group-by attributes. string rhs_op, lhs_op; for(g=0;gsize();g++){ ret += " && "; ret += "("; sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr; sprintf(tmpstr,"t->aggr_table[%s].gb_var%d",idx.c_str(),g); rhs_op = tmpstr; ret += generate_equality_test(lhs_op, rhs_op, gb_tbl->get_data_type(g) ); ret += ")"; } } ret += "){\n"; return ret; } string generate_gb_update(string node_name, table_list *schema, string idx, bool has_udaf){ int g; string ret; ret += "/*\t\tMatch found : update in place.\t*/\n"; int a; has_udaf = false; for(a=0;asize();a++){ sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a); ret += generate_aggr_update(tmpstr,aggr_tbl,a, schema); if(! aggr_tbl->is_builtin(a)) has_udaf = true; } // garbage collect copied buffer type gb attrs. for(g=0;gsize();g++){ data_type *gdt = gb_tbl->get_data_type(g); if(gdt->is_buffer_type()){ sprintf(tmpstr,"\t\t\t%s(&(gb_attr_%d));\n",gdt->get_buffer_destroy().c_str(),g); ret+=tmpstr; } } bool first_udaf = true; if(has_udaf){ ret += "\t\tif("; for(a=0;asize();a++){ if(! aggr_tbl->is_builtin(a)){ if(! first_udaf)ret += " || "; else first_udaf = false; ret += aggr_tbl->get_op(a)+"_LFTA_AGGR_FLUSHME_("; if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&"; ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"))"; } } ret+="){\n"; ret+=" fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n"; ret += generate_tuple_from_aggr(node_name,schema,idx); ret += "\t\tt->aggr_table_hashmap["+idx+"] &= ~SLOT_FILLED;\n"; ret+="\t\t}\n"; } return ret; } string generate_init_group( table_list *schema, string idx){ int g,a; string ret="\t\t\tt->aggr_table_hashmap["+idx+"] = hash2 | SLOT_FILLED | gen_val;\n"; // Fill up the aggregate block. for(g=0;gsize();g++){ sprintf(tmpstr,"\t\t\tt->aggr_table[%s].gb_var%d = gb_attr_%d;\n",idx.c_str(),g,g); ret += tmpstr; } for(a=0;asize();a++){ sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a); ret += generate_aggr_init(tmpstr, aggr_tbl,a, schema); } ret+="\t\tt->n_aggrs++;\n"; return ret; } string generate_fta_flush(string node_name, table_list *schema, ext_fcn_list *Ext_fcns){ string ret; string select_var_defs ; int s, p; // Flush from previous epoch ret+="static void fta_aggr_flush_old_"+node_name+"(struct FTA *f, unsigned int nflush){\n"; ret += "\tgs_int32_t tuple_size, tuple_pos;\n"; ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n"; ret += "\tint i, lfta_bailout;\n"; ret += "\tunsigned int gen_val;\n"; ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct "; ret += generate_fta_name(node_name)+" *) f;\n"; ret += "\n"; // Variables needed to store selected attributes of BUFFER type // temporarily, in order to compute their size for storage // in an output tuple. select_var_defs = ""; for(s=0;sget_data_type(); if(sdt->is_buffer_type()){ sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s); select_var_defs.append(tmpstr); } } if(select_var_defs != ""){ ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n"; ret += select_var_defs; } // Variables to store results of partial functions. if(sl_fcns_start != sl_fcns_end){ ret += "/*\t\tVariables to store the results of partial functions.\t*/\n"; for(p=sl_fcns_start;pget_data_type()->get_cvar_type().c_str(), p); ret += tmpstr; } ret += "\tgs_retval_t retval = 0;\n\tint unpack_failed = 0;\n;"; } // Variables for udaf output temporaries bool no_udaf = true; int a; for(a=0;asize();a++){ if(! aggr_tbl->is_builtin(a)){ if(no_udaf){ ret+="/*\t\tUDAF output vars.\t*/\n"; no_udaf = false; } int afcn_id = aggr_tbl->get_fcn_id(a); data_type *adt = Ext_fcns->get_fcn_dt(afcn_id); sprintf(tmpstr,"udaf_ret%d", a); ret+="\t"+adt->make_cvar(tmpstr)+";\n"; } } // ret+="\tt->flush_finished=1; /* flush will be completed */\n"; ret+="\n"; ret+="\tgen_val = t->generation & SLOT_GEN_BITS;\n"; ret+="\tfor (i=t->flush_pos; (i < t->max_aggrs) && t->n_aggrs && nflush>0; ++i){\n"; ret+="\t\tif ( (t->aggr_table_hashmap[i] & SLOT_FILLED) && (((t->aggr_table_hashmap[i] & SLOT_GEN_BITS) != gen_val ) || ("; bool first_g=true; int g; for(g=0;gsize();g++){ data_type *gdt = gb_tbl->get_data_type(g); if(gdt->is_temporal()){ if(first_g) first_g=false; else ret+=" || "; ret += "t->last_gb_"+int_to_string(g)+" > t->aggr_table[i].gb_var"+int_to_string(g)+" "; } } ret += "))) {\n"; ret+="\t\t\tt->aggr_table_hashmap[i] = 0;\n"; ret+= "#ifdef LFTA_STATS\n" "\t\t\tt->eviction_cnt++;\n" "#endif\n" ; ret+=generate_tuple_from_aggr(node_name,schema,"i"); // ret+="\t\t\tt->n_aggrs--;\n"; // done in generate_tuple_from_aggr ret+="\t\t\tnflush--;\n"; ret+="\t\t}\n"; ret+="\t}\n"; ret+="\tt->flush_pos=i;\n"; ret+="\tif(t->n_aggrs == 0) {\n"; ret+="\t\tt->flush_pos = t->max_aggrs;\n"; ret += "\t}\n\n"; ret+="\tif(t->flush_pos == t->max_aggrs) {\n"; for(int g=0;gsize();g++){ data_type *dt = gb_tbl->get_data_type(g); if(dt->is_temporal()){ data_type *gdt = gb_tbl->get_data_type(g); if(!gdt->is_buffer_type()){ sprintf(tmpstr,"\t\tt->last_flushed_gb_%d = t->flush_start_gb_%d;\n",g,g); ret += tmpstr; } } } ret += "\t}\n}\n\n"; return(ret); } // TODO Remove sprintf to perform string catenation string generate_fta_load_params(string node_name){ int p; vector param_names = param_tbl->get_param_names(); string ret = "static int load_params_"+node_name+"(struct "+generate_fta_name(node_name); ret += " *t, int sz, void *value, int initial_call){\n"; ret += "\tint pos=0;\n"; ret += "\tint data_pos;\n"; for(p=0;pget_data_type(param_names[p]); if(dt->is_buffer_type()){ sprintf(tmpstr,"\t%s tmp_var_%s;\n",dt->get_cvar_type().c_str(), param_names[p].c_str() ); ret += tmpstr; sprintf(tmpstr,"\t%s access_var_%s;\n",dt->get_tuple_cvar_type().c_str(), param_names[p].c_str() ); ret += tmpstr; } } ret += "\n\tdata_pos = "; for(p=0;p0) ret += " + "; data_type *dt = param_tbl->get_data_type(param_names[p]); ret += "sizeof( "; ret += dt->get_tuple_cvar_type(); ret += " )"; } ret += ";\n"; ret += "\tif(data_pos > sz) return 1;\n\n"; for(p=0;pget_data_type(param_names[p]); if(dt->is_buffer_type()){ sprintf(tmpstr,"\taccess_var_%s = *( (%s *)((char *)value+pos) );\n",param_names[p].c_str(), dt->get_tuple_cvar_type().c_str() ); ret += tmpstr; switch( dt->get_type() ){ case v_str_t: // ret += "\ttmp_var_"+param_names[p]+".data = ntohl( tmp_var_"+param_names[p]+".data );\n"; // ntoh conversion // ret += "\ttmp_var_"+param_names[p]+".length = ntohl( tmp_var_"+param_names[p]+".length );\n"; // ntoh conversion sprintf(tmpstr,"\tif( (access_var_%s.offset) + access_var_%s.length > sz) return 1;\n",param_names[p].c_str(), param_names[p].c_str() ); ret += tmpstr; sprintf(tmpstr,"\ttmp_var_%s.data = (gs_sp_t)(value) + access_var_%s.offset ;\n",param_names[p].c_str(), param_names[p].c_str() ); ret += tmpstr; sprintf(tmpstr,"\ttmp_var_%s.length = access_var_%s.length ;\n",param_names[p].c_str(), param_names[p].c_str() ); ret += tmpstr; break; default: fprintf(stderr,"ERROR: parameter %s is of type %s, a buffered type, but I don't know how to unpack it as a parameter.\n",param_names[p].c_str(), dt->get_type_str().c_str() ); exit(1); break; } // First, destroy the old ret += "\tif(! initial_call)\n"; sprintf(tmpstr,"\t\t%s(&(t->param_%s));\n",dt->get_buffer_destroy().c_str(),param_names[p].c_str()); ret += tmpstr; // Next, create the new. sprintf(tmpstr,"\t%s((struct FTA *)t, &(t->param_%s), &tmp_var_%s);\n", dt->get_buffer_assign_copy().c_str(), param_names[p].c_str(), param_names[p].c_str() ); ret += tmpstr; }else{ // if(dt->needs_hn_translation()){ // sprintf(tmpstr,"\tt->param_%s = %s( *( (%s *)( (char *)value+pos) ) );\n", // param_names[p].c_str(), dt->ntoh_translation().c_str(), dt->get_cvar_type().c_str() ); // }else{ sprintf(tmpstr,"\tt->param_%s = *( (%s *)( (char *)value+pos) );\n", param_names[p].c_str(), dt->get_cvar_type().c_str() ); // } ret += tmpstr; } sprintf(tmpstr,"\tpos += sizeof( %s );\n",dt->get_cvar_type().c_str() ); ret += tmpstr; } // Register the pass-by-handle parameters ret += "/* register and de-register the pass-by-handle parameters */\n"; int ph; for(ph=0;phtype_name); switch(param_handle_table[ph]->val_type){ case cplx_lit_e: break; case litval_e: break; case param_e: ret += "\tif(! initial_call)\n"; sprintf(tmpstr, "\t\t%s(t->handle_param_%d);\n", param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph); ret += tmpstr; sprintf(tmpstr,"\tt->handle_param_%d = %s((struct FTA *)t,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str()); ret += tmpstr; if(pdt.is_buffer_type()) ret += "&("; ret += "t->param_"+param_handle_table[ph]->param_name; if(pdt.is_buffer_type()) ret += ")"; ret += ");\n"; break; default: sprintf(tmpstr, "INTERNAL ERROR unknown case (%d) found when processing pass-by-handle parameter table.",param_handle_table[ph]->val_type); fprintf(stderr,"%s\n",tmpstr); ret+=tmpstr; } } ret+="\treturn 0;\n"; ret += "}\n\n"; return(ret); } string generate_fta_free(string node_name, bool is_aggr_query){ string ret="static gs_retval_t free_fta_"+node_name+"(struct FTA *f, gs_uint32_t recursive){\n"; ret+= "\tstruct "+generate_fta_name(node_name)+ " * t = (struct "+generate_fta_name(node_name)+" *) f;\n"; ret += "\tint i;\n"; if(is_aggr_query){ ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n"; ret+="\t/* \t\tmark all groups as old */\n"; ret+="\tt->generation++;\n"; ret+="\tt->flush_pos = 0;\n"; ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n"; } // Deregister the pass-by-handle parameters ret += "/* de-register the pass-by-handle parameters */\n"; int ph; for(ph=0;phhandle_param_%d);\n", param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph); ret += tmpstr; } ret += "\treturn 0;\n}\n\n"; return(ret); } string generate_fta_control(string node_name, table_list *schema, bool is_aggr_query){ string ret="static gs_retval_t control_fta_"+node_name+"(struct FTA *f, gs_int32_t command, gs_int32_t sz, void *value){\n"; ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct "; ret += generate_fta_name(node_name)+" *) f;\n\n"; ret+="\tint i;\n"; ret += "\t/* temp status tuple */\n"; ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n"; ret += "\tgs_int32_t tuple_size;\n"; if(is_aggr_query){ ret+="\tif(command == FTA_COMMAND_FLUSH){\n"; ret+="\t\tif (!t->n_aggrs) {\n"; ret+="\t\t\ttuple = allocate_tuple(f, 0);\n"; ret+="\t\t\tif( tuple != NULL)\n"; ret+="\t\t\t\tpost_tuple(tuple);\n"; ret+="\t\t}else{\n"; ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n"; ret+="\t\t\t/* \t\tmark all groups as old */\n"; ret +="\t\tt->generation++;\n"; ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n"; ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n"; ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n"; ret+="\t\t\tt->flush_pos = 0;\n"; ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n"; ret+="\t\t}\n"; ret+="\t}\n"; } if(param_tbl->size() > 0){ ret+= "\tif(command == FTA_COMMAND_LOAD_PARAMS){\n" "\t\tif(load_params_"+node_name+"(t, sz, value, 0))\n" "#ifndef LFTA_IN_NIC\n" "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small, ignored.\\n\");\n" "#else\n" "\t\t{}\n" "#endif\n" "\t}\n"; } ret+= "\tif(command == FTA_COMMAND_SET_SAMPLING_RATE){\n" "\t\tmemcpy(&t->sampling_rate, value, sizeof(gs_float_t));\n" "\t}\n\n"; ret += "\tif(command == FTA_COMMAND_FILE_DONE ){\n"; if(is_aggr_query){ ret+="\t\tif (t->n_aggrs) {\n"; ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n"; ret+="\t\t\t/* \t\tmark all groups as old */\n"; ret +="\t\tt->generation++;\n"; ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n"; ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n"; ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n"; ret+="\t\t\tt->flush_pos = 0;\n"; ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n"; ret+="\t\t}\n"; } ret += "\t\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+");\n"; ret += "\t\ttuple = allocate_tuple(f, tuple_size );\n"; ret += "\t\tif( tuple == NULL)\n\t\treturn 1;\n"; /* mark tuple as EOF_TUPLE */ ret += "\n\t\t/* Mark tuple as eof_tuple */\n"; ret += "\t\ttuple->tuple_type = EOF_TUPLE;\n"; ret += "\t\tpost_tuple(tuple);\n"; ret += "\t}\n"; ret += "\treturn 0;\n}\n\n"; return(ret); } string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query, bool advance_uxtime){ string ret="static gs_retval_t clock_fta_"+node_name+"(struct FTA *f){\n"; ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct "; ret += generate_fta_name(node_name)+" *) f;\n\n"; ret += "\t/* Create a temp status tuple */\n"; ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n"; ret += "\tgs_int32_t tuple_size;\n"; ret += "\tunsigned int i;\n"; ret += "\ttime_t cur_time;\n"; ret += "\tint time_advanced;\n"; ret += "\tstruct fta_stat stats;\n"; /* copy the last seen values of temporal attributes */ col_id_set temp_cids; // col ids of temp attributes in select clause /* HACK: in order to reuse the SE generation code, we need to copy * the last values of the temp attributes into new variables * which have names unpack_var_XXX_XXX */ int s, g; col_id_set::iterator csi; for(s=0;sget_data_type(); if (sdt->is_temporal()) { gather_se_col_ids(sl_list[s],temp_cids, gb_tbl); } } for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){ int tblref = (*csi).tblvar_ref; int schref = (*csi).schema_ref; string field = (*csi).field; data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field)); sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref); ret += tmpstr; } if (is_aggr_query) { for(g=0;gsize();g++){ data_type *gdt = gb_tbl->get_data_type(g); if(gdt->is_temporal()){ sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g); ret += tmpstr; data_type *gdt = gb_tbl->get_data_type(g); if(gdt->is_buffer_type()){ sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g); ret += tmpstr; } } } } ret += "\n"; ret += "\ttime_advanced = 0;\n"; for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){ int tblref = (*csi).tblvar_ref; int schref = (*csi).schema_ref; string field = (*csi).field; data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field)); // update last seen value with the value seen ret += "\t#ifdef PREFILTER_DEFINED\n"; sprintf(tmpstr,"\tif (prefilter_temp_vars.unpack_var_%s_%d > t->last_%s_%d) {\n\t\tt->last_%s_%d = prefilter_temp_vars.unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref); ret += tmpstr; ret += "\t\ttime_advanced = 1;\n\t}\n"; ret += "\t#endif\n"; // we need to pay special attention to time fields if (field == "time" || field == "timestamp" || field == "timestamp_ms"){ ret += "\tcur_time = time(&cur_time);\n"; if (field == "time") { sprintf(tmpstr,"\tif (!gscp_blocking_mode() && (t->last_time_%d < (cur_time - %d))) {\n", tblref, time_corr); ret += tmpstr; sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = cur_time - %d;\n", field.c_str(), tblref, field.c_str(), tblref, time_corr); } else if (field == "timestamp_ms") { sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((t->last_timestamp_ms_%d/1000) < (cur_time - %d))) {\n", tblref, time_corr); ret += tmpstr; sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = (cur_time - %d) * 1000;\n", field.c_str(), tblref, field.c_str(), tblref, time_corr); }else{ sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((gs_uint32_t)(t->last_%s_%d>>32) < (cur_time - %d))) {\n", field.c_str(), tblref, time_corr); ret += tmpstr; sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = ((gs_uint64_t)(cur_time - %d))<<32;\n", field.c_str(), tblref, field.c_str(), tblref, time_corr); } ret += tmpstr; ret += "\t\ttime_advanced = 1;\n"; ret += "\t}\n"; sprintf(tmpstr,"\telse\n\t\tunpack_var_%s_%d = t->last_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref); ret += tmpstr; } else { sprintf(tmpstr,"\tunpack_var_%s_%d = t->last_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref); ret += tmpstr; } } if(advance_uxtime){ ret += "\tt->ux_time = time(&(t->ux_time));\n"; } // for aggregation lftas we need to check if the time was advanced beyond the current epoch if (is_aggr_query) { string change_test; bool first_one = true; for(g=0;gsize();g++){ data_type *gdt = gb_tbl->get_data_type(g); if(gdt->is_temporal()){ // To perform the test, first need to compute the value // of the temporal gb attrs. if(gdt->is_buffer_type()){ // NOTE : if the SE defining the gb is anything // other than a ref to a variable, this will generate // illegal code. To be resolved with Spatch. sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n", g, generate_se_code(gb_tbl->get_def(g),schema).c_str() ); ret+=tmpstr; sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n", gdt->get_buffer_assign_copy().c_str(), g, g); }else{ sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str()); } ret += tmpstr; sprintf(tmpstr,"t->last_gb_%d",g); string lhs_op = tmpstr; sprintf(tmpstr,"gb_attr_%d",g); string rhs_op = tmpstr; if(first_one){first_one = false;} else {change_test.append(") && (");} change_test.append(generate_equality_test(lhs_op, rhs_op, gdt)); } } ret += "\n\tif( time_advanced && !( ("; ret += change_test; ret += ") ) ){\n"; ret += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n"; ret += "\t\tif(t->flush_posmax_aggrs) \n"; ret += "\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n"; ret += "\t\t/* \t\tmark all groups as old */\n"; ret +="\t\tt->generation++;\n"; ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n"; ret += "//\t\tfor (i = 0; i < t->bitmap_size; ++i)\n"; ret += "//\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n"; ret += "\t\tt->flush_pos = 0;\n"; for(g=0;gsize();g++){ data_type *gdt = gb_tbl->get_data_type(g); if(gdt->is_temporal()){ sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g); ret += tmpstr; sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g); ret += tmpstr; } } ret += "\t}\n\n"; } ret += "\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t) + sizeof(struct fta_stat);\n"; ret += "\ttuple = allocate_tuple(f, tuple_size );\n"; ret += "\tif( tuple == NULL)\n\t\treturn 1;\n"; for(s=0;sget_data_type(); if(sdt->is_temporal()){ if (sl_list[s]->is_gb()) { sprintf(tmpstr,"\tt->last_flushed_gb_%d = (t->n_aggrs) ? t->last_flushed_gb_%d : %s;\n",sl_list[s]->get_gb_ref(), sl_list[s]->get_gb_ref(), generate_se_code(sl_list[s],schema).c_str()); ret += tmpstr; } sprintf(tmpstr,"\ttuple->tuple_var%d = ",s); ret += tmpstr; // if(sdt->needs_hn_translation()) // ret += sdt->hton_translation() +"( "; if (sl_list[s]->is_gb()) { sprintf(tmpstr, "t->last_flushed_gb_%d",sl_list[s]->get_gb_ref()); ret += tmpstr; } else{ ret += generate_se_code(sl_list[s],schema); } // if(sdt->needs_hn_translation()) // ret += " )"; ret += ";\n"; } } /* mark tuple as temporal */ ret += "\n\t/* Mark tuple as temporal */\n"; ret += "\ttuple->tuple_type = TEMPORAL_TUPLE;\n"; ret += "\n\t/* Copy trace id */\n"; ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+"), &t->trace_id, sizeof(gs_uint64_t));\n\n"; ret += "\n\t/* Populate runtime stats */\n"; ret += "\tstats.ftaid = f->ftaid;\n"; ret += "\tstats.in_tuple_cnt = t->in_tuple_cnt;\n"; ret += "\tstats.out_tuple_cnt = t->out_tuple_cnt;\n"; ret += "\tstats.out_tuple_sz = t->out_tuple_sz;\n"; ret += "\tstats.accepted_tuple_cnt = t->accepted_tuple_cnt;\n"; ret += "\tstats.cycle_cnt = t->cycle_cnt;\n"; ret += "\tstats.collision_cnt = t->collision_cnt;\n"; ret += "\tstats.eviction_cnt = t->eviction_cnt;\n"; ret += "\tstats.sampling_rate = t->sampling_rate;\n"; ret += "\n#ifdef LFTA_PROFILE\n"; ret += "\n\t/* Print stats */\n"; ret += "\tfprintf(stderr, \"STATS " + node_name + " \");\n"; ret += "\tfprintf(stderr, \"in_tuple_cnt= %u \", t->in_tuple_cnt);\n"; ret += "\tfprintf(stderr, \"out_tuple_cnt= %u \", t->out_tuple_cnt);\n"; ret += "\tfprintf(stderr, \"out_tuple_sz= %u \", t->out_tuple_sz);\n"; ret += "\tfprintf(stderr, \"accepted_tuple_cnt= %u \", t->accepted_tuple_cnt);\n"; ret += "\tfprintf(stderr, \"cycle_cnt= %llu \", t->cycle_cnt);\n"; ret += "\tfprintf(stderr, \"cycle_per_in_tuple= %lf \", ((double)t->cycle_cnt)/((double)t->in_tuple_cnt));\n"; ret += "\tfprintf(stderr, \"collision_cnt= %d\\n\", t->collision_cnt);\n\n"; ret += "\tfprintf(stderr, \"eviction_cnt= %d\\n\", t->eviction_cnt);\n\n"; ret += "\n#endif\n"; ret += "\n\t/* Copy stats */\n"; ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t), &stats, sizeof(fta_stat));\n\n"; ret+="\tpost_tuple(tuple);\n"; ret += "\n\t/* Send a heartbeat message to clearinghouse */\n"; ret += "\tfta_heartbeat(f->ftaid, t->trace_id++, 1, &stats);\n"; ret += "\n\t/* Reset runtime stats */\n"; ret += "\tt->in_tuple_cnt = 0;\n"; ret += "\tt->out_tuple_cnt = 0;\n"; ret += "\tt->out_tuple_sz = 0;\n"; ret += "\tt->accepted_tuple_cnt = 0;\n"; ret += "\tt->cycle_cnt = 0;\n"; ret += "\tt->collision_cnt = 0;\n"; ret += "\tt->eviction_cnt = 0;\n"; ret += "\treturn 0;\n}\n\n"; return(ret); } // accept processing before the where clause, // do flush processwing. string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *schema, col_id_set &unpacked_cids, string &temporal_flush){ int s; // Slow flush string ret="\n/*\tslow flush\t*/\n"; string slow_flush_str = fs->get_val_of_def("slow_flush"); int n_slow_flush = atoi(slow_flush_str.c_str()); if(n_slow_flush <= 0) n_slow_flush = 2; if(n_slow_flush > 1){ ret += "\tt->flush_ctr++;\n"; ret += "\tif(t->flush_ctr >= "+int_to_string(n_slow_flush)+"){\n"; ret += "\t\tt->flush_ctr = 0;\n"; ret+="\t\tif(t->flush_posmax_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n"; ret += "\t}\n\n"; }else{ ret+="\tif(t->flush_posmax_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n\n"; } string change_test; bool first_one = true; int g; col_id_set flush_cids; // col ids accessed when computing flush variables. // unpack them at temporal flush test time. temporal_flush = ""; for(g=0;gsize();g++){ data_type *gdt = gb_tbl->get_data_type(g); if(gdt->is_temporal()){ gather_se_col_ids(gb_tbl->get_def(g), flush_cids, gb_tbl); // To perform the test, first need to compute the value // of the temporal gb attrs. if(gdt->is_buffer_type()){ // NOTE : if the SE defining the gb is anything // other than a ref to a variable, this will generate // illegal code. To be resolved with Spatch. sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n", g, generate_se_code(gb_tbl->get_def(g),schema).c_str() ); temporal_flush += tmpstr; sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n", gdt->get_buffer_assign_copy().c_str(), g, g); }else{ sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str()); } temporal_flush += tmpstr; // END computing the value of the temporal GB attr. sprintf(tmpstr,"t->last_gb_%d",g); string lhs_op = tmpstr; sprintf(tmpstr,"gb_attr_%d",g); string rhs_op = tmpstr; if(first_one){first_one = false;} else {change_test.append(") && (");} change_test += generate_equality_test(lhs_op, rhs_op, gdt); } } if(!first_one){ // will be false iff. there is a temporal GB attribute temporal_flush += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n"; temporal_flush += "\tif( !( ("; temporal_flush += change_test; temporal_flush += ") ) ){\n"; // temporal_flush+="\t\tif(t->flush_posmax_aggrs) fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n"; temporal_flush+="\t\tif(t->flush_posmax_aggrs){ \n"; temporal_flush+="\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n"; temporal_flush+="\t\t}\n"; temporal_flush+="\t\t/* \t\tmark all groups as old */\n"; temporal_flush+="\t\tt->generation++;\n"; temporal_flush+="\t\tt->flush_pos = 0;\n"; // Now set the saved temporal value of the gb to the // current value of the gb. Only for simple types, // not for buffer types -- but the strings are not // temporal in any case. for(g=0;gsize();g++){ data_type *gdt = gb_tbl->get_data_type(g); if(gdt->is_temporal()){ if(gdt->is_buffer_type()){ fprintf(stderr,"ERROR : can't handle temporal buffer types, ignoring in buffer flush control.\n"); }else{ sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g); temporal_flush += tmpstr; sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g); temporal_flush += tmpstr; } } } temporal_flush += "\t}\n\n"; } // Unpack all the temporal attributes referenced in select clause // and update the last value of the attribute col_id_set temp_cids; // col ids of temp attributes in select clause col_id_set::iterator csi; for(s=0;sget_data_type(); if (sdt->is_temporal()) { gather_se_col_ids(sl_list[s],temp_cids, gb_tbl); } } for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){ if(unpacked_cids.count((*csi)) == 0){ int tblref = (*csi).tblvar_ref; int schref = (*csi).schema_ref; string field = (*csi).field; ret += generate_unpack_code(tblref,schref,field,schema,node_name); /* data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field)); sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n", schema->get_fcn(schref,field).c_str(), field.c_str(), tblref); ret += tmpstr; ret += "\tif(retval) return 1;\n"; */ sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref); ret += tmpstr; unpacked_cids.insert( (*csi) ); } } // Do the flush here if this is a real_time query string rt_level = fs->get_val_of_def("real_time"); if(rt_level != "" && temporal_flush != ""){ for(csi=flush_cids.begin(); csi != flush_cids.end();++csi){ if(unpacked_cids.count((*csi)) == 0){ int tblref = (*csi).tblvar_ref; int schref = (*csi).schema_ref; string field = (*csi).field; ret += generate_unpack_code(tblref,schref,field,schema,node_name); /* sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n", schema->get_fcn(schref,field).c_str(), field.c_str(), tblref); ret += tmpstr; ret += "\tif(retval) return 1;\n"; */ unpacked_cids.insert( (*csi) ); } } ret += temporal_flush; } return ret; } string generate_sel_accept_body(qp_node *fs, string node_name, table_list *schema){ int p,s; string ret; /////////////// Processing for filter-only query // test passed : create the tuple, then assign to it. ret += "/*\t\tCreate and post the tuple\t*/\n"; // Unpack partial fcns ref'd by the select clause. // Its a kind of a WHERE clause ... for(p=sl_fcns_start;p 1){ ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n"; } if(is_partial_fcn[p]){ ret += unpack_partial_fcn(partial_fcns[p], p, schema); ret += "\tif(retval) goto end;\n"; } if(fcn_ref_cnt[p] > 1){ if(!is_partial_fcn[p]){ ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n"; } ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n"; ret += "\t}\n"; } } // increment the counter of accepted tuples ret += "\n\t#ifdef LFTA_STATS\n"; ret += "\n\tt->accepted_tuple_cnt++;\n\n"; ret += "\t#endif\n\n"; // First, compute the size of the tuple. // Unpack any BUFFER type selections into temporaries // so that I can compute their size and not have // to recompute their value during tuple packing. // I can use regular assignment here because // these temporaries are non-persistent. for(s=0;sget_data_type(); if(sdt->is_buffer_type()){ sprintf(tmpstr,"\tselvar_%d = ",s); ret += tmpstr; ret += generate_se_code(sl_list[s],schema); ret += ";\n"; } } // The size of the tuple is the size of the tuple struct plus the // size of the buffers to be copied in. ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")"; for(s=0;sget_data_type(); if(sdt->is_buffer_type()){ sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s); ret += tmpstr; } } ret += ";\n"; ret += "\ttuple = allocate_tuple(f, tuple_size );\n"; ret += "\tif( tuple == NULL)\n\t\tgoto end;\n"; // Test passed, make assignments to the tuple. ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n"; // Mark tuple as REGULAR_TUPLE ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n"; for(s=0;sget_data_type(); if(sdt->is_buffer_type()){ sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s); ret += tmpstr; sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s); ret += tmpstr; }else{ sprintf(tmpstr,"\ttuple->tuple_var%d = ",s); ret += tmpstr; // if(sdt->needs_hn_translation()) // ret += sdt->hton_translation() +"( "; ret += generate_se_code(sl_list[s],schema); // if(sdt->needs_hn_translation()) // ret += ") "; ret += ";\n"; } } // Generate output. ret += "\tpost_tuple(tuple);\n"; // Increment the counter of posted tuples ret += "\n\t#ifdef LFTA_STATS\n"; ret += "\tt->out_tuple_cnt++;\n"; ret+="\tt->out_tuple_sz+=tuple_size;\n"; ret += "\t#endif\n\n"; return ret; } // TODO Ensure that postfilter predicates are being generated string generate_fj_accept_body(filter_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){ int p,s,w; string ret; // Get parameters unsigned int window_len = fs->temporal_range; unsigned int n_bloom = 11; string n_bloom_str = fs->get_val_of_def("num_bloom"); int tmp_n_bloom = atoi(n_bloom_str.c_str()); if(tmp_n_bloom>0) n_bloom = tmp_n_bloom+1; float bloom_width = (window_len+1.0)/(1.0*n_bloom-1); sprintf(tmpstr,"%f",bloom_width); string bloom_width_str = tmpstr; if(window_len < n_bloom){ n_bloom = window_len+1; bloom_width_str = "1"; } // Grab the current window time scalarexp_t winvar(fs->temporal_var); ret += "\tcurr_fj_ts = "+generate_se_code(&winvar,schema)+";\n"; int bf_exp_size = 12; // base-2 log of number of bits string bloom_len_str = fs->get_val_of_def("bloom_size"); int tmp_bf_exp_size = atoi(bloom_len_str.c_str()); if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){ bf_exp_size = tmp_bf_exp_size; } int bf_bit_size = 1 << bf_exp_size; int bf_byte_size = bf_bit_size / (8*sizeof(char)); unsigned int ht_size = 4096; string ht_size_s = fs->get_val_of_def("aggregate_slots"); int tmp_ht_size = atoi(ht_size_s.c_str()); if(tmp_ht_size > 1024){ unsigned int hs = 1; // make it power of 2 while(tmp_ht_size){ hs =hs << 1; tmp_ht_size = tmp_ht_size >> 1; } ht_size = hs; } int i, bf_mask = 0; if(fs->use_bloom){ for(i=0;i1;i=i>>1) bf_mask = (bf_mask << 1) | 1; } /* printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=%d, bf_byte_size=%d, ht_size=%d, ht_size_s=%s, bf_mask=%d\n", n_bloom, window_len, bloom_width_str.c_str(), bf_exp_size, bf_bit_size, bf_byte_size, ht_size, ht_size_s.c_str(), bf_mask); */ // If this is a bloom-filter fj, first test if the // bloom filter needs to be advanced. // SET_BF_EMPTY(table,number of bloom filters,bloom filter index,bit index) // t->bf_size : number of bits in bloom filter // TODO: vectorize? // TODO: Don't iterate more than n_bloom times! // As written, its possible to wrap around many times. if(fs->use_bloom){ ret += "// Clean out old bloom filters if needed.\n" "// TODO vectorize this ? \n" " if(t->first_exec){\n" " t->first_exec = 0;\n" " t->last_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n" " t->last_bloom_pos = t->last_bin % "+int_to_string(n_bloom)+";\n" " }else{\n" " curr_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n" " if(curr_bin != t->last_bin){\n" " for(the_bin=t->last_bin+1;the_bin<=curr_bin;the_bin++){\n" " t->last_bloom_pos++;\n" " if(t->last_bloom_pos >= "+int_to_string(n_bloom)+")\n" " t->last_bloom_pos = 0;\n" " tmp_i = t->last_bloom_pos;\n" " for(j=0;j<"+int_to_string(bf_bit_size)+";j++){\n" " SET_BF_EMPTY(t->bf_table, "+int_to_string(n_bloom)+", tmp_i,j);\n" " }\n" " }\n" " }\n" " t->last_bin = curr_bin;\n" " }\n" ; } //----------------------------------------------------------------- // First, determine whether to do S (filter stream) processing. ret += "// S (filtering stream) predicate, should it be processed?\n" "\n" ; // Sort S preds based on cost. vector s_filt = fs->pred_t1; col_id_set::iterator csi; if(s_filt.size() > 0){ // Unpack fields ref'd in the S pred for(w=0;wpr, this_pred_cids, gb_tbl); for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){ if(unpacked_cids.count( (*csi) ) == 0){ int tblref = (*csi).tblvar_ref; int schref = (*csi).schema_ref; string field = (*csi).field; ret += generate_unpack_code(tblref,schref,field,schema,node_name,"end_s"); unpacked_cids.insert( (*csi) ); } } } // Sort by evaluation cost. // First, estimate evaluation costs // Eliminate predicates covered by the prefilter (those in s_pids). // I need to do it before the sort becuase the indices refer // to the position in the unsorted list. vector tmp_wh; for(w=0;wcost); ret += tmpstr; // Find partial fcns ref'd in this cnf element set pfcn_refs; collect_partial_fcns_pr(s_filt[w]->pr, pfcn_refs); // Since set<..> is a "Sorted Associative Container", // we can walk through it in sorted order by walking from // begin() to end(). (and the partial fcns must be // evaluated in this order). set::iterator si; string pf_preds; for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){ if(fcn_ref_cnt[(*si)] > 1){ ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n"; } if(is_partial_fcn[(*si)]){ ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema); ret += "\t\tif(retval) goto end_s;\n"; } if(fcn_ref_cnt[(*si)] > 1){ if(!is_partial_fcn[(*si)]){ ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n"; // Testing for S is a side branch. // I don't want a cacheable partial function to be // marked as evaluated. Therefore I mark the function // as evalauted ONLY IF it is not partial. ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n"; } ret += "\t}\n"; } } ret += "\tif( !("+generate_predicate_code(s_filt[w]->pr,schema)+ ") ) goto end_s;\n"; } }else{ ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n"; } for(p=0;phash_eq.size();++p) ret += "\t\ts_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_right_se(),schema)+";\n"; if(fs->use_bloom){ // First, generate the S scalar expressions in the hash_eq // Iterate over the bloom filters for(i=0;i<3;i++){ ret += "\t\tbucket=0;\n"; for(p=0;phash_eq.size();++p){ ret += " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+ fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+ +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n"; } // SET_BF_BIT(table,number of bloom filters,bloom filter index,bit index) ret += " bucket &= "+int_to_string(bf_mask)+";\n" " SET_BF_BIT(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket);\n" "\n" ; } }else{ ret += "// Add the S record to the hash table, choose a position\n"; ret += "\t\tbucket=0;\n"; for(p=0;phash_eq.size();++p){ ret += " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+ fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+ +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n"; } ret += " bucket &= "+int_to_string(bf_mask)+";\n" " bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n" ; // Try the first bucket ret += "\t\tif("; for(p=0;phash_eq.size();++p){ if(p>0) ret += " && "; // ret += "t->join_table[bucket].key_var"+int_to_string(p)+ // " == s_equijoin_"+int_to_string(p); data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type(); string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p); string rhs_op = "s_equijoin_"+int_to_string(p); ret += generate_equality_test(lhs_op,rhs_op,hdt); } ret += "){\n\t\t\tthe_bucket = bucket;\n"; ret += "\t\t}else{\n\t\t\tif("; for(p=0;phash_eq.size();++p){ if(p>0) ret += " && "; // ret += "t->join_table[bucket1].key_var"+int_to_string(p)+ // " == s_equijoin_"+int_to_string(p); data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type(); string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p); string rhs_op = "s_equijoin_"+int_to_string(p); ret += generate_equality_test(lhs_op,rhs_op,hdt); } ret += "){\n\t\t\t\tthe_bucket = bucket1;\n"; ret += "\t\t\t}else{ \n\t\t\t\tif(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n"; ret+="\t\t\t\t\tthe_bucket = bucket;\n\t\t\t\telse\n\t\t\t\t\tthe_bucket=bucket1;\n"; ret += "\t\t\t}\n\t\t}\n"; for(p=0;phash_eq.size();++p){ data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type(); if(hdt->is_buffer_type()){ sprintf(tmpstr,"\t\t%s(f, &(t->join_table[the_bucket].key_var%d), &s_equijoin_%d);\n", hdt->get_buffer_assign_copy().c_str(), p, p); ret += tmpstr; }else{ ret += "\t\tt->join_table[the_bucket].key_var"+int_to_string(p)+ " = s_equijoin_"+int_to_string(p)+";\n"; } } ret+="\t\tt->join_table[the_bucket].ts = curr_fj_ts;\n"; } ret += "\tend_s:\n"; // ------------------------------------------------------------ // Next, determine if the R record should be processed. ret += "// R (main stream) cheap predicate\n" "\n" ; // Unpack r_filt fields vector r_filt = fs->pred_t0; for(w=0;wpr, this_pred_cids, gb_tbl); for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){ if(unpacked_cids.count( (*csi) ) == 0){ int tblref = (*csi).tblvar_ref; int schref = (*csi).schema_ref; string field = (*csi).field; ret += generate_unpack_code(tblref,schref,field,schema,node_name); unpacked_cids.insert( (*csi) ); } } } // Sort R preds based on cost. vector tmp_wh; for(w=0;wcost <= 20;cheap_rpos++); // Test the cheap filters on R. if(cheap_rpos >0){ // Now generate the predicates. for(w=0;wcost); ret += tmpstr; // Find partial fcns ref'd in this cnf element set pfcn_refs; collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs); // Since set<..> is a "Sorted Associative Container", // we can walk through it in sorted order by walking from // begin() to end(). (and the partial fcns must be // evaluated in this order). set::iterator si; for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){ if(fcn_ref_cnt[(*si)] > 1){ ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n"; } if(is_partial_fcn[(*si)]){ ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema); ret += "\t\tif(retval) goto end;\n"; } if(fcn_ref_cnt[(*si)] > 1){ if(!is_partial_fcn[(*si)]){ ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n"; } ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n"; ret += "\t}\n"; } } ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+ ") ) goto end;\n"; } }else{ ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n"; } ret += "\n// Do the join\n\n"; for(p=0;phash_eq.size();++p) ret += "\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n"; // Passed the cheap pred, now test the join with S. if(fs->use_bloom){ for(i=0;i<3;i++){ ret += "\t\tbucket"+int_to_string(i)+"=0;\n"; for(p=0;phash_eq.size();++p){ ret += " bucket"+int_to_string(i)+ " ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+ fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+ +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n"; } ret += " bucket"+int_to_string(i)+" &= "+int_to_string(bf_mask)+";\n"; } ret += "\tfound = 0;\n"; ret += "\tfor(b=0;b<"+int_to_string(n_bloom)+" && !found; b++){\n"; ret += "\t\tif(IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket0) && " "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket1) && " "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket2))\n " "\t\t\tfound=1;\n" "\t}\n" ; ret += " if(!found)\n" " goto end;\n" ; }else{ ret += "\tfound = 0;\n"; ret += "\t\tbucket=0;\n"; for(p=0;phash_eq.size();++p){ ret += " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+ fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+ +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n"; } ret += " bucket &= "+int_to_string(bf_mask)+";\n" " bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n" ; // Try the first bucket ret += "\t\tif("; for(p=0;phash_eq.size();++p){ if(p>0) ret += " && "; // ret += "t->join_table[bucket].key_var"+int_to_string(p)+ // " == r_equijoin_"+int_to_string(p); data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type(); string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p); string rhs_op = "s_equijoin_"+int_to_string(p); ret += generate_equality_test(lhs_op,rhs_op,hdt); } if(p>0) ret += " && "; ret += "t->join_table[bucket].ts+"+int_to_string(fs->temporal_range)+" <= curr_fj_ts"; ret += "){\n\t\t\tfound = 1;\n"; ret += "\t\t}else {if("; for(p=0;phash_eq.size();++p){ if(p>0) ret += " && "; // ret += "t->join_table[bucket1].key_var"+int_to_string(p)+ // " == r_equijoin_"+int_to_string(p); data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type(); string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p); string rhs_op = "s_equijoin_"+int_to_string(p); ret += generate_equality_test(lhs_op,rhs_op,hdt); } if(p>0) ret += " && "; ret += "t->join_table[bucket1].ts+"+int_to_string(fs->temporal_range)+" <= curr_fj_ts"; ret += ")\n\t\t\tfound=1;\n"; ret+="\t\t}\n"; ret += " if(!found)\n" " goto end;\n" ; } // Test the expensive filters on R. if(cheap_rpos < r_filt.size()){ // Now generate the predicates. for(w=cheap_rpos;wcost); ret += tmpstr; // Find partial fcns ref'd in this cnf element set pfcn_refs; collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs); // Since set<..> is a "Sorted Associative Container", // we can walk through it in sorted order by walking from // begin() to end(). (and the partial fcns must be // evaluated in this order). set::iterator si; for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){ if(fcn_ref_cnt[(*si)] > 1){ ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n"; } if(is_partial_fcn[(*si)]){ ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema); ret += "\t\tif(retval) goto end;\n"; } if(fcn_ref_cnt[(*si)] > 1){ if(!is_partial_fcn[(*si)]){ ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n"; } ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n"; ret += "\t}\n"; } } ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+ ") ) goto end;\n"; } }else{ ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n"; } /////////////// post the tuple // test passed : create the tuple, then assign to it. ret += "/*\t\tCreate and post the tuple\t*/\n"; // Unpack r_filt fields for(s=0;s 1){ ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n"; } if(is_partial_fcn[p]){ ret += unpack_partial_fcn(partial_fcns[p], p, schema); ret += "\tif(retval) goto end;\n"; } if(fcn_ref_cnt[p] > 1){ if(!is_partial_fcn[p]){ ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n"; } ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n"; ret += "\t}\n"; } } // increment the counter of accepted tuples ret += "\n\t#ifdef LFTA_STATS\n"; ret += "\n\tt->accepted_tuple_cnt++;\n\n"; ret += "\t#endif\n\n"; // First, compute the size of the tuple. // Unpack any BUFFER type selections into temporaries // so that I can compute their size and not have // to recompute their value during tuple packing. // I can use regular assignment here because // these temporaries are non-persistent. for(s=0;sget_data_type(); if(sdt->is_buffer_type()){ sprintf(tmpstr,"\tselvar_%d = ",s); ret += tmpstr; ret += generate_se_code(sl_list[s],schema); ret += ";\n"; } } // The size of the tuple is the size of the tuple struct plus the // size of the buffers to be copied in. ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")"; for(s=0;sget_data_type(); if(sdt->is_buffer_type()){ sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s); ret += tmpstr; } } ret += ";\n"; ret += "\ttuple = allocate_tuple(f, tuple_size );\n"; ret += "\tif( tuple == NULL)\n\t\tgoto end;\n"; // Test passed, make assignments to the tuple. ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n"; // Mark tuple as REGULAR_TUPLE ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n"; for(s=0;sget_data_type(); if(sdt->is_buffer_type()){ sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s); ret += tmpstr; sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s); ret += tmpstr; }else{ sprintf(tmpstr,"\ttuple->tuple_var%d = ",s); ret += tmpstr; // if(sdt->needs_hn_translation()) // ret += sdt->hton_translation() +"( "; ret += generate_se_code(sl_list[s],schema); // if(sdt->needs_hn_translation()) // ret += ") "; ret += ";\n"; } } // Generate output. ret += "\tpost_tuple(tuple);\n"; // Increment the counter of posted tuples ret += "\n\t#ifdef LFTA_STATS\n"; ret += "\n\tt->out_tuple_cnt++;\n\n"; ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n"; ret += "\t#endif\n\n"; return ret; } string generate_wj_accept_body(watch_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){ int p,s,w; string ret; string wl_schema = fs->from[1]->get_schema_name(); string wl_elem_str = generate_watchlist_element_name(wl_schema); string wl_node_str = generate_watchlist_struct_name(wl_schema); string tgt = generate_watchlist_name(wl_schema); ret += "//\n//\t\tGenerate test to update watchtable here\n//\n\n"; // ------------------------------------------------------------ // Determine if the R record should be processed. ret += "// R (main stream) cheap predicate\n" "\n" ; // Unpack r_filt fields vector r_filt = fs->pred_t0; for(w=0;wpr, this_pred_cids, gb_tbl); for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){ if(unpacked_cids.count( (*csi) ) == 0){ int tblref = (*csi).tblvar_ref; int schref = (*csi).schema_ref; string field = (*csi).field; ret += generate_unpack_code(tblref,schref,field,schema,node_name); unpacked_cids.insert( (*csi) ); } } } // Sort R preds based on cost. vector tmp_wh; for(w=0;wcost <= 20;cheap_rpos++); // Test the cheap filters on R. if(cheap_rpos >0){ // Now generate the predicates. for(w=0;wcost); ret += tmpstr; // Find partial fcns ref'd in this cnf element set pfcn_refs; collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs); // Since set<..> is a "Sorted Associative Container", // we can walk through it in sorted order by walking from // begin() to end(). (and the partial fcns must be // evaluated in this order). set::iterator si; for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){ if(fcn_ref_cnt[(*si)] > 1){ ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n"; } if(is_partial_fcn[(*si)]){ ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema); ret += "\t\tif(retval) goto end;\n"; } if(fcn_ref_cnt[(*si)] > 1){ if(!is_partial_fcn[(*si)]){ ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n"; } ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n"; ret += "\t}\n"; } } ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+ ") ) goto end;\n"; } }else{ ret += "\n\n/*\t\t (no cheap R predicate to test)\t*/\n\n"; } ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n"; map h_eq = ((watch_join_qpn *)fs)-> hash_eq; vector kflds = ((watch_join_qpn *)fs)->key_flds; for(w=0;wpr, this_pred_cids, gb_tbl); for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){ if(unpacked_cids.count( (*csi) ) == 0){ int tblref = (*csi).tblvar_ref; int schref = (*csi).schema_ref; string field = (*csi).field; if(tblref==0) // LHS from packet, don't unpack the RHS ret += generate_unpack_code(tblref,schref,field,schema,node_name); unpacked_cids.insert( (*csi) ); } } } ret += "\n// Do the join\n\n"; ret += "\n// (ensure that the watchtable is fresh)\n"; ret += "\tif(t->ux_time >= "+tgt+".next_refresh){\n"; ret += "\t\treload_watchlist__"+wl_schema+"();\n"; ret += "\t\t"+tgt+".next_refresh = t->ux_time+"+tgt+".refresh_interval;\n"; ret += "\t}\n\n"; for(p=0;pkey_flds.size();++p){ string kfld = fs->key_flds[p]; ret += "\tr_equijoin_"+kfld+" = "+generate_se_code(fs->hash_eq[kfld]->pr->get_left_se(),schema)+";\n"; } // Passed the cheap pred, now test the join with S. ret += "\tbucket=0;\n"; ret += "\thash=0;\n"; for(p=0;pkey_flds.size();++p){ string kfld = fs->key_flds[p]; ret += " hash ^= (("+hash_nums[p%NRANDS]+" * lfta_"+ fs->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_type_str()+ +"_to_hash(r_equijoin_"+kfld+")));\n"; } ret += "\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n"; ret += "\t\trec = "+tgt+".ht[bucket];\n"; ret += "\t\twhile(rec!=NULL){\n"; ret += "\t\t\tif(hash==rec->hashval){\n"; ret += "\t\t\t\tif("; for(p=0;pkey_flds.size();++p){ string kfld = fs->key_flds[p]; if(p>0) ret += " && "; data_type *hdt = fs->hash_eq[kfld]->pr->get_right_se()->get_data_type(); string lhs_op = "r_equijoin_"+kfld; string rhs_op = "rec->"+kfld; ret += generate_equality_test(lhs_op,rhs_op,hdt); } ret += ")\n"; ret += "\t\t\t\t\tbreak;\n"; ret += "\t\t\t}\n"; ret += "\t\t\trec=rec->next;\n"; ret += "\t\t}\n"; ret += "\t\tif(rec==NULL)\n"; ret += "\t\t\tgoto end;\n"; ret += "\n/*\tPassed the hash lookup clause, unpack the other predicate fields. */\n"; for(w=0;wpr, this_pred_cids, gb_tbl); for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){ if(unpacked_cids.count( (*csi) ) == 0){ int tblref = (*csi).tblvar_ref; int schref = (*csi).schema_ref; string field = (*csi).field; if(tblref==0) // LHS from packet ret += generate_unpack_code(tblref,schref,field,schema,node_name); else // RHS from hash bucket ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n"; unpacked_cids.insert( (*csi) ); } } } // Test the expensive filters on R. // TODO Should merge this with other predicates and eval in order // of cost - see the fj code. // TODO join and postfilter predicates haven't been costed yet. if(cheap_rpos < r_filt.size()){ // Now generate the predicates. for(w=cheap_rpos;wcost); ret += tmpstr; // Find partial fcns ref'd in this cnf element set pfcn_refs; collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs); // Since set<..> is a "Sorted Associative Container", // we can walk through it in sorted order by walking from // begin() to end(). (and the partial fcns must be // evaluated in this order). set::iterator si; for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){ if(fcn_ref_cnt[(*si)] > 1){ ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n"; } if(is_partial_fcn[(*si)]){ ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema); ret += "\t\tif(retval) goto end;\n"; } if(fcn_ref_cnt[(*si)] > 1){ if(!is_partial_fcn[(*si)]){ ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n"; } ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n"; ret += "\t}\n"; } } ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+ ") ) goto end;\n"; } }else{ ret += "\n\n/*\t\t (no expensive R predicate to test)\t*/\n\n"; } // TODO sort the additional predicates by cost // S-only for(w=0;wpred_t1.size();++w){ sprintf(tmpstr,"//\t\tS Predicate clause %d.(cost %d)\n",w,fs->pred_t1[w]->cost); ret += tmpstr; // Find partial fcns ref'd in this cnf element set pfcn_refs; collect_partial_fcns_pr(fs->pred_t1[w]->pr, pfcn_refs); // Since set<..> is a "Sorted Associative Container", // we can walk through it in sorted order by walking from // begin() to end(). (and the partial fcns must be // evaluated in this order). set::iterator si; for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){ if(fcn_ref_cnt[(*si)] > 1){ ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n"; } if(is_partial_fcn[(*si)]){ ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema); ret += "\t\tif(retval) goto end;\n"; } if(fcn_ref_cnt[(*si)] > 1){ if(!is_partial_fcn[(*si)]){ ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n"; } ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n"; ret += "\t}\n"; } } ret += "\tif( !("+generate_predicate_code(fs->pred_t1[w]->pr,schema)+ ") ) goto end;\n"; } // non hash-eq join for(w=0;wjoin_filter.size();++w){ sprintf(tmpstr,"//\t\tJoin Predicate clause %d.(cost %d)\n",w,fs->join_filter[w]->cost); ret += tmpstr; // Find partial fcns ref'd in this cnf element set pfcn_refs; collect_partial_fcns_pr(fs->join_filter[w]->pr, pfcn_refs); // Since set<..> is a "Sorted Associative Container", // we can walk through it in sorted order by walking from // begin() to end(). (and the partial fcns must be // evaluated in this order). set::iterator si; for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){ if(fcn_ref_cnt[(*si)] > 1){ ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n"; } if(is_partial_fcn[(*si)]){ ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema); ret += "\t\tif(retval) goto end;\n"; } if(fcn_ref_cnt[(*si)] > 1){ if(!is_partial_fcn[(*si)]){ ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n"; } ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n"; ret += "\t}\n"; } } ret += "\tif( !("+generate_predicate_code(fs->join_filter[w]->pr,schema)+ ") ) goto end;\n"; } // postfilter for(w=0;wpostfilter.size();++w){ sprintf(tmpstr,"//\t\tpostfilter Predicate clause %d.(cost %d)\n",w,fs->postfilter[w]->cost); ret += tmpstr; // Find partial fcns ref'd in this cnf element set pfcn_refs; collect_partial_fcns_pr(fs->postfilter[w]->pr, pfcn_refs); // Since set<..> is a "Sorted Associative Container", // we can walk through it in sorted order by walking from // begin() to end(). (and the partial fcns must be // evaluated in this order). set::iterator si; for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){ if(fcn_ref_cnt[(*si)] > 1){ ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n"; } if(is_partial_fcn[(*si)]){ ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema); ret += "\t\tif(retval) goto end;\n"; } if(fcn_ref_cnt[(*si)] > 1){ if(!is_partial_fcn[(*si)]){ ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n"; } ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n"; ret += "\t}\n"; } } ret += "\tif( !("+generate_predicate_code(fs->postfilter[w]->pr,schema)+ ") ) goto end;\n"; } /////////////// post the tuple // test passed : create the tuple, then assign to it. ret += "/*\t\tCreate and post the tuple\t*/\n"; // Unpack R fields for(s=0;s"+field+";\n"; unpacked_cids.insert( (*csi) ); } } } // Unpack partial fcns ref'd by the select clause. // Its a kind of a WHERE clause ... for(p=sl_fcns_start;p 1){ ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n"; } if(is_partial_fcn[p]){ ret += unpack_partial_fcn(partial_fcns[p], p, schema); ret += "\tif(retval) goto end;\n"; } if(fcn_ref_cnt[p] > 1){ if(!is_partial_fcn[p]){ ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n"; } ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n"; ret += "\t}\n"; } } // increment the counter of accepted tuples ret += "\n\t#ifdef LFTA_STATS\n"; ret += "\n\tt->accepted_tuple_cnt++;\n\n"; ret += "\t#endif\n\n"; // First, compute the size of the tuple. // Unpack any BUFFER type selections into temporaries // so that I can compute their size and not have // to recompute their value during tuple packing. // I can use regular assignment here because // these temporaries are non-persistent. for(s=0;sget_data_type(); if(sdt->is_buffer_type()){ sprintf(tmpstr,"\tselvar_%d = ",s); ret += tmpstr; ret += generate_se_code(sl_list[s],schema); ret += ";\n"; } } // The size of the tuple is the size of the tuple struct plus the // size of the buffers to be copied in. ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")"; for(s=0;sget_data_type(); if(sdt->is_buffer_type()){ sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s); ret += tmpstr; } } ret += ";\n"; ret += "\ttuple = allocate_tuple(f, tuple_size );\n"; ret += "\tif( tuple == NULL)\n\t\tgoto end;\n"; // Test passed, make assignments to the tuple. ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n"; // Mark tuple as REGULAR_TUPLE ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n"; for(s=0;sget_data_type(); if(sdt->is_buffer_type()){ sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s); ret += tmpstr; sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s); ret += tmpstr; }else{ sprintf(tmpstr,"\ttuple->tuple_var%d = ",s); ret += tmpstr; // if(sdt->needs_hn_translation()) // ret += sdt->hton_translation() +"( "; ret += generate_se_code(sl_list[s],schema); // if(sdt->needs_hn_translation()) // ret += ") "; ret += ";\n"; } } // Generate output. ret += "\tpost_tuple(tuple);\n"; // Increment the counter of posted tuples ret += "\n\t#ifdef LFTA_STATS\n"; ret += "\n\tt->out_tuple_cnt++;\n\n"; ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n"; ret += "\t#endif\n\n"; return ret; } string generate_aggr_accept_body(qp_node *fs,string node_name,table_list *schema, string &temporal_flush){ string ret; int a,p,g; ////////////// Processing for aggregtion query // First, search for a match. Start by unpacking the group-by attributes. // One complication : if a real-time aggregate flush occurs, // the GB attr has already been calculated. So don't compute // it again if 1) its temporal and 2) it will be computed in the // agggregate flush code. // Unpack the partial fcns ref'd by the gb's and the aggr defs. for(p=gb_fcns_start;psize();g++){ data_type *gdt = gb_tbl->get_data_type(g); if((! gdt->is_temporal()) || temporal_flush == ""){ if(gdt->is_buffer_type()){ // NOTE : if the SE defining the gb is anything // other than a ref to a variable, this will generate // illegal code. To be resolved with Spatch. sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n", g, generate_se_code(gb_tbl->get_def(g),schema).c_str() ); ret += tmpstr; sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n", gdt->get_buffer_assign_copy().c_str(), g, g); }else{ sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str()); } ret += tmpstr; } } ret += "\n"; // A quick aside : if any of the GB attrs are temporal, // test for change and flush if any change occurred. // We've already computed the flush code, // Put it here if this is not a real time query. // We've already unpacked all column refs, so no need to // do it again here. string rt_level = fs->get_val_of_def("real_time"); if(rt_level == "" && temporal_flush != ""){ ret += temporal_flush; } // Compute the hash bucket if(gb_tbl->size() > 0){ ret += "\thashval = ";\ for(g=0;gsize();g++){ if(g>0) ret += " ^ "; data_type *gdt = gb_tbl->get_data_type(g); if(gdt->is_buffer_type()){ sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(), gdt->get_type_str().c_str(), g); }else{ sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(), gdt->get_type_str().c_str(), g); } ret += tmpstr; } ret += ";\n"; ret += "\thash2 = ((hashval * "+hash_nums[g%NRANDS]+") >> 32) & SLOT_HASH_BITS;\n"; ret+="\tprobe = (hashval >> 32) & (t->max_aggrs-1);\n"; }else{ ret+="\tprobe = 0;\n"; ret+="\thash2 = 0;\n\n"; } // Does the lfta reference a udaf? bool has_udaf = false; for(a=0;asize();a++){ if(! aggr_tbl->is_builtin(a)) has_udaf = true; } // Scan for a match, or alternatively the best slot. // Currently, hardcode 5 tests. ret += " gen_val = t->generation & SLOT_GEN_BITS;\n" " match_found = 0;\n" " best_slot = probe;\n" " for(i=0;i<5 && match_found == 0;i++){\n" " if((t->aggr_table_hashmap[probe] & SLOT_FILLED) && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_HASH_BITS) == hash2 ){\n" ; if(gb_tbl->size()>0){ ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n"; ret+="\t\tif("; string rhs_op, lhs_op; for(g=0;gsize();g++){ if(g>0) ret += " && "; ret += "("; sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr; sprintf(tmpstr,"t->aggr_table[probe].gb_var%d",g); rhs_op = tmpstr; ret += generate_equality_test(lhs_op,rhs_op,gb_tbl->get_data_type(g)); ret += ")"; } } ret += "){\n" " match_found = 1;\n" " best_slot = probe;\n" " }\n" " }\n" "// Rate slots in case no match found: prefer empty, then full but old slots\n" " if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n" " if((t->aggr_table_hashmap[probe] & SLOT_FILLED)==0)\n" " best_slot = probe;\n" " }else{\n" " if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) != gen_val){\n" " best_slot = probe;\n" " }\n" " }\n" " probe++;\n" " if(probe >= t->max_aggrs)\n" " probe=0;\n" " }\n" " if(match_found){\n" ; ret += generate_gb_update(node_name, schema, "best_slot",has_udaf); ret += " }else{\n" " if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n" ; printf("sgah_qpn name is %s, disorder is %d\n",fs->node_name.c_str(),((sgah_qpn *)fs)->lfta_disorder); if(((sgah_qpn *)fs)->lfta_disorder <= 1){ ret += " if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS)==gen_val){\n" " if(("; bool first_g = true; for(int g=0;gsize();g++){ data_type *gdt = gb_tbl->get_data_type(g); if(gdt->is_temporal()){ if(first_g) first_g = false; else ret+=" + "; ret += "(gb_attr_"+int_to_string(g)+" - t->aggr_table[best_slot].gb_var"+int_to_string(g)+")"; } } ret += ") == 0 ){\n"; ret += " fta_aggr_flush_old_"+ node_name+"(f,t->max_aggrs);\n" " }\n" " }\n" ; } ret += generate_tuple_from_aggr(node_name,schema,"best_slot"); ret += "\t\t\t#ifdef LFTA_STATS\n" "\t\t\tif((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val)\n" "\t\t\t\tt->collision_cnt++;\n\n" "\t\t\t#endif\n\n" "\t\t}\n" ; ret += generate_init_group(schema,"best_slot"); ret += "\t}\n"; return ret; } string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, bool is_wj, set &s_pids){ string ret="static gs_retval_t accept_packet_"+node_name+ "(struct FTA *f, FTAID * ftaid, void *pkt, gs_int32_t sz){\n"; ret += "\tstruct packet *p = (struct packet *)pkt;\n"; int a; // Define all of the variables needed by this // procedure. // Gather all column references, need to define unpacking variables. int w,s; col_id_set cid_set; col_id_set::iterator csi; // If its a filter join, rebind all colrefs // to the first range var, to avoid double unpacking. if(is_fj){ for(w=0;wpr, gb_tbl); for(s=0;spr,cid_set, gb_tbl); } for(s=0;ssize();g++) gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl); } // Variables for unpacking attributes. ret += "/*\t\tVariables for unpacking attributes\t*/\n"; for(csi=cid_set.begin(); csi!=cid_set.end();++csi){ int schref = (*csi).schema_ref; int tblref = (*csi).tblvar_ref; string field = (*csi).field; data_type dt(schema->get_type_name(schref,field)); sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(), field.c_str(), tblref); ret += tmpstr; } ret += "\n\n"; // Variables that are always needed ret += "/*\t\tVariables which are always needed\t*/\n"; ret += "\tgs_retval_t retval;\n"; ret += "\tgs_int32_t tuple_size, tuple_pos, lfta_bailout;\n"; ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n"; ret+="\tstruct "+generate_fta_name(node_name)+" *t = (struct "+generate_fta_name(node_name)+"*) f;\n\n"; // Variables needed for aggregation queries. if(is_aggr_query){ ret += "\n/*\t\tVariables for aggregation\t*/\n"; ret+="\tunsigned int i, probe;\n"; ret+="\tunsigned int gen_val, match_found, best_slot;\n"; ret+="\tgs_uint64_t hashval, hash2;\n"; // Variables for storing group-by attribute values. if(gb_tbl->size() > 0) ret += "/*\t\tGroup-by attributes\t*/\n"; for(g=0;gsize();g++){ sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g); ret += tmpstr; data_type *gdt = gb_tbl->get_data_type(g); if(gdt->is_buffer_type()){ sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g); ret += tmpstr; } } ret += "\n"; // Temporaries for min/max string aggr_tmp_str = ""; for(a=0;asize();a++){ string aggr_op = aggr_tbl->get_op(a); if(aggr_op == "MIN" || aggr_op == "MAX"){ sprintf(tmpstr,"\t%s aggr_tmp_%d;\n",aggr_tbl->get_data_type(a)->get_cvar_type().c_str(),a); aggr_tmp_str.append(tmpstr); } } if(aggr_tmp_str != ""){ ret += "/*\t\tTemp vars for BUFFER aggregates\t*/\n"; ret += aggr_tmp_str; ret += "\n"; } // Variables for udaf output temporaries bool no_udaf = true; for(a=0;asize();a++){ if(! aggr_tbl->is_builtin(a)){ if(no_udaf){ ret+="/*\t\tUDAF output vars.\t*/\n"; no_udaf = false; } int afcn_id = aggr_tbl->get_fcn_id(a); data_type *adt = Ext_fcns->get_fcn_dt(afcn_id); sprintf(tmpstr,"udaf_ret%d", a); ret+="\t"+adt->make_cvar(tmpstr)+";\n"; } } } // Variables needed for a filter join query if(fs->node_type() == "filter_join"){ filter_join_qpn *fjq = (filter_join_qpn *)fs; bool uses_bloom = fjq->use_bloom; ret += "/*\t\tJoin fields\t*/\n"; for(g=0;ghash_eq.size();g++){ sprintf(tmpstr,"\t%s s_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_right_se()->get_data_type()->get_cvar_type().c_str(),g); ret += tmpstr; sprintf(tmpstr,"\t%s r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g); ret += tmpstr; } if(uses_bloom){ ret += " /* Variables for fj bloom filter */ \n" "\tunsigned int i=0,j=0,k=0, b, bf_clean = 0, tmp_i, found; \n" "\tunsigned int bucket, bucket0, bucket1, bucket2;\n" "\tlong long int curr_fj_ts;\n" "\tlong long int curr_bin, the_bin;\n" "\n" ; }else{ ret += " /* Variables for fj join table */ \n" "\tunsigned int i, bucket, found; \n" "\tunsigned int bucket1, the_bucket;\n" " long long int curr_fj_ts;\n" "\n" ; } } if(fs->node_type() == "watch_join"){ watch_join_qpn *wlq = (watch_join_qpn *)fs; ret += "/*\t\tJoin fields\t*/\n"; for(int k=0;kkey_flds.size(); ++k){ string kfld = wlq->key_flds[k]; ret += "\t"+wlq->hash_eq[kfld]->pr->get_right_se()->get_data_type()->get_cvar_type()+" s_equijoin_"+kfld+";\n"; ret += "\t"+wlq->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_cvar_type()+" r_equijoin_"+kfld+";\n"; } ret += " /* Variables for wl join table */ \n" "\tunsigned int i, bucket;\n" "\tunsigned long long int hash; \n"; string wl_schema = wlq->from[1]->get_schema_name(); string wl_elem_str = generate_watchlist_element_name(wl_schema); ret += "\tstruct "+wl_elem_str+" *rec = NULL;\n"; "\n" ; } // Variables needed to store selected attributes of BUFFER type // temporarily, in order to compute their size for storage // in an output tuple. string select_var_defs = ""; for(int s=0;sget_data_type(); if(sdt->is_buffer_type()){ sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s); select_var_defs.append(tmpstr); } } if(select_var_defs != ""){ ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n"; ret += select_var_defs; } // Variables to store results of partial functions. int p; if(partial_fcns.size()>0){ ret += "/*\t\tVariables for storing results of partial functions. \t*/\n"; for(p=0;p1)){ sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n", partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p); ret += tmpstr; if(!is_aggr_query && fcn_ref_cnt[p] >1){ ret += "\tint fcn_ref_cnt_"+int_to_string(p)+" = 0;\n"; } } } if(is_aggr_query) ret += "\tint unpack_failed = 0;\n"; ret += "\n"; } // variable to hold packet struct // if(packed_return){ ret += "\t struct "+node_name+"_input_struct *"+node_name+"_input_struct_var;\n"; } ret += "\t#ifdef LFTA_STATS\n"; // variable to store counter of cpu cycles spend in accept_tuple ret += "\tgs_uint64_t start_cycle = rdtsc();\n"; // increment counter of received tuples ret += "\tt->in_tuple_cnt++;\n"; ret += "\t#endif\n"; // ------------------------------------------------- // If the packet is "packet", test if its for this lfta, // and if so load it into its struct if(packed_return){ ret+="\n/* packed tuple : test and load. \t*/\n"; ret+="\t"+node_name+"_input_struct_var = (struct "+node_name+"_input_struct *) pkt;\n"; ret+="\tif("+node_name+"_input_struct_var->__lfta_id_fm_nic__ != "+int_to_string(global_id) + ")\n"; ret+="\t\tgoto end;\n\n"; } col_id_set unpacked_cids; // Keep track of the cols that have been unpacked. string temporal_flush; if(is_aggr_query) ret += generate_aggr_accept_prelim(fs, node_name, schema, unpacked_cids, temporal_flush); else { // non-aggregation operators // Unpack all the temporal attributes referenced in select clause // and update the last value of the attribute col_id_set temp_cids; // col ids of temp attributes in select clause for(s=0;sget_data_type(); if (sdt->is_temporal()) { gather_se_col_ids(sl_list[s],temp_cids, gb_tbl); } } // If this is a filter join, // ensure that the temporal range field is unpacked. if(is_fj){ col_id window_var_cid(((filter_join_qpn *)fs)->temporal_var); if(temp_cids.count(window_var_cid)==0) temp_cids.insert(window_var_cid); } for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){ if(unpacked_cids.count((*csi)) == 0){ int tblref = (*csi).tblvar_ref; int schref = (*csi).schema_ref; string field = (*csi).field; ret += generate_unpack_code(tblref,schref,field,schema,node_name); sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref); ret += tmpstr; unpacked_cids.insert( (*csi) ); } } } vector filter = fs->get_filter_clause(); // Test the filter predicate (some query types have additional preds). if(filter.size() > 0 && !is_wj){ // watchlist join does specialized processing // Sort by evaluation cost. // First, estimate evaluation costs // Eliminate predicates covered by the prefilter (those in s_pids). // I need to do it before the sort becuase the indices refer // to the position in the unsorted list./ vector tmp_wh; for(w=0;wcost); ret += tmpstr; // Find the set of variables accessed in this CNF elem, // but in no previous element. col_id_set this_pred_cids; gather_pr_col_ids(filter[w]->pr, this_pred_cids, gb_tbl); for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){ if(unpacked_cids.count( (*csi) ) == 0){ int tblref = (*csi).tblvar_ref; int schref = (*csi).schema_ref; string field = (*csi).field; ret += generate_unpack_code(tblref,schref,field,schema,node_name); unpacked_cids.insert( (*csi) ); } } // Find partial fcns ref'd in this cnf element set pfcn_refs; collect_partial_fcns_pr(filter[w]->pr, pfcn_refs); // Since set<..> is a "Sorted Associative Container", // we can walk through it in sorted order by walking from // begin() to end(). (and the partial fcns must be // evaluated in this order). set::iterator si; for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){ if(fcn_ref_cnt[(*si)] > 1){ ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n"; } if(is_partial_fcn[(*si)]){ ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema); ret += "\t\tif(retval) goto end;\n"; } if(fcn_ref_cnt[(*si)] > 1){ if(!is_partial_fcn[(*si)]){ ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n"; } ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n"; ret += "\t}\n"; } } ret += "\tif( !("+generate_predicate_code(filter[w]->pr,schema)+ ") ) goto end;\n"; } }else{ ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n"; } // We've passed the WHERE clause, // unpack the remainder of the accessed fields. if(is_fj){ ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n"; vector h_eq = ((filter_join_qpn *)fs)-> hash_eq; for(w=0;wpr, this_pred_cids, gb_tbl); for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){ if(unpacked_cids.count( (*csi) ) == 0){ int tblref = (*csi).tblvar_ref; int schref = (*csi).schema_ref; string field = (*csi).field; ret += generate_unpack_code(tblref,schref,field,schema,node_name); unpacked_cids.insert( (*csi) ); } } } }else if(is_wj){ // STOPPED HERE move this to wj main body /* ret += "\n//\tPassed the WHERE clause, unpack the hash fields. \n"; map h_eq = ((watch_join_qpn *)fs)-> hash_eq; vector kflds = ((watch_join_qpn *)fs)->key_flds; for(w=0;wpr, this_pred_cids, gb_tbl); for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){ if(unpacked_cids.count( (*csi) ) == 0){ int tblref = (*csi).tblvar_ref; int schref = (*csi).schema_ref; string field = (*csi).field; ret += generate_unpack_code(tblref,schref,field,schema,node_name); unpacked_cids.insert( (*csi) ); } } } */ }else{ ret += "\n/*\tPassed the WHERE clause, unpack the rest of the accessed fields. */\n"; for(csi=cid_set.begin();csi!=cid_set.end();++csi){ if(unpacked_cids.count( (*csi) ) == 0){ int schref = (*csi).schema_ref; int tblref = (*csi).tblvar_ref; string field = (*csi).field; ret += generate_unpack_code(tblref,schref,field,schema,node_name); unpacked_cids.insert( (*csi) ); } } } ////////////////// ////////////////// After this, the query types ////////////////// are processed differently. if(!is_aggr_query && !is_fj & !is_wj) ret += generate_sel_accept_body(fs, node_name, schema); else if(is_aggr_query) ret += generate_aggr_accept_body(fs, node_name, schema, temporal_flush); else{ if(is_fj) ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema); else ret += generate_wj_accept_body((watch_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema); } // Finish up. ret += "\n\tend:\n"; ret += "\t#ifdef LFTA_STATS\n"; ret+= "\tt->cycle_cnt += rdtsc() - start_cycle;\n"; ret += "\t#endif\n"; ret += "\n\treturn 1;\n}\n\n"; return(ret); } string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, bool is_aggr_query, bool is_fj, bool uses_bloom){ int g, cl; string ret = "struct FTA * "+generate_alloc_name(node_name) + "(struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command, gs_int32_t sz, void *value){\n"; ret+="\tstruct "+generate_fta_name(node_name)+"* f;\n"; ret+="\tint i;\n"; ret += "\n"; ret+="\tif((f=fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n"; // assign a streamid to fta instance ret+="\t/* assign a streamid */\n"; ret+="\tf->f.ftaid = ftaid;\n"; ret+="\tf->f.ftaid.streamid = (gs_p_t)f;\n"; ret+="\tgslog(LOG_INFO,\"Lfta "+node_name+" has FTAID {ip=%u,port=%u,index=%u,streamid=%u}\\n\",f->f.ftaid.ip,f->f.ftaid.port,f->f.ftaid.index,f->f.ftaid.streamid);\n"; if(is_aggr_query){ ret += "\tf->n_aggrs = 0;\n"; ret += "\tf->max_aggrs = "; // Computing the number of aggregate blocks is a little // tricky. If there are no GB attrs, or if all GB attrs // are temporal, then use a single aggregate block, else // use a default value (10). A user specification overrides // this logic. bool single_group = true; for(g=0;gsize();g++){ data_type *gdt = gb_tbl->get_data_type(g); if(! gdt->is_temporal() ){ single_group = false; } } string max_aggr_str = fs->get_val_of_def("aggregate_slots"); int max_aggr_i = atoi(max_aggr_str.c_str()); if(max_aggr_i <= 0){ if(single_group) ret += "2"; else ret += int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE); }else{ unsigned int naggrs = 1; // make it power of 2 unsigned int nones = 0; while(max_aggr_i){ if(max_aggr_i&1) nones++; naggrs = naggrs << 1; max_aggr_i = max_aggr_i >> 1; } if(nones==1) // in case it was already a power of 2. naggrs/=2; ret += int_to_string(naggrs); } ret += ";\n"; ret+="\tif ((f->aggr_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n"; ret+="\t\treturn(0);\n"; ret+="\t}\n\n"; // ret+="/* compute how many integers we need to store the hashmap */\n"; // ret+="\tf->bitmap_size = (f->max_aggrs % (sizeof(gs_uint32_t) * 4)) ? (f->max_aggrs / (sizeof(gs_uint32_t) * 4) + 1) : (f->max_aggrs / (sizeof(gs_uint32_t) * 4));\n\n"; ret+="\tif ((f->aggr_table_hashmap = sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n"; ret+="\t\treturn(0);\n"; ret+="\t}\n"; ret+="/*\t\tfill bitmap with zero \t*/\n"; ret+="\tfor (i = 0; i < f->max_aggrs; ++i)\n"; ret+="\t\tf->aggr_table_hashmap[i] = 0;\n"; ret+="\tf->generation=0;\n"; ret+="\tf->flush_pos = f->max_aggrs;\n"; ret += "\tf->flush_ctr = 0;\n"; } if(is_fj){ if(uses_bloom){ ret+="\tf->first_exec = 1;\n"; unsigned int n_bloom = 11; string n_bloom_str = fs->get_val_of_def("num_bloom"); int tmp_n_bloom = atoi(n_bloom_str.c_str()); if(tmp_n_bloom>0) n_bloom = tmp_n_bloom+1; unsigned int window_len = ((filter_join_qpn *)fs)->temporal_range; if(window_len < n_bloom){ n_bloom = window_len+1; } int bf_exp_size = 12; // base-2 log of number of bits string bloom_len_str = fs->get_val_of_def("bloom_size"); int tmp_bf_exp_size = atoi(bloom_len_str.c_str()); if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){ bf_exp_size = tmp_bf_exp_size; } int bf_bit_size = 1 << 12; int bf_byte_size = bf_bit_size / (8*sizeof(char)); int bf_tot = n_bloom*bf_byte_size; ret+="\tif ((f->bf_table = sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n"; ret+="\t\treturn(0);\n"; ret+="\t}\n"; ret += " for(i=0;i<"+int_to_string(bf_tot)+";i++)\n" " f->bf_table[i] = 0;\n" ; }else{ unsigned int ht_size = 4096; string ht_size_s = fs->get_val_of_def("aggregate_slots"); int tmp_ht_size = atoi(ht_size_s.c_str()); if(tmp_ht_size > 1024){ unsigned int hs = 1; // make it power of 2 while(tmp_ht_size){ hs =hs << 1; tmp_ht_size = tmp_ht_size >> 1; } ht_size = hs; } ret+="\tif ((f->join_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n"; ret+="\t\treturn(0);\n"; ret+="\t}\n\n"; ret += " for(i=0;i<"+int_to_string(ht_size)+";i++)\n" " f->join_table[i].ts = 0;\n" ; } } // Initialize the complex literals (which might be handles). for(cl=0;clsize();cl++){ literal_t *l = complex_literals->get_literal(cl); // sprintf(tmpstr,"\tf->complex_literal_%d = ",cl); // ret += tmpstr + l->to_C_code() + ";\n"; sprintf(tmpstr,"&(f->complex_literal_%d)",cl); ret += "\t" + l->to_C_code(tmpstr) + ";\n"; } ret += "\n"; // Initialize the last seen values of temporal attributes to min(max) value of // their respective type // Create places to hold the last values of temporal attributes referenced in select clause col_id_set temp_cids; // col ids of temp attributes in select clause int s; col_id_set::iterator csi; for(s=0;sget_data_type(); if (sdt->is_temporal()) { gather_se_col_ids(sl_list[s],temp_cids, gb_tbl); } } for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){ int tblref = (*csi).tblvar_ref; int schref = (*csi).schema_ref; string field = (*csi).field; data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field)); if (dt.is_increasing()) { sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_min_literal().c_str()); ret += tmpstr; } else if (dt.is_decreasing()) { sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_max_literal().c_str()); ret += tmpstr; } } // initialize last seen values of temporal groubpy variables if(is_aggr_query){ for(g=0;gsize();g++){ data_type *dt = gb_tbl->get_data_type(g); if(dt->is_temporal()){ /* fprintf(stderr,"group by attribute %s is temporal, ", gb_tbl->get_name(g).c_str()); */ if(dt->is_increasing()){ sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_min_literal().c_str()); }else{ sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_max_literal().c_str()); } ret += tmpstr; } } } ret += "\tf->f.alloc_fta="+generate_alloc_name(node_name)+";\n"; ret+="\tf->f.free_fta=free_fta_"+node_name+";\n"; ret+="\tf->f.control_fta=control_fta_"+node_name+";\n"; ret+="\tf->f.accept_packet=accept_packet_"+node_name+";\n"; ret+="\tf->f.clock_fta=clock_fta_"+node_name+";\n\n"; // Initialize runtime stats ret+="\tf->in_tuple_cnt = 0;\n"; ret+="\tf->out_tuple_cnt = 0;\n"; ret+="\tf->out_tuple_sz = 0;\n"; ret+="\tf->accepted_tuple_cnt = 0;\n"; ret+="\tf->cycle_cnt = 0;\n"; ret+="\tf->collision_cnt = 0;\n"; ret+="\tf->eviction_cnt = 0;\n"; ret+="\tf->sampling_rate = 1.0;\n"; ret+="\tf->trace_id = 0;\n\n"; if(param_tbl->size() > 0){ ret+= "\tif(load_params_"+node_name+"(f, sz, value, 1)){\n" "#ifndef LFTA_IN_NIC\n" "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small (%d). This query does not have valid parameters, bailing out.\\n\",sz);\n" "#else\n" "\t\t}\n" "#endif\n" "\t\t\treturn 0;\n" "\t\t}\n"; } // Register the pass-by-handle parameters int ph; for(ph=0;phtype_name); sprintf(tmpstr,"\tf->handle_param_%d = %s((struct FTA *)f,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str()); switch(param_handle_table[ph]->val_type){ case cplx_lit_e: ret += tmpstr; if(pdt.is_buffer_type()) ret += "&("; sprintf(tmpstr,"f->complex_literal_%d",param_handle_table[ph]->complex_literal_idx); ret += tmpstr ; if(pdt.is_buffer_type()) ret += ")"; ret += ");\n"; break; case litval_e: // not complex, no constructor ret += tmpstr; ret += param_handle_table[ph]->litval->to_C_code("") + ");\n"; break; case param_e: // query parameter handles are regstered/deregistered in the // load_params function. // ret += "t->param_"+param_handle_table[ph]->param_name; break; default: fprintf(stderr, "INTERNAL ERROR unknown case found when processing pass-by-handle parameter table.\n"); exit(1); } } ret += "\treturn (struct FTA *) f;\n"; ret += "}\n\n"; return(ret); } ////////////////////////////////////////////////////////////////// string generate_lfta_block(qp_node *fs, table_list *schema, int gid, // map &int_fcn_defs, ext_fcn_list *Ext_fcns, string &schema_embed_str, ifq_t *ifdb, nic_property *nicp, set &s_pids){ bool is_aggr_query; int s,p,g; string retval; ///////////////////////////////////////////////////////////// /// Do operator-generic processing, such as /// gathering the set of referenced columns, /// generating structures, etc. // Initialize globals to empty. gb_tbl = NULL; aggr_tbl = NULL; global_id = -1; nicprop = NULL; param_tbl = fs->get_param_tbl(); sl_list.clear(); where.clear(); partial_fcns.clear(); fcn_ref_cnt.clear(); is_partial_fcn.clear(); pred_class.clear(); pred_pos.clear(); sl_fcns_start=sl_fcns_end=wh_fcns_start=wh_fcns_end=0; gb_fcns_start=gb_fcns_end=ag_fcns_start=ag_fcns_end=0; // Does the lfta read packed results from the NIC? nicprop = nicp; // load into global global_id = gid; packed_return = false; if(nicp && nicp->option_exists("Return")){ if(nicp->option_value("Return") == "Packed"){ packed_return = true; }else{ fprintf(stderr,"Warning, nic option value of Return=%s is not recognized, ignoring\n",nicp->option_value("Return").c_str()); } } // Extract data which defines the query. // complex literals gathered now. complex_literals = fs->get_cplx_lit_tbl(Ext_fcns); param_handle_table = fs->get_handle_param_tbl(Ext_fcns); string node_name = fs->get_node_name(); bool is_fj = false, uses_bloom = false; bool is_wj = false; bool is_watch_tbl = false; if(fs->node_type() == "spx_qpn"){ is_aggr_query = false; spx_qpn *spx_node = (spx_qpn *)fs; sl_list = spx_node->get_select_se_list(); where = spx_node->get_where_clause(); gb_tbl = NULL; aggr_tbl = NULL; } else if(fs->node_type() == "sgah_qpn"){ is_aggr_query = true; sgah_qpn *sgah_node = (sgah_qpn *)fs; sl_list = sgah_node->get_select_se_list(); where = sgah_node->get_where_clause(); gb_tbl = sgah_node->get_gb_tbl(); aggr_tbl = sgah_node->get_aggr_tbl(); if((sgah_node->get_having_clause()).size() > 0){ fprintf(stderr,"Warning in LFTA %s: HAVING clause will be ignored.\n", fs->get_node_name().c_str()); } } else if(fs->node_type() == "filter_join"){ is_aggr_query = false; is_fj = true; filter_join_qpn *fj_node = (filter_join_qpn *)fs; sl_list = fj_node->get_select_se_list(); where = fj_node->get_where_clause(); uses_bloom = fj_node->use_bloom; gb_tbl = NULL; aggr_tbl = NULL; }else if(fs->node_type() == "watch_join"){ is_aggr_query = false; is_wj = true; watch_join_qpn *wl_node = (watch_join_qpn *)fs; sl_list = wl_node->get_select_se_list(); where = wl_node->get_where_clause(); gb_tbl = NULL; aggr_tbl = NULL; }else if(fs->node_type() == "watch_tbl_qpn"){ is_aggr_query = false; is_watch_tbl = true; vector empty_sl_list; vector empty_where; sl_list = empty_sl_list; where = empty_where; gb_tbl = NULL; aggr_tbl = NULL; } else { fprintf(stderr,"INTERNAL ERROR, unrecognized node type %s in generate_lfta_block\n", fs->node_type().c_str()); exit(1); } // Build list of "partial functions", by clause. // NOTE : partial fcns are not handles well. // The act of searching for them associates the fcn call // in the SE with an index to an array. Refs to the // fcn value are replaced with refs to the variable they are // unpacked into. A more general tagging mechanism would be better. int i; vector *pfunc_ptr = NULL; vector *ref_cnt_ptr = NULL; if(!is_aggr_query){ // don't collect cacheable fcns on aggr query. ref_cnt_ptr = &fcn_ref_cnt; pfunc_ptr = &is_partial_fcn; } sl_fcns_start = 0; for(i=0;ipr, &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns); } gb_fcns_start = wh_fcns_end = partial_fcns.size(); if(gb_tbl != NULL){ for(i=0;isize();i++){ find_partial_fcns(gb_tbl->get_def(i), &partial_fcns, NULL, NULL, Ext_fcns); } } ag_fcns_start = gb_fcns_end = partial_fcns.size(); if(aggr_tbl != NULL){ for(i=0;isize();i++){ find_partial_fcns(aggr_tbl->get_aggr_se(i), NULL, NULL, &is_partial_fcn, Ext_fcns); } } ag_fcns_end = partial_fcns.size(); // Fill up the is_partial_fcn and fcn_ref_cnt arrays. if(is_aggr_query){ for(i=0; iset_partial_ref(-1); } } node_name = normalize_name(node_name); retval += generate_preamble(schema, /*int_fcn_defs,*/ node_name, schema_embed_str); if(packed_return){ // generate unpack struct vector input_tbls = fs->get_input_tbls(); int schref = input_tbls[0]->get_schema_ref(); vector refd_cols; for(s=0;spr,refd_cols, nicp, gb_tbl); } if(gb_tbl){ for(g=0;gsize();++g){ gather_nicsafe_cols(gb_tbl->get_def(g),refd_cols, nicp, gb_tbl); } } sort(refd_cols.begin(), refd_cols.end()); retval += "struct "+node_name+"_input_struct{\n"; retval += "\tint __lfta_id_fm_nic__;\n"; int vsi; for(vsi=0;vsiget_type_name(schref,refd_cols[vsi])); retval+="\t"+dt.get_cvar_type()+" unpack_var_"+refd_cols[vsi]+";\n"; } retval+="};\n\n"; } ///////////////////////////////////////////////////// // Common stuff unpacked, do some generation if(is_aggr_query) retval += generate_aggr_struct(node_name, gb_tbl, aggr_tbl); if(is_fj) retval += generate_fj_struct((filter_join_qpn *)fs, node_name); if(is_watch_tbl){ retval += "\n\n// watchtable code here \n\n"; watch_tbl_qpn *wl_node = (watch_tbl_qpn *)fs; retval += generate_watchlist_structs(node_name, wl_node->table_layout, wl_node->filename, wl_node->refresh_interval); retval += generate_watchlist_load(node_name, wl_node->table_layout, wl_node->key_flds); } if(! is_watch_tbl){ retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, is_wj, uses_bloom, schema); retval += generate_tuple_struct(node_name, sl_list) ; if(is_aggr_query) retval += generate_fta_flush(node_name, schema, Ext_fcns) ; if(param_tbl->size() > 0) retval += generate_fta_load_params(node_name) ; retval += generate_fta_free(node_name, is_aggr_query) ; retval += generate_fta_control(node_name, schema, is_aggr_query) ; retval += generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, is_wj, s_pids) ; /* extract the value of Time_Correlation from interface definition */ int e,v; string es; unsigned time_corr; vector tvec = fs->get_input_tbls(); vector time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es); if (time_corr_vec.empty()) time_corr = DEFAULT_TIME_CORR; else time_corr = atoi(time_corr_vec[0].c_str()); retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query, is_wj) ); retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) ); } return(retval); } int compute_snap_len(qp_node *fs, table_list *schema, string snap_type){ // Initialize global vars gb_tbl = NULL; sl_list.clear(); where.clear(); if(fs->node_type() == "watch_tbl_qpn"){ return -1; } if(fs->node_type() == "spx_qpn"){ spx_qpn *spx_node = (spx_qpn *)fs; sl_list = spx_node->get_select_se_list(); where = spx_node->get_where_clause(); } else if(fs->node_type() == "sgah_qpn"){ sgah_qpn *sgah_node = (sgah_qpn *)fs; sl_list = sgah_node->get_select_se_list(); where = sgah_node->get_where_clause(); gb_tbl = sgah_node->get_gb_tbl(); } else if(fs->node_type() == "filter_join"){ filter_join_qpn *fj_node = (filter_join_qpn *)fs; sl_list = fj_node->get_select_se_list(); where = fj_node->get_where_clause(); } else if(fs->node_type() == "watch_join"){ watch_join_qpn *fj_node = (watch_join_qpn *)fs; sl_list = fj_node->get_select_se_list(); where = fj_node->get_where_clause(); } else{ fprintf(stderr,"INTERNAL ERROR, node type %s not recognized in compute_snap_len\n",fs->node_type().c_str()); exit(1); } // Gather all column references, need to define unpacking variables. int w,s; col_id_set cid_set; col_id_set::iterator csi; for(w=0;wpr,cid_set, gb_tbl); for(s=0;ssize();g++) gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl); } // compute snap length int snap_len = -1; int n_snap=0; for(csi=cid_set.begin(); csi!=cid_set.end();++csi){ int schref = (*csi).schema_ref; int tblref = (*csi).tblvar_ref; string field = (*csi).field; if(snap_type == "index"){ int pos = schema->get_field_idx(schref, field); if(pos>snap_len) snap_len = pos; n_snap++; }else{ param_list *field_params = schema->get_modifier_list(schref, field); if(field_params->contains_key("snap_len")){ string fld_snap_str = field_params->val_of("snap_len"); int fld_snap; if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){ if(fld_snap > snap_len) snap_len = fld_snap; n_snap++; }else{ fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() ); } } } } if(n_snap == cid_set.size()){ return (snap_len); }else{ return -1; } } // Function which computes an optimal // set of unpacking functions. void find_optimal_unpack_fcns(col_id_set &upref_cids, table_list *Schema, map &ucol_fcn_map){ map pfcn_count; map::iterator msii; col_id_set::iterator cisi; set::iterator ssi; string best_fcn; while(ucol_fcn_map.size() < upref_cids.size()){ // Gather unpack functions referenced by unaccounted-for // columns, and increment their reference count. pfcn_count.clear(); for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){ if(ucol_fcn_map.count((*cisi)) == 0){ set ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns(); for(ssi=ufcns.begin();ssi!=ufcns.end();++ssi) pfcn_count[(*ssi)]++; } } // Get the lowest cost per field function. float min_cost = 0.0; string best_fcn = ""; for(msii=pfcn_count.begin();msii!=pfcn_count.end();++msii){ int fcost = Schema->get_ufcn_cost((*msii).first); if(fcost < 0){ fprintf(stderr,"CONFIGURATION ERROR, unpack function %s either has negative cost or is not defined.\n",(*msii).first.c_str()); exit(1); } float this_cost = (1.0*fcost)/(*msii).second; if(msii == pfcn_count.begin() || this_cost < min_cost){ min_cost = this_cost; best_fcn = (*msii).first; } } if(best_fcn == ""){ fprintf(stderr,"ERROR, could not find a best field unpqacking function.\n"); exit(1); } // Assign this function to the unassigned fcns which use it. for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){ if(ucol_fcn_map.count((*cisi)) == 0){ set ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns(); if(ufcns.count(best_fcn)>0) ucol_fcn_map[(*cisi)] = best_fcn; } } } } // Generate an initial test test for the lfta // Assume that the predicate references no external functions, // and especially no partial functions, // aggregates, internal functions. string generate_lfta_prefilter(vector &pred_list, col_id_set &temp_cids, table_list *Schema, ext_fcn_list *Ext_fcns, vector &lfta_cols, vector &lfta_sigs, vector &lfta_snap_lens, string iface){ col_id_set tmp_cid_set, cid_set,upref_cids,upall_cids; col_id_set::iterator csi; int o,p,q; string ret; // Gather complex literals in the prefilter. cplx_lit_table *complex_literals = new cplx_lit_table(); for(p=0;ppr,Ext_fcns, complex_literals); } // Find the combinable predicates vector pr_list; for(p=0;ppr,&pr_list, Schema, Ext_fcns); } // Analyze the combinable predicates to find the predicate classes. pred_class.clear(); // idx to equiv pred in equiv_list pred_pos.clear(); // idx to returned bitmask. vector equiv_list; vector num_equiv; for(p=0;psize() == 0) ret += "\tint no_variable;\n"; int cl; for(cl=0;clsize();cl++){ literal_t *l = complex_literals->get_literal(cl); data_type *dtl = new data_type( l->get_type() ); sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl); ret += tmpstr; } ret += "} prefilter_complex_lits_"+iface+";\n\n"; // Generate the prefilter initialziation code ret += "void init_lfta_prefilter_"+iface+"(){\n"; // First initialize complex literals, if any. ret += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n"; for(cl=0;clsize();cl++){ literal_t *l = complex_literals->get_literal(cl); sprintf(tmpstr,"&(t->complex_literal_%d)",cl); ret += "\t" + l->to_C_code(tmpstr) + ";\n"; } set epred_seen; for(p=0;p0){ ret += "\tregister_commonpred_handles_"+equiv_list[q]->get_op()+"("; vector cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id()); vector op_list = pr_list[p]->get_op_list(); for(o=0;oget_op()+"("; vector cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id()); vector op_list = pr_list[p]->get_op_list(); for(o=0;opr,tmp_cid_set, gb_tbl); } // make the col_ids refer to the base tables, and // grab the col_ids with at least one unpacking function. for(csi=tmp_cid_set.begin();csi!=tmp_cid_set.end();++csi){ string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field); col_id tmp_col_id; tmp_col_id.field = (*csi).field; tmp_col_id.tblvar_ref = (*csi).tblvar_ref; tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl); cid_set.insert(tmp_col_id); field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field); if(fe->get_unpack_fcns().size()>0) upref_cids.insert(tmp_col_id); } // Find the set of unpacking programs needed for the // prefilter fields. map ucol_fcn_map; find_optimal_unpack_fcns(upref_cids, Schema, ucol_fcn_map); set pref_ufcns; map::iterator mcis; for(mcis=ucol_fcn_map.begin(); mcis!=ucol_fcn_map.end(); mcis++){ pref_ufcns.insert((*mcis).second); } // Variables for unpacking attributes. body += "/*\t\tVariables for unpacking attributes\t*/\n"; for(csi=cid_set.begin(); csi!=cid_set.end();++csi){ int schref = (*csi).schema_ref; int tblref = (*csi).tblvar_ref; string field = (*csi).field; data_type dt(Schema->get_type_name(schref,field)); sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(), field.c_str(), tblref); body += tmpstr; sprintf(tmpstr,"\tgs_retval_t ret_%s_%d;\n", field.c_str(),tblref); body += tmpstr; } // Variables for unpacking temporal attributes. body += "/*\t\tVariables for unpacking temporal attributes\t*/\n"; for(csi=temp_cids.begin(); csi!=temp_cids.end();++csi){ if (cid_set.count(*csi) == 0) { int schref = (*csi).schema_ref; int tblref = (*csi).tblvar_ref; string field = (*csi).field; data_type dt(Schema->get_type_name(schref,field)); sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(), field.c_str(), tblref); body += tmpstr; } } body += "\n\n"; // Variables for combinable predicate evaluation body += "/*\t\tVariables for common prdicate evaluation\t*/\n"; for(q=0;q 0) body += "\n/*\t\tcall field unpacking functions\t*/\n"; set::iterator ssi; for(ssi=pref_ufcns.begin(); ssi!=pref_ufcns.end(); ++ssi){ body += "\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n"; } // Unpack the accessed attributes body += "\n/*\t\tUnpack the accessed attributes.\t*/\n"; for(csi=cid_set.begin();csi!=cid_set.end();++csi){ int tblref = (*csi).tblvar_ref; int schref = (*csi).schema_ref; string field = (*csi).field; sprintf(tmpstr,"\tret_%s_%d = (%s(p, &unpack_var_%s_%d) == 0);\n", field.c_str(),tblref,Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref); body += tmpstr; } // next unpack the temporal attributes and ignore the errors // We are assuming here that failed unpack of temporal attributes // is not going to overwrite the last stored value // Failed upacks are ignored for(csi=temp_cids.begin();csi!=temp_cids.end();++csi){ int tblref = (*csi).tblvar_ref; int schref = (*csi).schema_ref; string field = (*csi).field; sprintf(tmpstr,"\t%s(p, &prefilter_temp_vars.unpack_var_%s_%d);\n", Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref); body += tmpstr; } // Evaluate the combinable predicates if(equiv_list.size()>0) body += "/*\t\tEvaluate the combinable predicates.\t*/\n"; for(q=0;q0){ body += "\tif("; for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){ if(cpi != pred_cids.begin()) body += " && "; string field = (*cpi).field; int tblref = (*cpi).tblvar_ref; body += "ret_"+field+"_"+int_to_string(tblref); } body+=")\n"; } body += "\t\tpref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = eval_commonpred_"+equiv_list[q]->get_op()+"(pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface; vector op_list = equiv_list[q]->get_op_list(); vector cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id()); for(o=0;opr,pred_cids, gb_tbl); if(pred_cids.size()>0){ body += "\tif("; for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){ if(cpi != pred_cids.begin()) body += " && "; string field = (*cpi).field; int tblref = (*cpi).tblvar_ref; body += "ret_"+field+"_"+int_to_string(tblref); } body+=")\n"; } body += "\t\tif("+generate_predicate_code(pred_list[p]->pr,Schema)+")\n\t\t\tretval |= bitpos;\n"; body+="\tbitpos = bitpos << 1;\n"; } // --------------------------------------------------------------- // Finished with the body of the prefilter // -------------------------------------------------------------- ret += body; // Collect fields referenced by an lfta but not // already unpacked for the prefilter. //printf("upref_cids is:\n"); //for(csi=upref_cids.begin();csi!=upref_cids.end();csi++) //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref); //printf("pref_ufcns is:\n"); //for(ssi=pref_ufcns.begin();ssi!=pref_ufcns.end();++ssi) //printf("\t%s\n",(*ssi).c_str()); int l; for(l=0;lget_basetbl_name((*csi).schema_ref,(*csi).field); col_id tmp_col_id; tmp_col_id.field = (*csi).field; tmp_col_id.tblvar_ref = (*csi).tblvar_ref; tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl); field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field); set fld_ufcns = fe->get_unpack_fcns(); //printf("tmpcol is (%s, %d), ufcns size is %d, upref_cids cnt is %d\n",tmp_col_id.field.c_str(),tmp_col_id.schema_ref,fld_ufcns.size(), upref_cids.count(tmp_col_id)); if(fld_ufcns.size()>0 && upref_cids.count(tmp_col_id) == 0){ // Ensure that this field not already unpacked. bool found = false; for(ssi=fld_ufcns.begin();ssi!=fld_ufcns.end();++ssi){ //printf("\tField has unpacking fcn %s\n",(*ssi).c_str()); if(pref_ufcns.count((*ssi))){ //printf("Field already unpacked.\n"); found = true;; } } if(! found){ //printf("\tadding to unpack list\n"); upall_cids.insert(tmp_col_id); } } } } //printf("upall_cids is:\n"); //for(csi=upall_cids.begin();csi!=upall_cids.end();csi++) //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref); // Get the set of unpacking programs for these. map uall_fcn_map; find_optimal_unpack_fcns(upall_cids, Schema, uall_fcn_map); set pall_ufcns; for(mcis=uall_fcn_map.begin(); mcis!=uall_fcn_map.end(); mcis++){ //printf("uall_fcn_map[%s %d] = %s\n",(*mcis).first.field.c_str(),(*mcis).first.schema_ref,(*mcis).second.c_str()); pall_ufcns.insert((*mcis).second); } // Iterate through the remaining set of unpacking function if(pall_ufcns.size() > 0) ret += "//\t\tcall all remaining field unpacking functions.\n"; for(ssi=pall_ufcns.begin(); ssi!=pall_ufcns.end(); ++ssi){ // gather the set of columns unpacked by this ufcn col_id_set fcol_set; for(csi=upall_cids.begin();csi!=upall_cids.end();++csi){ if(uall_fcn_map[(*csi)] == (*ssi)) fcol_set.insert((*csi)); } // gather the set of lftas which access a field unpacked by the fcn set clfta; for(l=0;l 0) break; } if(csi != fcol_set.end()) clfta.insert(lfta_sigs[l]); } // generate the unpacking code ret += "\tif("; set::iterator sii; for(sii=clfta.begin();sii!=clfta.end();++sii){ if(sii!=clfta.begin()) ret += " || "; sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sii),(*sii)); ret += tmpstr; } ret += ")\n\t\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n"; } ret += "\treturn(retval);\n\n"; ret += "}\n\n"; // -------------------------------------------------------- // reuse prefilter body for snaplen calculator // // This is dummy code, so I'm commenting it out. /* ret+="gs_uint32_t lfta_pkt_snaplen(void *pkt){\n"; ret += body; int i; vector s_snaps = lfta_snap_lens; sort(s_snaps.begin(), s_snaps.end()); if(s_snaps[0] == -1){ set sigset; for(i=0;i::iterator sulli; for(sulli=sigset.begin();sulli!=sigset.end();++sulli){ if(sulli!=sigset.begin()) ret += " || "; sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli)); ret += tmpstr; } ret += ") return -1;\n"; } int nextpos = lfta_snap_lens.size()-1; int nextval = lfta_snap_lens[nextpos]; while(nextval >= 0){ set sigset; for(i=0;i::iterator sulli; for(sulli=sigset.begin();sulli!=sigset.end();++sulli){ if(sulli!=sigset.begin()) ret += " || "; sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli)); ret += tmpstr; } ret += ") return "+int_to_string(nextval)+";\n"; for(nextpos--;nextpos>0 && lfta_snap_lens[nextpos] == nextval;nextpos--); if(nextpos>0) nextval = lfta_snap_lens[nextpos]; else nextval = -1; } ret += "\treturn 0;\n"; ret += "}\n\n"; */ return(ret); } // Generate the struct which will store the the values of // temporal attributesunpacked by prefilter string generate_lfta_prefilter_struct(col_id_set &cid_set, table_list *Schema) { col_id_set::iterator csi; // printf("generate_lfta_prefilter_struct : %d vars\n",cid_set.size()); string ret="struct prefilter_unpacked_temp_vars {\n"; ret += "\t/*\tVariables for unpacking temporal attributes\t*/\n"; string init_code; for(csi=cid_set.begin(); csi!=cid_set.end();++csi){ int schref = (*csi).schema_ref; int tblref = (*csi).tblvar_ref; string field = (*csi).field; data_type dt(Schema->get_type_name(schref,field), Schema->get_modifier_list(schref,field)); sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(), field.c_str(), tblref); ret += tmpstr; if (init_code != "") init_code += ", "; if (dt.is_increasing()) init_code += dt.get_min_literal(); else init_code += dt.get_max_literal(); } ret += "};\n\n"; ret += "struct prefilter_unpacked_temp_vars prefilter_temp_vars = {" + init_code + "};\n\n"; return(ret); }