X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fftacmp%2Fgenerate_lfta_code.cc;h=55227b1df893e00fe976514c252a8bae02193fe8;hb=7cec316889150a8a92238e52c7bad1270608b333;hp=aea03a8dd3b536e9d42d163b1a74c5d82442ead2;hpb=804ea15b01566ac0de58781ca61870b4824d0e02;p=com%2Fgs-lite.git diff --git a/src/ftacmp/generate_lfta_code.cc b/src/ftacmp/generate_lfta_code.cc index aea03a8..55227b1 100644 --- a/src/ftacmp/generate_lfta_code.cc +++ b/src/ftacmp/generate_lfta_code.cc @@ -169,6 +169,36 @@ string generate_fj_struct_name(string node_name){ return(ret); } +string generate_watchlist_element_name(string node_name){ + string ret = normalize_name(node_name); + if(ret == ""){ + ret = "default"; + } + ret += "__wl_elem"; + + return(ret); +} + +string generate_watchlist_struct_name(string node_name){ + string ret = normalize_name(node_name); + if(ret == ""){ + ret = "default"; + } + ret += "__wl_struct"; + + return(ret); +} + +string generate_watchlist_name(string node_name){ + string ret = normalize_name(node_name); + if(ret == ""){ + ret = "default"; + } + ret += "__wl"; + + return(ret); +} + string generate_unpack_code(int tblref, int schref, string field, table_list *schema, string node_name, string end_goto = string("end")){ string ret; if(! packed_return){ @@ -246,14 +276,178 @@ string generate_fj_struct(filter_join_qpn *fs, string node_name ){ return(ret); } +string generate_watchlist_structs(string node_name, table_def *tbl, + std::string filename, int refresh_interval){ + string ret; + ret += "struct "+generate_watchlist_element_name(node_name)+"{\n"; + vector fields = tbl->get_fields(); + for(int f=0;fget_type()); + ret += "\t"+dt.make_cvar(fields[f]->get_name())+";\n"; + } + ret += "\tgs_uint64_t hashval;\n"; + ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next;\n"; + ret += "};\n\n"; + ret += "char *"+generate_watchlist_name(node_name)+"__fstr = \""+filename+"\";\n"; + ret += "struct "+generate_watchlist_struct_name(node_name)+"{\n"; + ret += "\tstruct "+ generate_watchlist_element_name(node_name)+" **ht;\n"; + ret += "\tgs_uint32_t ht_size;\n"; + ret += "\tgs_uint32_t n_elem;\n"; + ret += "\tgs_uint32_t refresh_interval;\n"; + ret += "\ttime_t next_refresh;\n"; + ret += "\ttime_t last_mtime;\n"; + ret += "\tchar *filename;\n"; + ret += "} "+generate_watchlist_name(node_name)+" = { NULL, 0, 0, "+std::to_string(refresh_interval)+", 0, 0, NULL};\n\n"; + + return ret; +} +string generate_watchlist_load(string node_name, table_def *tbl, vector keys){ + string ret; + string tgt = generate_watchlist_name(node_name); + vector fields = tbl->get_fields(); + + ret += "void reload_watchlist__"+node_name+"(){\n"; + ret += "\tgs_uint32_t i;\n"; + ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *ptr = NULL;\n"; + ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next = NULL;\n"; + ret += "\tFILE *fl;\n"; + ret += "\tchar buf[10000];\n"; + ret += "\tgs_uint32_t buflen = 10000;\n"; + ret += "\tchar *flds["+std::to_string(fields.size())+"];\n"; + ret += "\tgs_uint32_t pos, f, linelen, malformed;\n"; + ret += "\tgs_uint32_t n_malformed, short_lines, toolong_lines, ok;\n"; + ret += "\tgs_uint64_t hash, bucket;\n"; + ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *rec;\n\n"; + + ret += "// make sure watchlist file has changed since the last time we loaded it\n"; + ret += "\tstruct stat file_stat;\n"; + ret += "\tint err = stat(" + tgt + ".filename, &file_stat);\n"; + ret += "\tif (err) {\n"; + ret += "\t\tgslog(LOG_INFO,\"Warning, unable to stat() watchlist file %s to reload " + node_name + ", continue using old version\\n\", " + tgt + ".filename);\n"; + ret += "\t\treturn;\n"; + ret += "\t}\n"; + ret += "\tif (file_stat.st_mtime <= " + tgt + ".last_mtime && file_stat.st_ctime <= " + tgt + ".last_mtime) // watchlist file hasn't changed since last time\n"; + ret += "\t\treturn;\n"; + ret += "\t" + tgt + ".last_mtime = (file_stat.st_mtime>file_stat.st_ctime)?file_stat.st_mtime:file_stat.st_ctime;\n\n"; + + ret += "// Delete old entries.\n"; + ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n"; + ret += "\t\tptr="+tgt+".ht[i];\n"; + ret += "\t\twhile(ptr!=NULL){\n"; + for(int f=0;fget_type()); + if(dt.is_buffer_type()){ + ret += "\t\t\t"+dt.get_buffer_destroy()+"(&(ptr->"+fields[f]->get_name()+"));\n"; + } + } + ret += "\t\t\tnext = ptr->next;\n"; + ret += "\t\t\tfree(ptr);\n"; + ret += "\t\t\tptr = next;\n"; + ret += "\t\t}\n"; + ret += "\t}\n"; + ret += "\n// prepare new table. \n"; + ret += "\tif("+tgt+".n_elem > "+tgt+".ht_size || "+tgt+".ht_size==0){\n"; + ret += "\t\tif("+tgt+".ht)\n"; + ret += "\t\t\tfree("+tgt+".ht);\n"; + ret += "\t\tif("+tgt+".ht_size == 0)\n"; + ret += "\t\t\t"+tgt+".ht_size = 100000;\n"; + ret += "\t\telse\n"; + ret += "\t\t\t"+tgt+".ht_size = "+tgt+".n_elem;\n"; + ret += "\t\t"+tgt+".ht = (struct "+generate_watchlist_element_name(node_name)+" **)malloc("+tgt+".ht_size * sizeof(struct "+generate_watchlist_element_name(node_name)+" *));\n"; + ret += "\t}\n"; + ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n"; + ret += "\t\t"+tgt+".ht[i] = NULL;\n"; + ret += "\t}\n"; + ret += "\n// load new table\n"; + ret += "\t"+tgt+".n_elem = 0;\n"; + ret += "\tfl = fopen("+tgt+".filename, \"r\");\n"; + ret += "\tif(fl==NULL){\n"; + ret += "\t\tgslog(LOG_INFO,\"Warning, can't open file %s for watchlist "+node_name+"\\n\","+tgt+".filename);\n"; + ret += "\t\treturn;\n"; + ret += "\t}\n"; + ret += "\tmalformed = 0;\n"; + ret += "\tshort_lines = 0;\n"; + ret += "\ttoolong_lines = 0;\n"; + ret += "\twhile(fgets(buf, buflen, fl) != NULL){\n"; + ret += "\t\tlinelen = strlen(buf);\n"; + ret += "\t\tbuf[linelen-1]='\\0'; // strip off trailing newline\n"; + ret += "\t\tlinelen--;\n"; + + ret += "\t\tpos=0;\n"; + ret += "\t\tmalformed=0;\n"; + ret += "\t\tok=1;\n"; + ret += "\t\tflds[0] = buf;\n"; + ret += "\t\tfor(f=1;pos < linelen && f<"+std::to_string(fields.size())+";++f){\n"; + ret += "\t\t\tfor(;pos < linelen && buf[pos]!=',' && buf[pos]!='\\n';++pos);\n"; + ret += "\t\t\tif(pos >= linelen){\n"; + ret += "\t\t\t\tmalformed = 1;\n"; + ret += "\t\t\t\tbreak;\n"; + ret += "\t\t\t}\n"; + ret += "\t\t\tbuf[pos]='\\0';\n"; + ret += "\t\t\tpos++;\n"; + ret += "\t\t\tflds[f]=buf+pos;\n"; + ret += "\t\t}\n"; + ret += "\t\tif(malformed){\n"; + ret += "\t\t\tok=0;\n"; + ret += "\t\t\tn_malformed++;\n"; + ret += "\t\t}\n"; + ret += "\t\tif(f<"+std::to_string(fields.size())+"){\n"; + ret += "\t\t\tok=0;\n"; + ret += "\t\t\tshort_lines++;\n"; + ret += "\t\t}\n"; + ret += "\t\tif(pos && (posget_type()); + ret += "\t\t\t"+dt.get_wl_extract_fcn()+"(flds["+std::to_string(f)+"], &(rec->"+fields[f]->get_name()+"));\n"; + } +// Compute the hash value + ret += "\t\t\thash=0;\n"; + for(int k=0;kget_name() == key_fld) + break; + } + data_type dt(fields[f]->get_type()); + + ret += +"\t\t\thash ^= (("+hash_nums[f%NRANDS]+" * lfta_"+ + dt.get_type_str()+"_to_hash(rec->"+fields[f]->get_name()+")));\n"; + } + ret += "\t\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n"; + ret += "\t\t\trec->hashval = hash;\n"; + ret += "\t\t\trec->next = "+tgt+".ht[bucket];\n"; + ret += "\t\t\t"+tgt+".ht[bucket] = rec;\n"; + ret += "\t\t\t"+tgt+".n_elem++;\n"; + + ret += "\t\t}\n"; + ret += "\t}\n"; + ret += "\tif(n_malformed+toolong_lines > 0){\n"; + ret += "\t\tgslog(LOG_INFO,\"Errors reading data for watchlist "+node_name+" from file %s: malformed=%d, too short=%d, too long=%d\\n\","+tgt+".filename, malformed, short_lines, toolong_lines);\n"; + ret += "\t}\n"; + ret += "}\n\n"; + + return ret; +} + + + + string generate_fta_struct(string node_name, gb_table *gb_tbl, aggregate_table *aggr_tbl, param_table *param_tbl, cplx_lit_table *complex_literals, vector ¶m_handle_table, - bool is_aggr_query, bool is_fj, bool uses_bloom, + bool is_aggr_query, bool is_fj, bool is_wj, bool uses_bloom, table_list *schema){ string ret = "struct " + generate_fta_name(node_name) + "{\n"; @@ -337,6 +531,12 @@ string generate_fta_struct(string node_name, gb_table *gb_tbl, } +// -------------------------------------------- +// watchlist-join specific + if(is_wj){ + ret += "\ttime_t ux_time;\n"; + } + //-------------------------------------------------------- // Common fields @@ -1174,6 +1374,7 @@ string generate_preamble(table_list *schema, //map &int_fcn_defs, ret += "#include\n"; ret += "#include \n"; ret += "#include \n"; + ret += "#include \n"; ret += "#include \"rdtsc.h\"\n"; ret += "#endif\n"; @@ -1766,7 +1967,7 @@ string generate_fta_control(string node_name, table_list *schema, bool is_aggr_q return(ret); } -string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query){ +string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query, bool advance_uxtime){ string ret="static gs_retval_t clock_fta_"+node_name+"(struct FTA *f){\n"; ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct "; ret += generate_fta_name(node_name)+" *) f;\n\n"; @@ -1879,6 +2080,11 @@ string generate_fta_clock(string node_name, table_list *schema, unsigned time_co } } + + if(advance_uxtime){ + ret += "\tt->ux_time = time(&(t->ux_time));\n"; + } + // for aggregation lftas we need to check if the time was advanced beyond the current epoch if (is_aggr_query) { @@ -2284,6 +2490,7 @@ string ret; return ret; } +// TODO Ensure that postfilter predicates are being generated string generate_fj_accept_body(filter_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){ int p,s,w; @@ -2587,14 +2794,14 @@ printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=% // WARNING! the constant 20 below is a wild-ass guess. int cheap_rpos; - for(cheap_rpos=0;cheap_rposcost <= 20;cheap_rpos++) + for(cheap_rpos=0;cheap_rposcost <= 20;cheap_rpos++); // Test the cheap filters on R. if(cheap_rpos >0){ // Now generate the predicates. for(w=0;wcost); + sprintf(tmpstr,"//\t\tcheap R predicate clause %d. (cost %d)\n",w,r_filt[w]->cost); ret += tmpstr; // Find partial fcns ref'd in this cnf element @@ -2815,6 +3022,463 @@ printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=% } +// The size of the tuple is the size of the tuple struct plus the +// size of the buffers to be copied in. + + ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")"; + for(s=0;sget_data_type(); + if(sdt->is_buffer_type()){ + sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s); + ret += tmpstr; + } + } + ret += ";\n"; + + + ret += "\ttuple = allocate_tuple(f, tuple_size );\n"; + ret += "\tif( tuple == NULL)\n\t\tgoto end;\n"; + +// Test passed, make assignments to the tuple. + + ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n"; + +// Mark tuple as REGULAR_TUPLE + ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n"; + + + for(s=0;sget_data_type(); + if(sdt->is_buffer_type()){ + sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s); + ret += tmpstr; + sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s); + ret += tmpstr; + }else{ + sprintf(tmpstr,"\ttuple->tuple_var%d = ",s); + ret += tmpstr; +// if(sdt->needs_hn_translation()) +// ret += sdt->hton_translation() +"( "; + ret += generate_se_code(sl_list[s],schema); +// if(sdt->needs_hn_translation()) +// ret += ") "; + ret += ";\n"; + } + } + +// Generate output. + + ret += "\tpost_tuple(tuple);\n"; + +// Increment the counter of posted tuples + ret += "\n\t#ifdef LFTA_STATS\n"; + ret += "\n\tt->out_tuple_cnt++;\n\n"; + ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n"; + ret += "\t#endif\n\n"; + + + return ret; +} + + +string generate_wj_accept_body(watch_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){ + +int p,s,w; +string ret; + + + string wl_schema = fs->from[1]->get_schema_name(); + string wl_elem_str = generate_watchlist_element_name(wl_schema); + string wl_node_str = generate_watchlist_struct_name(wl_schema); + string tgt = generate_watchlist_name(wl_schema); + + ret += "//\n//\t\tGenerate test to update watchtable here\n//\n\n"; + + + + + +// ------------------------------------------------------------ +// Determine if the R record should be processed. + + + ret += +"// R (main stream) cheap predicate\n" +"\n" +; + +// Unpack r_filt fields + vector r_filt = fs->pred_t0; + for(w=0;wpr, this_pred_cids, gb_tbl); + for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){ + if(unpacked_cids.count( (*csi) ) == 0){ + int tblref = (*csi).tblvar_ref; + int schref = (*csi).schema_ref; + string field = (*csi).field; + ret += generate_unpack_code(tblref,schref,field,schema,node_name); + unpacked_cids.insert( (*csi) ); + } + } + } + +// Sort R preds based on cost. + + vector tmp_wh; + for(w=0;wcost <= 20;cheap_rpos++); + +// Test the cheap filters on R. + if(cheap_rpos >0){ + +// Now generate the predicates. + for(w=0;wcost); + ret += tmpstr; + +// Find partial fcns ref'd in this cnf element + set pfcn_refs; + collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs); +// Since set<..> is a "Sorted Associative Container", +// we can walk through it in sorted order by walking from +// begin() to end(). (and the partial fcns must be +// evaluated in this order). + set::iterator si; + for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){ + if(fcn_ref_cnt[(*si)] > 1){ + ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n"; + } + if(is_partial_fcn[(*si)]){ + ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema); + ret += "\t\tif(retval) goto end;\n"; + } + if(fcn_ref_cnt[(*si)] > 1){ + if(!is_partial_fcn[(*si)]){ + ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n"; + } + ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n"; + ret += "\t}\n"; + } + } + + ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+ + ") ) goto end;\n"; + } + }else{ + ret += "\n\n/*\t\t (no cheap R predicate to test)\t*/\n\n"; + } + + ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n"; + map h_eq = ((watch_join_qpn *)fs)-> hash_eq; + vector kflds = ((watch_join_qpn *)fs)->key_flds; + for(w=0;wpr, this_pred_cids, gb_tbl); + for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){ + if(unpacked_cids.count( (*csi) ) == 0){ + int tblref = (*csi).tblvar_ref; + int schref = (*csi).schema_ref; + string field = (*csi).field; + if(tblref==0) // LHS from packet, don't unpack the RHS + ret += generate_unpack_code(tblref,schref,field,schema,node_name); + unpacked_cids.insert( (*csi) ); + } + } + } + + + ret += "\n// Do the join\n\n"; + ret += "\n// (ensure that the watchtable is fresh)\n"; + ret += "\tif(t->ux_time >= "+tgt+".next_refresh){\n"; + ret += "\t\treload_watchlist__"+wl_schema+"();\n"; + ret += "\t\t"+tgt+".next_refresh = t->ux_time+"+tgt+".refresh_interval;\n"; + ret += "\t}\n\n"; + + + for(p=0;pkey_flds.size();++p){ + string kfld = fs->key_flds[p]; + ret += "\tr_equijoin_"+kfld+" = "+generate_se_code(fs->hash_eq[kfld]->pr->get_left_se(),schema)+";\n"; + } + + +// Passed the cheap pred, now test the join with S. + ret += "\tbucket=0;\n"; + ret += "\thash=0;\n"; + for(p=0;pkey_flds.size();++p){ + string kfld = fs->key_flds[p]; + ret += +" hash ^= (("+hash_nums[p%NRANDS]+" * lfta_"+ + fs->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_type_str()+ + +"_to_hash(r_equijoin_"+kfld+")));\n"; + } + ret += "\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n"; + + ret += "\t\trec = "+tgt+".ht[bucket];\n"; + ret += "\t\twhile(rec!=NULL){\n"; + ret += "\t\t\tif(hash==rec->hashval){\n"; + ret += "\t\t\t\tif("; + for(p=0;pkey_flds.size();++p){ + string kfld = fs->key_flds[p]; + if(p>0) ret += " && "; + data_type *hdt = fs->hash_eq[kfld]->pr->get_right_se()->get_data_type(); + string lhs_op = "r_equijoin_"+kfld; + string rhs_op = "rec->"+kfld; + ret += generate_equality_test(lhs_op,rhs_op,hdt); + } + ret += ")\n"; + ret += "\t\t\t\t\tbreak;\n"; + ret += "\t\t\t}\n"; + ret += "\t\t\trec=rec->next;\n"; + ret += "\t\t}\n"; + ret += "\t\tif(rec==NULL)\n"; + ret += "\t\t\tgoto end;\n"; + + ret += "\n/*\tPassed the hash lookup clause, unpack the other predicate fields. */\n"; + for(w=0;wpr, this_pred_cids, gb_tbl); + for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){ + if(unpacked_cids.count( (*csi) ) == 0){ + int tblref = (*csi).tblvar_ref; + int schref = (*csi).schema_ref; + string field = (*csi).field; + if(tblref==0) // LHS from packet + ret += generate_unpack_code(tblref,schref,field,schema,node_name); + else // RHS from hash bucket + ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n"; + unpacked_cids.insert( (*csi) ); + } + } + } + + +// Test the expensive filters on R. +// TODO Should merge this with other predicates and eval in order +// of cost - see the fj code. +// TODO join and postfilter predicates haven't been costed yet. + if(cheap_rpos < r_filt.size()){ + +// Now generate the predicates. + for(w=cheap_rpos;wcost); + ret += tmpstr; + +// Find partial fcns ref'd in this cnf element + set pfcn_refs; + collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs); +// Since set<..> is a "Sorted Associative Container", +// we can walk through it in sorted order by walking from +// begin() to end(). (and the partial fcns must be +// evaluated in this order). + set::iterator si; + for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){ + if(fcn_ref_cnt[(*si)] > 1){ + ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n"; + } + if(is_partial_fcn[(*si)]){ + ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema); + ret += "\t\tif(retval) goto end;\n"; + } + if(fcn_ref_cnt[(*si)] > 1){ + if(!is_partial_fcn[(*si)]){ + ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n"; + } + ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n"; + ret += "\t}\n"; + } + } + + ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+ + ") ) goto end;\n"; + } + }else{ + ret += "\n\n/*\t\t (no expensive R predicate to test)\t*/\n\n"; + } + +// TODO sort the additional predicates by cost + +// S-only + for(w=0;wpred_t1.size();++w){ + sprintf(tmpstr,"//\t\tS Predicate clause %d.(cost %d)\n",w,fs->pred_t1[w]->cost); + ret += tmpstr; + +// Find partial fcns ref'd in this cnf element + set pfcn_refs; + collect_partial_fcns_pr(fs->pred_t1[w]->pr, pfcn_refs); +// Since set<..> is a "Sorted Associative Container", +// we can walk through it in sorted order by walking from +// begin() to end(). (and the partial fcns must be +// evaluated in this order). + set::iterator si; + for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){ + if(fcn_ref_cnt[(*si)] > 1){ + ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n"; + } + if(is_partial_fcn[(*si)]){ + ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema); + ret += "\t\tif(retval) goto end;\n"; + } + if(fcn_ref_cnt[(*si)] > 1){ + if(!is_partial_fcn[(*si)]){ + ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n"; + } + ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n"; + ret += "\t}\n"; + } + } + + ret += "\tif( !("+generate_predicate_code(fs->pred_t1[w]->pr,schema)+ + ") ) goto end;\n"; + } + +// non hash-eq join + for(w=0;wjoin_filter.size();++w){ + sprintf(tmpstr,"//\t\tJoin Predicate clause %d.(cost %d)\n",w,fs->join_filter[w]->cost); + ret += tmpstr; + +// Find partial fcns ref'd in this cnf element + set pfcn_refs; + collect_partial_fcns_pr(fs->join_filter[w]->pr, pfcn_refs); +// Since set<..> is a "Sorted Associative Container", +// we can walk through it in sorted order by walking from +// begin() to end(). (and the partial fcns must be +// evaluated in this order). + set::iterator si; + for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){ + if(fcn_ref_cnt[(*si)] > 1){ + ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n"; + } + if(is_partial_fcn[(*si)]){ + ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema); + ret += "\t\tif(retval) goto end;\n"; + } + if(fcn_ref_cnt[(*si)] > 1){ + if(!is_partial_fcn[(*si)]){ + ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n"; + } + ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n"; + ret += "\t}\n"; + } + } + + ret += "\tif( !("+generate_predicate_code(fs->join_filter[w]->pr,schema)+ + ") ) goto end;\n"; + } + +// postfilter + for(w=0;wpostfilter.size();++w){ + sprintf(tmpstr,"//\t\tpostfilter Predicate clause %d.(cost %d)\n",w,fs->postfilter[w]->cost); + ret += tmpstr; + +// Find partial fcns ref'd in this cnf element + set pfcn_refs; + collect_partial_fcns_pr(fs->postfilter[w]->pr, pfcn_refs); +// Since set<..> is a "Sorted Associative Container", +// we can walk through it in sorted order by walking from +// begin() to end(). (and the partial fcns must be +// evaluated in this order). + set::iterator si; + for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){ + if(fcn_ref_cnt[(*si)] > 1){ + ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n"; + } + if(is_partial_fcn[(*si)]){ + ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema); + ret += "\t\tif(retval) goto end;\n"; + } + if(fcn_ref_cnt[(*si)] > 1){ + if(!is_partial_fcn[(*si)]){ + ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n"; + } + ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n"; + ret += "\t}\n"; + } + } + + ret += "\tif( !("+generate_predicate_code(fs->postfilter[w]->pr,schema)+ + ") ) goto end;\n"; + } + + + +/////////////// post the tuple + +// test passed : create the tuple, then assign to it. + ret += "/*\t\tCreate and post the tuple\t*/\n"; + +// Unpack R fields + for(s=0;s"+field+";\n"; + unpacked_cids.insert( (*csi) ); + } + } + } + + +// Unpack partial fcns ref'd by the select clause. +// Its a kind of a WHERE clause ... + for(p=sl_fcns_start;p 1){ + ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n"; + } + if(is_partial_fcn[p]){ + ret += unpack_partial_fcn(partial_fcns[p], p, schema); + ret += "\tif(retval) goto end;\n"; + } + if(fcn_ref_cnt[p] > 1){ + if(!is_partial_fcn[p]){ + ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n"; + } + ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n"; + ret += "\t}\n"; + } + } + + // increment the counter of accepted tuples + ret += "\n\t#ifdef LFTA_STATS\n"; + ret += "\n\tt->accepted_tuple_cnt++;\n\n"; + ret += "\t#endif\n\n"; + +// First, compute the size of the tuple. + +// Unpack any BUFFER type selections into temporaries +// so that I can compute their size and not have +// to recompute their value during tuple packing. +// I can use regular assignment here because +// these temporaries are non-persistent. + + for(s=0;sget_data_type(); + if(sdt->is_buffer_type()){ + sprintf(tmpstr,"\tselvar_%d = ",s); + ret += tmpstr; + ret += generate_se_code(sl_list[s],schema); + ret += ";\n"; + } + } + + // The size of the tuple is the size of the tuple struct plus the // size of the buffers to be copied in. @@ -3056,7 +3720,7 @@ printf("sgah_qpn name is %s, disorder is %d\n",fs->node_name.c_str(),((sgah_qpn -string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, set &s_pids){ +string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, bool is_wj, set &s_pids){ string ret="static gs_retval_t accept_packet_"+node_name+ "(struct FTA *f, FTAID * ftaid, void *pkt, gs_int32_t sz){\n"; @@ -3084,9 +3748,9 @@ string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ex } for(w=0;wpr,cid_set, gb_tbl); - } + } for(s=0;snode_type() == "watch_join"){ + watch_join_qpn *wlq = (watch_join_qpn *)fs; + ret += "/*\t\tJoin fields\t*/\n"; + for(int k=0;kkey_flds.size(); ++k){ + string kfld = wlq->key_flds[k]; + ret += "\t"+wlq->hash_eq[kfld]->pr->get_right_se()->get_data_type()->get_cvar_type()+" s_equijoin_"+kfld+";\n"; + ret += "\t"+wlq->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_cvar_type()+" r_equijoin_"+kfld+";\n"; + } + ret += +" /* Variables for wl join table */ \n" +"\tunsigned int i, bucket;\n" +"\tunsigned long long int hash; \n"; + string wl_schema = wlq->from[1]->get_schema_name(); + string wl_elem_str = generate_watchlist_element_name(wl_schema); + ret += "\tstruct "+wl_elem_str+" *rec = NULL;\n"; +"\n" +; + } + + // Variables needed to store selected attributes of BUFFER type // temporarily, in order to compute their size for storage // in an output tuple. string select_var_defs = ""; - for(s=0;sget_data_type(); if(sdt->is_buffer_type()){ sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s); @@ -3306,7 +3990,7 @@ string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ex vector filter = fs->get_filter_clause(); // Test the filter predicate (some query types have additional preds). - if(filter.size() > 0){ + if(filter.size() > 0 && !is_wj){ // watchlist join does specialized processing // Sort by evaluation cost. // First, estimate evaluation costs @@ -3379,7 +4063,7 @@ string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ex if(is_fj){ ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n"; vector h_eq = ((filter_join_qpn *)fs)-> hash_eq; - for(w=0;wpr, this_pred_cids, gb_tbl); for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){ @@ -3392,6 +4076,26 @@ string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ex } } } + }else if(is_wj){ // STOPPED HERE move this to wj main body +/* + ret += "\n//\tPassed the WHERE clause, unpack the hash fields. \n"; + map h_eq = ((watch_join_qpn *)fs)-> hash_eq; + vector kflds = ((watch_join_qpn *)fs)->key_flds; + for(w=0;wpr, this_pred_cids, gb_tbl); + for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){ + if(unpacked_cids.count( (*csi) ) == 0){ + int tblref = (*csi).tblvar_ref; + int schref = (*csi).schema_ref; + string field = (*csi).field; + ret += generate_unpack_code(tblref,schref,field,schema,node_name); + unpacked_cids.insert( (*csi) ); + } + } + } +*/ }else{ ret += "\n/*\tPassed the WHERE clause, unpack the rest of the accessed fields. */\n"; @@ -3411,12 +4115,17 @@ string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ex ////////////////// After this, the query types ////////////////// are processed differently. - if(!is_aggr_query && !is_fj) + if(!is_aggr_query && !is_fj & !is_wj) ret += generate_sel_accept_body(fs, node_name, schema); else if(is_aggr_query) ret += generate_aggr_accept_body(fs, node_name, schema, temporal_flush); - else - ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema); + else{ + if(is_fj) + ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema); + else + ret += generate_wj_accept_body((watch_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema); + } + // Finish up. @@ -3733,6 +4442,8 @@ string generate_lfta_block(qp_node *fs, table_list *schema, int gid, param_handle_table = fs->get_handle_param_tbl(Ext_fcns); string node_name = fs->get_node_name(); bool is_fj = false, uses_bloom = false; + bool is_wj = false; + bool is_watch_tbl = false; if(fs->node_type() == "spx_qpn"){ @@ -3764,6 +4475,25 @@ string generate_lfta_block(qp_node *fs, table_list *schema, int gid, uses_bloom = fj_node->use_bloom; gb_tbl = NULL; aggr_tbl = NULL; + }else + if(fs->node_type() == "watch_join"){ + is_aggr_query = false; + is_wj = true; + watch_join_qpn *wl_node = (watch_join_qpn *)fs; + sl_list = wl_node->get_select_se_list(); + where = wl_node->get_where_clause(); + gb_tbl = NULL; + aggr_tbl = NULL; + }else + if(fs->node_type() == "watch_tbl_qpn"){ + is_aggr_query = false; + is_watch_tbl = true; + vector empty_sl_list; + vector empty_where; + sl_list = empty_sl_list; + where = empty_where; + gb_tbl = NULL; + aggr_tbl = NULL; } else { fprintf(stderr,"INTERNAL ERROR, unrecognized node type %s in generate_lfta_block\n", fs->node_type().c_str()); exit(1); @@ -3856,36 +4586,44 @@ string generate_lfta_block(qp_node *fs, table_list *schema, int gid, ///////////////////////////////////////////////////// // Common stuff unpacked, do some generation + if(is_aggr_query) retval += generate_aggr_struct(node_name, gb_tbl, aggr_tbl); if(is_fj) retval += generate_fj_struct((filter_join_qpn *)fs, node_name); - - retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, uses_bloom, schema); - retval += generate_tuple_struct(node_name, sl_list) ; - - if(is_aggr_query) - retval += generate_fta_flush(node_name, schema, Ext_fcns) ; - if(param_tbl->size() > 0) - retval += generate_fta_load_params(node_name) ; - retval += generate_fta_free(node_name, is_aggr_query) ; - retval += generate_fta_control(node_name, schema, is_aggr_query) ; - retval += generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, s_pids) ; - + if(is_watch_tbl){ + retval += "\n\n// watchtable code here \n\n"; + watch_tbl_qpn *wl_node = (watch_tbl_qpn *)fs; + retval += generate_watchlist_structs(node_name, wl_node->table_layout, wl_node->filename, wl_node->refresh_interval); + retval += generate_watchlist_load(node_name, wl_node->table_layout, wl_node->key_flds); + } + + if(! is_watch_tbl){ + retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, is_wj, uses_bloom, schema); + retval += generate_tuple_struct(node_name, sl_list) ; + + if(is_aggr_query) + retval += generate_fta_flush(node_name, schema, Ext_fcns) ; + if(param_tbl->size() > 0) + retval += generate_fta_load_params(node_name) ; + retval += generate_fta_free(node_name, is_aggr_query) ; + retval += generate_fta_control(node_name, schema, is_aggr_query) ; + retval += generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, is_wj, s_pids) ; /* extract the value of Time_Correlation from interface definition */ - int e,v; - string es; - unsigned time_corr; - vector tvec = fs->get_input_tbls(); - vector time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es); - if (time_corr_vec.empty()) - time_corr = DEFAULT_TIME_CORR; - else - time_corr = atoi(time_corr_vec[0].c_str()); + int e,v; + string es; + unsigned time_corr; + vector tvec = fs->get_input_tbls(); + vector time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es); + if (time_corr_vec.empty()) + time_corr = DEFAULT_TIME_CORR; + else + time_corr = atoi(time_corr_vec[0].c_str()); - retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query) ); - retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) ); + retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query, is_wj) ); + retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) ); + } return(retval); } @@ -3898,6 +4636,11 @@ int compute_snap_len(qp_node *fs, table_list *schema){ gb_tbl = NULL; sl_list.clear(); where.clear(); + + if(fs->node_type() == "watch_tbl_qpn"){ + return -1; + } + if(fs->node_type() == "spx_qpn"){ spx_qpn *spx_node = (spx_qpn *)fs; sl_list = spx_node->get_select_se_list(); @@ -3913,6 +4656,11 @@ int compute_snap_len(qp_node *fs, table_list *schema){ filter_join_qpn *fj_node = (filter_join_qpn *)fs; sl_list = fj_node->get_select_se_list(); where = fj_node->get_where_clause(); + } + else if(fs->node_type() == "watch_join"){ + watch_join_qpn *fj_node = (watch_join_qpn *)fs; + sl_list = fj_node->get_select_se_list(); + where = fj_node->get_where_clause(); } else{ fprintf(stderr,"INTERNAL ERROR, node type %s not recognized in compute_snap_len\n",fs->node_type().c_str()); exit(1);