X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fftacmp%2Fgenerate_lfta_code.cc;h=5ec41676fa5e8dd2f07c0715dc09f17201b4ebde;hb=9fd1eb03e66522e79c94dec7ed26f68c17018fc1;hp=06bfc9b64e908ef36f6236c217c008f97672d1dc;hpb=c9783d8ea8b85d810483559e50dbf2297109e349;p=com%2Fgs-lite.git diff --git a/src/ftacmp/generate_lfta_code.cc b/src/ftacmp/generate_lfta_code.cc index 06bfc9b..5ec4167 100644 --- a/src/ftacmp/generate_lfta_code.cc +++ b/src/ftacmp/generate_lfta_code.cc @@ -169,6 +169,36 @@ string generate_fj_struct_name(string node_name){ return(ret); } +string generate_watchlist_element_name(string node_name){ + string ret = normalize_name(node_name); + if(ret == ""){ + ret = "default"; + } + ret += "__wl_elem"; + + return(ret); +} + +string generate_watchlist_struct_name(string node_name){ + string ret = normalize_name(node_name); + if(ret == ""){ + ret = "default"; + } + ret += "__wl_struct"; + + return(ret); +} + +string generate_watchlist_name(string node_name){ + string ret = normalize_name(node_name); + if(ret == ""){ + ret = "default"; + } + ret += "__wl"; + + return(ret); +} + string generate_unpack_code(int tblref, int schref, string field, table_list *schema, string node_name, string end_goto = string("end")){ string ret; if(! packed_return){ @@ -237,7 +267,7 @@ string generate_fj_struct(filter_join_qpn *fs, string node_name ){ int k; for(k=0;khash_eq.size();++k){ sprintf(tmpstr,"key_var%d",k); - ret += "\t"+fs->hash_eq[k]->pr->get_left_se()->get_data_type()->make_cvar(tmpstr)+";\n"; + ret += "\t"+fs->hash_eq[k]->pr->get_right_se()->get_data_type()->make_cvar(tmpstr)+";\n"; } ret += "\tlong long int ts;\n"; ret += "};\n\n"; @@ -246,14 +276,178 @@ string generate_fj_struct(filter_join_qpn *fs, string node_name ){ return(ret); } +string generate_watchlist_structs(string node_name, table_def *tbl, + std::string filename, int refresh_interval){ + string ret; + + ret += "struct "+generate_watchlist_element_name(node_name)+"{\n"; + vector fields = tbl->get_fields(); + for(int f=0;fget_type()); + ret += "\t"+dt.make_cvar(fields[f]->get_name())+";\n"; + } + ret += "\tgs_uint64_t hashval;\n"; + ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next;\n"; + ret += "};\n\n"; + + ret += "char *"+generate_watchlist_name(node_name)+"__fstr = \""+filename+"\";\n"; + ret += "struct "+generate_watchlist_struct_name(node_name)+"{\n"; + ret += "\tstruct "+ generate_watchlist_element_name(node_name)+" **ht;\n"; + ret += "\tgs_uint32_t ht_size;\n"; + ret += "\tgs_uint32_t n_elem;\n"; + ret += "\tgs_uint32_t refresh_interval;\n"; + ret += "\ttime_t next_refresh;\n"; + ret += "\ttime_t last_mtime;\n"; + ret += "\tchar *filename;\n"; + ret += "} "+generate_watchlist_name(node_name)+" = { NULL, 0, 0, "+std::to_string(refresh_interval)+", 0, 0, NULL};\n\n"; + + return ret; +} + +string generate_watchlist_load(string node_name, table_def *tbl, vector keys){ + string ret; + string tgt = generate_watchlist_name(node_name); + vector fields = tbl->get_fields(); + + ret += "void reload_watchlist__"+node_name+"(){\n"; + ret += "\tgs_uint32_t i;\n"; + ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *ptr = NULL;\n"; + ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next = NULL;\n"; + ret += "\tFILE *fl;\n"; + ret += "\tchar buf[10000];\n"; + ret += "\tgs_uint32_t buflen = 10000;\n"; + ret += "\tchar *flds["+std::to_string(fields.size())+"];\n"; + ret += "\tgs_uint32_t pos, f, linelen, malformed;\n"; + ret += "\tgs_uint32_t n_malformed, short_lines, toolong_lines, ok;\n"; + ret += "\tgs_uint64_t hash, bucket;\n"; + ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *rec;\n\n"; + + ret += "// make sure watchlist file has changed since the last time we loaded it\n"; + ret += "\tstruct stat file_stat;\n"; + ret += "\tint err = stat(" + tgt + ".filename, &file_stat);\n"; + ret += "\tif (err) {\n"; + ret += "\t\tgslog(LOG_INFO,\"Warning, unable to stat() watchlist file %s to reload " + node_name + ", continue using old version\\n\", " + tgt + ".filename);\n"; + ret += "\t\treturn;\n"; + ret += "\t}\n"; + ret += "\tif (file_stat.st_mtime <= " + tgt + ".last_mtime && file_stat.st_ctime <= " + tgt + ".last_mtime) // watchlist file hasn't changed since last time\n"; + ret += "\t\treturn;\n"; + ret += "\t" + tgt + ".last_mtime = (file_stat.st_mtime>file_stat.st_ctime)?file_stat.st_mtime:file_stat.st_ctime;\n\n"; + + ret += "// Delete old entries.\n"; + ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n"; + ret += "\t\tptr="+tgt+".ht[i];\n"; + ret += "\t\twhile(ptr!=NULL){\n"; + for(int f=0;fget_type()); + if(dt.is_buffer_type()){ + ret += "\t\t\t"+dt.get_buffer_destroy()+"(&(ptr->"+fields[f]->get_name()+"));\n"; + } + } + ret += "\t\t\tnext = ptr->next;\n"; + ret += "\t\t\tfree(ptr);\n"; + ret += "\t\t\tptr = next;\n"; + ret += "\t\t}\n"; + ret += "\t}\n"; + ret += "\n// prepare new table. \n"; + ret += "\tif("+tgt+".n_elem > "+tgt+".ht_size || "+tgt+".ht_size==0){\n"; + ret += "\t\tif("+tgt+".ht)\n"; + ret += "\t\t\tfree("+tgt+".ht);\n"; + ret += "\t\tif("+tgt+".ht_size == 0)\n"; + ret += "\t\t\t"+tgt+".ht_size = 100000;\n"; + ret += "\t\telse\n"; + ret += "\t\t\t"+tgt+".ht_size = "+tgt+".n_elem;\n"; + ret += "\t\t"+tgt+".ht = (struct "+generate_watchlist_element_name(node_name)+" **)malloc("+tgt+".ht_size * sizeof(struct "+generate_watchlist_element_name(node_name)+" *));\n"; + ret += "\t}\n"; + ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n"; + ret += "\t\t"+tgt+".ht[i] = NULL;\n"; + ret += "\t}\n"; + ret += "\n// load new table\n"; + ret += "\t"+tgt+".n_elem = 0;\n"; + ret += "\tfl = fopen("+tgt+".filename, \"r\");\n"; + ret += "\tif(fl==NULL){\n"; + ret += "\t\tgslog(LOG_INFO,\"Warning, can't open file %s for watchlist "+node_name+"\\n\","+tgt+".filename);\n"; + ret += "\t\treturn;\n"; + ret += "\t}\n"; + ret += "\tmalformed = 0;\n"; + ret += "\tshort_lines = 0;\n"; + ret += "\ttoolong_lines = 0;\n"; + ret += "\twhile(fgets(buf, buflen, fl) != NULL){\n"; + ret += "\t\tlinelen = strlen(buf);\n"; + ret += "\t\tbuf[linelen-1]='\\0'; // strip off trailing newline\n"; + ret += "\t\tlinelen--;\n"; + + ret += "\t\tpos=0;\n"; + ret += "\t\tmalformed=0;\n"; + ret += "\t\tok=1;\n"; + ret += "\t\tflds[0] = buf;\n"; + ret += "\t\tfor(f=1;pos < linelen && f<"+std::to_string(fields.size())+";++f){\n"; + ret += "\t\t\tfor(;pos < linelen && buf[pos]!=',' && buf[pos]!='\\n';++pos);\n"; + ret += "\t\t\tif(pos >= linelen){\n"; + ret += "\t\t\t\tmalformed = 1;\n"; + ret += "\t\t\t\tbreak;\n"; + ret += "\t\t\t}\n"; + ret += "\t\t\tbuf[pos]='\\0';\n"; + ret += "\t\t\tpos++;\n"; + ret += "\t\t\tflds[f]=buf+pos;\n"; + ret += "\t\t}\n"; + ret += "\t\tif(malformed){\n"; + ret += "\t\t\tok=0;\n"; + ret += "\t\t\tn_malformed++;\n"; + ret += "\t\t}\n"; + ret += "\t\tif(f<"+std::to_string(fields.size())+"){\n"; + ret += "\t\t\tok=0;\n"; + ret += "\t\t\tshort_lines++;\n"; + ret += "\t\t}\n"; + ret += "\t\tif(pos && (posget_type()); + ret += "\t\t\t"+dt.get_wl_extract_fcn()+"(flds["+std::to_string(f)+"], &(rec->"+fields[f]->get_name()+"));\n"; + } +// Compute the hash value + ret += "\t\t\thash=0;\n"; + for(int k=0;kget_name() == key_fld) + break; + } + data_type dt(fields[f]->get_type()); + + ret += +"\t\t\thash ^= (("+hash_nums[f%NRANDS]+" * lfta_"+ + dt.get_type_str()+"_to_hash(rec->"+fields[f]->get_name()+")));\n"; + } + ret += "\t\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n"; + ret += "\t\t\trec->hashval = hash;\n"; + ret += "\t\t\trec->next = "+tgt+".ht[bucket];\n"; + ret += "\t\t\t"+tgt+".ht[bucket] = rec;\n"; + ret += "\t\t\t"+tgt+".n_elem++;\n"; + + ret += "\t\t}\n"; + ret += "\t}\n"; + ret += "\tif(n_malformed+toolong_lines > 0){\n"; + ret += "\t\tgslog(LOG_INFO,\"Errors reading data for watchlist "+node_name+" from file %s: malformed=%d, too short=%d, too long=%d\\n\","+tgt+".filename, malformed, short_lines, toolong_lines);\n"; + ret += "\t}\n"; + ret += "}\n\n"; + + return ret; +} + string generate_fta_struct(string node_name, gb_table *gb_tbl, aggregate_table *aggr_tbl, param_table *param_tbl, cplx_lit_table *complex_literals, vector ¶m_handle_table, - bool is_aggr_query, bool is_fj, bool uses_bloom, + bool is_aggr_query, bool is_fj, bool is_wj, bool uses_bloom, table_list *schema){ string ret = "struct " + generate_fta_name(node_name) + "{\n"; @@ -270,6 +464,7 @@ string generate_fta_struct(string node_name, gb_table *gb_tbl, ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n"; // ret+="\tint bitmap_size;\n"; ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n"; + ret += "\tgs_int32_t n_ticks; // for limiting slow flush\n"; ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n"; ret += "\tint max_windows; // max number of open windows.\n"; ret += "\tunsigned int generation; // initially zero, increment on\n"; @@ -337,6 +532,12 @@ string generate_fta_struct(string node_name, gb_table *gb_tbl, } +// -------------------------------------------- +// watchlist-join specific + if(is_wj){ + ret += "\ttime_t ux_time;\n"; + } + //-------------------------------------------------------- // Common fields @@ -853,7 +1054,7 @@ static string generate_predicate_code(predicate_t *pr,table_list *schema){ ret += "( "; if(ldt->complex_comparison(ldt) ){ - ret += ldt->get_comparison_fcn(ldt) ; + ret += ldt->get_equals_fcn(ldt) ; ret += "( "; if(ldt->is_buffer_type() ) ret += "&"; ret += generate_se_code(pr->get_left_se(), schema); @@ -883,6 +1084,7 @@ static string generate_predicate_code(predicate_t *pr,table_list *schema){ ret += "( "; if(ldt->complex_comparison(rdt) ){ +// TODO can use get_equals_fcn if op is "=" ? ret += ldt->get_comparison_fcn(rdt); ret += "("; if(ldt->is_buffer_type() ) ret += "&"; @@ -953,7 +1155,7 @@ static string generate_equality_test(string &lhs_op, string &rhs_op, data_type * string ret; if(dt->complex_comparison(dt) ){ - ret += dt->get_comparison_fcn(dt); + ret += dt->get_equals_fcn(dt); ret += "("; if(dt->is_buffer_type() ) ret += "&"; ret += lhs_op; @@ -970,26 +1172,26 @@ static string generate_equality_test(string &lhs_op, string &rhs_op, data_type * return(ret); } -static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){ - string ret; - - if(dt->complex_comparison(dt) ){ - ret += dt->get_comparison_fcn(dt); - ret += "("; - if(dt->is_buffer_type() ) ret += "&"; - ret += lhs_op; - ret += ", "; - if(dt->is_buffer_type() ) ret += "&"; - ret += rhs_op; - ret += ") == 0"; - }else{ - ret += lhs_op; - ret += " == "; - ret += rhs_op; - } - - return(ret); -} +//static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){ +// string ret; +// + // if(dt->complex_comparison(dt) ){ +// ret += dt->get_equals_fcn(dt); +// ret += "("; +// if(dt->is_buffer_type() ) ret += "&"; +// ret += lhs_op; +// ret += ", "; +// if(dt->is_buffer_type() ) ret += "&"; +// ret += rhs_op; +// ret += ") == 0"; +// }else{ +// ret += lhs_op; +// ret += " == "; +// ret += rhs_op; +// } +// +// return(ret); +//} // Here I assume that only MIN and MAX aggregates can be computed // over BUFFER data types. @@ -1170,10 +1372,11 @@ string generate_preamble(table_list *schema, //map &int_fcn_defs, // ret += "#include \"fta.h\"\n\n"); string ret = "#ifndef LFTA_IN_NIC\n"; - ret += "char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n"; + ret += "const char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n"; ret += "#include\n"; ret += "#include \n"; ret += "#include \n"; + ret += "#include \n"; ret += "#include \"rdtsc.h\"\n"; ret += "#endif\n"; @@ -1258,7 +1461,7 @@ string generate_tuple_from_aggr(string node_name, table_list *schema, string idx ret += ";\n"; - ret += "\t\t\ttuple = allocate_tuple(f, tuple_size );\n"; + ret += "\t\t\ttuple = (struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n"; ret += "\t\t\tif( tuple != NULL){\n"; @@ -1701,7 +1904,7 @@ string generate_fta_control(string node_name, table_list *schema, bool is_aggr_q ret+="\tif(command == FTA_COMMAND_FLUSH){\n"; ret+="\t\tif (!t->n_aggrs) {\n"; - ret+="\t\t\ttuple = allocate_tuple(f, 0);\n"; + ret+="\t\t\ttuple = (struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, 0);\n"; ret+="\t\t\tif( tuple != NULL)\n"; ret+="\t\t\t\tpost_tuple(tuple);\n"; @@ -1752,7 +1955,7 @@ string generate_fta_control(string node_name, table_list *schema, bool is_aggr_q } ret += "\t\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+");\n"; - ret += "\t\ttuple = allocate_tuple(f, tuple_size );\n"; + ret += "\t\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n"; ret += "\t\tif( tuple == NULL)\n\t\treturn 1;\n"; /* mark tuple as EOF_TUPLE */ @@ -1766,7 +1969,7 @@ string generate_fta_control(string node_name, table_list *schema, bool is_aggr_q return(ret); } -string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query){ +string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query, bool advance_uxtime){ string ret="static gs_retval_t clock_fta_"+node_name+"(struct FTA *f){\n"; ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct "; ret += generate_fta_name(node_name)+" *) f;\n\n"; @@ -1842,30 +2045,36 @@ string generate_fta_clock(string node_name, table_list *schema, unsigned time_co ret += "\t#endif\n"; // we need to pay special attention to time fields - if (field == "time" || field == "timestamp"){ - ret += "\tcur_time = time(&cur_time);\n"; + if (field == "time" || field == "timestamp" || field == "timestamp_ms"){ + ret += "\tcur_time = time(&cur_time);\n"; - if (field == "time") { - sprintf(tmpstr,"\tif (!gscp_blocking_mode() && (t->last_time_%d < (cur_time - %d))) {\n", + if (field == "time") { + sprintf(tmpstr,"\tif (!gscp_blocking_mode() && (t->last_time_%d < (cur_time - %d))) {\n", tblref, time_corr); - ret += tmpstr; - sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = cur_time - %d;\n", - field.c_str(), tblref, field.c_str(), tblref, time_corr); - } else { - sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((gs_uint32_t)(t->last_%s_%d>>32) < (cur_time - %d))) {\n", + ret += tmpstr; + sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = cur_time - %d;\n", + field.c_str(), tblref, field.c_str(), tblref, time_corr); + } else if (field == "timestamp_ms") { + sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((t->last_timestamp_ms_%d/1000) < (cur_time - %d))) {\n", + tblref, time_corr); + ret += tmpstr; + sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = (cur_time - %d) * 1000;\n", + field.c_str(), tblref, field.c_str(), tblref, time_corr); + }else{ + sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((gs_uint32_t)(t->last_%s_%d>>32) < (cur_time - %d))) {\n", field.c_str(), tblref, time_corr); - ret += tmpstr; - sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = ((gs_uint64_t)(cur_time - %d))<<32;\n", + ret += tmpstr; + sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = ((gs_uint64_t)(cur_time - %d))<<32;\n", field.c_str(), tblref, field.c_str(), tblref, time_corr); - } - ret += tmpstr; + } + ret += tmpstr; - ret += "\t\ttime_advanced = 1;\n"; - ret += "\t}\n"; + ret += "\t\ttime_advanced = 1;\n"; + ret += "\t}\n"; - sprintf(tmpstr,"\telse\n\t\tunpack_var_%s_%d = t->last_%s_%d;\n", - field.c_str(), tblref, field.c_str(), tblref); - ret += tmpstr; + sprintf(tmpstr,"\telse\n\t\tunpack_var_%s_%d = t->last_%s_%d;\n", + field.c_str(), tblref, field.c_str(), tblref); + ret += tmpstr; } else { sprintf(tmpstr,"\tunpack_var_%s_%d = t->last_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref); @@ -1873,6 +2082,11 @@ string generate_fta_clock(string node_name, table_list *schema, unsigned time_co } } + + if(advance_uxtime){ + ret += "\tt->ux_time = time(&(t->ux_time));\n"; + } + // for aggregation lftas we need to check if the time was advanced beyond the current epoch if (is_aggr_query) { @@ -1926,12 +2140,21 @@ string generate_fta_clock(string node_name, table_list *schema, unsigned time_co sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g); ret += tmpstr; } } + ret += "\t\tt->n_ticks = 0; // reset clock tick counter, limit slow flush\n"; + ret += "\t}else{\n"; + ret += "//\tLimit slow flush, do a full flush at two clock ticks past the change in generation.\n"; + ret += "\t\tt->n_ticks++;\n"; + ret += "\t\tif(t->n_ticks == 2){\n"; + ret += "\t\t\tif(t->flush_posmax_aggrs) \n"; + ret += "\t\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n"; + ret += "\t\t}\n"; ret += "\t}\n\n"; + } ret += "\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t) + sizeof(struct fta_stat);\n"; - ret += "\ttuple = allocate_tuple(f, tuple_size );\n"; + ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+"*)allocate_tuple(f, tuple_size );\n"; ret += "\tif( tuple == NULL)\n\t\treturn 1;\n"; @@ -1997,7 +2220,8 @@ string generate_fta_clock(string node_name, table_list *schema, unsigned time_co ret+="\tpost_tuple(tuple);\n"; ret += "\n\t/* Send a heartbeat message to clearinghouse */\n"; - ret += "\tfta_heartbeat(f->ftaid, t->trace_id++, 1, &stats);\n"; + ret += "\n\t/* Disable heartbeats for now to avoid overloading clearinghouse */\n"; + ret += "\t/* fta_heartbeat(f->ftaid, t->trace_id++, 1, &stats); */\n"; ret += "\n\t/* Reset runtime stats */\n"; ret += "\tt->in_tuple_cnt = 0;\n"; @@ -2085,6 +2309,7 @@ string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *sc temporal_flush+="\t\t/* \t\tmark all groups as old */\n"; temporal_flush+="\t\tt->generation++;\n"; temporal_flush+="\t\tt->flush_pos = 0;\n"; + temporal_flush+="\t\tt->n_ticks = 0; // reset clock tick counter, to limit slow flush\n"; // Now set the saved temporal value of the gb to the @@ -2233,7 +2458,7 @@ string ret; ret += ";\n"; - ret += "\ttuple = allocate_tuple(f, tuple_size );\n"; + ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n"; ret += "\tif( tuple == NULL)\n\t\tgoto end;\n"; // Test passed, make assignments to the tuple. @@ -2278,6 +2503,7 @@ string ret; return ret; } +// TODO Ensure that postfilter predicates are being generated string generate_fj_accept_body(filter_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){ int p,s,w; @@ -2354,9 +2580,13 @@ printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=% // bloom filter needs to be advanced. // SET_BF_EMPTY(table,number of bloom filters,bloom filter index,bit index) // t->bf_size : number of bits in bloom filter +// TODO: vectorize? +// TODO: Don't iterate more than n_bloom times! +// As written, its possible to wrap around many times. if(fs->use_bloom){ ret += "// Clean out old bloom filters if needed.\n" +"// TODO vectorize this ? \n" " if(t->first_exec){\n" " t->first_exec = 0;\n" " t->last_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n" @@ -2487,6 +2717,7 @@ printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=% ; } }else{ + ret += "// Add the S record to the hash table, choose a position\n"; ret += "\t\tbucket=0;\n"; for(p=0;phash_eq.size();++p){ ret += @@ -2510,20 +2741,20 @@ printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=% ret += generate_equality_test(lhs_op,rhs_op,hdt); } ret += "){\n\t\t\tthe_bucket = bucket;\n"; - ret += "\t\t}else {if("; + ret += "\t\t}else{\n\t\t\tif("; for(p=0;phash_eq.size();++p){ if(p>0) ret += " && "; // ret += "t->join_table[bucket1].key_var"+int_to_string(p)+ // " == s_equijoin_"+int_to_string(p); data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type(); - string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p); + string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p); string rhs_op = "s_equijoin_"+int_to_string(p); ret += generate_equality_test(lhs_op,rhs_op,hdt); } - ret += "){\n\t\t\tthe_bucket = bucket1;\n"; - ret += "\t\t}else{ if(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n"; - ret+="\t\t\tthe_bucket = bucket;\n\t\t\telse the_bucket=bucket1;\n"; - ret += "\t\t}}\n"; + ret += "){\n\t\t\t\tthe_bucket = bucket1;\n"; + ret += "\t\t\t}else{ \n\t\t\t\tif(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n"; + ret+="\t\t\t\t\tthe_bucket = bucket;\n\t\t\t\telse\n\t\t\t\t\tthe_bucket=bucket1;\n"; + ret += "\t\t\t}\n\t\t}\n"; for(p=0;phash_eq.size();++p){ data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type(); if(hdt->is_buffer_type()){ @@ -2563,7 +2794,7 @@ printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=% } } -// Sort S preds based on cost. +// Sort R preds based on cost. vector tmp_wh; for(w=0;wcost <= 20;cheap_rpos++) + for(cheap_rpos=0;cheap_rposcost <= 20;cheap_rpos++); // Test the cheap filters on R. if(cheap_rpos >0){ // Now generate the predicates. for(w=0;wcost); + sprintf(tmpstr,"//\t\tcheap R predicate clause %d. (cost %d)\n",w,r_filt[w]->cost); ret += tmpstr; // Find partial fcns ref'd in this cnf element @@ -2620,7 +2851,7 @@ printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=% ret += "\n// Do the join\n\n"; for(p=0;phash_eq.size();++p) - ret += "\t\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n"; + ret += "\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n"; // Passed the cheap pred, now test the join with S. @@ -2631,7 +2862,7 @@ printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=% ret += " bucket"+int_to_string(i)+ " ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+ - fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+ + fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+ +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n"; } ret += @@ -2656,7 +2887,7 @@ printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=% for(p=0;phash_eq.size();++p){ ret += " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+ - fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+ + fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+ +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n"; } ret += @@ -2683,7 +2914,7 @@ printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=% // ret += "t->join_table[bucket1].key_var"+int_to_string(p)+ // " == r_equijoin_"+int_to_string(p); data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type(); - string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p); + string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p); string rhs_op = "s_equijoin_"+int_to_string(p); ret += generate_equality_test(lhs_op,rhs_op,hdt); } @@ -2818,7 +3049,464 @@ printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=% ret += ";\n"; - ret += "\ttuple = allocate_tuple(f, tuple_size );\n"; + ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n"; + ret += "\tif( tuple == NULL)\n\t\tgoto end;\n"; + +// Test passed, make assignments to the tuple. + + ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n"; + +// Mark tuple as REGULAR_TUPLE + ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n"; + + + for(s=0;sget_data_type(); + if(sdt->is_buffer_type()){ + sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s); + ret += tmpstr; + sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s); + ret += tmpstr; + }else{ + sprintf(tmpstr,"\ttuple->tuple_var%d = ",s); + ret += tmpstr; +// if(sdt->needs_hn_translation()) +// ret += sdt->hton_translation() +"( "; + ret += generate_se_code(sl_list[s],schema); +// if(sdt->needs_hn_translation()) +// ret += ") "; + ret += ";\n"; + } + } + +// Generate output. + + ret += "\tpost_tuple(tuple);\n"; + +// Increment the counter of posted tuples + ret += "\n\t#ifdef LFTA_STATS\n"; + ret += "\n\tt->out_tuple_cnt++;\n\n"; + ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n"; + ret += "\t#endif\n\n"; + + + return ret; +} + + +string generate_wj_accept_body(watch_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){ + +int p,s,w; +string ret; + + + string wl_schema = fs->from[1]->get_schema_name(); + string wl_elem_str = generate_watchlist_element_name(wl_schema); + string wl_node_str = generate_watchlist_struct_name(wl_schema); + string tgt = generate_watchlist_name(wl_schema); + + ret += "//\n//\t\tGenerate test to update watchtable here\n//\n\n"; + + + + + +// ------------------------------------------------------------ +// Determine if the R record should be processed. + + + ret += +"// R (main stream) cheap predicate\n" +"\n" +; + +// Unpack r_filt fields + vector r_filt = fs->pred_t0; + for(w=0;wpr, this_pred_cids, gb_tbl); + for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){ + if(unpacked_cids.count( (*csi) ) == 0){ + int tblref = (*csi).tblvar_ref; + int schref = (*csi).schema_ref; + string field = (*csi).field; + ret += generate_unpack_code(tblref,schref,field,schema,node_name); + unpacked_cids.insert( (*csi) ); + } + } + } + +// Sort R preds based on cost. + + vector tmp_wh; + for(w=0;wcost <= 20;cheap_rpos++); + +// Test the cheap filters on R. + if(cheap_rpos >0){ + +// Now generate the predicates. + for(w=0;wcost); + ret += tmpstr; + +// Find partial fcns ref'd in this cnf element + set pfcn_refs; + collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs); +// Since set<..> is a "Sorted Associative Container", +// we can walk through it in sorted order by walking from +// begin() to end(). (and the partial fcns must be +// evaluated in this order). + set::iterator si; + for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){ + if(fcn_ref_cnt[(*si)] > 1){ + ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n"; + } + if(is_partial_fcn[(*si)]){ + ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema); + ret += "\t\tif(retval) goto end;\n"; + } + if(fcn_ref_cnt[(*si)] > 1){ + if(!is_partial_fcn[(*si)]){ + ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n"; + } + ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n"; + ret += "\t}\n"; + } + } + + ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+ + ") ) goto end;\n"; + } + }else{ + ret += "\n\n/*\t\t (no cheap R predicate to test)\t*/\n\n"; + } + + ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n"; + map h_eq = ((watch_join_qpn *)fs)-> hash_eq; + vector kflds = ((watch_join_qpn *)fs)->key_flds; + for(w=0;wpr, this_pred_cids, gb_tbl); + for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){ + if(unpacked_cids.count( (*csi) ) == 0){ + int tblref = (*csi).tblvar_ref; + int schref = (*csi).schema_ref; + string field = (*csi).field; + if(tblref==0) // LHS from packet, don't unpack the RHS + ret += generate_unpack_code(tblref,schref,field,schema,node_name); + unpacked_cids.insert( (*csi) ); + } + } + } + + + ret += "\n// Do the join\n\n"; + ret += "\n// (ensure that the watchtable is fresh)\n"; + ret += "\tif(t->ux_time >= "+tgt+".next_refresh){\n"; + ret += "\t\treload_watchlist__"+wl_schema+"();\n"; + ret += "\t\t"+tgt+".next_refresh = t->ux_time+"+tgt+".refresh_interval;\n"; + ret += "\t}\n\n"; + + + for(p=0;pkey_flds.size();++p){ + string kfld = fs->key_flds[p]; + ret += "\tr_equijoin_"+kfld+" = "+generate_se_code(fs->hash_eq[kfld]->pr->get_left_se(),schema)+";\n"; + } + + +// Passed the cheap pred, now test the join with S. + ret += "\tbucket=0;\n"; + ret += "\thash=0;\n"; + for(p=0;pkey_flds.size();++p){ + string kfld = fs->key_flds[p]; + ret += +" hash ^= (("+hash_nums[p%NRANDS]+" * lfta_"+ + fs->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_type_str()+ + +"_to_hash(r_equijoin_"+kfld+")));\n"; + } + ret += "\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n"; + + ret += "\t\trec = "+tgt+".ht[bucket];\n"; + ret += "\t\twhile(rec!=NULL){\n"; + ret += "\t\t\tif(hash==rec->hashval){\n"; + ret += "\t\t\t\tif("; + for(p=0;pkey_flds.size();++p){ + string kfld = fs->key_flds[p]; + if(p>0) ret += " && "; + data_type *hdt = fs->hash_eq[kfld]->pr->get_right_se()->get_data_type(); + string lhs_op = "r_equijoin_"+kfld; + string rhs_op = "rec->"+kfld; + ret += generate_equality_test(lhs_op,rhs_op,hdt); + } + ret += ")\n"; + ret += "\t\t\t\t\tbreak;\n"; + ret += "\t\t\t}\n"; + ret += "\t\t\trec=rec->next;\n"; + ret += "\t\t}\n"; + ret += "\t\tif(rec==NULL)\n"; + ret += "\t\t\tgoto end;\n"; + + ret += "\n/*\tPassed the hash lookup clause, unpack the other predicate fields. */\n"; + for(w=0;wpr, this_pred_cids, gb_tbl); + for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){ + if(unpacked_cids.count( (*csi) ) == 0){ + int tblref = (*csi).tblvar_ref; + int schref = (*csi).schema_ref; + string field = (*csi).field; + if(tblref==0) // LHS from packet + ret += generate_unpack_code(tblref,schref,field,schema,node_name); + else // RHS from hash bucket + ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n"; + unpacked_cids.insert( (*csi) ); + } + } + } + + +// Test the expensive filters on R. +// TODO Should merge this with other predicates and eval in order +// of cost - see the fj code. +// TODO join and postfilter predicates haven't been costed yet. + if(cheap_rpos < r_filt.size()){ + +// Now generate the predicates. + for(w=cheap_rpos;wcost); + ret += tmpstr; + +// Find partial fcns ref'd in this cnf element + set pfcn_refs; + collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs); +// Since set<..> is a "Sorted Associative Container", +// we can walk through it in sorted order by walking from +// begin() to end(). (and the partial fcns must be +// evaluated in this order). + set::iterator si; + for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){ + if(fcn_ref_cnt[(*si)] > 1){ + ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n"; + } + if(is_partial_fcn[(*si)]){ + ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema); + ret += "\t\tif(retval) goto end;\n"; + } + if(fcn_ref_cnt[(*si)] > 1){ + if(!is_partial_fcn[(*si)]){ + ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n"; + } + ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n"; + ret += "\t}\n"; + } + } + + ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+ + ") ) goto end;\n"; + } + }else{ + ret += "\n\n/*\t\t (no expensive R predicate to test)\t*/\n\n"; + } + +// TODO sort the additional predicates by cost + +// S-only + for(w=0;wpred_t1.size();++w){ + sprintf(tmpstr,"//\t\tS Predicate clause %d.(cost %d)\n",w,fs->pred_t1[w]->cost); + ret += tmpstr; + +// Find partial fcns ref'd in this cnf element + set pfcn_refs; + collect_partial_fcns_pr(fs->pred_t1[w]->pr, pfcn_refs); +// Since set<..> is a "Sorted Associative Container", +// we can walk through it in sorted order by walking from +// begin() to end(). (and the partial fcns must be +// evaluated in this order). + set::iterator si; + for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){ + if(fcn_ref_cnt[(*si)] > 1){ + ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n"; + } + if(is_partial_fcn[(*si)]){ + ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema); + ret += "\t\tif(retval) goto end;\n"; + } + if(fcn_ref_cnt[(*si)] > 1){ + if(!is_partial_fcn[(*si)]){ + ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n"; + } + ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n"; + ret += "\t}\n"; + } + } + + ret += "\tif( !("+generate_predicate_code(fs->pred_t1[w]->pr,schema)+ + ") ) goto end;\n"; + } + +// non hash-eq join + for(w=0;wjoin_filter.size();++w){ + sprintf(tmpstr,"//\t\tJoin Predicate clause %d.(cost %d)\n",w,fs->join_filter[w]->cost); + ret += tmpstr; + +// Find partial fcns ref'd in this cnf element + set pfcn_refs; + collect_partial_fcns_pr(fs->join_filter[w]->pr, pfcn_refs); +// Since set<..> is a "Sorted Associative Container", +// we can walk through it in sorted order by walking from +// begin() to end(). (and the partial fcns must be +// evaluated in this order). + set::iterator si; + for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){ + if(fcn_ref_cnt[(*si)] > 1){ + ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n"; + } + if(is_partial_fcn[(*si)]){ + ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema); + ret += "\t\tif(retval) goto end;\n"; + } + if(fcn_ref_cnt[(*si)] > 1){ + if(!is_partial_fcn[(*si)]){ + ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n"; + } + ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n"; + ret += "\t}\n"; + } + } + + ret += "\tif( !("+generate_predicate_code(fs->join_filter[w]->pr,schema)+ + ") ) goto end;\n"; + } + +// postfilter + for(w=0;wpostfilter.size();++w){ + sprintf(tmpstr,"//\t\tpostfilter Predicate clause %d.(cost %d)\n",w,fs->postfilter[w]->cost); + ret += tmpstr; + +// Find partial fcns ref'd in this cnf element + set pfcn_refs; + collect_partial_fcns_pr(fs->postfilter[w]->pr, pfcn_refs); +// Since set<..> is a "Sorted Associative Container", +// we can walk through it in sorted order by walking from +// begin() to end(). (and the partial fcns must be +// evaluated in this order). + set::iterator si; + for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){ + if(fcn_ref_cnt[(*si)] > 1){ + ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n"; + } + if(is_partial_fcn[(*si)]){ + ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema); + ret += "\t\tif(retval) goto end;\n"; + } + if(fcn_ref_cnt[(*si)] > 1){ + if(!is_partial_fcn[(*si)]){ + ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n"; + } + ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n"; + ret += "\t}\n"; + } + } + + ret += "\tif( !("+generate_predicate_code(fs->postfilter[w]->pr,schema)+ + ") ) goto end;\n"; + } + + + +/////////////// post the tuple + +// test passed : create the tuple, then assign to it. + ret += "/*\t\tCreate and post the tuple\t*/\n"; + +// Unpack R fields + for(s=0;s"+field+";\n"; + unpacked_cids.insert( (*csi) ); + } + } + } + + +// Unpack partial fcns ref'd by the select clause. +// Its a kind of a WHERE clause ... + for(p=sl_fcns_start;p 1){ + ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n"; + } + if(is_partial_fcn[p]){ + ret += unpack_partial_fcn(partial_fcns[p], p, schema); + ret += "\tif(retval) goto end;\n"; + } + if(fcn_ref_cnt[p] > 1){ + if(!is_partial_fcn[p]){ + ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n"; + } + ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n"; + ret += "\t}\n"; + } + } + + // increment the counter of accepted tuples + ret += "\n\t#ifdef LFTA_STATS\n"; + ret += "\n\tt->accepted_tuple_cnt++;\n\n"; + ret += "\t#endif\n\n"; + +// First, compute the size of the tuple. + +// Unpack any BUFFER type selections into temporaries +// so that I can compute their size and not have +// to recompute their value during tuple packing. +// I can use regular assignment here because +// these temporaries are non-persistent. + + for(s=0;sget_data_type(); + if(sdt->is_buffer_type()){ + sprintf(tmpstr,"\tselvar_%d = ",s); + ret += tmpstr; + ret += generate_se_code(sl_list[s],schema); + ret += ";\n"; + } + } + + +// The size of the tuple is the size of the tuple struct plus the +// size of the buffers to be copied in. + + ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")"; + for(s=0;sget_data_type(); + if(sdt->is_buffer_type()){ + sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s); + ret += tmpstr; + } + } + ret += ";\n"; + + + ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n"; ret += "\tif( tuple == NULL)\n\t\tgoto end;\n"; // Test passed, make assignments to the tuple. @@ -3045,7 +3733,7 @@ printf("sgah_qpn name is %s, disorder is %d\n",fs->node_name.c_str(),((sgah_qpn -string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, set &s_pids){ +string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, bool is_wj, set &s_pids){ string ret="static gs_retval_t accept_packet_"+node_name+ "(struct FTA *f, FTAID * ftaid, void *pkt, gs_int32_t sz){\n"; @@ -3073,9 +3761,9 @@ string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ex } for(w=0;wpr,cid_set, gb_tbl); - } + } for(s=0;suse_bloom; ret += "/*\t\tJoin fields\t*/\n"; for(g=0;ghash_eq.size();g++){ - sprintf(tmpstr,"\t%s s_equijoin_%d, r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g,g); + sprintf(tmpstr,"\t%s s_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_right_se()->get_data_type()->get_cvar_type().c_str(),g); + ret += tmpstr; + sprintf(tmpstr,"\t%s r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g); ret += tmpstr; } if(uses_bloom){ @@ -3173,7 +3863,7 @@ string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ex "\tunsigned int i=0,j=0,k=0, b, bf_clean = 0, tmp_i, found; \n" "\tunsigned int bucket, bucket0, bucket1, bucket2;\n" "\tlong long int curr_fj_ts;\n" -"\tunsigned int curr_bin, the_bin;\n" +"\tlong long int curr_bin, the_bin;\n" "\n" ; }else{ @@ -3188,12 +3878,32 @@ string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ex } + if(fs->node_type() == "watch_join"){ + watch_join_qpn *wlq = (watch_join_qpn *)fs; + ret += "/*\t\tJoin fields\t*/\n"; + for(int k=0;kkey_flds.size(); ++k){ + string kfld = wlq->key_flds[k]; + ret += "\t"+wlq->hash_eq[kfld]->pr->get_right_se()->get_data_type()->get_cvar_type()+" s_equijoin_"+kfld+";\n"; + ret += "\t"+wlq->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_cvar_type()+" r_equijoin_"+kfld+";\n"; + } + ret += +" /* Variables for wl join table */ \n" +"\tunsigned int i, bucket;\n" +"\tunsigned long long int hash; \n"; + string wl_schema = wlq->from[1]->get_schema_name(); + string wl_elem_str = generate_watchlist_element_name(wl_schema); + ret += "\tstruct "+wl_elem_str+" *rec = NULL;\n"; +"\n" +; + } + + // Variables needed to store selected attributes of BUFFER type // temporarily, in order to compute their size for storage // in an output tuple. string select_var_defs = ""; - for(s=0;sget_data_type(); if(sdt->is_buffer_type()){ sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s); @@ -3293,7 +4003,7 @@ string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ex vector filter = fs->get_filter_clause(); // Test the filter predicate (some query types have additional preds). - if(filter.size() > 0){ + if(filter.size() > 0 && !is_wj){ // watchlist join does specialized processing // Sort by evaluation cost. // First, estimate evaluation costs @@ -3366,7 +4076,7 @@ string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ex if(is_fj){ ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n"; vector h_eq = ((filter_join_qpn *)fs)-> hash_eq; - for(w=0;wpr, this_pred_cids, gb_tbl); for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){ @@ -3379,6 +4089,26 @@ string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ex } } } + }else if(is_wj){ // STOPPED HERE move this to wj main body +/* + ret += "\n//\tPassed the WHERE clause, unpack the hash fields. \n"; + map h_eq = ((watch_join_qpn *)fs)-> hash_eq; + vector kflds = ((watch_join_qpn *)fs)->key_flds; + for(w=0;wpr, this_pred_cids, gb_tbl); + for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){ + if(unpacked_cids.count( (*csi) ) == 0){ + int tblref = (*csi).tblvar_ref; + int schref = (*csi).schema_ref; + string field = (*csi).field; + ret += generate_unpack_code(tblref,schref,field,schema,node_name); + unpacked_cids.insert( (*csi) ); + } + } + } +*/ }else{ ret += "\n/*\tPassed the WHERE clause, unpack the rest of the accessed fields. */\n"; @@ -3398,12 +4128,17 @@ string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ex ////////////////// After this, the query types ////////////////// are processed differently. - if(!is_aggr_query && !is_fj) + if(!is_aggr_query && !is_fj & !is_wj) ret += generate_sel_accept_body(fs, node_name, schema); else if(is_aggr_query) ret += generate_aggr_accept_body(fs, node_name, schema, temporal_flush); - else - ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema); + else{ + if(is_fj) + ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema); + else + ret += generate_wj_accept_body((watch_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema); + } + // Finish up. @@ -3427,7 +4162,7 @@ string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, boo ret+="\tstruct "+generate_fta_name(node_name)+"* f;\n"; ret+="\tint i;\n"; ret += "\n"; - ret+="\tif((f=fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n"; + ret+="\tif((f=(struct "+generate_fta_name(node_name)+" *)fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n"; // assign a streamid to fta instance ret+="\t/* assign a streamid */\n"; @@ -3437,6 +4172,7 @@ string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, boo if(is_aggr_query){ ret += "\tf->n_aggrs = 0;\n"; + ret += "\tf->n_ticks = 0; // for limiting slow flush\n"; ret += "\tf->max_aggrs = "; @@ -3474,12 +4210,12 @@ string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, boo } ret += ";\n"; - ret+="\tif ((f->aggr_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n"; + ret+="\tif ((f->aggr_table = (struct "+generate_aggr_struct_name(node_name)+" *)sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n"; ret+="\t\treturn(0);\n"; ret+="\t}\n\n"; // ret+="/* compute how many integers we need to store the hashmap */\n"; // ret+="\tf->bitmap_size = (f->max_aggrs % (sizeof(gs_uint32_t) * 4)) ? (f->max_aggrs / (sizeof(gs_uint32_t) * 4) + 1) : (f->max_aggrs / (sizeof(gs_uint32_t) * 4));\n\n"; - ret+="\tif ((f->aggr_table_hashmap = sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n"; + ret+="\tif ((f->aggr_table_hashmap = (gs_uint32_t *)sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n"; ret+="\t\treturn(0);\n"; ret+="\t}\n"; ret+="/*\t\tfill bitmap with zero \t*/\n"; @@ -3516,7 +4252,7 @@ string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, boo int bf_byte_size = bf_bit_size / (8*sizeof(char)); int bf_tot = n_bloom*bf_byte_size; - ret+="\tif ((f->bf_table = sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n"; + ret+="\tif ((f->bf_table = (unsigned char *)sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n"; ret+="\t\treturn(0);\n"; ret+="\t}\n"; ret += @@ -3535,7 +4271,7 @@ string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, boo } ht_size = hs; } - ret+="\tif ((f->join_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n"; + ret+="\tif ((f->join_table = (struct "+generate_fj_struct_name(node_name)+" *) sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n"; ret+="\t\treturn(0);\n"; ret+="\t}\n\n"; ret += @@ -3720,6 +4456,8 @@ string generate_lfta_block(qp_node *fs, table_list *schema, int gid, param_handle_table = fs->get_handle_param_tbl(Ext_fcns); string node_name = fs->get_node_name(); bool is_fj = false, uses_bloom = false; + bool is_wj = false; + bool is_watch_tbl = false; if(fs->node_type() == "spx_qpn"){ @@ -3751,13 +4489,32 @@ string generate_lfta_block(qp_node *fs, table_list *schema, int gid, uses_bloom = fj_node->use_bloom; gb_tbl = NULL; aggr_tbl = NULL; + }else + if(fs->node_type() == "watch_join"){ + is_aggr_query = false; + is_wj = true; + watch_join_qpn *wl_node = (watch_join_qpn *)fs; + sl_list = wl_node->get_select_se_list(); + where = wl_node->get_where_clause(); + gb_tbl = NULL; + aggr_tbl = NULL; + }else + if(fs->node_type() == "watch_tbl_qpn"){ + is_aggr_query = false; + is_watch_tbl = true; + vector empty_sl_list; + vector empty_where; + sl_list = empty_sl_list; + where = empty_where; + gb_tbl = NULL; + aggr_tbl = NULL; } else { fprintf(stderr,"INTERNAL ERROR, unrecognized node type %s in generate_lfta_block\n", fs->node_type().c_str()); exit(1); } // Build list of "partial functions", by clause. -// NOTE : partial fcns are not handles well. +// NOTE : partial fcns are not handled well. // The act of searching for them associates the fcn call // in the SE with an index to an array. Refs to the // fcn value are replaced with refs to the variable they are @@ -3788,7 +4545,7 @@ string generate_lfta_block(qp_node *fs, table_list *schema, int gid, ag_fcns_start = gb_fcns_end = partial_fcns.size(); if(aggr_tbl != NULL){ for(i=0;isize();i++){ - find_partial_fcns(aggr_tbl->get_aggr_se(i), NULL, NULL, &is_partial_fcn, Ext_fcns); + find_partial_fcns(aggr_tbl->get_aggr_se(i), &partial_fcns, NULL, &is_partial_fcn, Ext_fcns); } } ag_fcns_end = partial_fcns.size(); @@ -3843,48 +4600,61 @@ string generate_lfta_block(qp_node *fs, table_list *schema, int gid, ///////////////////////////////////////////////////// // Common stuff unpacked, do some generation + if(is_aggr_query) retval += generate_aggr_struct(node_name, gb_tbl, aggr_tbl); if(is_fj) retval += generate_fj_struct((filter_join_qpn *)fs, node_name); - - retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, uses_bloom, schema); - retval += generate_tuple_struct(node_name, sl_list) ; - - if(is_aggr_query) - retval += generate_fta_flush(node_name, schema, Ext_fcns) ; - if(param_tbl->size() > 0) - retval += generate_fta_load_params(node_name) ; - retval += generate_fta_free(node_name, is_aggr_query) ; - retval += generate_fta_control(node_name, schema, is_aggr_query) ; - retval += generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, s_pids) ; - + if(is_watch_tbl){ + retval += "\n\n// watchtable code here \n\n"; + watch_tbl_qpn *wl_node = (watch_tbl_qpn *)fs; + retval += generate_watchlist_structs(node_name, wl_node->table_layout, wl_node->filename, wl_node->refresh_interval); + retval += generate_watchlist_load(node_name, wl_node->table_layout, wl_node->key_flds); + } + + if(! is_watch_tbl){ + retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, is_wj, uses_bloom, schema); + retval += generate_tuple_struct(node_name, sl_list) ; + + if(is_aggr_query) + retval += generate_fta_flush(node_name, schema, Ext_fcns) ; + if(param_tbl->size() > 0) + retval += generate_fta_load_params(node_name) ; + retval += generate_fta_free(node_name, is_aggr_query) ; + retval += generate_fta_control(node_name, schema, is_aggr_query) ; + retval += generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, is_wj, s_pids) ; /* extract the value of Time_Correlation from interface definition */ - int e,v; - string es; - unsigned time_corr; - vector tvec = fs->get_input_tbls(); - vector time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es); - if (time_corr_vec.empty()) - time_corr = DEFAULT_TIME_CORR; - else - time_corr = atoi(time_corr_vec[0].c_str()); + int e,v; + string es; + unsigned time_corr; + vector tvec = fs->get_input_tbls(); + vector time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es); + if (time_corr_vec.empty()) + time_corr = DEFAULT_TIME_CORR; + else + time_corr = atoi(time_corr_vec[0].c_str()); - retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query) ); - retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) ); + retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query, is_wj) ); + retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) ); + } return(retval); } -int compute_snap_len(qp_node *fs, table_list *schema){ +int compute_snap_len(qp_node *fs, table_list *schema, string snap_type){ // Initialize global vars gb_tbl = NULL; sl_list.clear(); where.clear(); + + if(fs->node_type() == "watch_tbl_qpn"){ + return -1; + } + if(fs->node_type() == "spx_qpn"){ spx_qpn *spx_node = (spx_qpn *)fs; sl_list = spx_node->get_select_se_list(); @@ -3900,6 +4670,11 @@ int compute_snap_len(qp_node *fs, table_list *schema){ filter_join_qpn *fj_node = (filter_join_qpn *)fs; sl_list = fj_node->get_select_se_list(); where = fj_node->get_where_clause(); + } + else if(fs->node_type() == "watch_join"){ + watch_join_qpn *fj_node = (watch_join_qpn *)fs; + sl_list = fj_node->get_select_se_list(); + where = fj_node->get_where_clause(); } else{ fprintf(stderr,"INTERNAL ERROR, node type %s not recognized in compute_snap_len\n",fs->node_type().c_str()); exit(1); @@ -3930,15 +4705,21 @@ int compute_snap_len(qp_node *fs, table_list *schema){ int tblref = (*csi).tblvar_ref; string field = (*csi).field; - param_list *field_params = schema->get_modifier_list(schref, field); - if(field_params->contains_key("snap_len")){ - string fld_snap_str = field_params->val_of("snap_len"); - int fld_snap; - if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){ - if(fld_snap > snap_len) snap_len = fld_snap; - n_snap++; - }else{ - fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() ); + if(snap_type == "index"){ + int pos = schema->get_field_idx(schref, field); + if(pos>snap_len) snap_len = pos; + n_snap++; + }else{ + param_list *field_params = schema->get_modifier_list(schref, field); + if(field_params->contains_key("snap_len")){ + string fld_snap_str = field_params->val_of("snap_len"); + int fld_snap; + if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){ + if(fld_snap > snap_len) snap_len = fld_snap; + n_snap++; + }else{ + fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() ); + } } } }