mangler="";
tablevar_list_t *fm = parse_tree->get_from();
- refd_tbls = fm->get_table_names();
+ if(fm!=NULL){
+ refd_tbls = fm->get_table_names();
+ }
params = pt->query_params;
};
// Each qp node must be able to return a description
// of the tuples it creates.
-// TODO: the get_output_tls method should subsume the get_fields
+// TODO: the get_output_tbls method should subsume the get_fields
// method, but in fact it really just returns the
// operator name.
virtual table_def *get_fields() = 0; // Should be vector?
+// get keys from the operator. Currently, only works on group-by queries.
+// partial_keys set to true if there is a suspicion that the list is partial.
+ virtual std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys) = 0;
// Get the from clause
virtual std::vector<tablevar_t *> get_input_tbls() = 0;
// this is a confused function, it acutally return the output
// The "where" clause is a pre-filter
virtual std::vector<cnf_elem *> get_where_clause() = 0;
-// To be more explicit, use get_filter_preds
+// To be more explicit, use get_filter_preds, this is used to compute the prefilter
virtual std::vector<cnf_elem *> get_filter_clause() = 0;
+// Add an extra predicate. Currently only used for LFTAs.
+ virtual void append_to_where(cnf_elem *c) = 0;
+
void add_predecessor(int i){predecessors.push_back(i);};
void remove_predecessor(int i){
std::vector<int>::iterator vi;
return ret;
}
+ void append_to_where(cnf_elem *c){
+ where.push_back(c);
+ }
+
+
void bind_to_schema(table_list *Schema);
col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
table_def *get_fields();
+ std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
+ std::vector<string> ret;
+ return ret;
+ }
+
std::vector<tablevar_t *> get_input_tbls();
std::vector<tablevar_t *> get_output_tbls();
// embedded in them. Would it make sense
// to grab the whole table list?
tablevar_list_t *fm = qs->fta_tree->get_from();
+
std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
if(tbl_vec.size() != 1){
char tmpstr[200];
}
table_name = (tbl_vec[0]);
+ int t = tbl_vec[0]->get_schema_ref();
+ if(! Schema->is_stream(t)){
+ err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
+ error_code = 1;
+ }
+
// Get the select list.
select_list = qs->fta_tree->get_sl_vec();
int lfta_disorder; // maximum disorder in the steam between lfta, hfta
int hfta_disorder; // maximum disorder in the hfta
+ int hfta_slow_flush; // outputs per input, 0 means no slow flush
// rollup, cube, and grouping_sets cannot be readily reconstructed by
// analyzing the patterns, so explicitly record them here.
std::string generate_operator(int i, std::string params);
std::string get_include_file(){
- if(hfta_disorder <= 1){
- return("#include <groupby_operator.h>\n");
+ if(hfta_disorder <= 1){
+ if(hfta_slow_flush>0){
+ return("#include <groupby_slowflush_operator.h>\n");
}else{
- return("#include <groupby_operator_oop.h>\n");
+ return("#include <groupby_operator.h>\n");
}
+ }else{
+ return("#include <groupby_operator_oop.h>\n");
+ }
};
std::vector<select_element *> get_select_list(){return select_list;};
return ret;
};
std::vector<cnf_elem *> get_where_clause(){return where;};
+
+ void append_to_where(cnf_elem *c){
+ where.push_back(c);
+ }
+
std::vector<cnf_elem *> get_filter_clause(){return where;};
std::vector<cnf_elem *> get_having_clause(){return having;};
gb_table *get_gb_tbl(){return &gb_tbl;};
// table which represents output tuple.
table_def *get_fields();
+ std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys);
std::vector<tablevar_t *> get_input_tbls();
std::vector<tablevar_t *> get_output_tbls();
sgah_qpn(){
lfta_disorder = 1;
hfta_disorder = 1;
+ hfta_slow_flush = 0;
};
sgah_qpn(query_summary_class *qs,table_list *Schema){
lfta_disorder = 1;
hfta_disorder = 1;
+ hfta_slow_flush = 0;
// Get the table name.
// NOTE the colrefs have the tablevar ref (an int)
}
table_name = (tbl_vec[0]);
+ int t = tbl_vec[0]->get_schema_ref();
+ if(! Schema->is_stream(t)){
+ err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
+ error_code = 1;
+ }
+
+
// Get the select list.
select_list = qs->fta_tree->get_sl_vec();
param_tbl->handle_access(param_names[pi]));
}
ret->definitions = definitions;
+ ret->hfta_slow_flush = hfta_slow_flush;
ret->node_name = node_name + suffix;
return ret;
};
std::vector<cnf_elem *> get_where_clause(){return where;};
+ void append_to_where(cnf_elem *c){
+ where.push_back(c);
+ }
+
std::vector<cnf_elem *> get_filter_clause(){return where;};
std::vector<cnf_elem *> get_having_clause(){return having;};
std::vector<cnf_elem *> get_closing_when_clause(){return closing_when;};
// table which represents output tuple.
table_def *get_fields();
+ std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys);
+
std::vector<tablevar_t *> get_input_tbls();
std::vector<tablevar_t *> get_output_tbls();
}
table_name = (tbl_vec[0]);
+ int t = tbl_vec[0]->get_schema_ref();
+ if(! Schema->is_stream(t)){
+ err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
+ error_code = 1;
+ }
+
// Get the select list.
select_list = qs->fta_tree->get_sl_vec();
};
+
+// Watchlist - from a table read from an external source.
+
+class watch_tbl_qpn: public qp_node{
+public:
+ table_def *table_layout; // the output schema
+ std::vector<std::string> key_flds;
+
+// Parameters related to loading the table
+ std::string filename;
+ int refresh_interval;
+
+
+ void append_to_where(cnf_elem *c){
+ fprintf(stderr, "ERROR, append_to_where called on watch_tbl_qpn, not supported, query %s.\n", node_name.c_str());
+ exit(1);
+ }
+
+ std::string node_type(){return("watch_tbl_qpn"); };
+ bool makes_transform(){return false;};
+ std::vector<std::string> external_libs(){
+ std::vector<std::string> ret;
+ return ret;
+ }
+
+ void bind_to_schema(table_list *Schema){}
+ col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
+ col_id_set ret;
+ return ret;
+ }
+
+ std::string to_query_string();
+ std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
+ std::string generate_functor_name();
+ std::string generate_operator(int i, std::string params);
+ std::string get_include_file(){
+ return("#include <watchlist_tbl.h>\n");
+ };
+
+ cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
+ std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
+
+ table_def *get_fields();
+ std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
+ return key_flds;
+ }
+
+ std::vector<tablevar_t *> get_input_tbls();
+ std::vector<tablevar_t *> get_output_tbls();
+
+ std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
+ virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
+// Ensure that any refs to interface params have been split away.
+ int count_ifp_refs(std::set<std::string> &ifpnames);
+
+// No predicates, return an empty clause
+ std::vector<cnf_elem *> get_where_clause(){
+ std::vector<cnf_elem *> t;
+ return(t);
+ };
+ std::vector<cnf_elem *> get_filter_clause(){
+ return get_where_clause();
+ }
+
+ watch_tbl_qpn(){
+ };
+
+ watch_tbl_qpn(query_summary_class *qs,table_list *Schema){
+ node_name=qs->query_name;
+ param_tbl = qs->param_tbl;
+ definitions = qs->definitions;
+
+
+// Populate the schema
+ table_layout = new table_def(
+ node_name.c_str(), NULL, NULL, qs->fta_tree->fel, WATCHLIST_SCHEMA
+ );
+
+// Find the keys
+ std::vector<field_entry *> flds = qs->fta_tree->fel->get_list();
+ for(int f=0;f<flds.size();++f){
+ if(flds[f]->get_modifier_list()->contains_key("key") ||
+ flds[f]->get_modifier_list()->contains_key("Key") ||
+ flds[f]->get_modifier_list()->contains_key("KEY") ){
+ key_flds.push_back(flds[f]->get_name());
+ }
+ }
+ if(key_flds.size()==0){
+ fprintf(stderr,"Error, no key fields defined for table watchlist %s\n",node_name.c_str());
+ exit(1);
+ }
+
+ table_layout->set_keys(key_flds); // communicate keys to consumers
+
+// Get loading parameters
+ if(definitions.count("filename")>0){
+ filename = definitions["filename"];
+ }else{
+ fprintf(stderr, "Error, no filename for source data defined for table watchlist %s\n",node_name.c_str());
+ exit(1);
+ }
+ if(definitions.count("refresh_interval")>0){
+ refresh_interval = atoi(definitions["refresh_interval"].c_str());
+ if(refresh_interval <= 0){
+ fprintf(stderr, "Error, the refresh_interval (%s) of table watchlist %s must be a positive non-zero integer.\n",definitions["refresh_interval"].c_str(), node_name.c_str());
+ exit(1);
+ }
+ }else{
+ fprintf(stderr, "Error, no refresh_interval defined for table watchlist %s\n",node_name.c_str());
+ exit(1);
+ }
+
+ }
+
+ qp_node *make_copy(std::string suffix){
+ watch_tbl_qpn *ret = new watch_tbl_qpn();
+ ret->filename = filename;
+ ret->refresh_interval = refresh_interval;
+ ret->key_flds = key_flds;
+
+ ret->param_tbl = new param_table();
+ std::vector<std::string> param_names = param_tbl->get_param_names();
+ int pi;
+ for(pi=0;pi<param_names.size();pi++){
+ data_type *dt = param_tbl->get_data_type(param_names[pi]);
+ ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
+ param_tbl->handle_access(param_names[pi]));
+ }
+ ret->definitions = definitions;
+
+ ret->node_name = node_name + suffix;
+ ret->table_layout = table_layout->make_shallow_copy(ret->node_name);
+
+ return ret;
+ };
+
+ // the following method is used for distributed query optimization
+ double get_rate_estimate();
+
+ void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
+
+
+};
+
+
+
+
+
+
// forward reference
class filter_join_qpn;
+class watch_join_qpn;
// (temporal) Merge query plan node.
partn_def_t* partn_def;
+ void append_to_where(cnf_elem *c){
+ fprintf(stderr, "ERROR, append_to_where called on mrg_qpn, not supported, query %s.\n", node_name.c_str());
+ exit(1);
+ }
+
+
std::string node_type(){return("mrg_qpn"); };
bool makes_transform(){return false;};
std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
table_def *get_fields();
+ std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
+ std::vector<string> ret;
+ return ret;
+ }
+
std::vector<tablevar_t *> get_input_tbls();
std::vector<tablevar_t *> get_output_tbls();
exit(1);
}
+ for(int f=0;f<fm.size();++f){
+ int t=fm[f]->get_schema_ref();
+ if(! Schema->is_stream(t)){
+ err_str += "ERROR in query "+node_name+", the source "+fm[f]->get_schema_name()+" is not a stream.\n";
+ error_code = 1;
+ }
+ }
+
// Get the parameters
param_tbl = qs->param_tbl;
mrg_qpn(filter_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb);
+// Merge watch_join LFTAs.
+
+ mrg_qpn(watch_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb);
+
// Merge selection LFTAs.
mrg_qpn(spx_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb){
exit(1);
}
+ void append_to_where(cnf_elem *c){
+ fprintf(stderr, "Error, append_to_where called on join_hash_qpn, not supported, query is %s\n",node_name.c_str());
+ exit(1);
+ }
+
+
std::string to_query_string();
std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
std::string generate_functor_name();
std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
table_def *get_fields();
+
+// It might be feasible to find keys in an equijoin expression.
+ std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
+ std::vector<string> ret;
+ return ret;
+ }
+
std::vector<tablevar_t *> get_input_tbls();
std::vector<tablevar_t *> get_output_tbls();
error_code = 1;
}
+ for(int f=0;f<from.size();++f){
+ int t=from[f]->get_schema_ref();
+ if(! Schema->is_stream(t)){
+ err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n";
+ error_code = 1;
+ }
+ }
+
+
// Get the select list.
select_list = qs->fta_tree->get_sl_vec();
return ret;
};
// Used for LFTA only
+ void append_to_where(cnf_elem *c){
+ where.push_back(c);
+ }
+
std::vector<cnf_elem *> get_where_clause(){return where;}
std::vector<cnf_elem *> get_filter_clause(){return shared_pred;}
std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
table_def *get_fields();
+// It should be feasible to find keys in a filter join
+ std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
+ std::vector<string> ret;
+ return ret;
+ }
+
std::vector<tablevar_t *> get_input_tbls();
std::vector<tablevar_t *> get_output_tbls();
// Ensure that any refs to interface params have been split away.
int count_ifp_refs(std::set<std::string> &ifpnames);
-
+// CONSTRUCTOR
filter_join_qpn(){
};
filter_join_qpn(query_summary_class *qs,table_list *Schema){
err_str += tmpstr;
error_code = 1;
}
+ if(from[0]->get_interface() != from[1]->get_interface()){
+ err_str += "Error building filter_join_qpn node: all range variables must be sourced from the same interface or interface set ("+from[0]->get_interface()+" vs. "+from[1]->get_interface()+")\n";
+ error_code = 1;
+ }
+
+ for(int f=0;f<from.size();++f){
+ int t=from[f]->get_schema_ref();
+ if(! Schema->is_stream(t)){
+ err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n";
+ error_code = 1;
+ }
+ }
+
// Get the select list.
select_list = qs->fta_tree->get_sl_vec();
};
+
+// TODO : put tests on other operators to ensure they dont' read from a watchlist
+// TODO : register with : is_partn_compatible pushdown_partn_operator is_pushdown_compatible pushdown_operator ?
+class watch_join_qpn: public qp_node{
+public:
+ std::vector<tablevar_t *> from; // Source tables
+ std::vector<select_element *> select_list; // Select list
+ std::vector<cnf_elem *> pred_t0; // main (R) preds
+ std::vector<cnf_elem *> pred_t1; // watchlist-only (S) preds (?)
+ std::map<std::string, cnf_elem *> hash_eq; // predicates on S hash keys
+ std::vector<cnf_elem *> join_filter; // ref's R, S, but not a hash
+ std::vector<cnf_elem *> postfilter; // ref's no table.
+
+ std::vector<std::string> key_flds;
+
+ std::vector<cnf_elem *> where; // all the filters
+ // useful for summary analysis
+
+ std::vector<scalarexp_t *> hash_src_r, hash_src_l;
+ std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
+ std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
+
+
+
+ std::string node_type(){return("watch_join"); };
+ bool makes_transform(){return true;};
+ std::vector<std::string> external_libs(){
+ std::vector<std::string> ret;
+ return ret;
+ }
+
+ void bind_to_schema(table_list *Schema);
+ col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
+
+ std::string to_query_string();
+ std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform){
+ fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_functor called\n");
+ exit(1);
+ }
+ std::string generate_functor_name(){
+ fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_functor_name called\n");
+ exit(1);
+ }
+ std::string generate_operator(int i, std::string params){
+ fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_operator called\n");
+ exit(1);
+ }
+ std::string get_include_file(){return("#include <watchlist_operator.h>\n");};
+
+ std::vector<select_element *> get_select_list(){return select_list;};
+ std::vector<scalarexp_t *> get_select_se_list(){
+ std::vector<scalarexp_t *> ret;
+ int i;
+ for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
+ return ret;
+ };
+// Used for LFTA only
+ void append_to_where(cnf_elem *c){
+ where.push_back(c);
+ }
+
+ std::vector<cnf_elem *> get_where_clause(){return where;}
+
+ std::vector<cnf_elem *> get_filter_clause(){return pred_t0;}
+
+ cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
+ std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
+
+ table_def *get_fields();
+// It should be feasible to find keys in a watchlist join
+ std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
+ std::vector<string> ret;
+ return ret;
+ }
+
+ std::vector<tablevar_t *> get_input_tbls();
+ std::vector<tablevar_t *> get_output_tbls();
+
+ std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
+ int resolve_if_params(ifq_t *ifdb, std::string &err);
+
+ virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
+// Ensure that any refs to interface params have been split away.
+ int count_ifp_refs(std::set<std::string> &ifpnames);
+
+// CONSTRUCTOR
+ watch_join_qpn(){
+ };
+ watch_join_qpn(query_summary_class *qs,table_list *Schema){
+ int i,w;
+// Get the table name.
+// NOTE the colrefs have the table ref (an int)
+// embedded in them. Would it make sense
+// to grab the whole table list?
+ from = qs->fta_tree->get_from()->get_table_list();
+ if(from.size() != 2){
+ char tmpstr[200];
+ sprintf(tmpstr,"ERROR building filter_join_qpn node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
+ err_str += tmpstr;
+ error_code = 1;
+ }
+
+ int t = from[0]->get_schema_ref();
+ if(Schema->get_schema_type(t) != PROTOCOL_SCHEMA){
+ err_str += "ERROR in query "+node_name+", the LHS of the join must be a PROTOCOL\n";
+ error_code = 1;
+ }
+ t = from[1]->get_schema_ref();
+ if(Schema->get_schema_type(t) != WATCHLIST_SCHEMA){
+ err_str += "ERROR in query "+node_name+", the RHS of the join must be a WATCHLIST\n";
+ error_code = 1;
+ }
+ key_flds = Schema->get_table(t)->get_keys();
+
+
+// Get the select list.
+ select_list = qs->fta_tree->get_sl_vec();
+
+// Get the selection predicate.
+ where = qs->wh_cnf;
+ std::vector<cnf_elem *> t0_only, t1_only;
+ for(w=0;w<where.size();++w){
+ analyze_cnf(where[w]);
+ std::vector<int> pred_tbls;
+ get_tablevar_ref_pr(where[w]->pr,pred_tbls);
+// Collect the list of preds by src var,
+// extract the shared preds later.
+ if(pred_tbls.size()==1){
+ if(pred_tbls[0] == 0){
+ pred_t0.push_back(where[w]);
+ }else{
+ pred_t1.push_back(where[w]);
+ }
+ continue;
+ }
+// refs nothing -- might be sampling, do it as postfilter.
+ if(pred_tbls.size()==0){
+ postfilter.push_back(where[w]);
+ continue;
+ }
+
+// Must reference both
+// See if it can be a hash predicate.
+ if(where[w]->is_atom && where[w]->eq_pred){
+ std::vector<int> sel_tbls, ser_tbls;
+ get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
+ get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
+ if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
+// make channel 0 SE on LHS.
+ if(sel_tbls[0] != 0)
+ where[w]->swap_scalar_operands();
+
+// Must be simple (a colref) on the RHS
+ if(where[w]->r_simple){
+ string rcol = where[w]->pr->get_right_se()->get_colref()->get_field();
+ if(std::find(key_flds.begin(), key_flds.end(), rcol) != key_flds.end()){
+ hash_eq[rcol] = where[w];
+
+ data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
+ data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
+ if( (dtl->is_increasing() && dtr->is_increasing()) || (dtl->is_decreasing() && dtr->is_decreasing()) )
+ err_str += "Warning, a watchlist join should not have join predicates on temporal fields, query "+node_name+".\n";
+ continue;
+ }
+ }
+ }
+ }
+// All tests failed, fallback is join_filter.
+ join_filter.push_back(where[w]);
+ }
+
+ if(key_flds.size() > hash_eq.size()){
+ err_str += "Error, in query "+node_name+" the watchlist join does not cover all fields in the watchlist with an equality predicate. Missing fields are";
+ for(int k=0;k<key_flds.size();++k){
+ if(hash_eq.count(key_flds[k]) < 1){
+ err_str += " "+key_flds[k];
+ }
+ }
+ err_str += ".\n";
+ error_code = 5;
+ }
+
+
+// Get the parameters
+ param_tbl = qs->param_tbl;
+ definitions = qs->definitions;
+
+ };
+
+ // the following method is used for distributed query optimization
+ double get_rate_estimate();
+
+
+ qp_node* make_copy(std::string suffix){
+ watch_join_qpn *ret = new watch_join_qpn();
+
+ ret->param_tbl = new param_table();
+ std::vector<std::string> param_names = param_tbl->get_param_names();
+ int pi;
+ for(pi=0;pi<param_names.size();pi++){
+ data_type *dt = param_tbl->get_data_type(param_names[pi]);
+ ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
+ param_tbl->handle_access(param_names[pi]));
+ }
+ ret->definitions = definitions;
+
+ ret->node_name = node_name + suffix;
+
+ // make shallow copy of all fields
+ ret->where = where;
+ ret->from = from;
+ ret->select_list = select_list;
+ ret->key_flds = key_flds;
+ ret->pred_t0 = pred_t0;
+ ret->pred_t1 = pred_t1;
+ ret->join_filter = join_filter;
+ ret->postfilter = postfilter;
+ ret->hash_eq = hash_eq;
+ ret->hash_src_r = hash_src_r;
+ ret->hash_src_l = hash_src_l;
+
+ return ret;
+ };
+
+ void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
+
+};
+
+
+
+
enum output_file_type_enum {regular, gzip, bzip};
class output_file_qpn: public qp_node{
return ret;
}
+ void append_to_where(cnf_elem *c){
+ fprintf(stderr, "ERROR, append_to_where called on output_file_qpn, not supported, query %s.\n", node_name.c_str());
+ exit(1);
+ }
+
+
+
void bind_to_schema(table_list *Schema){}
col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
col_id_set ret;
fel->append_field(fields[i]);
return new table_def(node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
}
+
+// TODO! either bypass the output operator in stream_query,
+// or propagate key information when the output operator is constructed.
+ std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
+ std::vector<string> ret;
+ return ret;
+ }
+
std::vector<tablevar_t *> get_input_tbls();
std::vector<tablevar_t *> get_output_tbls();
hfta_query_name = qn;
eat_input = ei;
+// TODO stream checking, but it requires passing Schema to output_file_qpn
+/*
+ for(int f=0;f<fm.size();++f){
+ int t=fm[f]->get_schema_ref();
+ if(! Schema->is_stream(t)){
+ err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n";
+ error_code = 1;
+ }
+ }
+*/
+
+
do_gzip = false;
compression_type = regular;
if(ospec->operator_type == "zfile")
// table which represents output tuple.
table_def *get_fields();
+// TODO Key extraction should be feasible but I'll defer the issue.
+ std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
+ std::vector<string> ret;
+ return ret;
+ }
+
std::vector<tablevar_t *> get_input_tbls();
std::vector<tablevar_t *> get_output_tbls();
+ void append_to_where(cnf_elem *c){
+ where.push_back(c);
+ }
+
sgahcwcb_qpn(){
};
}
table_name = (tbl_vec[0]);
+ int t = tbl_vec[0]->get_schema_ref();
+ if(! Schema->is_stream(t)){
+ err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
+ error_code = 1;
+ }
+
+
// Get the select list.
select_list = qs->fta_tree->get_sl_vec();
table_def *create_attributes(string tname, vector<select_element *> &select_list);
-
#endif