/* ------------------------------------------------ Copyright 2014 AT&T Intellectual Property Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ------------------------------------------- */ #ifndef __QUERY_PLAN_H__ #define __QUERY_PLAN_H__ #include #include #include using namespace std; #include"analyze_fta.h" #include"iface_q.h" #include"parse_partn.h" #include"generate_utils.h" // Identify the format of the input, output streams. #define UNKNOWNFORMAT 0 #define NETFORMAT 1 #define HOSTFORMAT 2 /////////////////////////////////////////////////// // representation of an output operator specification struct ospec_str{ string query; string operator_type; string operator_param; string output_directory; int bucketwidth; string partitioning_flds; int n_partitions; }; //////////////////////////////////////////////////// // Input representation of a query struct query_node{ int idx; std::set reads_from; std::set sources_to; std::vector refd_tbls; std::vector params; std::string name; std::string file; std::string mangler; // for UDOPs bool touched; table_exp_t *parse_tree; int n_consumers; bool is_udop; bool is_externally_visible; bool inferred_visible_node; set subtree_roots; query_node(){ idx = -1; touched = false; parse_tree = NULL; n_consumers = 0; is_externally_visible = false; inferred_visible_node = false; mangler=""; }; query_node(int i, std::string qnm, std::string flnm, table_exp_t *pt){ idx = i; touched = false; name = qnm; file = flnm; parse_tree = pt; n_consumers = 0; is_udop = false; is_externally_visible = pt->get_visible(); inferred_visible_node = false; mangler=""; tablevar_list_t *fm = parse_tree->get_from(); if(fm!=NULL){ refd_tbls = fm->get_table_names(); } params = pt->query_params; }; query_node(int ix, std::string udop_name,table_list *Schema){ idx = ix; touched = false; name = udop_name; file = udop_name; parse_tree = NULL; n_consumers = 0; is_udop = true; is_externally_visible = true; inferred_visible_node = false; mangler=""; int sid = Schema->find_tbl(udop_name); std::vector subq = Schema->get_subqueryspecs(sid); int i; for(i=0;iname); } }; }; struct hfta_node{ std::string name; std::string source_name; std::vector query_node_indices; std::set reads_from; std::set sources_to; bool is_udop; bool inferred_visible_node; int n_parallel; int parallel_idx; bool do_generation; // false means, ignore it. hfta_node(){ is_udop = false; inferred_visible_node = false; n_parallel = 1; parallel_idx = 0; do_generation = true; } }; #define SPX_QUERY 1 #define SGAH_QUERY 2 // the following selectivity estimates are used by our primitive rate estimators #define SPX_SELECTIVITY 1.0 #define SGAH_SELECTIVITY 0.1 #define RSGAH_SELECTIVITY 0.1 #define SGAHCWCB_SELECTIVITY 0.1 #define MRG_SELECTIVITY 1.0 #define JOIN_EQ_HASH_SELECTIVITY 1.0 // the the output rate of the interface is not given we are going to use // this default value #define DEFAULT_INTERFACE_RATE 100 // Define query plan nodes // These nodes are intended for query modeling // and transformation rather than for code generation. // Query plan node base class. // It has an ID, can return its type, // and can be linked into lists with the predecessors // and successors. // To add : serialize, unserialize? class qp_node{ public: int id; std::vector predecessors; std::vector successors; std::string node_name; // For error reporting without exiting the program. int error_code; std::string err_str; // These should be moved to the containing stream_query object. std::map definitions; param_table *param_tbl; // The value of a field in terms of protocol fields (if any). std::map protocol_map; qp_node(){ error_code = 0; id = -1; param_tbl = new param_table(); }; qp_node(int i){ error_code = 0; id = i; param_tbl = new param_table(); }; int get_id(){return(id);}; void set_id(int i){id = i; }; int get_error_code(){return error_code;}; std::string get_error_str(){return err_str;}; virtual std::string node_type() = 0; // For code generation, does the operator xform its input. virtual bool makes_transform() = 0; // For linking, what external libraries does the operator depend on? virtual std::vector external_libs() = 0; void set_node_name(std::string n){node_name = n;}; std::string get_node_name(){return node_name;}; void set_definitions(std::map &def){ definitions = def; }; std::map get_definitions(){return definitions;}; // call to create the mapping from field name to se in protocol fields. // Pass in qp_node of data sources, in order. virtual void create_protocol_se(std::vector q_sources,table_list *Schema)=0; // get the protocol map. the parameter is the return value. std::map *get_protocol_se(){return &protocol_map;} // Each qp node must be able to return a description // of the tuples it creates. // TODO: the get_output_tbls method should subsume the get_fields // method, but in fact it really just returns the // operator name. virtual table_def *get_fields() = 0; // Should be vector? // get keys from the operator. Currently, only works on group-by queries. // partial_keys set to true if there is a suspicion that the list is partial. virtual std::vector get_tbl_keys(std::vector &partial_keys) = 0; // Get the from clause virtual std::vector get_input_tbls() = 0; // this is a confused function, it acutally return the output // table name. virtual std::vector get_output_tbls() = 0; std::string get_val_of_def(std::string def){ if(definitions.count(def) > 0) return definitions[def]; return(""); }; void set_definition(std::string def, std::string val){ definitions[def]=val; } // Associate colrefs in SEs with tables // at code generation time. virtual void bind_to_schema(table_list *Schema) = 0; // Get colrefs of the operator, currently only meaningful for lfta // operators, and only interested in colrefs with extraction fcns virtual col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema)=0; virtual std::string to_query_string() = 0; virtual std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector &needs_xform) = 0; virtual std::string generate_functor_name() = 0; virtual std::string generate_operator(int i, std::string params) = 0; virtual std::string get_include_file() = 0; virtual cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns) = 0; virtual std::vector get_handle_param_tbl(ext_fcn_list *Ext_fcns) = 0; // Split this node into LFTA and HFTA nodes. // Four possible outcomes: // 1) the qp_node reads from a protocol, but does not need to // split (can be evaluated as an LFTA). // The lfta node is the only element in the return vector, // and hfta_returned is false. // 2) the qp_node reads from no protocol, and therefore cannot be split. // THe hfta node is the only element in the return vector, // and hfta_returned is true. // 3) reads from at least one protocol, but cannot be split : failure. // return vector is empty, the error conditions are written // in the qp_node. // 4) The qp_node splits into an hfta node and one or more LFTA nodes. // the return vector has two or more elements, and hfta_returned // is true. The last element is the HFTA. virtual std::vector split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx) = 0; // Ensure that any refs to interface params have been split away. virtual int count_ifp_refs(std::set &ifpnames)=0; // Tag the data sources which are views, // return the (optimized) source queries and // record the view access in opview_set virtual std::vector extract_opview(table_list *Schema, std::vector &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm) = 0; param_table *get_param_tbl(){return param_tbl;}; // The "where" clause is a pre-filter virtual std::vector get_where_clause() = 0; // To be more explicit, use get_filter_preds, this is used to compute the prefilter virtual std::vector get_filter_clause() = 0; // Add an extra predicate. Currently only used for LFTAs. virtual void append_to_where(cnf_elem *c) = 0; void add_predecessor(int i){predecessors.push_back(i);}; void remove_predecessor(int i){ std::vector::iterator vi; for(vi=predecessors.begin(); vi!=predecessors.end();++vi){ if((*vi) == i){ predecessors.erase(vi); return; } } }; void add_successor(int i){successors.push_back(i);}; std::vector get_predecessors(){return predecessors;}; int n_predecessors(){return predecessors.size();}; std::vector get_successors(){return successors;}; int n_successors(){return successors.size();}; void clear_predecessors(){predecessors.clear();}; void clear_successors(){successors.clear();}; // the following method is used for distributed query optimization double get_rate_estimate(); // used for cloning query nodes virtual qp_node* make_copy(std::string suffix) = 0; }; // Select, project, transform (xform) query plan node. // represent the following query fragment // select scalar_expression_1, ..., scalar_expression_k // from S // where predicate // // the predicates and the scalar expressions can reference // attributes of S and also functions. class spx_qpn: public qp_node{ public: tablevar_t *table_name; // Source table std::vector where; // selection predicate std::vector select_list; // Select list std::string node_type(){return("spx_qpn"); }; bool makes_transform(){return true;}; std::vector external_libs(){ std::vector ret; return ret; } void append_to_where(cnf_elem *c){ where.push_back(c); } void bind_to_schema(table_list *Schema); col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema); std::string to_query_string(); std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector &needs_xform); std::string generate_functor_name(); std::string generate_operator(int i, std::string params); std::string get_include_file(){return("#include \n");}; std::vector get_select_list(){return select_list;}; std::vector get_select_se_list(){ std::vector ret; int i; for(i=0;ise); return ret; }; std::vector get_where_clause(){return where;}; std::vector get_filter_clause(){return where;}; cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns); std::vector get_handle_param_tbl(ext_fcn_list *Ext_fcns); table_def *get_fields(); std::vector get_tbl_keys(std::vector &partial_keys){ std::vector ret; return ret; } std::vector get_input_tbls(); std::vector get_output_tbls(); std::vector split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx); virtual std::vector extract_opview(table_list *Schema, std::vector &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm); // Ensure that any refs to interface params have been split away. int count_ifp_refs(std::set &ifpnames); int resolve_if_params(ifq_t *ifdb, std::string &err); spx_qpn(){ }; spx_qpn(query_summary_class *qs,table_list *Schema){ // Get the table name. // NOTE the colrefs have the table ref (an int) // embedded in them. Would it make sense // to grab the whole table list? tablevar_list_t *fm = qs->fta_tree->get_from(); std::vector tbl_vec = fm->get_table_list(); if(tbl_vec.size() != 1){ char tmpstr[200]; sprintf(tmpstr,"INTERNAL ERROR building SPX node: query defined over %lu tables.\n",tbl_vec.size() ); err_str = tmpstr; error_code = 1; } table_name = (tbl_vec[0]); int t = tbl_vec[0]->get_schema_ref(); if(! Schema->is_stream(t)){ err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n"; error_code = 1; } // Get the select list. select_list = qs->fta_tree->get_sl_vec(); // Get the selection predicate. where = qs->wh_cnf; // Get the parameters param_tbl = qs->param_tbl; }; // the following method is used for distributed query optimization double get_rate_estimate(); qp_node* make_copy(std::string suffix){ spx_qpn *ret = new spx_qpn(); ret->param_tbl = new param_table(); std::vector param_names = param_tbl->get_param_names(); int pi; for(pi=0;piget_data_type(param_names[pi]); ret->param_tbl->add_param(param_names[pi],dt->duplicate(), param_tbl->handle_access(param_names[pi])); } ret->definitions = definitions; ret->node_name = node_name + suffix; // make shallow copy of all fields ret->where = where; ret->select_list = select_list; return ret; }; void create_protocol_se(vector q_sources, table_list *Schema); }; // Select, group-by, aggregate. // Representing // Select SE_1, ..., SE_k // From T // Where predicate // Group By gb1, ..., gb_n // Having predicate // // NOTE : the samlping operator is sgahcwcb_qpn. // // For now, must have group-by variables and aggregates. // The scalar expressions which are output must be a function // of the groub-by variables and the aggregates. // The group-by variables can be references to columsn of T, // or they can be scalar expressions. class sgah_qpn: public qp_node{ public: tablevar_t *table_name; // source table std::vector where; // selection predicate std::vector having; // post-aggregation predicate std::vector select_list; // se's of output gb_table gb_tbl; // Table of all group-by attributes. aggregate_table aggr_tbl; // Table of all referenced aggregates. std::vector gb_sources; // pre-compute for partitioning. int lfta_disorder; // maximum disorder in the steam between lfta, hfta int hfta_disorder; // maximum disorder in the hfta int hfta_slow_flush; // outputs per input, 0 means no slow flush // rollup, cube, and grouping_sets cannot be readily reconstructed by // analyzing the patterns, so explicitly record them here. // used only so that to_query_string produces something meaningful. std::vector gb_entry_type; std::vector gb_entry_count; std::vector get_gb_sources(){return gb_sources;} std::string node_type(){return("sgah_qpn"); }; bool makes_transform(){return true;}; std::vector external_libs(){ std::vector ret; return ret; } void bind_to_schema(table_list *Schema); col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema); std::string to_query_string(); std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector &needs_xform); std::string generate_functor_name(); std::string generate_operator(int i, std::string params); std::string get_include_file(){ if(hfta_disorder <= 1){ if(hfta_slow_flush>0){ return("#include \n"); }else{ return("#include \n"); } }else{ return("#include \n"); } }; std::vector get_select_list(){return select_list;}; std::vector get_select_se_list(){ std::vector ret; int i; for(i=0;ise); return ret; }; std::vector get_where_clause(){return where;}; void append_to_where(cnf_elem *c){ where.push_back(c); } std::vector get_filter_clause(){return where;}; std::vector get_having_clause(){return having;}; gb_table *get_gb_tbl(){return &gb_tbl;}; aggregate_table *get_aggr_tbl(){return &aggr_tbl;}; cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns); std::vector get_handle_param_tbl(ext_fcn_list *Ext_fcns); // table which represents output tuple. table_def *get_fields(); std::vector get_tbl_keys(std::vector &partial_keys); std::vector get_input_tbls(); std::vector get_output_tbls(); sgah_qpn(){ lfta_disorder = 1; hfta_disorder = 1; hfta_slow_flush = 0; }; sgah_qpn(query_summary_class *qs,table_list *Schema){ lfta_disorder = 1; hfta_disorder = 1; hfta_slow_flush = 0; // Get the table name. // NOTE the colrefs have the tablevar ref (an int) // embedded in them. Would it make sense // to grab the whole table list? tablevar_list_t *fm = qs->fta_tree->get_from(); std::vector tbl_vec = fm->get_table_list(); if(tbl_vec.size() != 1){ char tmpstr[200]; sprintf(tmpstr,"INTERNAL ERROR building SGAH node: query defined over %lu tables.\n",tbl_vec.size() ); err_str=tmpstr; error_code = 1; } table_name = (tbl_vec[0]); int t = tbl_vec[0]->get_schema_ref(); if(! Schema->is_stream(t)){ err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n"; error_code = 1; } // Get the select list. select_list = qs->fta_tree->get_sl_vec(); // Get the selection and having predicates. where = qs->wh_cnf; having = qs->hav_cnf; // Build a new GB var table (don't share, might need to modify) int g; for(g=0;ggb_tbl->size();g++){ gb_tbl.add_gb_var(qs->gb_tbl->get_name(g), qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g), qs->gb_tbl->get_reftype(g) ); } gb_tbl.set_pattern_info(qs->gb_tbl); // gb_tbl.gb_entry_type = qs->gb_tbl->gb_entry_type; // gb_tbl.gb_entry_count = qs->gb_tbl->gb_entry_count; // gb_tbl.pattern_components = qs->gb_tbl->pattern_components; // Build a new aggregate table. (don't share, might need // to modify). int a; for(a=0;aaggr_tbl->size();a++){ aggr_tbl.add_aggr( // qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a) qs->aggr_tbl->duplicate(a) ); } // Get the parameters param_tbl = qs->param_tbl; }; std::vector split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx); virtual std::vector extract_opview(table_list *Schema, std::vector &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm); // Ensure that any refs to interface params have been split away. int count_ifp_refs(std::set &ifpnames); int resolve_if_params(ifq_t *ifdb, std::string &err); // the following method is used for distributed query optimization double get_rate_estimate(); qp_node* make_copy(std::string suffix){ sgah_qpn *ret = new sgah_qpn(); ret->param_tbl = new param_table(); std::vector param_names = param_tbl->get_param_names(); int pi; for(pi=0;piget_data_type(param_names[pi]); ret->param_tbl->add_param(param_names[pi],dt->duplicate(), param_tbl->handle_access(param_names[pi])); } ret->definitions = definitions; ret->hfta_slow_flush = hfta_slow_flush; ret->node_name = node_name + suffix; // make shallow copy of all fields ret->where = where; ret->having = having; ret->select_list = select_list; ret->gb_tbl = gb_tbl; ret->aggr_tbl = aggr_tbl; return ret; }; // Split aggregation into two HFTA components - sub and superaggregation // If unable to split the aggreagates, split into selection and aggregation // If resulting low-level query is empty (e.g. when aggregates cannot be split and // where clause is empty) empty vector willb e returned virtual std::vector split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema); void create_protocol_se(vector q_sources, table_list *Schema); }; // Select, group-by, aggregate. with running aggregates // Representing // Select SE_1, ..., SE_k // From T // Where predicate // Group By gb1, ..., gb_n // Closing When predicate // Having predicate // // NOTE : the sampling operator is sgahcwcb_qpn. // // For now, must have group-by variables and aggregates. // The scalar expressions which are output must be a function // of the groub-by variables and the aggregates. // The group-by variables can be references to columsn of T, // or they can be scalar expressions. class rsgah_qpn: public qp_node{ public: tablevar_t *table_name; // source table std::vector where; // selection predicate std::vector having; // post-aggregation predicate std::vector closing_when; // group closing predicate std::vector select_list; // se's of output gb_table gb_tbl; // Table of all group-by attributes. aggregate_table aggr_tbl; // Table of all referenced aggregates. std::vector gb_sources; // pre-compute for partitioning. int lfta_disorder; // maximum disorder allowed in stream between lfta, hfta int hfta_disorder; // maximum disorder allowed in hfta std::vector get_gb_sources(){return gb_sources;} std::string node_type(){return("rsgah_qpn"); }; bool makes_transform(){return true;}; std::vector external_libs(){ std::vector ret; return ret; } void bind_to_schema(table_list *Schema); col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){ fprintf(stderr,"INTERNAL ERROR, calling rsgah_qpn::get_colrefs\n"); exit(1); } std::string to_query_string(); std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector &needs_xform); std::string generate_functor_name(); std::string generate_operator(int i, std::string params); std::string get_include_file(){return("#include \n");}; std::vector get_select_list(){return select_list;}; std::vector get_select_se_list(){ std::vector ret; int i; for(i=0;ise); return ret; }; std::vector get_where_clause(){return where;}; void append_to_where(cnf_elem *c){ where.push_back(c); } std::vector get_filter_clause(){return where;}; std::vector get_having_clause(){return having;}; std::vector get_closing_when_clause(){return closing_when;}; gb_table *get_gb_tbl(){return &gb_tbl;}; aggregate_table *get_aggr_tbl(){return &aggr_tbl;}; cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns); std::vector get_handle_param_tbl(ext_fcn_list *Ext_fcns); // table which represents output tuple. table_def *get_fields(); std::vector get_tbl_keys(std::vector &partial_keys); std::vector get_input_tbls(); std::vector get_output_tbls(); rsgah_qpn(){ lfta_disorder = 1; hfta_disorder = 1; }; rsgah_qpn(query_summary_class *qs,table_list *Schema){ lfta_disorder = 1; hfta_disorder = 1; // Get the table name. // NOTE the colrefs have the tablevar ref (an int) // embedded in them. Would it make sense // to grab the whole table list? tablevar_list_t *fm = qs->fta_tree->get_from(); std::vector tbl_vec = fm->get_table_list(); if(tbl_vec.size() != 1){ char tmpstr[200]; sprintf(tmpstr,"INTERNAL ERROR buildingR SGAH node: query defined over %lu tables.\n",tbl_vec.size() ); err_str=tmpstr; error_code = 1; } table_name = (tbl_vec[0]); int t = tbl_vec[0]->get_schema_ref(); if(! Schema->is_stream(t)){ err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n"; error_code = 1; } // Get the select list. select_list = qs->fta_tree->get_sl_vec(); // Get the selection and having predicates. where = qs->wh_cnf; having = qs->hav_cnf; closing_when = qs->closew_cnf; // Build a new GB var table (don't share, might need to modify) int g; for(g=0;ggb_tbl->size();g++){ gb_tbl.add_gb_var(qs->gb_tbl->get_name(g), qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g), qs->gb_tbl->get_reftype(g) ); } // Build a new aggregate table. (don't share, might need // to modify). int a; for(a=0;aaggr_tbl->size();a++){ aggr_tbl.add_aggr( // qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a) qs->aggr_tbl->duplicate(a) ); } // Get the parameters param_tbl = qs->param_tbl; }; std::vector split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx); std::vector split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema); virtual std::vector extract_opview(table_list *Schema, std::vector &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm); // Ensure that any refs to interface params have been split away. int count_ifp_refs(std::set &ifpnames); int resolve_if_params(ifq_t *ifdb, std::string &err){ return 0;} // the following method is used for distributed query optimization double get_rate_estimate(); qp_node* make_copy(std::string suffix){ rsgah_qpn *ret = new rsgah_qpn(); ret->param_tbl = new param_table(); std::vector param_names = param_tbl->get_param_names(); int pi; for(pi=0;piget_data_type(param_names[pi]); ret->param_tbl->add_param(param_names[pi],dt->duplicate(), param_tbl->handle_access(param_names[pi])); } ret->definitions = definitions; ret->node_name = node_name + suffix; // make shallow copy of all fields ret->where = where; ret->having = having; ret->closing_when = closing_when; ret->select_list = select_list; ret->gb_tbl = gb_tbl; ret->aggr_tbl = aggr_tbl; return ret; }; void create_protocol_se(vector q_sources, table_list *Schema); }; // Watchlist - from a table read from an external source. class watch_tbl_qpn: public qp_node{ public: table_def *table_layout; // the output schema std::vector key_flds; // Parameters related to loading the table std::string filename; int refresh_interval; void append_to_where(cnf_elem *c){ fprintf(stderr, "ERROR, append_to_where called on watch_tbl_qpn, not supported, query %s.\n", node_name.c_str()); exit(1); } std::string node_type(){return("watch_tbl_qpn"); }; bool makes_transform(){return false;}; std::vector external_libs(){ std::vector ret; return ret; } void bind_to_schema(table_list *Schema){} col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){ col_id_set ret; return ret; } std::string to_query_string(); std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector &needs_xform); std::string generate_functor_name(); std::string generate_operator(int i, std::string params); std::string get_include_file(){ return("#include \n"); }; cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns); std::vector get_handle_param_tbl(ext_fcn_list *Ext_fcns); table_def *get_fields(); std::vector get_tbl_keys(std::vector &partial_keys){ return key_flds; } std::vector get_input_tbls(); std::vector get_output_tbls(); std::vector split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx); virtual std::vector extract_opview(table_list *Schema, std::vector &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm); // Ensure that any refs to interface params have been split away. int count_ifp_refs(std::set &ifpnames); // No predicates, return an empty clause std::vector get_where_clause(){ std::vector t; return(t); }; std::vector get_filter_clause(){ return get_where_clause(); } watch_tbl_qpn(){ }; watch_tbl_qpn(query_summary_class *qs,table_list *Schema){ node_name=qs->query_name; param_tbl = qs->param_tbl; definitions = qs->definitions; // Populate the schema table_layout = new table_def( node_name.c_str(), NULL, NULL, qs->fta_tree->fel, WATCHLIST_SCHEMA ); // Find the keys std::vector flds = qs->fta_tree->fel->get_list(); for(int f=0;fget_modifier_list()->contains_key("key") || flds[f]->get_modifier_list()->contains_key("Key") || flds[f]->get_modifier_list()->contains_key("KEY") ){ key_flds.push_back(flds[f]->get_name()); } } if(key_flds.size()==0){ fprintf(stderr,"Error, no key fields defined for table watchlist %s\n",node_name.c_str()); exit(1); } table_layout->set_keys(key_flds); // communicate keys to consumers // Get loading parameters if(definitions.count("filename")>0){ filename = definitions["filename"]; }else{ fprintf(stderr, "Error, no filename for source data defined for table watchlist %s\n",node_name.c_str()); exit(1); } if(definitions.count("refresh_interval")>0){ refresh_interval = atoi(definitions["refresh_interval"].c_str()); if(refresh_interval <= 0){ fprintf(stderr, "Error, the refresh_interval (%s) of table watchlist %s must be a positive non-zero integer.\n",definitions["refresh_interval"].c_str(), node_name.c_str()); exit(1); } }else{ fprintf(stderr, "Error, no refresh_interval defined for table watchlist %s\n",node_name.c_str()); exit(1); } } qp_node *make_copy(std::string suffix){ watch_tbl_qpn *ret = new watch_tbl_qpn(); ret->filename = filename; ret->refresh_interval = refresh_interval; ret->key_flds = key_flds; ret->param_tbl = new param_table(); std::vector param_names = param_tbl->get_param_names(); int pi; for(pi=0;piget_data_type(param_names[pi]); ret->param_tbl->add_param(param_names[pi],dt->duplicate(), param_tbl->handle_access(param_names[pi])); } ret->definitions = definitions; ret->node_name = node_name + suffix; ret->table_layout = table_layout->make_shallow_copy(ret->node_name); return ret; }; // the following method is used for distributed query optimization double get_rate_estimate(); void create_protocol_se(vector q_sources, table_list *Schema); }; // forward reference class filter_join_qpn; class watch_join_qpn; // (temporal) Merge query plan node. // represent the following query fragment // Merge c1:c2 // from T1 _t1, T2 _t2 // // T1 and T2 must have compatible schemas, // that is the same types in the same slots. // c1 and c2 must be colrefs from T1 and T2, // both ref'ing the same slot. Their types // must be temporal and the same kind of temporal. // in the output, no other field is temporal. // the field names ofthe output are drawn from T1. class mrg_qpn: public qp_node{ public: std::vector fm; // Source table std::vector mvars; // the merge-by columns. scalarexp_t *slack; table_def *table_layout; // the output schema int merge_fieldpos; // position of merge field, // convenience for manipulation. int disorder; // max disorder seen in the input / allowed in the output // partition definition for merges that combine streams partitioned over multiple interfaces partn_def_t* partn_def; void append_to_where(cnf_elem *c){ fprintf(stderr, "ERROR, append_to_where called on mrg_qpn, not supported, query %s.\n", node_name.c_str()); exit(1); } std::string node_type(){return("mrg_qpn"); }; bool makes_transform(){return false;}; std::vector external_libs(){ std::vector ret; return ret; } void bind_to_schema(table_list *Schema); col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){ fprintf(stderr,"INTERNAL ERROR, calling mrg_qpn::get_colrefs\n"); exit(1); } std::string to_query_string(); std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector &needs_xform); std::string generate_functor_name(); std::string generate_operator(int i, std::string params); std::string get_include_file(){ if(disorder>1) return("#include \n"); return("#include \n"); }; cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns); std::vector get_handle_param_tbl(ext_fcn_list *Ext_fcns); table_def *get_fields(); std::vector get_tbl_keys(std::vector &partial_keys){ std::vector ret; return ret; } std::vector get_input_tbls(); std::vector get_output_tbls(); std::vector split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx); virtual std::vector extract_opview(table_list *Schema, std::vector &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm); // Ensure that any refs to interface params have been split away. int count_ifp_refs(std::set &ifpnames); // No predicates, return an empty clause std::vector get_where_clause(){ std::vector t; return(t); }; std::vector get_filter_clause(){ return get_where_clause(); } mrg_qpn(){ partn_def = NULL; }; void set_disorder(int d){ disorder = d; } mrg_qpn(query_summary_class *qs,table_list *Schema){ disorder = 1; // Grab the elements of the query node. fm = qs->fta_tree->get_from()->get_table_list(); mvars = qs->mvars; slack = qs->slack; // sanity check if(fm.size() != mvars.size()){ fprintf(stderr,"INTERNAL ERROR in mrg_qpn::mrg_qpn. fm.size() = %lu, mvars.size() = %lu\n",fm.size(),mvars.size()); exit(1); } for(int f=0;fget_schema_ref(); if(! Schema->is_stream(t)){ err_str += "ERROR in query "+node_name+", the source "+fm[f]->get_schema_name()+" is not a stream.\n"; error_code = 1; } } // Get the parameters param_tbl = qs->param_tbl; // Need to set the node name now, so that the // schema (table_layout) can be properly named. // TODO: Setting the name of the table might best be done // via the set_node_name method, because presumably // thats when the node name is really known. // This should propogate to the table_def table_layout node_name=qs->query_name; /* int ff; printf("instantiating merge node, name = %s, %d sources.\n\t",node_name.c_str(), fm.size()); for(ff=0;ffto_string().c_str()); } printf("\n"); */ // Create the output schema. // strip temporal properites form all fields except the merge field. std::vector flva = Schema->get_fields(fm[0]->get_schema_name()); field_entry_list *fel = new field_entry_list(); int f; for(f=0;fget_type().c_str(), flva[f]->get_modifier_list()); if(flva[f]->get_name() == mvars[0]->get_field()){ merge_fieldpos = f; // if(slack != NULL) dt.reset_temporal(); }else{ dt.reset_temporal(); } param_list *plist = new param_list(); std::vector param_strings = dt.get_param_keys(); int p; for(p=0;pappend(param_strings[p].c_str(),v.c_str()); else plist->append(param_strings[p].c_str()); } fe=new field_entry( dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist, flva[f]->get_unpack_fcns()); fel->append_field(fe); } table_layout = new table_def( node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA ); partn_def = NULL; }; ///////////////////////////////////////////// /// Created for de-siloing. to be removed? or is it otherwise useful? // Merge existing set of sources (de-siloing) mrg_qpn(std::string n_name, std::vector &src_names,table_list *Schema){ int i,f; disorder = 1; // Construct the fm list for(f=0;fget_table_ref(src_names[f]); if(tbl_ref < 0){ fprintf(stderr,"INTERNAL ERROR, can't find %s in the schema when constructing no-silo merge node %s\n",src_names[f].c_str(), n_name.c_str()); exit(1); } table_def *src_tbl = Schema->get_table(tbl_ref); tablevar_t *fm_t = new tablevar_t(src_names[f].c_str()); string range_name = "_t" + int_to_string(f); fm_t->set_range_var(range_name); fm_t->set_schema_ref(tbl_ref); fm.push_back(fm_t); } // Create the output schema. // strip temporal properites form all fields except the merge field. std::vector flva = Schema->get_fields(fm[0]->get_schema_name()); field_entry_list *fel = new field_entry_list(); bool temporal_found = false; for(f=0;fget_type().c_str(), flva[f]->get_modifier_list()); if(dt.is_temporal() && !temporal_found){ merge_fieldpos = f; temporal_found = true; }else{ dt.reset_temporal(); } param_list *plist = new param_list(); std::vector param_strings = dt.get_param_keys(); int p; for(p=0;pappend(param_strings[p].c_str(),v.c_str()); else plist->append(param_strings[p].c_str()); } fe=new field_entry( dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist, flva[f]->get_unpack_fcns() ); fel->append_field(fe); } if(! temporal_found){ fprintf(stderr,"ERROR, can't find temporal field of the sources when constructing no-silo merge node %s\n",n_name.c_str()); exit(1); } node_name=n_name; table_layout = new table_def( node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA ); partn_def = NULL; param_tbl = new param_table(); // Construct mvars for(f=0;f flv_f = Schema->get_fields(fm[f]->get_schema_name()); data_type dt_f(flv_f[merge_fieldpos]->get_type().c_str(), flva[merge_fieldpos]->get_modifier_list()); colref_t *mcr = new colref_t(fm[f]->get_var_name().c_str(), flv_f[merge_fieldpos]->get_name().c_str()); mvars.push_back(mcr); } // literal_t *s_lit = new literal_t("5",LITERAL_INT); // slack = new scalarexp_t(s_lit); slack = NULL; }; // end de-siloing //////////////////////////////////////// void resolve_slack(scalarexp_t *t_se, std::string fname, std::vector > &sources,ifq_t *ifdb, gb_table *gbt); // Merge filter_join LFTAs. mrg_qpn(filter_join_qpn *spx, std::string n_name, std::vector &sources, std::vector > &ifaces, ifq_t *ifdb); // Merge watch_join LFTAs. mrg_qpn(watch_join_qpn *spx, std::string n_name, std::vector &sources, std::vector > &ifaces, ifq_t *ifdb); // Merge selection LFTAs. mrg_qpn(spx_qpn *spx, std::string n_name, std::vector &sources, std::vector > &ifaces, ifq_t *ifdb){ disorder = 1; param_tbl = spx->param_tbl; int i; node_name = n_name; field_entry_list *fel = new field_entry_list(); merge_fieldpos = -1; for(i=0;iselect_list.size();++i){ data_type *dt = spx->select_list[i]->se->get_data_type()->duplicate(); if(dt->is_temporal()){ if(merge_fieldpos < 0){ merge_fieldpos = i; }else{ fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), spx->select_list[merge_fieldpos]->name.c_str(), spx->select_list[i]->name.c_str(), spx->select_list[merge_fieldpos]->name.c_str() ); dt->reset_temporal(); } } field_entry *fe = dt->make_field_entry(spx->select_list[i]->name); fel->append_field(fe); delete dt; } if(merge_fieldpos<0){ fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str()); exit(1); } table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA); // NEED TO HANDLE USER_SPECIFIED SLACK this->resolve_slack(spx->select_list[merge_fieldpos]->se, spx->select_list[merge_fieldpos]->name, ifaces, ifdb,NULL); // if(this->slack == NULL) // fprintf(stderr,"Zero slack.\n"); // else // fprintf(stderr,"slack is %s\n",slack->to_string().c_str()); for(i=0;iselect_list[merge_fieldpos]->name.c_str())); mvars[i]->set_tablevar_ref(i); fm.push_back(new tablevar_t(sources[i].c_str())); fm[i]->set_range_var(rvar); } param_tbl = new param_table(); std::vector param_names = spx->param_tbl->get_param_names(); int pi; for(pi=0;piparam_tbl->get_data_type(param_names[pi]); param_tbl->add_param(param_names[pi],dt->duplicate(), spx->param_tbl->handle_access(param_names[pi])); } definitions = spx->definitions; } // Merge aggregation LFTAs mrg_qpn(sgah_qpn *sgah, std::string n_name, std::vector &sources, std::vector > &ifaces, ifq_t *ifdb){ disorder = 1; param_tbl = sgah->param_tbl; int i; node_name = n_name; field_entry_list *fel = new field_entry_list(); merge_fieldpos = -1; for(i=0;iselect_list.size();++i){ data_type *dt = sgah->select_list[i]->se->get_data_type()->duplicate(); if(dt->is_temporal()){ if(merge_fieldpos < 0){ merge_fieldpos = i; }else{ fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str(), sgah->select_list[i]->name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str() ); dt->reset_temporal(); } } field_entry *fe = dt->make_field_entry(sgah->select_list[i]->name); fel->append_field(fe); delete dt; } if(merge_fieldpos<0){ fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str()); exit(1); } table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA); // NEED TO HANDLE USER_SPECIFIED SLACK this->resolve_slack(sgah->select_list[merge_fieldpos]->se, sgah->select_list[merge_fieldpos]->name, ifaces, ifdb, &(sgah->gb_tbl)); if(this->slack == NULL) fprintf(stderr,"Zero slack.\n"); else fprintf(stderr,"slack is %s\n",slack->to_string().c_str()); for(i=0;iselect_list[merge_fieldpos]->name.c_str())); mvars[i]->set_tablevar_ref(i); fm.push_back(new tablevar_t(sources[i].c_str())); fm[i]->set_range_var(rvar); } param_tbl = new param_table(); std::vector param_names = sgah->param_tbl->get_param_names(); int pi; for(pi=0;piparam_tbl->get_data_type(param_names[pi]); param_tbl->add_param(param_names[pi],dt->duplicate(), sgah->param_tbl->handle_access(param_names[pi])); } definitions = sgah->definitions; } qp_node *make_copy(std::string suffix){ mrg_qpn *ret = new mrg_qpn(); ret->slack = slack; ret->disorder = disorder; ret->param_tbl = new param_table(); std::vector param_names = param_tbl->get_param_names(); int pi; for(pi=0;piget_data_type(param_names[pi]); ret->param_tbl->add_param(param_names[pi],dt->duplicate(), param_tbl->handle_access(param_names[pi])); } ret->definitions = definitions; ret->node_name = node_name + suffix; ret->table_layout = table_layout->make_shallow_copy(ret->node_name); ret->merge_fieldpos = merge_fieldpos; return ret; }; std::vector split_sources(); // the following method is used for distributed query optimization double get_rate_estimate(); // get partition definition for merges that combine streams partitioned over multiple interfaces // return NULL for regular merges partn_def_t* get_partn_definition(map lfta_names, vector interface_names, vector machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result) { if (partn_def) return partn_def; int err; string err_str; string partn_name; vector input_tables = get_input_tbls(); for (int i = 0; i < input_tables.size(); ++i) { tablevar_t * table = input_tables[i]; vector partn_names = ifaces_db->get_iface_vals(table->get_machine(), table->get_interface(),"iface_partition",err,err_str); if (partn_names.size() != 1) // can't have more than one value of partition attribute return NULL; string new_partn_name = partn_names[0]; // need to make sure that all ifaces belong to the same partition if (!i) partn_name = new_partn_name; else if (new_partn_name != partn_name) return NULL; } // now find partition definition corresponding to partn_name partn_def = partn_parse_result->get_partn_def(partn_name); return partn_def; }; void set_partn_definition(partn_def_t* def) { partn_def = def; } bool is_multihost_merge() { bool is_multihost = false; // each input table must be have machine attribute be non-empty // and there should be at least 2 different values of machine attributes vector input_tables = get_input_tbls(); string host = input_tables[0]->get_machine(); for (int i = 1; i < input_tables.size(); ++i) { string new_host = input_tables[i]->get_machine(); if (new_host == "") return false; if (new_host != host) is_multihost = true; } return is_multihost; } void create_protocol_se(vector q_sources, table_list *Schema); }; // eq_temporal, hash join query plan node. // represent the following query fragment // select scalar_expression_1, ..., scalar_expression_k // from T0 t0, T1 t1 // where predicate // // the predicates and the scalar expressions can reference // attributes of t0 and t1 and also functions. // The predicate must contain CNF elements to enable the // efficient evaluation of the query. // 1) at least one predicate of the form // (temporal se in t0) = (temporal se in t1) // 2) at least one predicate of the form // (non-temporal se in t0) = (non-temporal se in t1) // class join_eq_hash_qpn: public qp_node{ public: std::vector from; // Source tables std::vector select_list; // Select list std::vector prefilter[2]; // source prefilters std::vector temporal_eq; // define temporal window std::vector hash_eq; // define hash key std::vector postfilter; // final filter on hash matches. std::vector where; // all the filters // useful for summary analysis std::vector hash_src_r, hash_src_l; std::vector get_hash_r(){return hash_src_r;} std::vector get_hash_l(){return hash_src_l;} std::string node_type(){return("join_eq_hash_qpn"); }; bool makes_transform(){return true;}; std::vector external_libs(){ std::vector ret; return ret; } void bind_to_schema(table_list *Schema); col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){ fprintf(stderr,"INTERNAL ERROR, calling join_eq_hash_qpn::get_colrefs\n"); exit(1); } void append_to_where(cnf_elem *c){ fprintf(stderr, "Error, append_to_where called on join_hash_qpn, not supported, query is %s\n",node_name.c_str()); exit(1); } std::string to_query_string(); std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector &needs_xform); std::string generate_functor_name(); std::string generate_operator(int i, std::string params); std::string get_include_file(){return("#include \n");}; std::vector get_select_list(){return select_list;}; std::vector get_select_se_list(){ std::vector ret; int i; for(i=0;ise); return ret; }; // Used for LFTA only std::vector get_where_clause(){ std::vector t; return(t); }; std::vector get_filter_clause(){ return get_where_clause(); } cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns); std::vector get_handle_param_tbl(ext_fcn_list *Ext_fcns); table_def *get_fields(); // It might be feasible to find keys in an equijoin expression. std::vector get_tbl_keys(std::vector &partial_keys){ std::vector ret; return ret; } std::vector get_input_tbls(); std::vector get_output_tbls(); std::vector split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx); virtual std::vector extract_opview(table_list *Schema, std::vector &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm); // Ensure that any refs to interface params have been split away. int count_ifp_refs(std::set &ifpnames); join_eq_hash_qpn(){ }; join_eq_hash_qpn(query_summary_class *qs,table_list *Schema){ int w; // Get the table name. // NOTE the colrefs have the table ref (an int) // embedded in them. Would it make sense // to grab the whole table list? from = qs->fta_tree->get_from()->get_table_list(); if(from.size() != 2){ char tmpstr[200]; sprintf(tmpstr,"ERROR building join_eq_hash node: query defined over %lu tables, but joins must be between two sources.\n",from.size() ); err_str = tmpstr; error_code = 1; } for(int f=0;fget_schema_ref(); if(! Schema->is_stream(t)){ err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n"; error_code = 1; } } // Get the select list. select_list = qs->fta_tree->get_sl_vec(); // Get the selection predicate. where = qs->wh_cnf; for(w=0;w pred_tbls; get_tablevar_ref_pr(where[w]->pr,pred_tbls); // Prefilter if refs only one tablevar if(pred_tbls.size()==1){ prefilter[pred_tbls[0]].push_back(where[w]); continue; } // refs nothing -- might be sampling, do it as postfilter. if(pred_tbls.size()==0){ postfilter.push_back(where[w]); continue; } // See if it can be a hash or temporal predicate. // NOTE: synchronize with the temporality checking // done at join_eq_hash_qpn::get_fields if(where[w]->is_atom && where[w]->eq_pred){ std::vector sel_tbls, ser_tbls; get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls); get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls); if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){ // make channel 0 SE on LHS. if(sel_tbls[0] != 0) where[w]->pr->swap_scalar_operands(); data_type *dtl=where[w]->pr->get_left_se()->get_data_type(); data_type *dtr=where[w]->pr->get_right_se()->get_data_type(); if( (dtl->is_increasing() && dtr->is_increasing()) || (dtl->is_decreasing() && dtr->is_decreasing()) ) temporal_eq.push_back(where[w]); else hash_eq.push_back(where[w]); continue; } } // All tests failed, fallback is postfilter. postfilter.push_back(where[w]); } if(temporal_eq.size()==0){ err_str = "ERROR in join query: can't find temporal equality predicate to define a join window.\n"; error_code = 1; } // Get the parameters param_tbl = qs->param_tbl; }; // the following method is used for distributed query optimization double get_rate_estimate(); qp_node* make_copy(std::string suffix){ join_eq_hash_qpn *ret = new join_eq_hash_qpn(); ret->param_tbl = new param_table(); std::vector param_names = param_tbl->get_param_names(); int pi; for(pi=0;piget_data_type(param_names[pi]); ret->param_tbl->add_param(param_names[pi],dt->duplicate(), param_tbl->handle_access(param_names[pi])); } ret->definitions = definitions; ret->node_name = node_name + suffix; // make shallow copy of all fields ret->where = where; ret->from = from; ret->select_list = select_list; ret->prefilter[0] = prefilter[0]; ret->prefilter[1] = prefilter[1]; ret->postfilter = postfilter; ret->temporal_eq = temporal_eq; ret->hash_eq = hash_eq; return ret; }; void create_protocol_se(vector q_sources, table_list *Schema); }; // --------------------------------------------- // eq_temporal, hash join query plan node. // represent the following query fragment // select scalar_expression_1, ..., scalar_expression_k // FILTER_JOIN(col, range) from T0 t0, T1 t1 // where predicate // // t0 is the output range variable, t1 is the filtering range // variable. Both must alias a PROTOCOL. // The scalar expressions in the select clause may // reference t0 only. // The predicates are classified as follows // prefilter predicates: // a cheap predicate in t0 such that there is an equivalent // predicate in t1. Cost decisions about pushing to // lfta prefilter made later. // t0 predicates (other than prefilter predicates) // -- cheap vs. expensive sorted out at genereate time, // the constructor isn't called with the function list. // t1 predicates (other than prefiler predicates). // equi-join predicates of the form: // (se in t0) = (se in t1) // // There must be at least one equi-join predicate. // No join predicates other than equi-join predicates // are allowed. // Warn on temporal equi-join predicates. // t1 predicates should not be expensive ... warn? // class filter_join_qpn: public qp_node{ public: std::vector from; // Source tables colref_t *temporal_var; // join window in FROM unsigned int temporal_range; // metadata. std::vector select_list; // Select list std::vector shared_pred; // prefilter preds std::vector pred_t0; // main (R) preds std::vector pred_t1; // filtering (S) preds std::vector hash_eq; // define hash key std::vector postfilter; // ref's no table. std::vector where; // all the filters // useful for summary analysis std::vector hash_src_r, hash_src_l; std::vector get_hash_r(){return hash_src_r;} std::vector get_hash_l(){return hash_src_l;} bool use_bloom; // true => bloom filter, false => limited hash std::string node_type(){return("filter_join"); }; bool makes_transform(){return true;}; std::vector external_libs(){ std::vector ret; return ret; } void bind_to_schema(table_list *Schema); col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema); std::string to_query_string(); std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector &needs_xform){ fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor called\n"); exit(1); } std::string generate_functor_name(){ fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor_name called\n"); exit(1); } std::string generate_operator(int i, std::string params){ fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_operator called\n"); exit(1); } std::string get_include_file(){return("#include \n");}; std::vector get_select_list(){return select_list;}; std::vector get_select_se_list(){ std::vector ret; int i; for(i=0;ise); return ret; }; // Used for LFTA only void append_to_where(cnf_elem *c){ where.push_back(c); } std::vector get_where_clause(){return where;} std::vector get_filter_clause(){return shared_pred;} cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns); std::vector get_handle_param_tbl(ext_fcn_list *Ext_fcns); table_def *get_fields(); // It should be feasible to find keys in a filter join std::vector get_tbl_keys(std::vector &partial_keys){ std::vector ret; return ret; } std::vector get_input_tbls(); std::vector get_output_tbls(); std::vector split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx); int resolve_if_params(ifq_t *ifdb, std::string &err); virtual std::vector extract_opview(table_list *Schema, std::vector &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm); // Ensure that any refs to interface params have been split away. int count_ifp_refs(std::set &ifpnames); // CONSTRUCTOR filter_join_qpn(){ }; filter_join_qpn(query_summary_class *qs,table_list *Schema){ int i,w; // Get the table name. // NOTE the colrefs have the table ref (an int) // embedded in them. Would it make sense // to grab the whole table list? from = qs->fta_tree->get_from()->get_table_list(); temporal_var = qs->fta_tree->get_from()->get_colref(); temporal_range = qs->fta_tree->get_from()->get_temporal_range(); if(from.size() != 2){ char tmpstr[200]; sprintf(tmpstr,"ERROR building filter_join_qpn node: query defined over %lu tables, but joins must be between two sources.\n",from.size() ); err_str += tmpstr; error_code = 1; } if(from[0]->get_interface() != from[1]->get_interface()){ err_str += "Error building filter_join_qpn node: all range variables must be sourced from the same interface or interface set ("+from[0]->get_interface()+" vs. "+from[1]->get_interface()+")\n"; error_code = 1; } for(int f=0;fget_schema_ref(); if(! Schema->is_stream(t)){ err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n"; error_code = 1; } } // Get the select list. select_list = qs->fta_tree->get_sl_vec(); // Verify that only t0 is referenced. bool bad_ref = false; for(i=0;i sel_tbls; get_tablevar_ref_se(select_list[i]->se,sel_tbls); if((sel_tbls.size() == 2) || (sel_tbls.size()==1 && sel_tbls[0]==1)) bad_ref = true; } if(bad_ref){ err_str += "ERROR building filter_join_qpn node: query references range variable "+from[1]->variable_name+", but only the first range variable ("+from[0]->variable_name+" can be referenced.\n"; error_code = 1; } // Get the selection predicate. where = qs->wh_cnf; std::vector t0_only, t1_only; for(w=0;w pred_tbls; get_tablevar_ref_pr(where[w]->pr,pred_tbls); // Collect the list of preds by src var, // extract the shared preds later. if(pred_tbls.size()==1){ if(pred_tbls[0] == 0){ t0_only.push_back(where[w]); }else{ t1_only.push_back(where[w]); } continue; } // refs nothing -- might be sampling, do it as postfilter. if(pred_tbls.size()==0){ postfilter.push_back(where[w]); continue; } // See if it can be a hash or temporal predicate. // NOTE: synchronize with the temporality checking // done at join_eq_hash_qpn::get_fields if(where[w]->is_atom && where[w]->eq_pred){ std::vector sel_tbls, ser_tbls; get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls); get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls); if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){ // make channel 0 SE on LHS. if(sel_tbls[0] != 0) where[w]->pr->swap_scalar_operands(); hash_eq.push_back(where[w]); data_type *dtl=where[w]->pr->get_left_se()->get_data_type(); data_type *dtr=where[w]->pr->get_right_se()->get_data_type(); if( (dtl->is_increasing() && dtr->is_increasing()) || (dtl->is_decreasing() && dtr->is_decreasing()) ) err_str += "Warning, a filter join should not have join predicates on temporal fields.\n"; continue; } } // All tests failed, fallback is postfilter. err_str += "ERROR, join predicates in a filter join should have the form (scalar expression in "+from[0]->variable_name+") = (scalar expression in "+from[1]->variable_name+").\n"; error_code = 3; } // Classify the t0_only and t1_only preds. set matched_pred; int v; for(w=0;wpr,t1_only[v]->pr,Schema)) break; if(vparam_tbl; definitions = qs->definitions; // Determine the algorithm if(this->get_val_of_def("algorithm") == "hash"){ use_bloom = false; }else{ use_bloom = true; } }; // the following method is used for distributed query optimization double get_rate_estimate(); qp_node* make_copy(std::string suffix){ filter_join_qpn *ret = new filter_join_qpn(); ret->param_tbl = new param_table(); std::vector param_names = param_tbl->get_param_names(); int pi; for(pi=0;piget_data_type(param_names[pi]); ret->param_tbl->add_param(param_names[pi],dt->duplicate(), param_tbl->handle_access(param_names[pi])); } ret->definitions = definitions; ret->node_name = node_name + suffix; // make shallow copy of all fields ret->where = where; ret->from = from; ret->temporal_range = temporal_range; ret->temporal_var = temporal_var; ret->select_list = select_list; ret->shared_pred = shared_pred; ret->pred_t0 = pred_t0; ret->pred_t1 = pred_t1; ret->postfilter = postfilter; ret->hash_eq = hash_eq; return ret; }; void create_protocol_se(vector q_sources, table_list *Schema); }; // TODO : put tests on other operators to ensure they dont' read from a watchlist // TODO : register with : is_partn_compatible pushdown_partn_operator is_pushdown_compatible pushdown_operator ? class watch_join_qpn: public qp_node{ public: std::vector from; // Source tables std::vector select_list; // Select list std::vector pred_t0; // main (R) preds std::vector pred_t1; // watchlist-only (S) preds (?) std::map hash_eq; // predicates on S hash keys std::vector join_filter; // ref's R, S, but not a hash std::vector postfilter; // ref's no table. std::vector key_flds; std::vector where; // all the filters // useful for summary analysis std::vector hash_src_r, hash_src_l; std::vector get_hash_r(){return hash_src_r;} std::vector get_hash_l(){return hash_src_l;} std::string node_type(){return("watch_join"); }; bool makes_transform(){return true;}; std::vector external_libs(){ std::vector ret; return ret; } void bind_to_schema(table_list *Schema); col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema); std::string to_query_string(); std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector &needs_xform){ fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_functor called\n"); exit(1); } std::string generate_functor_name(){ fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_functor_name called\n"); exit(1); } std::string generate_operator(int i, std::string params){ fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_operator called\n"); exit(1); } std::string get_include_file(){return("#include \n");}; std::vector get_select_list(){return select_list;}; std::vector get_select_se_list(){ std::vector ret; int i; for(i=0;ise); return ret; }; // Used for LFTA only void append_to_where(cnf_elem *c){ where.push_back(c); } std::vector get_where_clause(){return where;} std::vector get_filter_clause(){return pred_t0;} cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns); std::vector get_handle_param_tbl(ext_fcn_list *Ext_fcns); table_def *get_fields(); // It should be feasible to find keys in a watchlist join std::vector get_tbl_keys(std::vector &partial_keys){ std::vector ret; return ret; } std::vector get_input_tbls(); std::vector get_output_tbls(); std::vector split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx); int resolve_if_params(ifq_t *ifdb, std::string &err); virtual std::vector extract_opview(table_list *Schema, std::vector &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm); // Ensure that any refs to interface params have been split away. int count_ifp_refs(std::set &ifpnames); // CONSTRUCTOR watch_join_qpn(){ }; watch_join_qpn(query_summary_class *qs,table_list *Schema){ int i,w; // Get the table name. // NOTE the colrefs have the table ref (an int) // embedded in them. Would it make sense // to grab the whole table list? from = qs->fta_tree->get_from()->get_table_list(); if(from.size() != 2){ char tmpstr[200]; sprintf(tmpstr,"ERROR building filter_join_qpn node: query defined over %lu tables, but joins must be between two sources.\n",from.size() ); err_str += tmpstr; error_code = 1; } int t = from[0]->get_schema_ref(); if(Schema->get_schema_type(t) != PROTOCOL_SCHEMA){ err_str += "ERROR in query "+node_name+", the LHS of the join must be a PROTOCOL\n"; error_code = 1; } t = from[1]->get_schema_ref(); if(Schema->get_schema_type(t) != WATCHLIST_SCHEMA){ err_str += "ERROR in query "+node_name+", the RHS of the join must be a WATCHLIST\n"; error_code = 1; } key_flds = Schema->get_table(t)->get_keys(); // Get the select list. select_list = qs->fta_tree->get_sl_vec(); // Get the selection predicate. where = qs->wh_cnf; std::vector t0_only, t1_only; for(w=0;w pred_tbls; get_tablevar_ref_pr(where[w]->pr,pred_tbls); // Collect the list of preds by src var, // extract the shared preds later. if(pred_tbls.size()==1){ if(pred_tbls[0] == 0){ pred_t0.push_back(where[w]); }else{ pred_t1.push_back(where[w]); } continue; } // refs nothing -- might be sampling, do it as postfilter. if(pred_tbls.size()==0){ postfilter.push_back(where[w]); continue; } // Must reference both // See if it can be a hash predicate. if(where[w]->is_atom && where[w]->eq_pred){ std::vector sel_tbls, ser_tbls; get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls); get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls); if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){ // make channel 0 SE on LHS. if(sel_tbls[0] != 0) where[w]->swap_scalar_operands(); // Must be simple (a colref) on the RHS if(where[w]->r_simple){ string rcol = where[w]->pr->get_right_se()->get_colref()->get_field(); if(std::find(key_flds.begin(), key_flds.end(), rcol) != key_flds.end()){ hash_eq[rcol] = where[w]; data_type *dtl=where[w]->pr->get_left_se()->get_data_type(); data_type *dtr=where[w]->pr->get_right_se()->get_data_type(); if( (dtl->is_increasing() && dtr->is_increasing()) || (dtl->is_decreasing() && dtr->is_decreasing()) ) err_str += "Warning, a watchlist join should not have join predicates on temporal fields, query "+node_name+".\n"; continue; } } } } // All tests failed, fallback is join_filter. join_filter.push_back(where[w]); } if(key_flds.size() > hash_eq.size()){ err_str += "Error, in query "+node_name+" the watchlist join does not cover all fields in the watchlist with an equality predicate. Missing fields are"; for(int k=0;kparam_tbl; definitions = qs->definitions; }; // the following method is used for distributed query optimization double get_rate_estimate(); qp_node* make_copy(std::string suffix){ watch_join_qpn *ret = new watch_join_qpn(); ret->param_tbl = new param_table(); std::vector param_names = param_tbl->get_param_names(); int pi; for(pi=0;piget_data_type(param_names[pi]); ret->param_tbl->add_param(param_names[pi],dt->duplicate(), param_tbl->handle_access(param_names[pi])); } ret->definitions = definitions; ret->node_name = node_name + suffix; // make shallow copy of all fields ret->where = where; ret->from = from; ret->select_list = select_list; ret->key_flds = key_flds; ret->pred_t0 = pred_t0; ret->pred_t1 = pred_t1; ret->join_filter = join_filter; ret->postfilter = postfilter; ret->hash_eq = hash_eq; ret->hash_src_r = hash_src_r; ret->hash_src_l = hash_src_l; return ret; }; void create_protocol_se(vector q_sources, table_list *Schema); }; enum output_file_type_enum {regular, gzip, bzip}; class output_file_qpn: public qp_node{ public: std::string source_op_name; // Source table std::vector fields; ospec_str *output_spec; vector fm; std::string hfta_query_name; std::string filestream_id; bool eat_input; std::vector params; bool do_gzip; output_file_type_enum compression_type; int n_streams; // Number of output streams int n_hfta_clones; // number of hfta clones int parallel_idx; // which close this produces output for. std::vector hash_flds; // fields used to hash the output. std::string node_type(){return("output_file_qpn"); }; bool makes_transform(){return false;}; std::vector external_libs(){ std::vector ret; switch(compression_type){ case gzip: ret.push_back("-lz"); break; case bzip: ret.push_back("-lbz2"); break; default: break; } return ret; } void append_to_where(cnf_elem *c){ fprintf(stderr, "ERROR, append_to_where called on output_file_qpn, not supported, query %s.\n", node_name.c_str()); exit(1); } void bind_to_schema(table_list *Schema){} col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){ col_id_set ret; return ret; } std::string to_query_string(){return "// output_file_operator \n";} std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector &needs_xform); std::string generate_functor_name(); std::string generate_operator(int i, std::string params); std::string get_include_file(){ switch(compression_type){ case gzip: return("#include \n"); default: return("#include \n"); } return("#include \n"); }; std::vector get_where_clause(){std::vector ret; return ret;}; std::vector get_filter_clause(){std::vector ret; return ret;}; cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){cplx_lit_table *ret = new cplx_lit_table(); return ret;} std::vector get_handle_param_tbl(ext_fcn_list *Ext_fcns){std::vector ret; return ret;} table_def *get_fields(){ field_entry_list *fel = new field_entry_list(); int i; for(i=0;iappend_field(fields[i]); return new table_def(node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA); } // TODO! either bypass the output operator in stream_query, // or propagate key information when the output operator is constructed. std::vector get_tbl_keys(std::vector &partial_keys){ std::vector ret; return ret; } std::vector get_input_tbls(); std::vector get_output_tbls(); std::vector split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){ std::vector ret; ret.push_back(this); hfta_returned = true; return ret; } std::vector extract_opview(table_list *Schema, std::vector &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm){ std::vector ret; return ret; } // Ensure that any refs to interface params have been split away. int count_ifp_refs(std::set &ifpnames){return 0;} int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;}; output_file_qpn(std::string src_op, std::string qn, std::string fs_id, table_def *src_tbl_def, ospec_str *ospec, bool ei){ source_op_name = src_op; node_name = source_op_name + "_output"; filestream_id = fs_id; fields = src_tbl_def->get_fields(); output_spec = ospec; fm.push_back(new tablevar_t(source_op_name.c_str())); hfta_query_name = qn; eat_input = ei; // TODO stream checking, but it requires passing Schema to output_file_qpn /* for(int f=0;fget_schema_ref(); if(! Schema->is_stream(t)){ err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n"; error_code = 1; } } */ do_gzip = false; compression_type = regular; if(ospec->operator_type == "zfile") compression_type = gzip; n_streams = 1; parallel_idx = 0; n_hfta_clones = 1; char buf[1000]; strncpy(buf, output_spec->operator_param.c_str(),1000); buf[999] = '\0'; char *words[100]; int nwords = split_string(buf, ':', words,100); int i; for(i=0;iget_name() == target){ hash_flds.push_back(j); break; } } if(j==fields.size()){ err_flds += " "+target; } } if(err_flds != ""){ err_report += "ERROR in "+hfta_query_name+", a file output operator needs to split the output but these splitting fileds are not part of the output:"+err_flds; return true; } } return false; } // the following method is used for distributed query optimization double get_rate_estimate(){return 1.0;} qp_node* make_copy(std::string suffix){ // output_file_qpn *ret = new output_file_qpn(); output_file_qpn *ret = new output_file_qpn(source_op_name, hfta_query_name, filestream_id, this->get_fields(), output_spec, eat_input); return ret; } void create_protocol_se(vector q_sources, table_list *Schema){} }; // // --------------------------------------------- // Select, group-by, aggregate, sampling. // Representing // Select SE_1, ..., SE_k // From T // Where predicate // Group By gb1, ..., gb_n // [Subgroup gb_i1, .., gb_ik] // Cleaning_when predicate // Cleaning_by predicate // Having predicate // // For now, must have group-by variables and aggregates. // The scalar expressions which are output must be a function // of the groub-by variables and the aggregates. // The group-by variables can be references to columsn of T, // or they can be scalar expressions. class sgahcwcb_qpn: public qp_node{ public: tablevar_t *table_name; // source table std::vector where; // selection predicate std::vector having; // post-aggregation predicate std::vector select_list; // se's of output gb_table gb_tbl; // Table of all group-by attributes. std::set sg_tbl; // Names of the superGB attributes aggregate_table aggr_tbl; // Table of all referenced aggregates. std::set states_refd; // states ref'd by stateful fcns. std::vector cleanby; std::vector cleanwhen; std::vector gb_sources; // pre-compute for partitioning. std::vector get_gb_sources(){return gb_sources;} std::string node_type(){return("sgahcwcb_qpn"); }; bool makes_transform(){return true;}; std::vector external_libs(){ std::vector ret; return ret; } void bind_to_schema(table_list *Schema); col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){ fprintf(stderr,"INTERNAL ERROR, calling sgahcwcb_qpn::get_colrefs\n"); exit(1); } std::string to_query_string(); std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector &needs_xform); std::string generate_functor_name(); std::string generate_operator(int i, std::string params); std::string get_include_file(){return("#include \n");}; std::vector get_select_list(){return select_list;}; std::vector get_select_se_list(){ std::vector ret; int i; for(i=0;ise); return ret; }; std::vector get_where_clause(){return where;}; std::vector get_filter_clause(){return where;}; std::vector get_having_clause(){return having;}; gb_table *get_gb_tbl(){return &gb_tbl;}; aggregate_table *get_aggr_tbl(){return &aggr_tbl;}; cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns); std::vector get_handle_param_tbl(ext_fcn_list *Ext_fcns); // table which represents output tuple. table_def *get_fields(); // TODO Key extraction should be feasible but I'll defer the issue. std::vector get_tbl_keys(std::vector &partial_keys){ std::vector ret; return ret; } std::vector get_input_tbls(); std::vector get_output_tbls(); void append_to_where(cnf_elem *c){ where.push_back(c); } sgahcwcb_qpn(){ }; sgahcwcb_qpn(query_summary_class *qs,table_list *Schema){ // Get the table name. // NOTE the colrefs have the tablevar ref (an int) // embedded in them. Would it make sense // to grab the whole table list? tablevar_list_t *fm = qs->fta_tree->get_from(); std::vector tbl_vec = fm->get_table_list(); if(tbl_vec.size() != 1){ char tmpstr[200]; sprintf(tmpstr,"INTERNAL ERROR building SGAHCWCB node: query defined over %lu tables.\n",tbl_vec.size() ); err_str=tmpstr; error_code = 1; } table_name = (tbl_vec[0]); int t = tbl_vec[0]->get_schema_ref(); if(! Schema->is_stream(t)){ err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n"; error_code = 1; } // Get the select list. select_list = qs->fta_tree->get_sl_vec(); // Get the selection and having predicates. where = qs->wh_cnf; having = qs->hav_cnf; cleanby = qs->cb_cnf; cleanwhen = qs->cw_cnf; // Build a new GB var table (don't share, might need to modify) int g; for(g=0;ggb_tbl->size();g++){ gb_tbl.add_gb_var(qs->gb_tbl->get_name(g), qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g), qs->gb_tbl->get_reftype(g) ); } // Build a new aggregate table. (don't share, might need // to modify). int a; for(a=0;aaggr_tbl->size();a++){ aggr_tbl.add_aggr( // qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a) qs->aggr_tbl->duplicate(a) ); } sg_tbl = qs->sg_tbl; states_refd = qs->states_refd; // Get the parameters param_tbl = qs->param_tbl; }; std::vector split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx); virtual std::vector extract_opview(table_list *Schema, std::vector &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm); // Ensure that any refs to interface params have been split away. // CURRENTLY not allowed by split_node_for_fta int count_ifp_refs(std::set &ifpnames){return 0;} int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;} // the following method is used for distributed query optimization double get_rate_estimate(); qp_node* make_copy(std::string suffix){ sgahcwcb_qpn *ret = new sgahcwcb_qpn(); ret->param_tbl = new param_table(); std::vector param_names = param_tbl->get_param_names(); int pi; for(pi=0;piget_data_type(param_names[pi]); ret->param_tbl->add_param(param_names[pi],dt->duplicate(), param_tbl->handle_access(param_names[pi])); } ret->definitions = definitions; ret->node_name = node_name + suffix; // make shallow copy of all fields ret->where = where; ret->having = having; ret->select_list = select_list; ret->gb_tbl = gb_tbl; ret->aggr_tbl = aggr_tbl; ret->sg_tbl = sg_tbl; ret->states_refd = states_refd; ret->cleanby = cleanby; ret->cleanwhen = cleanwhen; return ret; }; void create_protocol_se(vector q_sources, table_list *Schema); }; std::vector create_query_nodes(query_summary_class *qs,table_list *Schema); void untaboo(string &s); table_def *create_attributes(string tname, vector &select_list); #endif