X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=src%2Fftacmp%2Fquery_plan.h;h=ec782e0318cdcecd80372efba8cf080700d26e39;hb=f6f278dfd4bca7effd6f452d38ce74cf109d41f4;hp=3c7d3f4387d76b5d6ee0d52c04c2e446ae6d0bae;hpb=7210c67dde90098460d1f0922deeb810be521673;p=com%2Fgs-lite.git diff --git a/src/ftacmp/query_plan.h b/src/ftacmp/query_plan.h index 3c7d3f4..ec782e0 100644 --- a/src/ftacmp/query_plan.h +++ b/src/ftacmp/query_plan.h @@ -87,7 +87,9 @@ struct query_node{ mangler=""; tablevar_list_t *fm = parse_tree->get_from(); - refd_tbls = fm->get_table_names(); + if(fm!=NULL){ + refd_tbls = fm->get_table_names(); + } params = pt->query_params; }; @@ -296,7 +298,7 @@ public: // The "where" clause is a pre-filter virtual std::vector get_where_clause() = 0; -// To be more explicit, use get_filter_preds +// To be more explicit, use get_filter_preds, this is used to compute the prefilter virtual std::vector get_filter_clause() = 0; // Add an extra predicate. Currently only used for LFTAs. @@ -402,6 +404,7 @@ public: // embedded in them. Would it make sense // to grab the whole table list? tablevar_list_t *fm = qs->fta_tree->get_from(); + std::vector tbl_vec = fm->get_table_list(); if(tbl_vec.size() != 1){ char tmpstr[200]; @@ -411,6 +414,12 @@ public: } table_name = (tbl_vec[0]); + int t = tbl_vec[0]->get_schema_ref(); + if(! Schema->is_stream(t)){ + err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n"; + error_code = 1; + } + // Get the select list. select_list = qs->fta_tree->get_sl_vec(); @@ -564,6 +573,13 @@ public: } table_name = (tbl_vec[0]); + int t = tbl_vec[0]->get_schema_ref(); + if(! Schema->is_stream(t)){ + err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n"; + error_code = 1; + } + + // Get the select list. select_list = qs->fta_tree->get_sl_vec(); @@ -754,6 +770,12 @@ public: } table_name = (tbl_vec[0]); + int t = tbl_vec[0]->get_schema_ref(); + if(! Schema->is_stream(t)){ + err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n"; + error_code = 1; + } + // Get the select list. select_list = qs->fta_tree->get_sl_vec(); @@ -828,8 +850,158 @@ public: }; + +// Watchlist - from a table read from an external source. + +class watch_tbl_qpn: public qp_node{ +public: + table_def *table_layout; // the output schema + std::vector key_flds; + +// Parameters related to loading the table + std::string filename; + int refresh_interval; + + + void append_to_where(cnf_elem *c){ + fprintf(stderr, "ERROR, append_to_where called on watch_tbl_qpn, not supported, query %s.\n", node_name.c_str()); + exit(1); + } + + std::string node_type(){return("watch_tbl_qpn"); }; + bool makes_transform(){return false;}; + std::vector external_libs(){ + std::vector ret; + return ret; + } + + void bind_to_schema(table_list *Schema){} + col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){ + col_id_set ret; + return ret; + } + + std::string to_query_string(); + std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector &needs_xform); + std::string generate_functor_name(); + std::string generate_operator(int i, std::string params); + std::string get_include_file(){ + return("#include \n"); + }; + + cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns); + std::vector get_handle_param_tbl(ext_fcn_list *Ext_fcns); + + table_def *get_fields(); + std::vector get_tbl_keys(std::vector &partial_keys){ + return key_flds; + } + + std::vector get_input_tbls(); + std::vector get_output_tbls(); + + std::vector split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx); + virtual std::vector extract_opview(table_list *Schema, std::vector &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm); +// Ensure that any refs to interface params have been split away. + int count_ifp_refs(std::set &ifpnames); + +// No predicates, return an empty clause + std::vector get_where_clause(){ + std::vector t; + return(t); + }; + std::vector get_filter_clause(){ + return get_where_clause(); + } + + watch_tbl_qpn(){ + }; + + watch_tbl_qpn(query_summary_class *qs,table_list *Schema){ + node_name=qs->query_name; + param_tbl = qs->param_tbl; + definitions = qs->definitions; + + +// Populate the schema + table_layout = new table_def( + node_name.c_str(), NULL, NULL, qs->fta_tree->fel, WATCHLIST_SCHEMA + ); + +// Find the keys + std::vector flds = qs->fta_tree->fel->get_list(); + for(int f=0;fget_modifier_list()->contains_key("key") || + flds[f]->get_modifier_list()->contains_key("Key") || + flds[f]->get_modifier_list()->contains_key("KEY") ){ + key_flds.push_back(flds[f]->get_name()); + } + } + if(key_flds.size()==0){ + fprintf(stderr,"Error, no key fields defined for table watchlist %s\n",node_name.c_str()); + exit(1); + } + + table_layout->set_keys(key_flds); // communicate keys to consumers + +// Get loading parameters + if(definitions.count("filename")>0){ + filename = definitions["filename"]; + }else{ + fprintf(stderr, "Error, no filename for source data defined for table watchlist %s\n",node_name.c_str()); + exit(1); + } + if(definitions.count("refresh_interval")>0){ + refresh_interval = atoi(definitions["refresh_interval"].c_str()); + if(refresh_interval <= 0){ + fprintf(stderr, "Error, the refresh_interval (%s) of table watchlist %s must be a positive non-zero integer.\n",definitions["refresh_interval"].c_str(), node_name.c_str()); + exit(1); + } + }else{ + fprintf(stderr, "Error, no refresh_interval defined for table watchlist %s\n",node_name.c_str()); + exit(1); + } + + } + + qp_node *make_copy(std::string suffix){ + watch_tbl_qpn *ret = new watch_tbl_qpn(); + ret->filename = filename; + ret->refresh_interval = refresh_interval; + ret->key_flds = key_flds; + + ret->param_tbl = new param_table(); + std::vector param_names = param_tbl->get_param_names(); + int pi; + for(pi=0;piget_data_type(param_names[pi]); + ret->param_tbl->add_param(param_names[pi],dt->duplicate(), + param_tbl->handle_access(param_names[pi])); + } + ret->definitions = definitions; + + ret->node_name = node_name + suffix; + ret->table_layout = table_layout->make_shallow_copy(ret->node_name); + + return ret; + }; + + // the following method is used for distributed query optimization + double get_rate_estimate(); + + void create_protocol_se(vector q_sources, table_list *Schema); + + +}; + + + + + + // forward reference class filter_join_qpn; +class watch_join_qpn; // (temporal) Merge query plan node. @@ -939,6 +1111,14 @@ public: exit(1); } + for(int f=0;fget_schema_ref(); + if(! Schema->is_stream(t)){ + err_str += "ERROR in query "+node_name+", the source "+fm[f]->get_schema_name()+" is not a stream.\n"; + error_code = 1; + } + } + // Get the parameters param_tbl = qs->param_tbl; @@ -1098,6 +1278,10 @@ printf("\n"); mrg_qpn(filter_join_qpn *spx, std::string n_name, std::vector &sources, std::vector > &ifaces, ifq_t *ifdb); +// Merge watch_join LFTAs. + + mrg_qpn(watch_join_qpn *spx, std::string n_name, std::vector &sources, std::vector > &ifaces, ifq_t *ifdb); + // Merge selection LFTAs. mrg_qpn(spx_qpn *spx, std::string n_name, std::vector &sources, std::vector > &ifaces, ifq_t *ifdb){ @@ -1417,6 +1601,15 @@ public: error_code = 1; } + for(int f=0;fget_schema_ref(); + if(! Schema->is_stream(t)){ + err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n"; + error_code = 1; + } + } + + // Get the select list. select_list = qs->fta_tree->get_sl_vec(); @@ -1620,7 +1813,7 @@ public: // Ensure that any refs to interface params have been split away. int count_ifp_refs(std::set &ifpnames); - +// CONSTRUCTOR filter_join_qpn(){ }; filter_join_qpn(query_summary_class *qs,table_list *Schema){ @@ -1638,6 +1831,19 @@ public: err_str += tmpstr; error_code = 1; } + if(from[0]->get_interface() != from[1]->get_interface()){ + err_str += "Error building filter_join_qpn node: all range variables must be sourced from the same interface or interface set ("+from[0]->get_interface()+" vs. "+from[1]->get_interface()+")\n"; + error_code = 1; + } + + for(int f=0;fget_schema_ref(); + if(! Schema->is_stream(t)){ + err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n"; + error_code = 1; + } + } + // Get the select list. select_list = qs->fta_tree->get_sl_vec(); @@ -1774,6 +1980,237 @@ public: }; + +// TODO : put tests on other operators to ensure they dont' read from a watchlist +// TODO : register with : is_partn_compatible pushdown_partn_operator is_pushdown_compatible pushdown_operator ? +class watch_join_qpn: public qp_node{ +public: + std::vector from; // Source tables + std::vector select_list; // Select list + std::vector pred_t0; // main (R) preds + std::vector pred_t1; // watchlist-only (S) preds (?) + std::map hash_eq; // predicates on S hash keys + std::vector join_filter; // ref's R, S, but not a hash + std::vector postfilter; // ref's no table. + + std::vector key_flds; + + std::vector where; // all the filters + // useful for summary analysis + + std::vector hash_src_r, hash_src_l; + std::vector get_hash_r(){return hash_src_r;} + std::vector get_hash_l(){return hash_src_l;} + + + + std::string node_type(){return("watch_join"); }; + bool makes_transform(){return true;}; + std::vector external_libs(){ + std::vector ret; + return ret; + } + + void bind_to_schema(table_list *Schema); + col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema); + + std::string to_query_string(); + std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector &needs_xform){ + fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_functor called\n"); + exit(1); + } + std::string generate_functor_name(){ + fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_functor_name called\n"); + exit(1); + } + std::string generate_operator(int i, std::string params){ + fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_operator called\n"); + exit(1); + } + std::string get_include_file(){return("#include \n");}; + + std::vector get_select_list(){return select_list;}; + std::vector get_select_se_list(){ + std::vector ret; + int i; + for(i=0;ise); + return ret; + }; +// Used for LFTA only + void append_to_where(cnf_elem *c){ + where.push_back(c); + } + + std::vector get_where_clause(){return where;} + + std::vector get_filter_clause(){return pred_t0;} + + cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns); + std::vector get_handle_param_tbl(ext_fcn_list *Ext_fcns); + + table_def *get_fields(); +// It should be feasible to find keys in a watchlist join + std::vector get_tbl_keys(std::vector &partial_keys){ + std::vector ret; + return ret; + } + + std::vector get_input_tbls(); + std::vector get_output_tbls(); + + std::vector split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx); + int resolve_if_params(ifq_t *ifdb, std::string &err); + + virtual std::vector extract_opview(table_list *Schema, std::vector &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm); +// Ensure that any refs to interface params have been split away. + int count_ifp_refs(std::set &ifpnames); + +// CONSTRUCTOR + watch_join_qpn(){ + }; + watch_join_qpn(query_summary_class *qs,table_list *Schema){ + int i,w; +// Get the table name. +// NOTE the colrefs have the table ref (an int) +// embedded in them. Would it make sense +// to grab the whole table list? + from = qs->fta_tree->get_from()->get_table_list(); + if(from.size() != 2){ + char tmpstr[200]; + sprintf(tmpstr,"ERROR building filter_join_qpn node: query defined over %lu tables, but joins must be between two sources.\n",from.size() ); + err_str += tmpstr; + error_code = 1; + } + + int t = from[0]->get_schema_ref(); + if(Schema->get_schema_type(t) != PROTOCOL_SCHEMA){ + err_str += "ERROR in query "+node_name+", the LHS of the join must be a PROTOCOL\n"; + error_code = 1; + } + t = from[1]->get_schema_ref(); + if(Schema->get_schema_type(t) != WATCHLIST_SCHEMA){ + err_str += "ERROR in query "+node_name+", the RHS of the join must be a WATCHLIST\n"; + error_code = 1; + } + key_flds = Schema->get_table(t)->get_keys(); + + +// Get the select list. + select_list = qs->fta_tree->get_sl_vec(); + +// Get the selection predicate. + where = qs->wh_cnf; + std::vector t0_only, t1_only; + for(w=0;w pred_tbls; + get_tablevar_ref_pr(where[w]->pr,pred_tbls); +// Collect the list of preds by src var, +// extract the shared preds later. + if(pred_tbls.size()==1){ + if(pred_tbls[0] == 0){ + pred_t0.push_back(where[w]); + }else{ + pred_t1.push_back(where[w]); + } + continue; + } +// refs nothing -- might be sampling, do it as postfilter. + if(pred_tbls.size()==0){ + postfilter.push_back(where[w]); + continue; + } + +// Must reference both +// See if it can be a hash predicate. + if(where[w]->is_atom && where[w]->eq_pred){ + std::vector sel_tbls, ser_tbls; + get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls); + get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls); + if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){ +// make channel 0 SE on LHS. + if(sel_tbls[0] != 0) + where[w]->swap_scalar_operands(); + +// Must be simple (a colref) on the RHS + if(where[w]->r_simple){ + string rcol = where[w]->pr->get_right_se()->get_colref()->get_field(); + if(std::find(key_flds.begin(), key_flds.end(), rcol) != key_flds.end()){ + hash_eq[rcol] = where[w]; + + data_type *dtl=where[w]->pr->get_left_se()->get_data_type(); + data_type *dtr=where[w]->pr->get_right_se()->get_data_type(); + if( (dtl->is_increasing() && dtr->is_increasing()) || (dtl->is_decreasing() && dtr->is_decreasing()) ) + err_str += "Warning, a watchlist join should not have join predicates on temporal fields, query "+node_name+".\n"; + continue; + } + } + } + } +// All tests failed, fallback is join_filter. + join_filter.push_back(where[w]); + } + + if(key_flds.size() > hash_eq.size()){ + err_str += "Error, in query "+node_name+" the watchlist join does not cover all fields in the watchlist with an equality predicate. Missing fields are"; + for(int k=0;kparam_tbl; + definitions = qs->definitions; + + }; + + // the following method is used for distributed query optimization + double get_rate_estimate(); + + + qp_node* make_copy(std::string suffix){ + watch_join_qpn *ret = new watch_join_qpn(); + + ret->param_tbl = new param_table(); + std::vector param_names = param_tbl->get_param_names(); + int pi; + for(pi=0;piget_data_type(param_names[pi]); + ret->param_tbl->add_param(param_names[pi],dt->duplicate(), + param_tbl->handle_access(param_names[pi])); + } + ret->definitions = definitions; + + ret->node_name = node_name + suffix; + + // make shallow copy of all fields + ret->where = where; + ret->from = from; + ret->select_list = select_list; + ret->key_flds = key_flds; + ret->pred_t0 = pred_t0; + ret->pred_t1 = pred_t1; + ret->join_filter = join_filter; + ret->postfilter = postfilter; + ret->hash_eq = hash_eq; + ret->hash_src_r = hash_src_r; + ret->hash_src_l = hash_src_l; + + return ret; + }; + + void create_protocol_se(vector q_sources, table_list *Schema); + +}; + + + + enum output_file_type_enum {regular, gzip, bzip}; class output_file_qpn: public qp_node{ @@ -1882,6 +2319,18 @@ public: hfta_query_name = qn; eat_input = ei; +// TODO stream checking, but it requires passing Schema to output_file_qpn +/* + for(int f=0;fget_schema_ref(); + if(! Schema->is_stream(t)){ + err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n"; + error_code = 1; + } + } +*/ + + do_gzip = false; compression_type = regular; if(ospec->operator_type == "zfile") @@ -2060,6 +2509,13 @@ public: } table_name = (tbl_vec[0]); + int t = tbl_vec[0]->get_schema_ref(); + if(! Schema->is_stream(t)){ + err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n"; + error_code = 1; + } + + // Get the select list. select_list = qs->fta_tree->get_sl_vec(); @@ -2151,5 +2607,4 @@ void untaboo(string &s); table_def *create_attributes(string tname, vector &select_list); - #endif