return(ret);
}
+string generate_watchlist_element_name(string node_name){
+ string ret = normalize_name(node_name);
+ if(ret == ""){
+ ret = "default";
+ }
+ ret += "__wl_elem";
+
+ return(ret);
+}
+
+string generate_watchlist_struct_name(string node_name){
+ string ret = normalize_name(node_name);
+ if(ret == ""){
+ ret = "default";
+ }
+ ret += "__wl_struct";
+
+ return(ret);
+}
+
+string generate_watchlist_name(string node_name){
+ string ret = normalize_name(node_name);
+ if(ret == ""){
+ ret = "default";
+ }
+ ret += "__wl";
+
+ return(ret);
+}
+
string generate_unpack_code(int tblref, int schref, string field, table_list *schema, string node_name, string end_goto = string("end")){
string ret;
if(! packed_return){
int k;
for(k=0;k<fs->hash_eq.size();++k){
sprintf(tmpstr,"key_var%d",k);
- ret += "\t"+fs->hash_eq[k]->pr->get_left_se()->get_data_type()->make_cvar(tmpstr)+";\n";
+ ret += "\t"+fs->hash_eq[k]->pr->get_right_se()->get_data_type()->make_cvar(tmpstr)+";\n";
}
ret += "\tlong long int ts;\n";
ret += "};\n\n";
return(ret);
}
+string generate_watchlist_structs(string node_name, table_def *tbl,
+ std::string filename, int refresh_interval){
+ string ret;
+
+ ret += "struct "+generate_watchlist_element_name(node_name)+"{\n";
+ vector<field_entry *> fields = tbl->get_fields();
+ for(int f=0;f<fields.size();++f){
+ data_type dt(fields[f]->get_type());
+ ret += "\t"+dt.make_cvar(fields[f]->get_name())+";\n";
+ }
+ ret += "\tgs_uint64_t hashval;\n";
+ ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next;\n";
+ ret += "};\n\n";
+
+ ret += "char *"+generate_watchlist_name(node_name)+"__fstr = \""+filename+"\";\n";
+ ret += "struct "+generate_watchlist_struct_name(node_name)+"{\n";
+ ret += "\tstruct "+ generate_watchlist_element_name(node_name)+" **ht;\n";
+ ret += "\tgs_uint32_t ht_size;\n";
+ ret += "\tgs_uint32_t n_elem;\n";
+ ret += "\tgs_uint32_t refresh_interval;\n";
+ ret += "\ttime_t next_refresh;\n";
+ ret += "\ttime_t last_mtime;\n";
+ ret += "\tchar *filename;\n";
+ ret += "} "+generate_watchlist_name(node_name)+" = { NULL, 0, 0, "+std::to_string(refresh_interval)+", 0, 0, NULL};\n\n";
+
+ return ret;
+}
+
+string generate_watchlist_load(string node_name, table_def *tbl, vector<string> keys){
+ string ret;
+ string tgt = generate_watchlist_name(node_name);
+ vector<field_entry *> fields = tbl->get_fields();
+
+ ret += "void reload_watchlist__"+node_name+"(){\n";
+ ret += "\tgs_uint32_t i;\n";
+ ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *ptr = NULL;\n";
+ ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next = NULL;\n";
+ ret += "\tFILE *fl;\n";
+ ret += "\tchar buf[10000];\n";
+ ret += "\tgs_uint32_t buflen = 10000;\n";
+ ret += "\tchar *flds["+std::to_string(fields.size())+"];\n";
+ ret += "\tgs_uint32_t pos, f, linelen, malformed;\n";
+ ret += "\tgs_uint32_t n_malformed, short_lines, toolong_lines, ok;\n";
+ ret += "\tgs_uint64_t hash, bucket;\n";
+ ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *rec;\n\n";
+
+ ret += "// make sure watchlist file has changed since the last time we loaded it\n";
+ ret += "\tstruct stat file_stat;\n";
+ ret += "\tint err = stat(" + tgt + ".filename, &file_stat);\n";
+ ret += "\tif (err) {\n";
+ ret += "\t\tgslog(LOG_INFO,\"Warning, unable to stat() watchlist file %s to reload " + node_name + ", continue using old version\\n\", " + tgt + ".filename);\n";
+ ret += "\t\treturn;\n";
+ ret += "\t}\n";
+ ret += "\tif (file_stat.st_mtime <= " + tgt + ".last_mtime && file_stat.st_ctime <= " + tgt + ".last_mtime) // watchlist file hasn't changed since last time\n";
+ ret += "\t\treturn;\n";
+ ret += "\t" + tgt + ".last_mtime = (file_stat.st_mtime>file_stat.st_ctime)?file_stat.st_mtime:file_stat.st_ctime;\n\n";
+
+ ret += "// Delete old entries.\n";
+ ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n";
+ ret += "\t\tptr="+tgt+".ht[i];\n";
+ ret += "\t\twhile(ptr!=NULL){\n";
+ for(int f=0;f<fields.size();++f){
+ data_type dt(fields[f]->get_type());
+ if(dt.is_buffer_type()){
+ ret += "\t\t\t"+dt.get_buffer_destroy()+"(&(ptr->"+fields[f]->get_name()+"));\n";
+ }
+ }
+ ret += "\t\t\tnext = ptr->next;\n";
+ ret += "\t\t\tfree(ptr);\n";
+ ret += "\t\t\tptr = next;\n";
+ ret += "\t\t}\n";
+ ret += "\t}\n";
+ ret += "\n// prepare new table. \n";
+ ret += "\tif("+tgt+".n_elem > "+tgt+".ht_size || "+tgt+".ht_size==0){\n";
+ ret += "\t\tif("+tgt+".ht)\n";
+ ret += "\t\t\tfree("+tgt+".ht);\n";
+ ret += "\t\tif("+tgt+".ht_size == 0)\n";
+ ret += "\t\t\t"+tgt+".ht_size = 100000;\n";
+ ret += "\t\telse\n";
+ ret += "\t\t\t"+tgt+".ht_size = "+tgt+".n_elem;\n";
+ ret += "\t\t"+tgt+".ht = (struct "+generate_watchlist_element_name(node_name)+" **)malloc("+tgt+".ht_size * sizeof(struct "+generate_watchlist_element_name(node_name)+" *));\n";
+ ret += "\t}\n";
+ ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n";
+ ret += "\t\t"+tgt+".ht[i] = NULL;\n";
+ ret += "\t}\n";
+ ret += "\n// load new table\n";
+ ret += "\t"+tgt+".n_elem = 0;\n";
+ ret += "\tfl = fopen("+tgt+".filename, \"r\");\n";
+ ret += "\tif(fl==NULL){\n";
+ ret += "\t\tgslog(LOG_INFO,\"Warning, can't open file %s for watchlist "+node_name+"\\n\","+tgt+".filename);\n";
+ ret += "\t\treturn;\n";
+ ret += "\t}\n";
+ ret += "\tmalformed = 0;\n";
+ ret += "\tshort_lines = 0;\n";
+ ret += "\ttoolong_lines = 0;\n";
+ ret += "\twhile(fgets(buf, buflen, fl) != NULL){\n";
+ ret += "\t\tlinelen = strlen(buf);\n";
+ ret += "\t\tbuf[linelen-1]='\\0'; // strip off trailing newline\n";
+ ret += "\t\tlinelen--;\n";
+
+ ret += "\t\tpos=0;\n";
+ ret += "\t\tmalformed=0;\n";
+ ret += "\t\tok=1;\n";
+ ret += "\t\tflds[0] = buf;\n";
+ ret += "\t\tfor(f=1;pos < linelen && f<"+std::to_string(fields.size())+";++f){\n";
+ ret += "\t\t\tfor(;pos < linelen && buf[pos]!=',' && buf[pos]!='\\n';++pos);\n";
+ ret += "\t\t\tif(pos >= linelen){\n";
+ ret += "\t\t\t\tmalformed = 1;\n";
+ ret += "\t\t\t\tbreak;\n";
+ ret += "\t\t\t}\n";
+ ret += "\t\t\tbuf[pos]='\\0';\n";
+ ret += "\t\t\tpos++;\n";
+ ret += "\t\t\tflds[f]=buf+pos;\n";
+ ret += "\t\t}\n";
+ ret += "\t\tif(malformed){\n";
+ ret += "\t\t\tok=0;\n";
+ ret += "\t\t\tn_malformed++;\n";
+ ret += "\t\t}\n";
+ ret += "\t\tif(f<"+std::to_string(fields.size())+"){\n";
+ ret += "\t\t\tok=0;\n";
+ ret += "\t\t\tshort_lines++;\n";
+ ret += "\t\t}\n";
+ ret += "\t\tif(pos && (pos<linelen)){\n";
+ ret += "\t\t\tok=0;\n";
+ ret += "\t\t\ttoolong_lines++;\n";
+ ret += "\t\t}\n";
+ ret += "\t\tif(f>="+std::to_string(fields.size())+"){\n";
+ ret += "\t\t\trec = (struct "+generate_watchlist_element_name(node_name)+" *)malloc(sizeof(struct "+generate_watchlist_element_name(node_name)+"));\n";
+// Extract fields
+ for(int f=0;f<fields.size();++f){
+ data_type dt(fields[f]->get_type());
+ ret += "\t\t\t"+dt.get_wl_extract_fcn()+"(flds["+std::to_string(f)+"], &(rec->"+fields[f]->get_name()+"));\n";
+ }
+// Compute the hash value
+ ret += "\t\t\thash=0;\n";
+ for(int k=0;k<keys.size();++k){
+ string key_fld = keys[k];
+ int f;
+ for(f=0;f<fields.size();++f){
+ if(fields[f]->get_name() == key_fld)
+ break;
+ }
+ data_type dt(fields[f]->get_type());
+
+ ret +=
+"\t\t\thash ^= (("+hash_nums[f%NRANDS]+" * lfta_"+
+ dt.get_type_str()+"_to_hash(rec->"+fields[f]->get_name()+")));\n";
+ }
+ ret += "\t\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n";
+ ret += "\t\t\trec->hashval = hash;\n";
+ ret += "\t\t\trec->next = "+tgt+".ht[bucket];\n";
+ ret += "\t\t\t"+tgt+".ht[bucket] = rec;\n";
+ ret += "\t\t\t"+tgt+".n_elem++;\n";
+
+ ret += "\t\t}\n";
+ ret += "\t}\n";
+ ret += "\tif(n_malformed+toolong_lines > 0){\n";
+ ret += "\t\tgslog(LOG_INFO,\"Errors reading data for watchlist "+node_name+" from file %s: malformed=%d, too short=%d, too long=%d\\n\","+tgt+".filename, malformed, short_lines, toolong_lines);\n";
+ ret += "\t}\n";
+ ret += "}\n\n";
+
+ return ret;
+}
+
string generate_fta_struct(string node_name, gb_table *gb_tbl,
aggregate_table *aggr_tbl, param_table *param_tbl,
cplx_lit_table *complex_literals,
vector<handle_param_tbl_entry *> ¶m_handle_table,
- bool is_aggr_query, bool is_fj, bool uses_bloom,
+ bool is_aggr_query, bool is_fj, bool is_wj, bool uses_bloom,
table_list *schema){
string ret = "struct " + generate_fta_name(node_name) + "{\n";
}
+// --------------------------------------------
+// watchlist-join specific
+ if(is_wj){
+ ret += "\ttime_t ux_time;\n";
+ }
+
//--------------------------------------------------------
// Common fields
ret += "( ";
if(ldt->complex_comparison(ldt) ){
- ret += ldt->get_comparison_fcn(ldt) ;
+ ret += ldt->get_equals_fcn(ldt) ;
ret += "( ";
if(ldt->is_buffer_type() ) ret += "&";
ret += generate_se_code(pr->get_left_se(), schema);
ret += "( ";
if(ldt->complex_comparison(rdt) ){
+// TODO can use get_equals_fcn if op is "=" ?
ret += ldt->get_comparison_fcn(rdt);
ret += "(";
if(ldt->is_buffer_type() ) ret += "&";
string ret;
if(dt->complex_comparison(dt) ){
- ret += dt->get_comparison_fcn(dt);
+ ret += dt->get_equals_fcn(dt);
ret += "(";
if(dt->is_buffer_type() ) ret += "&";
ret += lhs_op;
return(ret);
}
-static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
- string ret;
-
- if(dt->complex_comparison(dt) ){
- ret += dt->get_comparison_fcn(dt);
- ret += "(";
- if(dt->is_buffer_type() ) ret += "&";
- ret += lhs_op;
- ret += ", ";
- if(dt->is_buffer_type() ) ret += "&";
- ret += rhs_op;
- ret += ") == 0";
- }else{
- ret += lhs_op;
- ret += " == ";
- ret += rhs_op;
- }
-
- return(ret);
-}
+//static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
+// string ret;
+//
+ // if(dt->complex_comparison(dt) ){
+// ret += dt->get_equals_fcn(dt);
+// ret += "(";
+// if(dt->is_buffer_type() ) ret += "&";
+// ret += lhs_op;
+// ret += ", ";
+// if(dt->is_buffer_type() ) ret += "&";
+// ret += rhs_op;
+// ret += ") == 0";
+// }else{
+// ret += lhs_op;
+// ret += " == ";
+// ret += rhs_op;
+// }
+//
+// return(ret);
+//}
// Here I assume that only MIN and MAX aggregates can be computed
// over BUFFER data types.
// ret += "#include \"fta.h\"\n\n");
string ret = "#ifndef LFTA_IN_NIC\n";
- ret += "char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n";
+ ret += "const char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n";
ret += "#include<stdio.h>\n";
ret += "#include <limits.h>\n";
ret += "#include <float.h>\n";
+ ret += "#include <sys/stat.h>\n";
ret += "#include \"rdtsc.h\"\n";
ret += "#endif\n";
ret += ";\n";
- ret += "\t\t\ttuple = allocate_tuple(f, tuple_size );\n";
+ ret += "\t\t\ttuple = (struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
ret += "\t\t\tif( tuple != NULL){\n";
ret+="\tif(command == FTA_COMMAND_FLUSH){\n";
ret+="\t\tif (!t->n_aggrs) {\n";
- ret+="\t\t\ttuple = allocate_tuple(f, 0);\n";
+ ret+="\t\t\ttuple = (struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, 0);\n";
ret+="\t\t\tif( tuple != NULL)\n";
ret+="\t\t\t\tpost_tuple(tuple);\n";
}
ret += "\t\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+");\n";
- ret += "\t\ttuple = allocate_tuple(f, tuple_size );\n";
+ ret += "\t\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
ret += "\t\tif( tuple == NULL)\n\t\treturn 1;\n";
/* mark tuple as EOF_TUPLE */
return(ret);
}
-string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query){
+string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query, bool advance_uxtime){
string ret="static gs_retval_t clock_fta_"+node_name+"(struct FTA *f){\n";
ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
ret += generate_fta_name(node_name)+" *) f;\n\n";
ret += "\t#endif\n";
// we need to pay special attention to time fields
- if (field == "time" || field == "timestamp"){
- ret += "\tcur_time = time(&cur_time);\n";
+ if (field == "time" || field == "timestamp" || field == "timestamp_ms"){
+ ret += "\tcur_time = time(&cur_time);\n";
- if (field == "time") {
- sprintf(tmpstr,"\tif (!gscp_blocking_mode() && (t->last_time_%d < (cur_time - %d))) {\n",
+ if (field == "time") {
+ sprintf(tmpstr,"\tif (!gscp_blocking_mode() && (t->last_time_%d < (cur_time - %d))) {\n",
tblref, time_corr);
- ret += tmpstr;
- sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = cur_time - %d;\n",
- field.c_str(), tblref, field.c_str(), tblref, time_corr);
- } else {
- sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((gs_uint32_t)(t->last_%s_%d>>32) < (cur_time - %d))) {\n",
+ ret += tmpstr;
+ sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = cur_time - %d;\n",
+ field.c_str(), tblref, field.c_str(), tblref, time_corr);
+ } else if (field == "timestamp_ms") {
+ sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((t->last_timestamp_ms_%d/1000) < (cur_time - %d))) {\n",
+ tblref, time_corr);
+ ret += tmpstr;
+ sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = (cur_time - %d) * 1000;\n",
+ field.c_str(), tblref, field.c_str(), tblref, time_corr);
+ }else{
+ sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((gs_uint32_t)(t->last_%s_%d>>32) < (cur_time - %d))) {\n",
field.c_str(), tblref, time_corr);
- ret += tmpstr;
- sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = ((gs_uint64_t)(cur_time - %d))<<32;\n",
+ ret += tmpstr;
+ sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = ((gs_uint64_t)(cur_time - %d))<<32;\n",
field.c_str(), tblref, field.c_str(), tblref, time_corr);
- }
- ret += tmpstr;
+ }
+ ret += tmpstr;
- ret += "\t\ttime_advanced = 1;\n";
- ret += "\t}\n";
+ ret += "\t\ttime_advanced = 1;\n";
+ ret += "\t}\n";
- sprintf(tmpstr,"\telse\n\t\tunpack_var_%s_%d = t->last_%s_%d;\n",
- field.c_str(), tblref, field.c_str(), tblref);
- ret += tmpstr;
+ sprintf(tmpstr,"\telse\n\t\tunpack_var_%s_%d = t->last_%s_%d;\n",
+ field.c_str(), tblref, field.c_str(), tblref);
+ ret += tmpstr;
} else {
sprintf(tmpstr,"\tunpack_var_%s_%d = t->last_%s_%d;\n",
field.c_str(), tblref, field.c_str(), tblref);
}
}
+
+ if(advance_uxtime){
+ ret += "\tt->ux_time = time(&(t->ux_time));\n";
+ }
+
// for aggregation lftas we need to check if the time was advanced beyond the current epoch
if (is_aggr_query) {
}
ret += "\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t) + sizeof(struct fta_stat);\n";
- ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
+ ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+"*)allocate_tuple(f, tuple_size );\n";
ret += "\tif( tuple == NULL)\n\t\treturn 1;\n";
ret+="\tpost_tuple(tuple);\n";
ret += "\n\t/* Send a heartbeat message to clearinghouse */\n";
- ret += "\tfta_heartbeat(f->ftaid, t->trace_id++, 1, &stats);\n";
+ ret += "\n\t/* Disable heartbeats for now to avoid overloading clearinghouse */\n";
+ ret += "\t/* fta_heartbeat(f->ftaid, t->trace_id++, 1, &stats); */\n";
ret += "\n\t/* Reset runtime stats */\n";
ret += "\tt->in_tuple_cnt = 0;\n";
ret += ";\n";
- ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
+ ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
// Test passed, make assignments to the tuple.
return ret;
}
+// TODO Ensure that postfilter predicates are being generated
string generate_fj_accept_body(filter_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
int p,s,w;
// bloom filter needs to be advanced.
// SET_BF_EMPTY(table,number of bloom filters,bloom filter index,bit index)
// t->bf_size : number of bits in bloom filter
+// TODO: vectorize?
+// TODO: Don't iterate more than n_bloom times!
+// As written, its possible to wrap around many times.
if(fs->use_bloom){
ret +=
"// Clean out old bloom filters if needed.\n"
+"// TODO vectorize this ? \n"
" if(t->first_exec){\n"
" t->first_exec = 0;\n"
" t->last_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
;
}
}else{
+ ret += "// Add the S record to the hash table, choose a position\n";
ret += "\t\tbucket=0;\n";
for(p=0;p<fs->hash_eq.size();++p){
ret +=
ret += generate_equality_test(lhs_op,rhs_op,hdt);
}
ret += "){\n\t\t\tthe_bucket = bucket;\n";
- ret += "\t\t}else {if(";
+ ret += "\t\t}else{\n\t\t\tif(";
for(p=0;p<fs->hash_eq.size();++p){
if(p>0) ret += " && ";
// ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
// " == s_equijoin_"+int_to_string(p);
data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
- string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
+ string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
string rhs_op = "s_equijoin_"+int_to_string(p);
ret += generate_equality_test(lhs_op,rhs_op,hdt);
}
- ret += "){\n\t\t\tthe_bucket = bucket1;\n";
- ret += "\t\t}else{ if(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n";
- ret+="\t\t\tthe_bucket = bucket;\n\t\t\telse the_bucket=bucket1;\n";
- ret += "\t\t}}\n";
+ ret += "){\n\t\t\t\tthe_bucket = bucket1;\n";
+ ret += "\t\t\t}else{ \n\t\t\t\tif(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n";
+ ret+="\t\t\t\t\tthe_bucket = bucket;\n\t\t\t\telse\n\t\t\t\t\tthe_bucket=bucket1;\n";
+ ret += "\t\t\t}\n\t\t}\n";
for(p=0;p<fs->hash_eq.size();++p){
data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
if(hdt->is_buffer_type()){
}
}
-// Sort S preds based on cost.
+// Sort R preds based on cost.
vector<cnf_elem *> tmp_wh;
for(w=0;w<r_filt.size();++w){
// WARNING! the constant 20 below is a wild-ass guess.
int cheap_rpos;
- for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++)
+ for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++);
// Test the cheap filters on R.
if(cheap_rpos >0){
// Now generate the predicates.
for(w=0;w<cheap_rpos;++w){
- sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
+ sprintf(tmpstr,"//\t\tcheap R predicate clause %d. (cost %d)\n",w,r_filt[w]->cost);
ret += tmpstr;
// Find partial fcns ref'd in this cnf element
ret += "\n// Do the join\n\n";
for(p=0;p<fs->hash_eq.size();++p)
- ret += "\t\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n";
+ ret += "\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n";
// Passed the cheap pred, now test the join with S.
ret +=
" bucket"+int_to_string(i)+
" ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
- fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
+ fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
+"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
}
ret +=
for(p=0;p<fs->hash_eq.size();++p){
ret +=
" bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
- fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
+ fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
+"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
}
ret +=
// ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
// " == r_equijoin_"+int_to_string(p);
data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
- string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
+ string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
string rhs_op = "s_equijoin_"+int_to_string(p);
ret += generate_equality_test(lhs_op,rhs_op,hdt);
}
ret += ";\n";
- ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
+ ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
+ ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
+
+// Test passed, make assignments to the tuple.
+
+ ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
+
+// Mark tuple as REGULAR_TUPLE
+ ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
+
+
+ for(s=0;s<sl_list.size();s++){
+ data_type *sdt = sl_list[s]->get_data_type();
+ if(sdt->is_buffer_type()){
+ sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
+ ret += tmpstr;
+ sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
+ ret += tmpstr;
+ }else{
+ sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
+ ret += tmpstr;
+// if(sdt->needs_hn_translation())
+// ret += sdt->hton_translation() +"( ";
+ ret += generate_se_code(sl_list[s],schema);
+// if(sdt->needs_hn_translation())
+// ret += ") ";
+ ret += ";\n";
+ }
+ }
+
+// Generate output.
+
+ ret += "\tpost_tuple(tuple);\n";
+
+// Increment the counter of posted tuples
+ ret += "\n\t#ifdef LFTA_STATS\n";
+ ret += "\n\tt->out_tuple_cnt++;\n\n";
+ ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
+ ret += "\t#endif\n\n";
+
+
+ return ret;
+}
+
+
+string generate_wj_accept_body(watch_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
+
+int p,s,w;
+string ret;
+
+
+ string wl_schema = fs->from[1]->get_schema_name();
+ string wl_elem_str = generate_watchlist_element_name(wl_schema);
+ string wl_node_str = generate_watchlist_struct_name(wl_schema);
+ string tgt = generate_watchlist_name(wl_schema);
+
+ ret += "//\n//\t\tGenerate test to update watchtable here\n//\n\n";
+
+
+
+
+
+// ------------------------------------------------------------
+// Determine if the R record should be processed.
+
+
+ ret +=
+"// R (main stream) cheap predicate\n"
+"\n"
+;
+
+// Unpack r_filt fields
+ vector<cnf_elem *> r_filt = fs->pred_t0;
+ for(w=0;w<r_filt.size();++w){
+ col_id_set this_pred_cids;
+ gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
+ for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
+ if(unpacked_cids.count( (*csi) ) == 0){
+ int tblref = (*csi).tblvar_ref;
+ int schref = (*csi).schema_ref;
+ string field = (*csi).field;
+ ret += generate_unpack_code(tblref,schref,field,schema,node_name);
+ unpacked_cids.insert( (*csi) );
+ }
+ }
+ }
+
+// Sort R preds based on cost.
+
+ vector<cnf_elem *> tmp_wh;
+ for(w=0;w<r_filt.size();++w){
+ compute_cnf_cost(r_filt[w],Ext_fcns);
+ tmp_wh.push_back(r_filt[w]);
+ }
+ r_filt = tmp_wh;
+
+ sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
+
+// WARNING! the constant 20 below is a wild-ass guess.
+ int cheap_rpos;
+ for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++);
+
+// Test the cheap filters on R.
+ if(cheap_rpos >0){
+
+// Now generate the predicates.
+ for(w=0;w<cheap_rpos;++w){
+ sprintf(tmpstr,"//\t\tCheap R predicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
+ ret += tmpstr;
+
+// Find partial fcns ref'd in this cnf element
+ set<int> pfcn_refs;
+ collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
+// Since set<..> is a "Sorted Associative Container",
+// we can walk through it in sorted order by walking from
+// begin() to end(). (and the partial fcns must be
+// evaluated in this order).
+ set<int>::iterator si;
+ for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
+ if(fcn_ref_cnt[(*si)] > 1){
+ ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
+ }
+ if(is_partial_fcn[(*si)]){
+ ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
+ ret += "\t\tif(retval) goto end;\n";
+ }
+ if(fcn_ref_cnt[(*si)] > 1){
+ if(!is_partial_fcn[(*si)]){
+ ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
+ }
+ ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
+ ret += "\t}\n";
+ }
+ }
+
+ ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
+ ") ) goto end;\n";
+ }
+ }else{
+ ret += "\n\n/*\t\t (no cheap R predicate to test)\t*/\n\n";
+ }
+
+ ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
+ map<string, cnf_elem *> h_eq = ((watch_join_qpn *)fs)-> hash_eq;
+ vector<string> kflds = ((watch_join_qpn *)fs)->key_flds;
+ for(w=0;w<kflds.size();++w){
+ string kfld = kflds[w];
+ col_id_set this_pred_cids;
+ gather_pr_col_ids(h_eq[kfld]->pr, this_pred_cids, gb_tbl);
+ for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
+ if(unpacked_cids.count( (*csi) ) == 0){
+ int tblref = (*csi).tblvar_ref;
+ int schref = (*csi).schema_ref;
+ string field = (*csi).field;
+ if(tblref==0) // LHS from packet, don't unpack the RHS
+ ret += generate_unpack_code(tblref,schref,field,schema,node_name);
+ unpacked_cids.insert( (*csi) );
+ }
+ }
+ }
+
+
+ ret += "\n// Do the join\n\n";
+ ret += "\n// (ensure that the watchtable is fresh)\n";
+ ret += "\tif(t->ux_time >= "+tgt+".next_refresh){\n";
+ ret += "\t\treload_watchlist__"+wl_schema+"();\n";
+ ret += "\t\t"+tgt+".next_refresh = t->ux_time+"+tgt+".refresh_interval;\n";
+ ret += "\t}\n\n";
+
+
+ for(p=0;p<fs->key_flds.size();++p){
+ string kfld = fs->key_flds[p];
+ ret += "\tr_equijoin_"+kfld+" = "+generate_se_code(fs->hash_eq[kfld]->pr->get_left_se(),schema)+";\n";
+ }
+
+
+// Passed the cheap pred, now test the join with S.
+ ret += "\tbucket=0;\n";
+ ret += "\thash=0;\n";
+ for(p=0;p<fs->key_flds.size();++p){
+ string kfld = fs->key_flds[p];
+ ret +=
+" hash ^= (("+hash_nums[p%NRANDS]+" * lfta_"+
+ fs->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_type_str()+
+ +"_to_hash(r_equijoin_"+kfld+")));\n";
+ }
+ ret += "\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n";
+
+ ret += "\t\trec = "+tgt+".ht[bucket];\n";
+ ret += "\t\twhile(rec!=NULL){\n";
+ ret += "\t\t\tif(hash==rec->hashval){\n";
+ ret += "\t\t\t\tif(";
+ for(p=0;p<fs->key_flds.size();++p){
+ string kfld = fs->key_flds[p];
+ if(p>0) ret += " && ";
+ data_type *hdt = fs->hash_eq[kfld]->pr->get_right_se()->get_data_type();
+ string lhs_op = "r_equijoin_"+kfld;
+ string rhs_op = "rec->"+kfld;
+ ret += generate_equality_test(lhs_op,rhs_op,hdt);
+ }
+ ret += ")\n";
+ ret += "\t\t\t\t\tbreak;\n";
+ ret += "\t\t\t}\n";
+ ret += "\t\t\trec=rec->next;\n";
+ ret += "\t\t}\n";
+ ret += "\t\tif(rec==NULL)\n";
+ ret += "\t\t\tgoto end;\n";
+
+ ret += "\n/*\tPassed the hash lookup clause, unpack the other predicate fields. */\n";
+ for(w=0;w<where.size();++w){
+ col_id_set this_pred_cids;
+ gather_pr_col_ids(where[w]->pr, this_pred_cids, gb_tbl);
+ for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
+ if(unpacked_cids.count( (*csi) ) == 0){
+ int tblref = (*csi).tblvar_ref;
+ int schref = (*csi).schema_ref;
+ string field = (*csi).field;
+ if(tblref==0) // LHS from packet
+ ret += generate_unpack_code(tblref,schref,field,schema,node_name);
+ else // RHS from hash bucket
+ ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n";
+ unpacked_cids.insert( (*csi) );
+ }
+ }
+ }
+
+
+// Test the expensive filters on R.
+// TODO Should merge this with other predicates and eval in order
+// of cost - see the fj code.
+// TODO join and postfilter predicates haven't been costed yet.
+ if(cheap_rpos < r_filt.size()){
+
+// Now generate the predicates.
+ for(w=cheap_rpos;w<r_filt.size();++w){
+ sprintf(tmpstr,"//\t\tExpensive R predicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
+ ret += tmpstr;
+
+// Find partial fcns ref'd in this cnf element
+ set<int> pfcn_refs;
+ collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
+// Since set<..> is a "Sorted Associative Container",
+// we can walk through it in sorted order by walking from
+// begin() to end(). (and the partial fcns must be
+// evaluated in this order).
+ set<int>::iterator si;
+ for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
+ if(fcn_ref_cnt[(*si)] > 1){
+ ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
+ }
+ if(is_partial_fcn[(*si)]){
+ ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
+ ret += "\t\tif(retval) goto end;\n";
+ }
+ if(fcn_ref_cnt[(*si)] > 1){
+ if(!is_partial_fcn[(*si)]){
+ ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
+ }
+ ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
+ ret += "\t}\n";
+ }
+ }
+
+ ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
+ ") ) goto end;\n";
+ }
+ }else{
+ ret += "\n\n/*\t\t (no expensive R predicate to test)\t*/\n\n";
+ }
+
+// TODO sort the additional predicates by cost
+
+// S-only
+ for(w=0;w<fs->pred_t1.size();++w){
+ sprintf(tmpstr,"//\t\tS Predicate clause %d.(cost %d)\n",w,fs->pred_t1[w]->cost);
+ ret += tmpstr;
+
+// Find partial fcns ref'd in this cnf element
+ set<int> pfcn_refs;
+ collect_partial_fcns_pr(fs->pred_t1[w]->pr, pfcn_refs);
+// Since set<..> is a "Sorted Associative Container",
+// we can walk through it in sorted order by walking from
+// begin() to end(). (and the partial fcns must be
+// evaluated in this order).
+ set<int>::iterator si;
+ for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
+ if(fcn_ref_cnt[(*si)] > 1){
+ ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
+ }
+ if(is_partial_fcn[(*si)]){
+ ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
+ ret += "\t\tif(retval) goto end;\n";
+ }
+ if(fcn_ref_cnt[(*si)] > 1){
+ if(!is_partial_fcn[(*si)]){
+ ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
+ }
+ ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
+ ret += "\t}\n";
+ }
+ }
+
+ ret += "\tif( !("+generate_predicate_code(fs->pred_t1[w]->pr,schema)+
+ ") ) goto end;\n";
+ }
+
+// non hash-eq join
+ for(w=0;w<fs->join_filter.size();++w){
+ sprintf(tmpstr,"//\t\tJoin Predicate clause %d.(cost %d)\n",w,fs->join_filter[w]->cost);
+ ret += tmpstr;
+
+// Find partial fcns ref'd in this cnf element
+ set<int> pfcn_refs;
+ collect_partial_fcns_pr(fs->join_filter[w]->pr, pfcn_refs);
+// Since set<..> is a "Sorted Associative Container",
+// we can walk through it in sorted order by walking from
+// begin() to end(). (and the partial fcns must be
+// evaluated in this order).
+ set<int>::iterator si;
+ for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
+ if(fcn_ref_cnt[(*si)] > 1){
+ ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
+ }
+ if(is_partial_fcn[(*si)]){
+ ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
+ ret += "\t\tif(retval) goto end;\n";
+ }
+ if(fcn_ref_cnt[(*si)] > 1){
+ if(!is_partial_fcn[(*si)]){
+ ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
+ }
+ ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
+ ret += "\t}\n";
+ }
+ }
+
+ ret += "\tif( !("+generate_predicate_code(fs->join_filter[w]->pr,schema)+
+ ") ) goto end;\n";
+ }
+
+// postfilter
+ for(w=0;w<fs->postfilter.size();++w){
+ sprintf(tmpstr,"//\t\tpostfilter Predicate clause %d.(cost %d)\n",w,fs->postfilter[w]->cost);
+ ret += tmpstr;
+
+// Find partial fcns ref'd in this cnf element
+ set<int> pfcn_refs;
+ collect_partial_fcns_pr(fs->postfilter[w]->pr, pfcn_refs);
+// Since set<..> is a "Sorted Associative Container",
+// we can walk through it in sorted order by walking from
+// begin() to end(). (and the partial fcns must be
+// evaluated in this order).
+ set<int>::iterator si;
+ for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
+ if(fcn_ref_cnt[(*si)] > 1){
+ ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
+ }
+ if(is_partial_fcn[(*si)]){
+ ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
+ ret += "\t\tif(retval) goto end;\n";
+ }
+ if(fcn_ref_cnt[(*si)] > 1){
+ if(!is_partial_fcn[(*si)]){
+ ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
+ }
+ ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
+ ret += "\t}\n";
+ }
+ }
+
+ ret += "\tif( !("+generate_predicate_code(fs->postfilter[w]->pr,schema)+
+ ") ) goto end;\n";
+ }
+
+
+
+/////////////// post the tuple
+
+// test passed : create the tuple, then assign to it.
+ ret += "/*\t\tCreate and post the tuple\t*/\n";
+
+// Unpack R fields
+ for(s=0;s<sl_list.size();++s){
+ col_id_set this_se_cids;
+ gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
+ for(auto csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
+ if(unpacked_cids.count( (*csi) ) == 0){
+ int tblref = (*csi).tblvar_ref;
+ int schref = (*csi).schema_ref;
+ string field = (*csi).field;
+ if(tblref==0) // LHS from packet
+ ret += generate_unpack_code(tblref,schref,field,schema,node_name);
+ else // RHS from hash bucket
+ ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n";
+ unpacked_cids.insert( (*csi) );
+ }
+ }
+ }
+
+
+// Unpack partial fcns ref'd by the select clause.
+// Its a kind of a WHERE clause ...
+ for(p=sl_fcns_start;p<sl_fcns_end;p++){
+ if(fcn_ref_cnt[p] > 1){
+ ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
+ }
+ if(is_partial_fcn[p]){
+ ret += unpack_partial_fcn(partial_fcns[p], p, schema);
+ ret += "\tif(retval) goto end;\n";
+ }
+ if(fcn_ref_cnt[p] > 1){
+ if(!is_partial_fcn[p]){
+ ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
+ }
+ ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
+ ret += "\t}\n";
+ }
+ }
+
+ // increment the counter of accepted tuples
+ ret += "\n\t#ifdef LFTA_STATS\n";
+ ret += "\n\tt->accepted_tuple_cnt++;\n\n";
+ ret += "\t#endif\n\n";
+
+// First, compute the size of the tuple.
+
+// Unpack any BUFFER type selections into temporaries
+// so that I can compute their size and not have
+// to recompute their value during tuple packing.
+// I can use regular assignment here because
+// these temporaries are non-persistent.
+
+ for(s=0;s<sl_list.size();s++){
+ data_type *sdt = sl_list[s]->get_data_type();
+ if(sdt->is_buffer_type()){
+ sprintf(tmpstr,"\tselvar_%d = ",s);
+ ret += tmpstr;
+ ret += generate_se_code(sl_list[s],schema);
+ ret += ";\n";
+ }
+ }
+
+
+// The size of the tuple is the size of the tuple struct plus the
+// size of the buffers to be copied in.
+
+ ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
+ for(s=0;s<sl_list.size();s++){
+ data_type *sdt = sl_list[s]->get_data_type();
+ if(sdt->is_buffer_type()){
+ sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
+ ret += tmpstr;
+ }
+ }
+ ret += ";\n";
+
+
+ ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
// Test passed, make assignments to the tuple.
-string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, set<unsigned int> &s_pids){
+string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, bool is_wj, set<unsigned int> &s_pids){
string ret="static gs_retval_t accept_packet_"+node_name+
"(struct FTA *f, FTAID * ftaid, void *pkt, gs_int32_t sz){\n";
}
for(w=0;w<where.size();++w){
- if(is_fj || s_pids.count(w) == 0)
+ if(is_wj || is_fj || s_pids.count(w) == 0)
gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
- }
+ }
for(s=0;s<sl_list.size();s++){
gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
}
bool uses_bloom = fjq->use_bloom;
ret += "/*\t\tJoin fields\t*/\n";
for(g=0;g<fjq->hash_eq.size();g++){
- sprintf(tmpstr,"\t%s s_equijoin_%d, r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g,g);
+ sprintf(tmpstr,"\t%s s_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_right_se()->get_data_type()->get_cvar_type().c_str(),g);
+ ret += tmpstr;
+ sprintf(tmpstr,"\t%s r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g);
ret += tmpstr;
}
if(uses_bloom){
"\tunsigned int i=0,j=0,k=0, b, bf_clean = 0, tmp_i, found; \n"
"\tunsigned int bucket, bucket0, bucket1, bucket2;\n"
"\tlong long int curr_fj_ts;\n"
-"\tunsigned int curr_bin, the_bin;\n"
+"\tlong long int curr_bin, the_bin;\n"
"\n"
;
}else{
}
+ if(fs->node_type() == "watch_join"){
+ watch_join_qpn *wlq = (watch_join_qpn *)fs;
+ ret += "/*\t\tJoin fields\t*/\n";
+ for(int k=0;k<wlq->key_flds.size(); ++k){
+ string kfld = wlq->key_flds[k];
+ ret += "\t"+wlq->hash_eq[kfld]->pr->get_right_se()->get_data_type()->get_cvar_type()+" s_equijoin_"+kfld+";\n";
+ ret += "\t"+wlq->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_cvar_type()+" r_equijoin_"+kfld+";\n";
+ }
+ ret +=
+" /* Variables for wl join table */ \n"
+"\tunsigned int i, bucket;\n"
+"\tunsigned long long int hash; \n";
+ string wl_schema = wlq->from[1]->get_schema_name();
+ string wl_elem_str = generate_watchlist_element_name(wl_schema);
+ ret += "\tstruct "+wl_elem_str+" *rec = NULL;\n";
+"\n"
+;
+ }
+
+
// Variables needed to store selected attributes of BUFFER type
// temporarily, in order to compute their size for storage
// in an output tuple.
string select_var_defs = "";
- for(s=0;s<sl_list.size();s++){
+ for(int s=0;s<sl_list.size();s++){
data_type *sdt = sl_list[s]->get_data_type();
if(sdt->is_buffer_type()){
sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
vector<cnf_elem *> filter = fs->get_filter_clause();
// Test the filter predicate (some query types have additional preds).
- if(filter.size() > 0){
+ if(filter.size() > 0 && !is_wj){ // watchlist join does specialized processing
// Sort by evaluation cost.
// First, estimate evaluation costs
if(is_fj){
ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
vector<cnf_elem *> h_eq = ((filter_join_qpn *)fs)-> hash_eq;
- for(w=0;w<h_eq.size();++w){
+ for(w=0;w<h_eq.size();++w){
col_id_set this_pred_cids;
gather_pr_col_ids(h_eq[w]->pr, this_pred_cids, gb_tbl);
for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
}
}
}
+ }else if(is_wj){ // STOPPED HERE move this to wj main body
+/*
+ ret += "\n//\tPassed the WHERE clause, unpack the hash fields. \n";
+ map<string, cnf_elem *> h_eq = ((watch_join_qpn *)fs)-> hash_eq;
+ vector<string> kflds = ((watch_join_qpn *)fs)->key_flds;
+ for(w=0;w<kflds.size();++w){
+ string kfld = kflds[w];
+ col_id_set this_pred_cids;
+ gather_pr_col_ids(h_eq[kfld]->pr, this_pred_cids, gb_tbl);
+ for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
+ if(unpacked_cids.count( (*csi) ) == 0){
+ int tblref = (*csi).tblvar_ref;
+ int schref = (*csi).schema_ref;
+ string field = (*csi).field;
+ ret += generate_unpack_code(tblref,schref,field,schema,node_name);
+ unpacked_cids.insert( (*csi) );
+ }
+ }
+ }
+*/
}else{
ret += "\n/*\tPassed the WHERE clause, unpack the rest of the accessed fields. */\n";
////////////////// After this, the query types
////////////////// are processed differently.
- if(!is_aggr_query && !is_fj)
+ if(!is_aggr_query && !is_fj & !is_wj)
ret += generate_sel_accept_body(fs, node_name, schema);
else if(is_aggr_query)
ret += generate_aggr_accept_body(fs, node_name, schema, temporal_flush);
- else
- ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
+ else{
+ if(is_fj)
+ ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
+ else
+ ret += generate_wj_accept_body((watch_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
+ }
+
// Finish up.
ret+="\tstruct "+generate_fta_name(node_name)+"* f;\n";
ret+="\tint i;\n";
ret += "\n";
- ret+="\tif((f=fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n";
+ ret+="\tif((f=(struct "+generate_fta_name(node_name)+" *)fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n";
// assign a streamid to fta instance
ret+="\t/* assign a streamid */\n";
}
ret += ";\n";
- ret+="\tif ((f->aggr_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n";
+ ret+="\tif ((f->aggr_table = (struct "+generate_aggr_struct_name(node_name)+" *)sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n";
ret+="\t\treturn(0);\n";
ret+="\t}\n\n";
// ret+="/* compute how many integers we need to store the hashmap */\n";
// ret+="\tf->bitmap_size = (f->max_aggrs % (sizeof(gs_uint32_t) * 4)) ? (f->max_aggrs / (sizeof(gs_uint32_t) * 4) + 1) : (f->max_aggrs / (sizeof(gs_uint32_t) * 4));\n\n";
- ret+="\tif ((f->aggr_table_hashmap = sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n";
+ ret+="\tif ((f->aggr_table_hashmap = (gs_uint32_t *)sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n";
ret+="\t\treturn(0);\n";
ret+="\t}\n";
ret+="/*\t\tfill bitmap with zero \t*/\n";
int bf_byte_size = bf_bit_size / (8*sizeof(char));
int bf_tot = n_bloom*bf_byte_size;
- ret+="\tif ((f->bf_table = sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n";
+ ret+="\tif ((f->bf_table = (unsigned char *)sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n";
ret+="\t\treturn(0);\n";
ret+="\t}\n";
ret +=
}
ht_size = hs;
}
- ret+="\tif ((f->join_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n";
+ ret+="\tif ((f->join_table = (struct "+generate_fj_struct_name(node_name)+" *) sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n";
ret+="\t\treturn(0);\n";
ret+="\t}\n\n";
ret +=
param_handle_table = fs->get_handle_param_tbl(Ext_fcns);
string node_name = fs->get_node_name();
bool is_fj = false, uses_bloom = false;
+ bool is_wj = false;
+ bool is_watch_tbl = false;
if(fs->node_type() == "spx_qpn"){
uses_bloom = fj_node->use_bloom;
gb_tbl = NULL;
aggr_tbl = NULL;
+ }else
+ if(fs->node_type() == "watch_join"){
+ is_aggr_query = false;
+ is_wj = true;
+ watch_join_qpn *wl_node = (watch_join_qpn *)fs;
+ sl_list = wl_node->get_select_se_list();
+ where = wl_node->get_where_clause();
+ gb_tbl = NULL;
+ aggr_tbl = NULL;
+ }else
+ if(fs->node_type() == "watch_tbl_qpn"){
+ is_aggr_query = false;
+ is_watch_tbl = true;
+ vector<scalarexp_t *> empty_sl_list;
+ vector<cnf_elem *> empty_where;
+ sl_list = empty_sl_list;
+ where = empty_where;
+ gb_tbl = NULL;
+ aggr_tbl = NULL;
} else {
fprintf(stderr,"INTERNAL ERROR, unrecognized node type %s in generate_lfta_block\n", fs->node_type().c_str());
exit(1);
}
// Build list of "partial functions", by clause.
-// NOTE : partial fcns are not handles well.
+// NOTE : partial fcns are not handled well.
// The act of searching for them associates the fcn call
// in the SE with an index to an array. Refs to the
// fcn value are replaced with refs to the variable they are
ag_fcns_start = gb_fcns_end = partial_fcns.size();
if(aggr_tbl != NULL){
for(i=0;i<aggr_tbl->size();i++){
- find_partial_fcns(aggr_tbl->get_aggr_se(i), NULL, NULL, &is_partial_fcn, Ext_fcns);
+ find_partial_fcns(aggr_tbl->get_aggr_se(i), &partial_fcns, NULL, &is_partial_fcn, Ext_fcns);
}
}
ag_fcns_end = partial_fcns.size();
/////////////////////////////////////////////////////
// Common stuff unpacked, do some generation
+
if(is_aggr_query)
retval += generate_aggr_struct(node_name, gb_tbl, aggr_tbl);
if(is_fj)
retval += generate_fj_struct((filter_join_qpn *)fs, node_name);
-
- retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, uses_bloom, schema);
- retval += generate_tuple_struct(node_name, sl_list) ;
-
- if(is_aggr_query)
- retval += generate_fta_flush(node_name, schema, Ext_fcns) ;
- if(param_tbl->size() > 0)
- retval += generate_fta_load_params(node_name) ;
- retval += generate_fta_free(node_name, is_aggr_query) ;
- retval += generate_fta_control(node_name, schema, is_aggr_query) ;
- retval += generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, s_pids) ;
-
+ if(is_watch_tbl){
+ retval += "\n\n// watchtable code here \n\n";
+ watch_tbl_qpn *wl_node = (watch_tbl_qpn *)fs;
+ retval += generate_watchlist_structs(node_name, wl_node->table_layout, wl_node->filename, wl_node->refresh_interval);
+ retval += generate_watchlist_load(node_name, wl_node->table_layout, wl_node->key_flds);
+ }
+
+ if(! is_watch_tbl){
+ retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, is_wj, uses_bloom, schema);
+ retval += generate_tuple_struct(node_name, sl_list) ;
+
+ if(is_aggr_query)
+ retval += generate_fta_flush(node_name, schema, Ext_fcns) ;
+ if(param_tbl->size() > 0)
+ retval += generate_fta_load_params(node_name) ;
+ retval += generate_fta_free(node_name, is_aggr_query) ;
+ retval += generate_fta_control(node_name, schema, is_aggr_query) ;
+ retval += generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, is_wj, s_pids) ;
/* extract the value of Time_Correlation from interface definition */
- int e,v;
- string es;
- unsigned time_corr;
- vector<tablevar_t *> tvec = fs->get_input_tbls();
- vector<string> time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es);
- if (time_corr_vec.empty())
- time_corr = DEFAULT_TIME_CORR;
- else
- time_corr = atoi(time_corr_vec[0].c_str());
+ int e,v;
+ string es;
+ unsigned time_corr;
+ vector<tablevar_t *> tvec = fs->get_input_tbls();
+ vector<string> time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es);
+ if (time_corr_vec.empty())
+ time_corr = DEFAULT_TIME_CORR;
+ else
+ time_corr = atoi(time_corr_vec[0].c_str());
- retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query) );
- retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) );
+ retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query, is_wj) );
+ retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) );
+ }
return(retval);
}
-int compute_snap_len(qp_node *fs, table_list *schema){
+int compute_snap_len(qp_node *fs, table_list *schema, string snap_type){
// Initialize global vars
gb_tbl = NULL;
sl_list.clear(); where.clear();
+
+ if(fs->node_type() == "watch_tbl_qpn"){
+ return -1;
+ }
+
if(fs->node_type() == "spx_qpn"){
spx_qpn *spx_node = (spx_qpn *)fs;
sl_list = spx_node->get_select_se_list();
filter_join_qpn *fj_node = (filter_join_qpn *)fs;
sl_list = fj_node->get_select_se_list();
where = fj_node->get_where_clause();
+ }
+ else if(fs->node_type() == "watch_join"){
+ watch_join_qpn *fj_node = (watch_join_qpn *)fs;
+ sl_list = fj_node->get_select_se_list();
+ where = fj_node->get_where_clause();
} else{
fprintf(stderr,"INTERNAL ERROR, node type %s not recognized in compute_snap_len\n",fs->node_type().c_str());
exit(1);
int tblref = (*csi).tblvar_ref;
string field = (*csi).field;
- param_list *field_params = schema->get_modifier_list(schref, field);
- if(field_params->contains_key("snap_len")){
- string fld_snap_str = field_params->val_of("snap_len");
- int fld_snap;
- if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
- if(fld_snap > snap_len) snap_len = fld_snap;
- n_snap++;
- }else{
- fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
+ if(snap_type == "index"){
+ int pos = schema->get_field_idx(schref, field);
+ if(pos>snap_len) snap_len = pos;
+ n_snap++;
+ }else{
+ param_list *field_params = schema->get_modifier_list(schref, field);
+ if(field_params->contains_key("snap_len")){
+ string fld_snap_str = field_params->val_of("snap_len");
+ int fld_snap;
+ if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
+ if(fld_snap > snap_len) snap_len = fld_snap;
+ n_snap++;
+ }else{
+ fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
+ }
}
}
}