X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=src%2Fftacmp%2Fquery_plan.h;h=ec782e0318cdcecd80372efba8cf080700d26e39;hb=f1754ecea2eab7bd0a302042ac82eb11667b166c;hp=8c7d32a4e3a82faed1bb76c1be7432fe978c7347;hpb=3ff5c433efcaee8b01fbeed90ab848008f2e6278;p=com%2Fgs-lite.git diff --git a/src/ftacmp/query_plan.h b/src/ftacmp/query_plan.h index 8c7d32a..ec782e0 100644 --- a/src/ftacmp/query_plan.h +++ b/src/ftacmp/query_plan.h @@ -87,7 +87,9 @@ struct query_node{ mangler=""; tablevar_list_t *fm = parse_tree->get_from(); - refd_tbls = fm->get_table_names(); + if(fm!=NULL){ + refd_tbls = fm->get_table_names(); + } params = pt->query_params; }; @@ -225,10 +227,13 @@ public: // Each qp node must be able to return a description // of the tuples it creates. -// TODO: the get_output_tls method should subsume the get_fields +// TODO: the get_output_tbls method should subsume the get_fields // method, but in fact it really just returns the // operator name. virtual table_def *get_fields() = 0; // Should be vector? +// get keys from the operator. Currently, only works on group-by queries. +// partial_keys set to true if there is a suspicion that the list is partial. + virtual std::vector get_tbl_keys(std::vector &partial_keys) = 0; // Get the from clause virtual std::vector get_input_tbls() = 0; // this is a confused function, it acutally return the output @@ -293,9 +298,12 @@ public: // The "where" clause is a pre-filter virtual std::vector get_where_clause() = 0; -// To be more explicit, use get_filter_preds +// To be more explicit, use get_filter_preds, this is used to compute the prefilter virtual std::vector get_filter_clause() = 0; +// Add an extra predicate. Currently only used for LFTAs. + virtual void append_to_where(cnf_elem *c) = 0; + void add_predecessor(int i){predecessors.push_back(i);}; void remove_predecessor(int i){ std::vector::iterator vi; @@ -347,6 +355,11 @@ public: return ret; } + void append_to_where(cnf_elem *c){ + where.push_back(c); + } + + void bind_to_schema(table_list *Schema); col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema); @@ -369,6 +382,11 @@ public: std::vector get_handle_param_tbl(ext_fcn_list *Ext_fcns); table_def *get_fields(); + std::vector get_tbl_keys(std::vector &partial_keys){ + std::vector ret; + return ret; + } + std::vector get_input_tbls(); std::vector get_output_tbls(); @@ -386,6 +404,7 @@ public: // embedded in them. Would it make sense // to grab the whole table list? tablevar_list_t *fm = qs->fta_tree->get_from(); + std::vector tbl_vec = fm->get_table_list(); if(tbl_vec.size() != 1){ char tmpstr[200]; @@ -395,6 +414,12 @@ public: } table_name = (tbl_vec[0]); + int t = tbl_vec[0]->get_schema_ref(); + if(! Schema->is_stream(t)){ + err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n"; + error_code = 1; + } + // Get the select list. select_list = qs->fta_tree->get_sl_vec(); @@ -507,6 +532,11 @@ public: return ret; }; std::vector get_where_clause(){return where;}; + + void append_to_where(cnf_elem *c){ + where.push_back(c); + } + std::vector get_filter_clause(){return where;}; std::vector get_having_clause(){return having;}; gb_table *get_gb_tbl(){return &gb_tbl;}; @@ -516,6 +546,7 @@ public: // table which represents output tuple. table_def *get_fields(); + std::vector get_tbl_keys(std::vector &partial_keys); std::vector get_input_tbls(); std::vector get_output_tbls(); @@ -542,6 +573,13 @@ public: } table_name = (tbl_vec[0]); + int t = tbl_vec[0]->get_schema_ref(); + if(! Schema->is_stream(t)){ + err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n"; + error_code = 1; + } + + // Get the select list. select_list = qs->fta_tree->get_sl_vec(); @@ -690,6 +728,10 @@ public: return ret; }; std::vector get_where_clause(){return where;}; + void append_to_where(cnf_elem *c){ + where.push_back(c); + } + std::vector get_filter_clause(){return where;}; std::vector get_having_clause(){return having;}; std::vector get_closing_when_clause(){return closing_when;}; @@ -700,6 +742,8 @@ public: // table which represents output tuple. table_def *get_fields(); + std::vector get_tbl_keys(std::vector &partial_keys); + std::vector get_input_tbls(); std::vector get_output_tbls(); @@ -726,6 +770,12 @@ public: } table_name = (tbl_vec[0]); + int t = tbl_vec[0]->get_schema_ref(); + if(! Schema->is_stream(t)){ + err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n"; + error_code = 1; + } + // Get the select list. select_list = qs->fta_tree->get_sl_vec(); @@ -800,8 +850,158 @@ public: }; + +// Watchlist - from a table read from an external source. + +class watch_tbl_qpn: public qp_node{ +public: + table_def *table_layout; // the output schema + std::vector key_flds; + +// Parameters related to loading the table + std::string filename; + int refresh_interval; + + + void append_to_where(cnf_elem *c){ + fprintf(stderr, "ERROR, append_to_where called on watch_tbl_qpn, not supported, query %s.\n", node_name.c_str()); + exit(1); + } + + std::string node_type(){return("watch_tbl_qpn"); }; + bool makes_transform(){return false;}; + std::vector external_libs(){ + std::vector ret; + return ret; + } + + void bind_to_schema(table_list *Schema){} + col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){ + col_id_set ret; + return ret; + } + + std::string to_query_string(); + std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector &needs_xform); + std::string generate_functor_name(); + std::string generate_operator(int i, std::string params); + std::string get_include_file(){ + return("#include \n"); + }; + + cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns); + std::vector get_handle_param_tbl(ext_fcn_list *Ext_fcns); + + table_def *get_fields(); + std::vector get_tbl_keys(std::vector &partial_keys){ + return key_flds; + } + + std::vector get_input_tbls(); + std::vector get_output_tbls(); + + std::vector split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx); + virtual std::vector extract_opview(table_list *Schema, std::vector &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm); +// Ensure that any refs to interface params have been split away. + int count_ifp_refs(std::set &ifpnames); + +// No predicates, return an empty clause + std::vector get_where_clause(){ + std::vector t; + return(t); + }; + std::vector get_filter_clause(){ + return get_where_clause(); + } + + watch_tbl_qpn(){ + }; + + watch_tbl_qpn(query_summary_class *qs,table_list *Schema){ + node_name=qs->query_name; + param_tbl = qs->param_tbl; + definitions = qs->definitions; + + +// Populate the schema + table_layout = new table_def( + node_name.c_str(), NULL, NULL, qs->fta_tree->fel, WATCHLIST_SCHEMA + ); + +// Find the keys + std::vector flds = qs->fta_tree->fel->get_list(); + for(int f=0;fget_modifier_list()->contains_key("key") || + flds[f]->get_modifier_list()->contains_key("Key") || + flds[f]->get_modifier_list()->contains_key("KEY") ){ + key_flds.push_back(flds[f]->get_name()); + } + } + if(key_flds.size()==0){ + fprintf(stderr,"Error, no key fields defined for table watchlist %s\n",node_name.c_str()); + exit(1); + } + + table_layout->set_keys(key_flds); // communicate keys to consumers + +// Get loading parameters + if(definitions.count("filename")>0){ + filename = definitions["filename"]; + }else{ + fprintf(stderr, "Error, no filename for source data defined for table watchlist %s\n",node_name.c_str()); + exit(1); + } + if(definitions.count("refresh_interval")>0){ + refresh_interval = atoi(definitions["refresh_interval"].c_str()); + if(refresh_interval <= 0){ + fprintf(stderr, "Error, the refresh_interval (%s) of table watchlist %s must be a positive non-zero integer.\n",definitions["refresh_interval"].c_str(), node_name.c_str()); + exit(1); + } + }else{ + fprintf(stderr, "Error, no refresh_interval defined for table watchlist %s\n",node_name.c_str()); + exit(1); + } + + } + + qp_node *make_copy(std::string suffix){ + watch_tbl_qpn *ret = new watch_tbl_qpn(); + ret->filename = filename; + ret->refresh_interval = refresh_interval; + ret->key_flds = key_flds; + + ret->param_tbl = new param_table(); + std::vector param_names = param_tbl->get_param_names(); + int pi; + for(pi=0;piget_data_type(param_names[pi]); + ret->param_tbl->add_param(param_names[pi],dt->duplicate(), + param_tbl->handle_access(param_names[pi])); + } + ret->definitions = definitions; + + ret->node_name = node_name + suffix; + ret->table_layout = table_layout->make_shallow_copy(ret->node_name); + + return ret; + }; + + // the following method is used for distributed query optimization + double get_rate_estimate(); + + void create_protocol_se(vector q_sources, table_list *Schema); + + +}; + + + + + + // forward reference class filter_join_qpn; +class watch_join_qpn; // (temporal) Merge query plan node. @@ -833,6 +1033,12 @@ public: partn_def_t* partn_def; + void append_to_where(cnf_elem *c){ + fprintf(stderr, "ERROR, append_to_where called on mrg_qpn, not supported, query %s.\n", node_name.c_str()); + exit(1); + } + + std::string node_type(){return("mrg_qpn"); }; bool makes_transform(){return false;}; @@ -861,6 +1067,11 @@ public: std::vector get_handle_param_tbl(ext_fcn_list *Ext_fcns); table_def *get_fields(); + std::vector get_tbl_keys(std::vector &partial_keys){ + std::vector ret; + return ret; + } + std::vector get_input_tbls(); std::vector get_output_tbls(); @@ -900,6 +1111,14 @@ public: exit(1); } + for(int f=0;fget_schema_ref(); + if(! Schema->is_stream(t)){ + err_str += "ERROR in query "+node_name+", the source "+fm[f]->get_schema_name()+" is not a stream.\n"; + error_code = 1; + } + } + // Get the parameters param_tbl = qs->param_tbl; @@ -1059,6 +1278,10 @@ printf("\n"); mrg_qpn(filter_join_qpn *spx, std::string n_name, std::vector &sources, std::vector > &ifaces, ifq_t *ifdb); +// Merge watch_join LFTAs. + + mrg_qpn(watch_join_qpn *spx, std::string n_name, std::vector &sources, std::vector > &ifaces, ifq_t *ifdb); + // Merge selection LFTAs. mrg_qpn(spx_qpn *spx, std::string n_name, std::vector &sources, std::vector > &ifaces, ifq_t *ifdb){ @@ -1315,6 +1538,12 @@ public: exit(1); } + void append_to_where(cnf_elem *c){ + fprintf(stderr, "Error, append_to_where called on join_hash_qpn, not supported, query is %s\n",node_name.c_str()); + exit(1); + } + + std::string to_query_string(); std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector &needs_xform); std::string generate_functor_name(); @@ -1341,6 +1570,13 @@ public: std::vector get_handle_param_tbl(ext_fcn_list *Ext_fcns); table_def *get_fields(); + +// It might be feasible to find keys in an equijoin expression. + std::vector get_tbl_keys(std::vector &partial_keys){ + std::vector ret; + return ret; + } + std::vector get_input_tbls(); std::vector get_output_tbls(); @@ -1365,6 +1601,15 @@ public: error_code = 1; } + for(int f=0;fget_schema_ref(); + if(! Schema->is_stream(t)){ + err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n"; + error_code = 1; + } + } + + // Get the select list. select_list = qs->fta_tree->get_sl_vec(); @@ -1541,6 +1786,10 @@ public: return ret; }; // Used for LFTA only + void append_to_where(cnf_elem *c){ + where.push_back(c); + } + std::vector get_where_clause(){return where;} std::vector get_filter_clause(){return shared_pred;} @@ -1548,6 +1797,12 @@ public: std::vector get_handle_param_tbl(ext_fcn_list *Ext_fcns); table_def *get_fields(); +// It should be feasible to find keys in a filter join + std::vector get_tbl_keys(std::vector &partial_keys){ + std::vector ret; + return ret; + } + std::vector get_input_tbls(); std::vector get_output_tbls(); @@ -1558,7 +1813,7 @@ public: // Ensure that any refs to interface params have been split away. int count_ifp_refs(std::set &ifpnames); - +// CONSTRUCTOR filter_join_qpn(){ }; filter_join_qpn(query_summary_class *qs,table_list *Schema){ @@ -1576,6 +1831,19 @@ public: err_str += tmpstr; error_code = 1; } + if(from[0]->get_interface() != from[1]->get_interface()){ + err_str += "Error building filter_join_qpn node: all range variables must be sourced from the same interface or interface set ("+from[0]->get_interface()+" vs. "+from[1]->get_interface()+")\n"; + error_code = 1; + } + + for(int f=0;fget_schema_ref(); + if(! Schema->is_stream(t)){ + err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n"; + error_code = 1; + } + } + // Get the select list. select_list = qs->fta_tree->get_sl_vec(); @@ -1712,6 +1980,237 @@ public: }; + +// TODO : put tests on other operators to ensure they dont' read from a watchlist +// TODO : register with : is_partn_compatible pushdown_partn_operator is_pushdown_compatible pushdown_operator ? +class watch_join_qpn: public qp_node{ +public: + std::vector from; // Source tables + std::vector select_list; // Select list + std::vector pred_t0; // main (R) preds + std::vector pred_t1; // watchlist-only (S) preds (?) + std::map hash_eq; // predicates on S hash keys + std::vector join_filter; // ref's R, S, but not a hash + std::vector postfilter; // ref's no table. + + std::vector key_flds; + + std::vector where; // all the filters + // useful for summary analysis + + std::vector hash_src_r, hash_src_l; + std::vector get_hash_r(){return hash_src_r;} + std::vector get_hash_l(){return hash_src_l;} + + + + std::string node_type(){return("watch_join"); }; + bool makes_transform(){return true;}; + std::vector external_libs(){ + std::vector ret; + return ret; + } + + void bind_to_schema(table_list *Schema); + col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema); + + std::string to_query_string(); + std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector &needs_xform){ + fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_functor called\n"); + exit(1); + } + std::string generate_functor_name(){ + fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_functor_name called\n"); + exit(1); + } + std::string generate_operator(int i, std::string params){ + fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_operator called\n"); + exit(1); + } + std::string get_include_file(){return("#include \n");}; + + std::vector get_select_list(){return select_list;}; + std::vector get_select_se_list(){ + std::vector ret; + int i; + for(i=0;ise); + return ret; + }; +// Used for LFTA only + void append_to_where(cnf_elem *c){ + where.push_back(c); + } + + std::vector get_where_clause(){return where;} + + std::vector get_filter_clause(){return pred_t0;} + + cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns); + std::vector get_handle_param_tbl(ext_fcn_list *Ext_fcns); + + table_def *get_fields(); +// It should be feasible to find keys in a watchlist join + std::vector get_tbl_keys(std::vector &partial_keys){ + std::vector ret; + return ret; + } + + std::vector get_input_tbls(); + std::vector get_output_tbls(); + + std::vector split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx); + int resolve_if_params(ifq_t *ifdb, std::string &err); + + virtual std::vector extract_opview(table_list *Schema, std::vector &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm); +// Ensure that any refs to interface params have been split away. + int count_ifp_refs(std::set &ifpnames); + +// CONSTRUCTOR + watch_join_qpn(){ + }; + watch_join_qpn(query_summary_class *qs,table_list *Schema){ + int i,w; +// Get the table name. +// NOTE the colrefs have the table ref (an int) +// embedded in them. Would it make sense +// to grab the whole table list? + from = qs->fta_tree->get_from()->get_table_list(); + if(from.size() != 2){ + char tmpstr[200]; + sprintf(tmpstr,"ERROR building filter_join_qpn node: query defined over %lu tables, but joins must be between two sources.\n",from.size() ); + err_str += tmpstr; + error_code = 1; + } + + int t = from[0]->get_schema_ref(); + if(Schema->get_schema_type(t) != PROTOCOL_SCHEMA){ + err_str += "ERROR in query "+node_name+", the LHS of the join must be a PROTOCOL\n"; + error_code = 1; + } + t = from[1]->get_schema_ref(); + if(Schema->get_schema_type(t) != WATCHLIST_SCHEMA){ + err_str += "ERROR in query "+node_name+", the RHS of the join must be a WATCHLIST\n"; + error_code = 1; + } + key_flds = Schema->get_table(t)->get_keys(); + + +// Get the select list. + select_list = qs->fta_tree->get_sl_vec(); + +// Get the selection predicate. + where = qs->wh_cnf; + std::vector t0_only, t1_only; + for(w=0;w pred_tbls; + get_tablevar_ref_pr(where[w]->pr,pred_tbls); +// Collect the list of preds by src var, +// extract the shared preds later. + if(pred_tbls.size()==1){ + if(pred_tbls[0] == 0){ + pred_t0.push_back(where[w]); + }else{ + pred_t1.push_back(where[w]); + } + continue; + } +// refs nothing -- might be sampling, do it as postfilter. + if(pred_tbls.size()==0){ + postfilter.push_back(where[w]); + continue; + } + +// Must reference both +// See if it can be a hash predicate. + if(where[w]->is_atom && where[w]->eq_pred){ + std::vector sel_tbls, ser_tbls; + get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls); + get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls); + if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){ +// make channel 0 SE on LHS. + if(sel_tbls[0] != 0) + where[w]->swap_scalar_operands(); + +// Must be simple (a colref) on the RHS + if(where[w]->r_simple){ + string rcol = where[w]->pr->get_right_se()->get_colref()->get_field(); + if(std::find(key_flds.begin(), key_flds.end(), rcol) != key_flds.end()){ + hash_eq[rcol] = where[w]; + + data_type *dtl=where[w]->pr->get_left_se()->get_data_type(); + data_type *dtr=where[w]->pr->get_right_se()->get_data_type(); + if( (dtl->is_increasing() && dtr->is_increasing()) || (dtl->is_decreasing() && dtr->is_decreasing()) ) + err_str += "Warning, a watchlist join should not have join predicates on temporal fields, query "+node_name+".\n"; + continue; + } + } + } + } +// All tests failed, fallback is join_filter. + join_filter.push_back(where[w]); + } + + if(key_flds.size() > hash_eq.size()){ + err_str += "Error, in query "+node_name+" the watchlist join does not cover all fields in the watchlist with an equality predicate. Missing fields are"; + for(int k=0;kparam_tbl; + definitions = qs->definitions; + + }; + + // the following method is used for distributed query optimization + double get_rate_estimate(); + + + qp_node* make_copy(std::string suffix){ + watch_join_qpn *ret = new watch_join_qpn(); + + ret->param_tbl = new param_table(); + std::vector param_names = param_tbl->get_param_names(); + int pi; + for(pi=0;piget_data_type(param_names[pi]); + ret->param_tbl->add_param(param_names[pi],dt->duplicate(), + param_tbl->handle_access(param_names[pi])); + } + ret->definitions = definitions; + + ret->node_name = node_name + suffix; + + // make shallow copy of all fields + ret->where = where; + ret->from = from; + ret->select_list = select_list; + ret->key_flds = key_flds; + ret->pred_t0 = pred_t0; + ret->pred_t1 = pred_t1; + ret->join_filter = join_filter; + ret->postfilter = postfilter; + ret->hash_eq = hash_eq; + ret->hash_src_r = hash_src_r; + ret->hash_src_l = hash_src_l; + + return ret; + }; + + void create_protocol_se(vector q_sources, table_list *Schema); + +}; + + + + enum output_file_type_enum {regular, gzip, bzip}; class output_file_qpn: public qp_node{ @@ -1749,6 +2248,13 @@ public: return ret; } + void append_to_where(cnf_elem *c){ + fprintf(stderr, "ERROR, append_to_where called on output_file_qpn, not supported, query %s.\n", node_name.c_str()); + exit(1); + } + + + void bind_to_schema(table_list *Schema){} col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){ col_id_set ret; @@ -1781,6 +2287,14 @@ public: fel->append_field(fields[i]); return new table_def(node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA); } + +// TODO! either bypass the output operator in stream_query, +// or propagate key information when the output operator is constructed. + std::vector get_tbl_keys(std::vector &partial_keys){ + std::vector ret; + return ret; + } + std::vector get_input_tbls(); std::vector get_output_tbls(); @@ -1805,6 +2319,18 @@ public: hfta_query_name = qn; eat_input = ei; +// TODO stream checking, but it requires passing Schema to output_file_qpn +/* + for(int f=0;fget_schema_ref(); + if(! Schema->is_stream(t)){ + err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n"; + error_code = 1; + } + } +*/ + + do_gzip = false; compression_type = regular; if(ospec->operator_type == "zfile") @@ -1952,9 +2478,19 @@ public: // table which represents output tuple. table_def *get_fields(); +// TODO Key extraction should be feasible but I'll defer the issue. + std::vector get_tbl_keys(std::vector &partial_keys){ + std::vector ret; + return ret; + } + std::vector get_input_tbls(); std::vector get_output_tbls(); + void append_to_where(cnf_elem *c){ + where.push_back(c); + } + sgahcwcb_qpn(){ }; @@ -1973,6 +2509,13 @@ public: } table_name = (tbl_vec[0]); + int t = tbl_vec[0]->get_schema_ref(); + if(! Schema->is_stream(t)){ + err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n"; + error_code = 1; + } + + // Get the select list. select_list = qs->fta_tree->get_sl_vec(); @@ -2064,5 +2607,4 @@ void untaboo(string &s); table_def *create_attributes(string tname, vector &select_list); - #endif