}
+mrg_qpn::mrg_qpn(watch_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb){
+ param_tbl = spx->param_tbl;
+ int i;
+ node_name = n_name;
+ field_entry_list *fel = new field_entry_list();
+ merge_fieldpos = -1;
+
+ disorder = 1;
+
+ for(i=0;i<spx->select_list.size();++i){
+ data_type *dt = spx->select_list[i]->se->get_data_type()->duplicate();
+ if(dt->is_temporal()){
+ if(merge_fieldpos < 0){
+ merge_fieldpos = i;
+ }else{
+ fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), spx->select_list[merge_fieldpos]->name.c_str(), spx->select_list[i]->name.c_str(), spx->select_list[merge_fieldpos]->name.c_str() );
+ dt->reset_temporal();
+ }
+ }
+
+ field_entry *fe = dt->make_field_entry(spx->select_list[i]->name);
+ fel->append_field(fe);
+ delete dt;
+ }
+ if(merge_fieldpos<0){
+ fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
+ exit(1);
+ }
+ table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
+
+// NEED TO HANDLE USER_SPECIFIED SLACK
+ this->resolve_slack(spx->select_list[merge_fieldpos]->se,
+ spx->select_list[merge_fieldpos]->name, ifaces, ifdb,NULL);
+// if(this->slack == NULL)
+// fprintf(stderr,"Zero slack.\n");
+// else
+// fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
+
+ for(i=0;i<sources.size();i++){
+ std::string rvar = "_m"+int_to_string(i);
+ mvars.push_back(new colref_t(rvar.c_str(), spx->select_list[merge_fieldpos]->name.c_str()));
+ mvars[i]->set_tablevar_ref(i);
+ fm.push_back(new tablevar_t(sources[i].c_str()));
+ fm[i]->set_range_var(rvar);
+ }
+
+ param_tbl = new param_table();
+ std::vector<std::string> param_names = spx->param_tbl->get_param_names();
+ int pi;
+ for(pi=0;pi<param_names.size();pi++){
+ data_type *dt = spx->param_tbl->get_data_type(param_names[pi]);
+ param_tbl->add_param(param_names[pi],dt->duplicate(),
+ spx->param_tbl->handle_access(param_names[pi]));
+ }
+ definitions = spx->definitions;
+
+}
+
+
+
// This function translates an analyzed parse tree
// into one or more query nodes (qp_node).
// into the qp_node constructors,
// and have this code focus on building the query plan tree.
+// Watchlist node
+ if(qs->query_type == WATCHLIST_QUERY){
+ watch_tbl_qpn *watchnode = new watch_tbl_qpn(qs, Schema);
+
+// Done
+ plan_root = watchnode;
+ local_plan.push_back(watchnode);
+ }
+
+
// MERGE node
if(qs->query_type == MERGE_QUERY){
mrg_qpn *merge_node = new mrg_qpn(qs,Schema);
*/
- } else{
+ }
+
+ if(qs->query_type == SELECT_QUERY){
// Select / Aggregation / Join
if(qs->gb_tbl->size() == 0 && qs->aggr_tbl->size() == 0){
plan_root = join_node;
local_plan.push_back(join_node);
}else{
- join_eq_hash_qpn *join_node = new join_eq_hash_qpn(qs,Schema);
- plan_root = join_node;
- local_plan.push_back(join_node);
+ if(qs->fta_tree->get_from()->get_properties() == WATCHLIST_JOIN_PROPERTY){
+ watch_join_qpn *join_node = new watch_join_qpn(qs,Schema);
+ plan_root = join_node;
+ local_plan.push_back(join_node);
+ }else{
+ join_eq_hash_qpn *join_node = new join_eq_hash_qpn(qs,Schema);
+ plan_root = join_node;
+ local_plan.push_back(join_node);
+ }
}
}
}else{
return(ret);
}
+string watch_tbl_qpn::to_query_string(){
+ string ret;
+// ret += "DEFINE {\n";
+// ret += "\tfilename='"+filename+";\n";
+// ret += "\trefresh_interval="+to_string(refresh_interval)+";\n}\n";
+ ret += "WATCHLIST FIELDS {\n";
+ std::vector<field_entry *> fields = table_layout->get_fields();
+ for(int f=0;f<fields.size();++f){
+ ret += fields[f]->to_string()+"\n";
+ }
+ ret += "}\n";
+
+ return ret;
+}
string mrg_qpn::to_query_string(){
return(ret);
}
+string watch_join_qpn::to_query_string(){
+
+ string ret = "Select ";
+ int s;
+ for(s=0;s<select_list.size();s++){
+ if(s>0) ret+=", ";
+ ret += se_to_query_string(select_list[s]->se, NULL);
+ if(select_list[s]->name != "") ret += " AS "+select_list[s]->name;
+ }
+ ret += "\n";
+
+// NOTE: assuming binary join.
+ ret += "WATCHLIST_JOIN ";
+
+ ret += "From ";
+ int f;
+ for(f=0;f<from.size();++f){
+ if(f>0) ret+=", ";
+ ret += from[f]->to_string();
+ }
+ ret += "\n";
+
+ if(where.size() > 0){
+ ret += "Where ";
+ int w;
+ for(w=0;w<where.size();w++){
+ if(w>0) ret += " AND ";
+ ret += "(" + pred_to_query_str(where[w]->pr,NULL) + ")";
+ }
+ ret += "\n";
+ }
+
+ return(ret);
+}
+
+
// -----------------------------------------------------------------
// Query node subclass specific processing.
//------------------------------------------------------------------
// split a node to extract LFTA components.
+vector<qp_node *> watch_tbl_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
+ // nothing to do, nothing to split, return copy of self.
+
+ hfta_returned = 0;
+
+ vector<qp_node *> ret_vec;
+
+ ret_vec.push_back(this);
+ return(ret_vec);
+
+}
+
vector<qp_node *> mrg_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
// nothing to do, nothing to split, return copy of self.
vector<string> sel_names;
vector<pair<string,string> > ifaces = get_ifaces(from[0], ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
if (ifaces.empty()) {
- fprintf(stderr,"INTERNAL ERROR in filter_join_qpn::split_node_for_fta - empty interface set\n");
+ fprintf(stderr,"INTERNAL ERROR in filter_join_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
exit(1);
}
}
for(p=0;p<shared_pred.size();p++){
- predicate_t *new_pr = dup_pr(where[p]->pr, NULL);
+ predicate_t *new_pr = dup_pr(shared_pred[p]->pr, NULL);
cnf_elem *new_cnf = new cnf_elem(new_pr);
analyze_cnf(new_cnf);
fta_node->shared_pred.push_back(new_cnf);
fta_node->where.push_back(new_cnf);
}
for(p=0;p<pred_t0.size();p++){
- predicate_t *new_pr = dup_pr(where[p]->pr, NULL);
+ predicate_t *new_pr = dup_pr(pred_t0[p]->pr, NULL);
cnf_elem *new_cnf = new cnf_elem(new_pr);
analyze_cnf(new_cnf);
fta_node->pred_t0.push_back(new_cnf);
fta_node->where.push_back(new_cnf);
}
for(p=0;p<pred_t1.size();p++){
- predicate_t *new_pr = dup_pr(where[p]->pr, NULL);
+ predicate_t *new_pr = dup_pr(pred_t1[p]->pr, NULL);
cnf_elem *new_cnf = new cnf_elem(new_pr);
analyze_cnf(new_cnf);
fta_node->pred_t1.push_back(new_cnf);
fta_node->where.push_back(new_cnf);
}
for(p=0;p<hash_eq.size();p++){
- predicate_t *new_pr = dup_pr(where[p]->pr, NULL);
+ predicate_t *new_pr = dup_pr(hash_eq[p]->pr, NULL);
cnf_elem *new_cnf = new cnf_elem(new_pr);
analyze_cnf(new_cnf);
fta_node->hash_eq.push_back(new_cnf);
fta_node->where.push_back(new_cnf);
}
for(p=0;p<postfilter.size();p++){
- predicate_t *new_pr = dup_pr(where[p]->pr, NULL);
+ predicate_t *new_pr = dup_pr(postfilter[p]->pr, NULL);
cnf_elem *new_cnf = new cnf_elem(new_pr);
analyze_cnf(new_cnf);
fta_node->postfilter.push_back(new_cnf);
}
+
+
+
+
+vector<qp_node *> watch_join_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
+ vector<qp_node *> ret_vec;
+
+// First check if the query can be pushed to the FTA.
+ bool fta_ok = true;
+ int s;
+ for(s=0;s<select_list.size();s++){
+ fta_ok &= check_fta_forbidden_se(select_list[s]->se,NULL, Ext_fcns);
+ }
+ int p;
+ for(p=0;p<where.size();p++){
+ fta_ok &= check_fta_forbidden_pr(where[p]->pr,NULL, Ext_fcns);
+ }
+
+ if(!fta_ok){
+ fprintf(stderr,"ERROR, watchlist join %s is fta-unsafe.\n",node_name.c_str());
+ exit(1);
+ }
+
+// Can it be done in a single lfta?
+// Get the set of interfaces it accesses.
+ int ierr;
+ int si;
+ vector<string> sel_names;
+ vector<pair<string,string> > ifaces = get_ifaces(from[0], ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
+ if (ifaces.empty()) {
+ fprintf(stderr,"INTERNAL ERROR in filter_join_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
+ exit(1);
+ }
+
+ if(ifaces.size() == 1){
+// Single interface, no need to merge.
+ hfta_returned = 0;
+ ret_vec.push_back(this);
+
+// Treat the range vars a bit differently, the 2nd is reading from a _local_ watchlist.
+ from[0]->set_machine(ifaces[0].first);
+ from[0]->set_interface(ifaces[0].second);
+ from[0]->set_ifq(false);
+
+ from[1]->set_machine(ifaces[0].first);
+ from[1]->set_interface("_local_");
+ from[1]->set_ifq(false);
+
+ return(ret_vec);
+ }else{
+// Multiple interfaces, generate the interface-specific queries plus
+// the merge.
+ hfta_returned = 1;
+
+ vector<string> sel_names;
+ for(si=0;si<ifaces.size();++si){
+ watch_join_qpn *fta_node = new watch_join_qpn();
+
+// Name the fta
+ if(ifaces.size()==1)
+ fta_node->set_node_name( node_name );
+ else{
+ string new_name = "_"+node_name+"_"+ifaces[si].first+"_"+ifaces[si].second;
+ untaboo(new_name);
+ fta_node->set_node_name(new_name);
+ }
+ sel_names.push_back(fta_node->get_node_name());
+
+// Assign the table
+ int f;
+ for(f=0;f<from.size();f++){
+ fta_node->from.push_back(from[f]->duplicate());
+ fta_node->from[f]->set_machine(ifaces[si].first);
+ if(f==0)
+ fta_node->from[f]->set_interface(ifaces[si].second);
+ else
+ fta_node->from[f]->set_interface("_local_");
+ fta_node->from[f]->set_ifq(false);
+ }
+
+ for(s=0;s<select_list.size();s++){
+ fta_node->select_list.push_back( dup_select(select_list[s], NULL) );
+ }
+
+ for(p=0;p<pred_t0.size();p++){
+ predicate_t *new_pr = dup_pr(pred_t0[p]->pr, NULL);
+ cnf_elem *new_cnf = new cnf_elem(new_pr);
+ analyze_cnf(new_cnf);
+ fta_node->pred_t0.push_back(new_cnf);
+ fta_node->where.push_back(new_cnf);
+ }
+ for(p=0;p<pred_t1.size();p++){
+ predicate_t *new_pr = dup_pr(pred_t1[p]->pr, NULL);
+ cnf_elem *new_cnf = new cnf_elem(new_pr);
+ analyze_cnf(new_cnf);
+ fta_node->pred_t1.push_back(new_cnf);
+ fta_node->where.push_back(new_cnf);
+ }
+ for(p=0;p<key_flds.size();p++){ // we've checked that all keys are covered
+ string k = key_flds[p];
+ predicate_t *new_pr = dup_pr(hash_eq[k]->pr, NULL);
+ cnf_elem *new_cnf = new cnf_elem(new_pr);
+ analyze_cnf(new_cnf);
+ fta_node->hash_eq[k] = new_cnf;
+ fta_node->where.push_back(new_cnf);
+ }
+ for(p=0;p<join_filter.size();p++){
+ predicate_t *new_pr = dup_pr(join_filter[p]->pr, NULL);
+ cnf_elem *new_cnf = new cnf_elem(new_pr);
+ analyze_cnf(new_cnf);
+ fta_node->postfilter.push_back(new_cnf);
+ fta_node->where.push_back(new_cnf);
+ }
+ for(p=0;p<postfilter.size();p++){
+ predicate_t *new_pr = dup_pr(postfilter[p]->pr, NULL);
+ cnf_elem *new_cnf = new cnf_elem(new_pr);
+ analyze_cnf(new_cnf);
+ fta_node->postfilter.push_back(new_cnf);
+ fta_node->where.push_back(new_cnf);
+ }
+ fta_node->key_flds = key_flds;
+
+// Xfer all of the parameters.
+// Use existing handle annotations.
+ vector<string> param_names = param_tbl->get_param_names();
+ int pi;
+ for(pi=0;pi<param_names.size();pi++){
+ data_type *dt = param_tbl->get_data_type(param_names[pi]);
+ fta_node->param_tbl->add_param(param_names[pi],dt->duplicate(),
+ param_tbl->handle_access(param_names[pi]));
+ }
+ fta_node->definitions = definitions;
+ if(fta_node->resolve_if_params(ifdb, this->err_str)){
+ this->error_code = 3;
+ return ret_vec;
+ }
+
+ ret_vec.push_back(fta_node);
+ }
+
+ mrg_qpn *mrg_node = new mrg_qpn((watch_join_qpn *)ret_vec[0],
+ node_name, sel_names,ifaces, ifdb);
+ ret_vec.push_back(mrg_node);
+
+ return(ret_vec);
+ }
+
+}
+
// Use to search for unresolved interface param refs in an hfta.
int spx_qpn::count_ifp_refs(set<string> &ifpnames){
return ret;
}
+int watch_tbl_qpn::count_ifp_refs(set<string> &ifpnames){
+ return 0;
+}
+
int mrg_qpn::count_ifp_refs(set<string> &ifpnames){
return 0;
}
return ret;
}
+int watch_join_qpn::count_ifp_refs(set<string> &ifpnames){
+ int ret = 0;
+ int i;
+ for(i=0;i<select_list.size();++i)
+ ret += count_se_ifp_refs(select_list[i]->se,ifpnames);
+ for(i=0;i<where.size();++i)
+ ret += count_pr_ifp_refs(where[i]->pr,ifpnames);
+ return ret;
+}
+
+
// Resolve interface params to string literals
int filter_join_qpn::resolve_if_params( ifq_t *ifdb, string &err){
return ret;
}
+int watch_join_qpn::resolve_if_params( ifq_t *ifdb, string &err){
+ int ret = 0;
+ int i;
+ string ifname = from[0]->get_interface();
+ string ifmach = from[0]->get_machine();
+ for(i=0;i<select_list.size();++i)
+ if( resolve_se_ifp_refs(select_list[i]->se,ifmach, ifname, ifdb, err) )
+ ret = 1;
+ for(i=0;i<where.size();++i)
+ if( resolve_pr_ifp_refs(where[i]->pr,ifmach, ifname, ifdb, err))
+ ret = 1;
+ return ret;
+}
+
int spx_qpn::resolve_if_params( ifq_t *ifdb, string &err){
int ret = 0;
vector<string> sel_names;
vector<pair<string,string> > ifaces = get_ifaces(table_name, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
if (ifaces.empty()) {
- fprintf(stderr,"INTERNAL ERROR in spx_qpn::split_node_for_fta - empty interface set\n");
+ fprintf(stderr,"INTERNAL ERROR in spx_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
exit(1);
}
vector<string> sel_names;
vector<pair<string,string> > ifaces = get_ifaces(table_name, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
if (ifaces.empty()) {
- fprintf(stderr,"INTERNAL ERROR in rsgah_qpn::split_node_for_fta - empty interface set\n");
+ fprintf(stderr,"INTERNAL ERROR in rsgah_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
exit(1);
}
vector<string> sel_names;
vector<pair<string,string> > ifaces = get_ifaces(table_name, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
if (ifaces.empty()) {
- fprintf(stderr,"INTERNAL ERROR in sgah_qpn::split_node_for_fta - empty interface set\n");
+ fprintf(stderr,"INTERNAL ERROR in sgah_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
exit(1);
}
spx_qpn *c_node = child_vec[f];
vector<pair<string, string> > ifaces = get_ifaces(c_node->table_name, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
if (ifaces.empty()) {
- fprintf(stderr,"INTERNAL ERROR in join_eq_hash_qpn::split_node_for_fta - empty interface set\n");
+ fprintf(stderr,"INTERNAL ERROR in join_eq_hash_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
exit(1);
}
return(ret);
}
+vector<table_exp_t *> watch_join_qpn::extract_opview(table_list *Schema, vector<query_node *> &qnodes, opview_set &opviews, string rootnm, string silo_name){
+ vector<table_exp_t *> ret;
+ int retval = process_opview(from[0],0,node_name,
+ Schema,qnodes,opviews,ret, rootnm, silo_name);
+ if(retval) exit(1);
+ return(ret);
+}
+
+
+
+vector<table_exp_t *> watch_tbl_qpn::extract_opview(table_list *Schema, vector<query_node *> &qnodes, opview_set &opviews, string rootnm, string silo_name){
+ vector<table_exp_t *> ret;
+ return ret; // nothing to process
+}
+
//////////////////////////////////////////////////////////////////
return(table_layout);
}
+table_def *watch_tbl_qpn::get_fields(){
+ return(table_layout);
+}
+
table_def *spx_qpn::get_fields(){
return(create_attributes(node_name, select_list));
return(create_attributes(node_name, select_list));
}
+table_def *watch_join_qpn::get_fields(){
+ return(create_attributes(node_name, select_list));
+}
table_def *join_eq_hash_qpn::get_fields(){
int i, h, s, t;
return(fm);
}
+ vector<tablevar_t *> watch_tbl_qpn::get_input_tbls(){
+ vector<tablevar_t *> ret;
+ return(ret);
+ }
+
vector<tablevar_t *> mrg_qpn::get_input_tbls(){
return(fm);
}
return(from);
}
+ vector<tablevar_t *> watch_join_qpn::get_input_tbls(){
+ return(from);
+ }
+
//-----------------------------------------------------------------
// get output tables
return(retval);
}
+ vector<tablevar_t *> watch_tbl_qpn::get_output_tbls(){
+ vector<tablevar_t *> retval(1,new tablevar_t(node_name.c_str()));
+ return(retval);
+ }
+
vector<tablevar_t *> mrg_qpn::get_output_tbls(){
vector<tablevar_t *> retval(1,new tablevar_t(node_name.c_str()));
return(retval);
}
+ vector<tablevar_t *> watch_join_qpn::get_output_tbls(){
+ vector<tablevar_t *> retval(1,new tablevar_t(node_name.c_str()));
+ return(retval);
+ }
+
+
//-----------------------------------------------------------------
// Bind to schema
return tmp_cset;
}
+col_id_set watch_join_qpn::get_colrefs(bool ext_fcns_only,table_list *Schema){
+ col_id_set retval, tmp_cset;
+ int p;
+ for(p=0;p<where.size();++p){
+ gather_pr_col_ids(where[p]->pr, tmp_cset, NULL);
+ }
+ int s;
+ for(s=0;s<select_list.size();++s){
+ gather_se_col_ids(select_list[s]->se, tmp_cset, NULL);
+ }
+ col_id_set::iterator cisi;
+ if(ext_fcns_only){
+ for(cisi=tmp_cset.begin();cisi!=tmp_cset.end();++cisi){
+ field_entry *fe = Schema->get_field((*cisi).schema_ref, (*cisi).field);
+ if(fe->get_unpack_fcns().size()>0)
+ retval.insert((*cisi));
+ }
+ return retval;
+ }
+
+ return tmp_cset;
+}
+
+
// Associate colrefs in SEs with this schema.
}
+void watch_join_qpn::bind_to_schema(table_list *Schema){
+// Bind the tablevars in the From clause to the Schema
+// (it might have changed from analysis time)
+ int f;
+ for(f=0;f<from.size();++f){
+ string snm = from[f]->get_schema_name();
+ int tbl_ref = Schema->get_table_ref(snm);
+ if(tbl_ref >= 0)
+ from[f]->set_schema_ref(tbl_ref);
+ }
+
+// Bind all SEs to this schema
+ tablevar_list_t fm(from);
+
+ int p;
+ for(p=0;p<where.size();++p){
+ bind_to_schema_pr(where[p]->pr, &fm, Schema);
+ }
+ int s;
+ for(s=0;s<select_list.size();++s){
+ bind_to_schema_se(select_list[s]->se, &fm, Schema);
+ }
+
+// Collect set of tuples referenced in this HFTA
+// input, internal, or output.
+
+}
+
+
//-----------------------------------------------------------------
// get_cplx_lit_tbl
+cplx_lit_table *watch_tbl_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){
+ return(new cplx_lit_table());
+}
+
cplx_lit_table *mrg_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){
return(new cplx_lit_table());
}
return(complex_literals);
}
+cplx_lit_table *watch_join_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){
+ int i;
+ cplx_lit_table *complex_literals = new cplx_lit_table();
+
+ for(i=0;i<select_list.size();i++){
+ find_complex_literal_se(select_list[i]->se, Ext_fcns, complex_literals);
+ }
+ for(i=0;i<where.size();++i){
+ find_complex_literal_pr(where[i]->pr,Ext_fcns, complex_literals);
+ }
+
+ return(complex_literals);
+}
+
+
//-----------------------------------------------------------------
// get_handle_param_tbl
+vector<handle_param_tbl_entry *> watch_tbl_qpn::get_handle_param_tbl(ext_fcn_list *Ext_fcns){
+ vector<handle_param_tbl_entry *> retval;
+ return(retval);
+}
+
vector<handle_param_tbl_entry *> mrg_qpn::get_handle_param_tbl(ext_fcn_list *Ext_fcns){
vector<handle_param_tbl_entry *> retval;
return(retval);
return(retval);
}
+vector<handle_param_tbl_entry *> watch_join_qpn::get_handle_param_tbl(ext_fcn_list *Ext_fcns){
+ int i;
+ vector<handle_param_tbl_entry *> retval;
+
+ for(i=0;i<select_list.size();i++){
+ find_param_handles_se(select_list[i]->se, Ext_fcns, retval);
+ }
+ for(i=0;i<where.size();++i){
+ find_param_handles_pr(where[i]->pr,Ext_fcns, retval);
+ }
+
+ return(retval);
+}
+
+
+
///////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////
/// Functions for operator output rates estimations
return SGAHCWCB_SELECTIVITY * DEFAULT_INTERFACE_RATE;
}
+double watch_tbl_qpn::get_rate_estimate() {
+
+ // dummy method for now
+ return DEFAULT_INTERFACE_RATE;
+}
+
double mrg_qpn::get_rate_estimate() {
// dummy method for now
);
}
+////////////////////////////////////////////////
+/// WATCHLIST_TBL operator
+/// WATCHLIST_TBL functor
+////////////////////////////////////////////
+
+string watch_tbl_qpn::generate_functor_name(){
+ return("watch_tbl_functor_" + normalize_name(this->get_node_name()));
+}
+
+string watch_tbl_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, vector<bool> &needs_xform){
+
+ return("ERROR_WATCH_TBL_FUNCTOR_NOT_YET_IMPLEMENTED");
+}
+
+string watch_tbl_qpn::generate_operator(int i, string params){
+ return("ERROR_WATCH_TBL_FUNCTOR_NOT_YET_IMPLEMENTED");
+}
/////////////////////////////////////////////////////////
////// JOIN_EQ_HASH functor
}
}
+void watch_join_qpn::create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){
+ int i;
+ vector<map<string, scalarexp_t *> *> src_vec;
+
+ for(i=0;i<q_sources.size();i++){
+ if(q_sources[i] != NULL)
+ src_vec.push_back(q_sources[i]->get_protocol_se());
+ else
+ src_vec.push_back(NULL);
+ }
+
+ for(i=0;i<select_list.size();i++){
+ protocol_map[select_list[i]->name] = resolve_protocol_se(select_list[i]->se,src_vec,NULL,Schema);
+ }
+
+ for(i=0;i<key_flds.size();i++){
+ string kfld = key_flds[i];
+ hash_src_l.push_back(resolve_protocol_se(hash_eq[kfld]->pr->get_left_se(),src_vec,NULL,Schema));
+ hash_src_r.push_back(resolve_protocol_se(hash_eq[kfld]->pr->get_right_se(),src_vec,NULL,Schema));
+ }
+}
+
+
void sgah_qpn::create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){
int i;
vector<map<string, scalarexp_t *> *> src_vec;
protocol_map[fld_nm] = NULL;
}
}
+
+void watch_tbl_qpn::create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){
+ return;
+}
+