1 /* ------------------------------------------------
\r
2 Copyright 2014 AT&T Intellectual Property
\r
3 Licensed under the Apache License, Version 2.0 (the "License");
\r
4 you may not use this file except in compliance with the License.
\r
5 You may obtain a copy of the License at
\r
7 http://www.apache.org/licenses/LICENSE-2.0
\r
9 Unless required by applicable law or agreed to in writing, software
\r
10 distributed under the License is distributed on an "AS IS" BASIS,
\r
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
12 See the License for the specific language governing permissions and
\r
13 limitations under the License.
\r
14 ------------------------------------------- */
\r
15 #ifndef __QUERY_PLAN_H__
\r
16 #define __QUERY_PLAN_H__
\r
21 using namespace std;
\r
23 #include"analyze_fta.h"
\r
25 #include"parse_partn.h"
\r
26 #include"generate_utils.h"
\r
28 // Identify the format of the input, output streams.
\r
29 #define UNKNOWNFORMAT 0
\r
31 #define HOSTFORMAT 2
\r
33 ///////////////////////////////////////////////////
\r
34 // representation of an output operator specification
\r
38 string operator_type;
\r
39 string operator_param;
\r
40 string output_directory;
\r
42 string partitioning_flds;
\r
47 ////////////////////////////////////////////////////
\r
48 // Input representation of a query
\r
52 std::set<int> reads_from;
\r
53 std::set<int> sources_to;
\r
54 std::vector<std::string> refd_tbls;
\r
55 std::vector<var_pair_t *> params;
\r
58 std::string mangler; // for UDOPs
\r
60 table_exp_t *parse_tree;
\r
63 bool is_externally_visible;
\r
64 bool inferred_visible_node;
\r
66 set<int> subtree_roots;
\r
73 is_externally_visible = false;
\r
74 inferred_visible_node = false;
\r
77 query_node(int i, std::string qnm, std::string flnm, table_exp_t *pt){
\r
85 is_externally_visible = pt->get_visible();
\r
86 inferred_visible_node = false;
\r
89 tablevar_list_t *fm = parse_tree->get_from();
\r
90 refd_tbls = fm->get_table_names();
\r
92 params = pt->query_params;
\r
94 query_node(int ix, std::string udop_name,table_list *Schema){
\r
102 is_externally_visible = true;
\r
103 inferred_visible_node = false;
\r
106 int sid = Schema->find_tbl(udop_name);
\r
107 std::vector<subquery_spec *> subq = Schema->get_subqueryspecs(sid);
\r
109 for(i=0;i<subq.size();++i){
\r
110 refd_tbls.push_back(subq[i]->name);
\r
117 std::string source_name;
\r
118 std::vector<int> query_node_indices;
\r
119 std::set<int> reads_from;
\r
120 std::set<int> sources_to;
\r
122 bool inferred_visible_node;
\r
125 bool do_generation; // false means, ignore it.
\r
129 inferred_visible_node = false;
\r
132 do_generation = true;
\r
141 #define SPX_QUERY 1
\r
142 #define SGAH_QUERY 2
\r
144 // the following selectivity estimates are used by our primitive rate estimators
\r
145 #define SPX_SELECTIVITY 1.0
\r
146 #define SGAH_SELECTIVITY 0.1
\r
147 #define RSGAH_SELECTIVITY 0.1
\r
148 #define SGAHCWCB_SELECTIVITY 0.1
\r
149 #define MRG_SELECTIVITY 1.0
\r
150 #define JOIN_EQ_HASH_SELECTIVITY 1.0
\r
152 // the the output rate of the interface is not given we are going to use
\r
153 // this default value
\r
154 #define DEFAULT_INTERFACE_RATE 100
\r
157 // Define query plan nodes
\r
158 // These nodes are intended for query modeling
\r
159 // and transformation rather than for code generation.
\r
162 // Query plan node base class.
\r
163 // It has an ID, can return its type,
\r
164 // and can be linked into lists with the predecessors
\r
166 // To add : serialize, unserialize?
\r
171 std::vector<int> predecessors;
\r
172 std::vector<int> successors;
\r
173 std::string node_name;
\r
175 // For error reporting without exiting the program.
\r
177 std::string err_str;
\r
179 // These should be moved to the containing stream_query object.
\r
180 std::map<std::string, std::string> definitions;
\r
181 param_table *param_tbl;
\r
183 // The value of a field in terms of protocol fields (if any).
\r
184 std::map<std::string, scalarexp_t *> protocol_map;
\r
189 param_tbl = new param_table();
\r
194 param_tbl = new param_table();
\r
197 int get_id(){return(id);};
\r
198 void set_id(int i){id = i; };
\r
200 int get_error_code(){return error_code;};
\r
201 std::string get_error_str(){return err_str;};
\r
203 virtual std::string node_type() = 0;
\r
205 // For code generation, does the operator xform its input.
\r
206 virtual bool makes_transform() = 0;
\r
208 // For linking, what external libraries does the operator depend on?
\r
209 virtual std::vector<std::string> external_libs() = 0;
\r
211 void set_node_name(std::string n){node_name = n;};
\r
212 std::string get_node_name(){return node_name;};
\r
214 void set_definitions(std::map<std::string, std::string> &def){
\r
217 std::map<std::string, std::string> get_definitions(){return definitions;};
\r
220 // call to create the mapping from field name to se in protocol fields.
\r
221 // Pass in qp_node of data sources, in order.
\r
222 virtual void create_protocol_se(std::vector<qp_node *> q_sources,table_list *Schema)=0;
\r
223 // get the protocol map. the parameter is the return value.
\r
224 std::map<std::string, scalarexp_t *> *get_protocol_se(){return &protocol_map;}
\r
226 // Each qp node must be able to return a description
\r
227 // of the tuples it creates.
\r
228 // TODO: the get_output_tls method should subsume the get_fields
\r
229 // method, but in fact it really just returns the
\r
231 virtual table_def *get_fields() = 0; // Should be vector?
\r
232 // Get the from clause
\r
233 virtual std::vector<tablevar_t *> get_input_tbls() = 0;
\r
234 // this is a confused function, it acutally return the output
\r
236 virtual std::vector<tablevar_t *> get_output_tbls() = 0;
\r
238 std::string get_val_of_def(std::string def){
\r
239 if(definitions.count(def) > 0) return definitions[def];
\r
242 void set_definition(std::string def, std::string val){
\r
243 definitions[def]=val;
\r
246 // Associate colrefs in SEs with tables
\r
247 // at code generation time.
\r
248 virtual void bind_to_schema(table_list *Schema) = 0;
\r
250 // Get colrefs of the operator, currently only meaningful for lfta
\r
251 // operators, and only interested in colrefs with extraction fcns
\r
252 virtual col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema)=0;
\r
254 virtual std::string to_query_string() = 0;
\r
255 virtual std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform) = 0;
\r
256 virtual std::string generate_functor_name() = 0;
\r
258 virtual std::string generate_operator(int i, std::string params) = 0;
\r
259 virtual std::string get_include_file() = 0;
\r
261 virtual cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns) = 0;
\r
262 virtual std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns) = 0;
\r
264 // Split this node into LFTA and HFTA nodes.
\r
265 // Four possible outcomes:
\r
266 // 1) the qp_node reads from a protocol, but does not need to
\r
267 // split (can be evaluated as an LFTA).
\r
268 // The lfta node is the only element in the return vector,
\r
269 // and hfta_returned is false.
\r
270 // 2) the qp_node reads from no protocol, and therefore cannot be split.
\r
271 // THe hfta node is the only element in the return vector,
\r
272 // and hfta_returned is true.
\r
273 // 3) reads from at least one protocol, but cannot be split : failure.
\r
274 // return vector is empty, the error conditions are written
\r
276 // 4) The qp_node splits into an hfta node and one or more LFTA nodes.
\r
277 // the return vector has two or more elements, and hfta_returned
\r
278 // is true. The last element is the HFTA.
\r
279 virtual std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx) = 0;
\r
282 // Ensure that any refs to interface params have been split away.
\r
283 virtual int count_ifp_refs(std::set<std::string> &ifpnames)=0;
\r
287 // Tag the data sources which are views,
\r
288 // return the (optimized) source queries and
\r
289 // record the view access in opview_set
\r
290 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm) = 0;
\r
292 param_table *get_param_tbl(){return param_tbl;};
\r
294 // The "where" clause is a pre-filter
\r
295 virtual std::vector<cnf_elem *> get_where_clause() = 0;
\r
296 // To be more explicit, use get_filter_preds
\r
297 virtual std::vector<cnf_elem *> get_filter_clause() = 0;
\r
299 void add_predecessor(int i){predecessors.push_back(i);};
\r
300 void remove_predecessor(int i){
\r
301 std::vector<int>::iterator vi;
\r
302 for(vi=predecessors.begin(); vi!=predecessors.end();++vi){
\r
304 predecessors.erase(vi);
\r
309 void add_successor(int i){successors.push_back(i);};
\r
310 std::vector<int> get_predecessors(){return predecessors;};
\r
311 int n_predecessors(){return predecessors.size();};
\r
312 std::vector<int> get_successors(){return successors;};
\r
313 int n_successors(){return successors.size();};
\r
314 void clear_predecessors(){predecessors.clear();};
\r
315 void clear_successors(){successors.clear();};
\r
317 // the following method is used for distributed query optimization
\r
318 double get_rate_estimate();
\r
321 // used for cloning query nodes
\r
322 virtual qp_node* make_copy(std::string suffix) = 0;
\r
327 // Select, project, transform (xform) query plan node.
\r
328 // represent the following query fragment
\r
329 // select scalar_expression_1, ..., scalar_expression_k
\r
333 // the predicates and the scalar expressions can reference
\r
334 // attributes of S and also functions.
\r
335 class spx_qpn: public qp_node{
\r
337 tablevar_t *table_name; // Source table
\r
338 std::vector<cnf_elem *> where; // selection predicate
\r
339 std::vector<select_element *> select_list; // Select list
\r
343 std::string node_type(){return("spx_qpn"); };
\r
344 bool makes_transform(){return true;};
\r
345 std::vector<std::string> external_libs(){
\r
346 std::vector<std::string> ret;
\r
350 void bind_to_schema(table_list *Schema);
\r
351 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
\r
353 std::string to_query_string();
\r
354 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
\r
355 std::string generate_functor_name();
\r
356 std::string generate_operator(int i, std::string params);
\r
357 std::string get_include_file(){return("#include <selection_operator.h>\n");};
\r
359 std::vector<select_element *> get_select_list(){return select_list;};
\r
360 std::vector<scalarexp_t *> get_select_se_list(){
\r
361 std::vector<scalarexp_t *> ret;
\r
363 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
\r
366 std::vector<cnf_elem *> get_where_clause(){return where;};
\r
367 std::vector<cnf_elem *> get_filter_clause(){return where;};
\r
368 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
\r
369 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
\r
371 table_def *get_fields();
\r
372 std::vector<tablevar_t *> get_input_tbls();
\r
373 std::vector<tablevar_t *> get_output_tbls();
\r
375 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
\r
376 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
\r
377 // Ensure that any refs to interface params have been split away.
\r
378 int count_ifp_refs(std::set<std::string> &ifpnames);
\r
379 int resolve_if_params(ifq_t *ifdb, std::string &err);
\r
383 spx_qpn(query_summary_class *qs,table_list *Schema){
\r
384 // Get the table name.
\r
385 // NOTE the colrefs have the table ref (an int)
\r
386 // embedded in them. Would it make sense
\r
387 // to grab the whole table list?
\r
388 tablevar_list_t *fm = qs->fta_tree->get_from();
\r
389 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
\r
390 if(tbl_vec.size() != 1){
\r
392 sprintf(tmpstr,"INTERNAL ERROR building SPX node: query defined over %lu tables.\n",tbl_vec.size() );
\r
396 table_name = (tbl_vec[0]);
\r
398 // Get the select list.
\r
399 select_list = qs->fta_tree->get_sl_vec();
\r
401 // Get the selection predicate.
\r
402 where = qs->wh_cnf;
\r
405 // Get the parameters
\r
406 param_tbl = qs->param_tbl;
\r
412 // the following method is used for distributed query optimization
\r
413 double get_rate_estimate();
\r
416 qp_node* make_copy(std::string suffix){
\r
417 spx_qpn *ret = new spx_qpn();
\r
419 ret->param_tbl = new param_table();
\r
420 std::vector<std::string> param_names = param_tbl->get_param_names();
\r
422 for(pi=0;pi<param_names.size();pi++){
\r
423 data_type *dt = param_tbl->get_data_type(param_names[pi]);
\r
424 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
\r
425 param_tbl->handle_access(param_names[pi]));
\r
427 ret->definitions = definitions;
\r
428 ret->node_name = node_name + suffix;
\r
430 // make shallow copy of all fields
\r
431 ret->where = where;
\r
432 ret->select_list = select_list;
\r
436 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
\r
442 // Select, group-by, aggregate.
\r
444 // Select SE_1, ..., SE_k
\r
447 // Group By gb1, ..., gb_n
\r
448 // Having predicate
\r
450 // NOTE : the samlping operator is sgahcwcb_qpn.
\r
452 // For now, must have group-by variables and aggregates.
\r
453 // The scalar expressions which are output must be a function
\r
454 // of the groub-by variables and the aggregates.
\r
455 // The group-by variables can be references to columsn of T,
\r
456 // or they can be scalar expressions.
\r
457 class sgah_qpn: public qp_node{
\r
459 tablevar_t *table_name; // source table
\r
460 std::vector<cnf_elem *> where; // selection predicate
\r
461 std::vector<cnf_elem *> having; // post-aggregation predicate
\r
462 std::vector<select_element *> select_list; // se's of output
\r
463 gb_table gb_tbl; // Table of all group-by attributes.
\r
464 aggregate_table aggr_tbl; // Table of all referenced aggregates.
\r
466 std::vector<scalarexp_t *> gb_sources; // pre-compute for partitioning.
\r
468 int lfta_disorder; // maximum disorder in the steam between lfta, hfta
\r
469 int hfta_disorder; // maximum disorder in the hfta
\r
471 // rollup, cube, and grouping_sets cannot be readily reconstructed by
\r
472 // analyzing the patterns, so explicitly record them here.
\r
473 // used only so that to_query_string produces something meaningful.
\r
474 std::vector<std::string> gb_entry_type;
\r
475 std::vector<int> gb_entry_count;
\r
477 std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
\r
479 std::string node_type(){return("sgah_qpn"); };
\r
480 bool makes_transform(){return true;};
\r
481 std::vector<std::string> external_libs(){
\r
482 std::vector<std::string> ret;
\r
486 void bind_to_schema(table_list *Schema);
\r
487 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
\r
489 std::string to_query_string();
\r
490 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
\r
491 std::string generate_functor_name();
\r
493 std::string generate_operator(int i, std::string params);
\r
494 std::string get_include_file(){
\r
495 if(hfta_disorder <= 1){
\r
496 return("#include <groupby_operator.h>\n");
\r
498 return("#include <groupby_operator_oop.h>\n");
\r
502 std::vector<select_element *> get_select_list(){return select_list;};
\r
503 std::vector<scalarexp_t *> get_select_se_list(){
\r
504 std::vector<scalarexp_t *> ret;
\r
506 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
\r
509 std::vector<cnf_elem *> get_where_clause(){return where;};
\r
510 std::vector<cnf_elem *> get_filter_clause(){return where;};
\r
511 std::vector<cnf_elem *> get_having_clause(){return having;};
\r
512 gb_table *get_gb_tbl(){return &gb_tbl;};
\r
513 aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
\r
514 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
\r
515 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
\r
517 // table which represents output tuple.
\r
518 table_def *get_fields();
\r
519 std::vector<tablevar_t *> get_input_tbls();
\r
520 std::vector<tablevar_t *> get_output_tbls();
\r
527 sgah_qpn(query_summary_class *qs,table_list *Schema){
\r
531 // Get the table name.
\r
532 // NOTE the colrefs have the tablevar ref (an int)
\r
533 // embedded in them. Would it make sense
\r
534 // to grab the whole table list?
\r
535 tablevar_list_t *fm = qs->fta_tree->get_from();
\r
536 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
\r
537 if(tbl_vec.size() != 1){
\r
539 sprintf(tmpstr,"INTERNAL ERROR building SGAH node: query defined over %lu tables.\n",tbl_vec.size() );
\r
543 table_name = (tbl_vec[0]);
\r
545 // Get the select list.
\r
546 select_list = qs->fta_tree->get_sl_vec();
\r
548 // Get the selection and having predicates.
\r
549 where = qs->wh_cnf;
\r
550 having = qs->hav_cnf;
\r
552 // Build a new GB var table (don't share, might need to modify)
\r
554 for(g=0;g<qs->gb_tbl->size();g++){
\r
555 gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
\r
556 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
\r
557 qs->gb_tbl->get_reftype(g)
\r
560 gb_tbl.set_pattern_info(qs->gb_tbl);
\r
561 // gb_tbl.gb_entry_type = qs->gb_tbl->gb_entry_type;
\r
562 // gb_tbl.gb_entry_count = qs->gb_tbl->gb_entry_count;
\r
563 // gb_tbl.pattern_components = qs->gb_tbl->pattern_components;
\r
565 // Build a new aggregate table. (don't share, might need
\r
568 for(a=0;a<qs->aggr_tbl->size();a++){
\r
570 // qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
\r
571 qs->aggr_tbl->duplicate(a)
\r
576 // Get the parameters
\r
577 param_tbl = qs->param_tbl;
\r
583 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
\r
584 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
\r
585 // Ensure that any refs to interface params have been split away.
\r
586 int count_ifp_refs(std::set<std::string> &ifpnames);
\r
587 int resolve_if_params(ifq_t *ifdb, std::string &err);
\r
589 // the following method is used for distributed query optimization
\r
590 double get_rate_estimate();
\r
593 qp_node* make_copy(std::string suffix){
\r
594 sgah_qpn *ret = new sgah_qpn();
\r
596 ret->param_tbl = new param_table();
\r
597 std::vector<std::string> param_names = param_tbl->get_param_names();
\r
599 for(pi=0;pi<param_names.size();pi++){
\r
600 data_type *dt = param_tbl->get_data_type(param_names[pi]);
\r
601 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
\r
602 param_tbl->handle_access(param_names[pi]));
\r
604 ret->definitions = definitions;
\r
606 ret->node_name = node_name + suffix;
\r
608 // make shallow copy of all fields
\r
609 ret->where = where;
\r
610 ret->having = having;
\r
611 ret->select_list = select_list;
\r
612 ret->gb_tbl = gb_tbl;
\r
613 ret->aggr_tbl = aggr_tbl;
\r
618 // Split aggregation into two HFTA components - sub and superaggregation
\r
619 // If unable to split the aggreagates, split into selection and aggregation
\r
620 // If resulting low-level query is empty (e.g. when aggregates cannot be split and
\r
621 // where clause is empty) empty vector willb e returned
\r
622 virtual std::vector<qp_node *> split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema);
\r
624 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
\r
631 // Select, group-by, aggregate. with running aggregates
\r
633 // Select SE_1, ..., SE_k
\r
636 // Group By gb1, ..., gb_n
\r
637 // Closing When predicate
\r
638 // Having predicate
\r
640 // NOTE : the sampling operator is sgahcwcb_qpn.
\r
642 // For now, must have group-by variables and aggregates.
\r
643 // The scalar expressions which are output must be a function
\r
644 // of the groub-by variables and the aggregates.
\r
645 // The group-by variables can be references to columsn of T,
\r
646 // or they can be scalar expressions.
\r
647 class rsgah_qpn: public qp_node{
\r
649 tablevar_t *table_name; // source table
\r
650 std::vector<cnf_elem *> where; // selection predicate
\r
651 std::vector<cnf_elem *> having; // post-aggregation predicate
\r
652 std::vector<cnf_elem *> closing_when; // group closing predicate
\r
653 std::vector<select_element *> select_list; // se's of output
\r
654 gb_table gb_tbl; // Table of all group-by attributes.
\r
655 aggregate_table aggr_tbl; // Table of all referenced aggregates.
\r
657 std::vector<scalarexp_t *> gb_sources; // pre-compute for partitioning.
\r
659 int lfta_disorder; // maximum disorder allowed in stream between lfta, hfta
\r
660 int hfta_disorder; // maximum disorder allowed in hfta
\r
662 std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
\r
665 std::string node_type(){return("rsgah_qpn"); };
\r
666 bool makes_transform(){return true;};
\r
667 std::vector<std::string> external_libs(){
\r
668 std::vector<std::string> ret;
\r
672 void bind_to_schema(table_list *Schema);
\r
673 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
\r
674 fprintf(stderr,"INTERNAL ERROR, calling rsgah_qpn::get_colrefs\n");
\r
678 std::string to_query_string();
\r
679 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
\r
680 std::string generate_functor_name();
\r
682 std::string generate_operator(int i, std::string params);
\r
683 std::string get_include_file(){return("#include <running_gb_operator.h>\n");};
\r
685 std::vector<select_element *> get_select_list(){return select_list;};
\r
686 std::vector<scalarexp_t *> get_select_se_list(){
\r
687 std::vector<scalarexp_t *> ret;
\r
689 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
\r
692 std::vector<cnf_elem *> get_where_clause(){return where;};
\r
693 std::vector<cnf_elem *> get_filter_clause(){return where;};
\r
694 std::vector<cnf_elem *> get_having_clause(){return having;};
\r
695 std::vector<cnf_elem *> get_closing_when_clause(){return closing_when;};
\r
696 gb_table *get_gb_tbl(){return &gb_tbl;};
\r
697 aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
\r
698 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
\r
699 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
\r
701 // table which represents output tuple.
\r
702 table_def *get_fields();
\r
703 std::vector<tablevar_t *> get_input_tbls();
\r
704 std::vector<tablevar_t *> get_output_tbls();
\r
711 rsgah_qpn(query_summary_class *qs,table_list *Schema){
\r
715 // Get the table name.
\r
716 // NOTE the colrefs have the tablevar ref (an int)
\r
717 // embedded in them. Would it make sense
\r
718 // to grab the whole table list?
\r
719 tablevar_list_t *fm = qs->fta_tree->get_from();
\r
720 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
\r
721 if(tbl_vec.size() != 1){
\r
723 sprintf(tmpstr,"INTERNAL ERROR buildingR SGAH node: query defined over %lu tables.\n",tbl_vec.size() );
\r
727 table_name = (tbl_vec[0]);
\r
729 // Get the select list.
\r
730 select_list = qs->fta_tree->get_sl_vec();
\r
732 // Get the selection and having predicates.
\r
733 where = qs->wh_cnf;
\r
734 having = qs->hav_cnf;
\r
735 closing_when = qs->closew_cnf;
\r
737 // Build a new GB var table (don't share, might need to modify)
\r
739 for(g=0;g<qs->gb_tbl->size();g++){
\r
740 gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
\r
741 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
\r
742 qs->gb_tbl->get_reftype(g)
\r
746 // Build a new aggregate table. (don't share, might need
\r
749 for(a=0;a<qs->aggr_tbl->size();a++){
\r
751 // qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
\r
752 qs->aggr_tbl->duplicate(a)
\r
757 // Get the parameters
\r
758 param_tbl = qs->param_tbl;
\r
764 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
\r
765 std::vector<qp_node *> split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema);
\r
766 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
\r
767 // Ensure that any refs to interface params have been split away.
\r
768 int count_ifp_refs(std::set<std::string> &ifpnames);
\r
769 int resolve_if_params(ifq_t *ifdb, std::string &err){ return 0;}
\r
771 // the following method is used for distributed query optimization
\r
772 double get_rate_estimate();
\r
774 qp_node* make_copy(std::string suffix){
\r
775 rsgah_qpn *ret = new rsgah_qpn();
\r
777 ret->param_tbl = new param_table();
\r
778 std::vector<std::string> param_names = param_tbl->get_param_names();
\r
780 for(pi=0;pi<param_names.size();pi++){
\r
781 data_type *dt = param_tbl->get_data_type(param_names[pi]);
\r
782 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
\r
783 param_tbl->handle_access(param_names[pi]));
\r
785 ret->definitions = definitions;
\r
787 ret->node_name = node_name + suffix;
\r
789 // make shallow copy of all fields
\r
790 ret->where = where;
\r
791 ret->having = having;
\r
792 ret->closing_when = closing_when;
\r
793 ret->select_list = select_list;
\r
794 ret->gb_tbl = gb_tbl;
\r
795 ret->aggr_tbl = aggr_tbl;
\r
799 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
\r
803 // forward reference
\r
804 class filter_join_qpn;
\r
807 // (temporal) Merge query plan node.
\r
808 // represent the following query fragment
\r
810 // from T1 _t1, T2 _t2
\r
812 // T1 and T2 must have compatible schemas,
\r
813 // that is the same types in the same slots.
\r
814 // c1 and c2 must be colrefs from T1 and T2,
\r
815 // both ref'ing the same slot. Their types
\r
816 // must be temporal and the same kind of temporal.
\r
817 // in the output, no other field is temporal.
\r
818 // the field names ofthe output are drawn from T1.
\r
819 class mrg_qpn: public qp_node{
\r
821 std::vector<tablevar_t *> fm; // Source table
\r
822 std::vector<colref_t *> mvars; // the merge-by columns.
\r
823 scalarexp_t *slack;
\r
825 table_def *table_layout; // the output schema
\r
826 int merge_fieldpos; // position of merge field,
\r
827 // convenience for manipulation.
\r
829 int disorder; // max disorder seen in the input / allowed in the output
\r
832 // partition definition for merges that combine streams partitioned over multiple interfaces
\r
833 partn_def_t* partn_def;
\r
837 std::string node_type(){return("mrg_qpn"); };
\r
838 bool makes_transform(){return false;};
\r
839 std::vector<std::string> external_libs(){
\r
840 std::vector<std::string> ret;
\r
844 void bind_to_schema(table_list *Schema);
\r
845 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
\r
846 fprintf(stderr,"INTERNAL ERROR, calling mrg_qpn::get_colrefs\n");
\r
850 std::string to_query_string();
\r
851 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
\r
852 std::string generate_functor_name();
\r
853 std::string generate_operator(int i, std::string params);
\r
854 std::string get_include_file(){
\r
856 return("#include <merge_operator_oop.h>\n");
\r
857 return("#include <merge_operator.h>\n");
\r
860 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
\r
861 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
\r
863 table_def *get_fields();
\r
864 std::vector<tablevar_t *> get_input_tbls();
\r
865 std::vector<tablevar_t *> get_output_tbls();
\r
867 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
\r
868 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
\r
869 // Ensure that any refs to interface params have been split away.
\r
870 int count_ifp_refs(std::set<std::string> &ifpnames);
\r
872 // No predicates, return an empty clause
\r
873 std::vector<cnf_elem *> get_where_clause(){
\r
874 std::vector<cnf_elem *> t;
\r
877 std::vector<cnf_elem *> get_filter_clause(){
\r
878 return get_where_clause();
\r
885 void set_disorder(int d){
\r
889 mrg_qpn(query_summary_class *qs,table_list *Schema){
\r
892 // Grab the elements of the query node.
\r
893 fm = qs->fta_tree->get_from()->get_table_list();
\r
898 if(fm.size() != mvars.size()){
\r
899 fprintf(stderr,"INTERNAL ERROR in mrg_qpn::mrg_qpn. fm.size() = %lu, mvars.size() = %lu\n",fm.size(),mvars.size());
\r
903 // Get the parameters
\r
904 param_tbl = qs->param_tbl;
\r
906 // Need to set the node name now, so that the
\r
907 // schema (table_layout) can be properly named.
\r
908 // TODO: Setting the name of the table might best be done
\r
909 // via the set_node_name method, because presumably
\r
910 // thats when the node name is really known.
\r
911 // This should propogate to the table_def table_layout
\r
912 node_name=qs->query_name;
\r
916 printf("instantiating merge node, name = %s, %d sources.\n\t",node_name.c_str(), fm.size());
\r
917 for(ff=0;ff<fm.size();++ff){
\r
918 printf("%s ",fm[ff]->to_string().c_str());
\r
924 // Create the output schema.
\r
925 // strip temporal properites form all fields except the merge field.
\r
926 std::vector<field_entry *> flva = Schema->get_fields(fm[0]->get_schema_name());
\r
927 field_entry_list *fel = new field_entry_list();
\r
929 for(f=0;f<flva.size();++f){
\r
931 data_type dt(flva[f]->get_type().c_str(), flva[f]->get_modifier_list());
\r
932 if(flva[f]->get_name() == mvars[0]->get_field()){
\r
933 merge_fieldpos = f;
\r
934 // if(slack != NULL) dt.reset_temporal();
\r
936 dt.reset_temporal();
\r
939 param_list *plist = new param_list();
\r
940 std::vector<std::string> param_strings = dt.get_param_keys();
\r
942 for(p=0;p<param_strings.size();++p){
\r
943 std::string v = dt.get_param_val(param_strings[p]);
\r
945 plist->append(param_strings[p].c_str(),v.c_str());
\r
947 plist->append(param_strings[p].c_str());
\r
951 fe=new field_entry(
\r
952 dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist, flva[f]->get_unpack_fcns());
\r
953 fel->append_field(fe);
\r
959 table_layout = new table_def(
\r
960 node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA
\r
967 /////////////////////////////////////////////
\r
968 /// Created for de-siloing. to be removed? or is it otherwise useful?
\r
969 // Merge existing set of sources (de-siloing)
\r
970 mrg_qpn(std::string n_name, std::vector<std::string> &src_names,table_list *Schema){
\r
975 // Construct the fm list
\r
976 for(f=0;f<src_names.size();++f){
\r
977 int tbl_ref = Schema->get_table_ref(src_names[f]);
\r
979 fprintf(stderr,"INTERNAL ERROR, can't find %s in the schema when constructing no-silo merge node %s\n",src_names[f].c_str(), n_name.c_str());
\r
982 table_def *src_tbl = Schema->get_table(tbl_ref);
\r
983 tablevar_t *fm_t = new tablevar_t(src_names[f].c_str());
\r
984 string range_name = "_t" + int_to_string(f);
\r
985 fm_t->set_range_var(range_name);
\r
986 fm_t->set_schema_ref(tbl_ref);
\r
987 fm.push_back(fm_t);
\r
990 // Create the output schema.
\r
991 // strip temporal properites form all fields except the merge field.
\r
992 std::vector<field_entry *> flva = Schema->get_fields(fm[0]->get_schema_name());
\r
993 field_entry_list *fel = new field_entry_list();
\r
994 bool temporal_found = false;
\r
995 for(f=0;f<flva.size();++f){
\r
997 data_type dt(flva[f]->get_type().c_str(), flva[f]->get_modifier_list());
\r
998 if(dt.is_temporal() && !temporal_found){
\r
999 merge_fieldpos = f;
\r
1000 temporal_found = true;
\r
1002 dt.reset_temporal();
\r
1005 param_list *plist = new param_list();
\r
1006 std::vector<std::string> param_strings = dt.get_param_keys();
\r
1008 for(p=0;p<param_strings.size();++p){
\r
1009 std::string v = dt.get_param_val(param_strings[p]);
\r
1011 plist->append(param_strings[p].c_str(),v.c_str());
\r
1013 plist->append(param_strings[p].c_str());
\r
1016 fe=new field_entry(
\r
1017 dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist,
\r
1018 flva[f]->get_unpack_fcns()
\r
1020 fel->append_field(fe);
\r
1023 if(! temporal_found){
\r
1024 fprintf(stderr,"ERROR, can't find temporal field of the sources when constructing no-silo merge node %s\n",n_name.c_str());
\r
1029 table_layout = new table_def(
\r
1030 node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA
\r
1034 param_tbl = new param_table();
\r
1036 // Construct mvars
\r
1037 for(f=0;f<fm.size();++f){
\r
1038 std::vector<field_entry *> flv_f = Schema->get_fields(fm[f]->get_schema_name());
\r
1039 data_type dt_f(flv_f[merge_fieldpos]->get_type().c_str(),
\r
1040 flva[merge_fieldpos]->get_modifier_list());
\r
1042 colref_t *mcr = new colref_t(fm[f]->get_var_name().c_str(),
\r
1043 flv_f[merge_fieldpos]->get_name().c_str());
\r
1044 mvars.push_back(mcr);
\r
1047 // literal_t *s_lit = new literal_t("5",LITERAL_INT);
\r
1048 // slack = new scalarexp_t(s_lit);
\r
1053 ////////////////////////////////////////
\r
1055 void resolve_slack(scalarexp_t *t_se, std::string fname, std::vector<std::pair<std::string, std::string> > &sources,ifq_t *ifdb, gb_table *gbt);
\r
1058 // Merge filter_join LFTAs.
\r
1060 mrg_qpn(filter_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb);
\r
1062 // Merge selection LFTAs.
\r
1064 mrg_qpn(spx_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb){
\r
1068 param_tbl = spx->param_tbl;
\r
1070 node_name = n_name;
\r
1071 field_entry_list *fel = new field_entry_list();
\r
1072 merge_fieldpos = -1;
\r
1077 for(i=0;i<spx->select_list.size();++i){
\r
1078 data_type *dt = spx->select_list[i]->se->get_data_type()->duplicate();
\r
1079 if(dt->is_temporal()){
\r
1080 if(merge_fieldpos < 0){
\r
1081 merge_fieldpos = i;
\r
1083 fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), spx->select_list[merge_fieldpos]->name.c_str(), spx->select_list[i]->name.c_str(), spx->select_list[merge_fieldpos]->name.c_str() );
\r
1084 dt->reset_temporal();
\r
1088 field_entry *fe = dt->make_field_entry(spx->select_list[i]->name);
\r
1089 fel->append_field(fe);
\r
1092 if(merge_fieldpos<0){
\r
1093 fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
\r
1096 table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
\r
1098 // NEED TO HANDLE USER_SPECIFIED SLACK
\r
1099 this->resolve_slack(spx->select_list[merge_fieldpos]->se,
\r
1100 spx->select_list[merge_fieldpos]->name, ifaces, ifdb,NULL);
\r
1101 // if(this->slack == NULL)
\r
1102 // fprintf(stderr,"Zero slack.\n");
\r
1104 // fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
\r
1106 for(i=0;i<sources.size();i++){
\r
1107 std::string rvar = "_m"+int_to_string(i);
\r
1108 mvars.push_back(new colref_t(rvar.c_str(), spx->select_list[merge_fieldpos]->name.c_str()));
\r
1109 mvars[i]->set_tablevar_ref(i);
\r
1110 fm.push_back(new tablevar_t(sources[i].c_str()));
\r
1111 fm[i]->set_range_var(rvar);
\r
1114 param_tbl = new param_table();
\r
1115 std::vector<std::string> param_names = spx->param_tbl->get_param_names();
\r
1117 for(pi=0;pi<param_names.size();pi++){
\r
1118 data_type *dt = spx->param_tbl->get_data_type(param_names[pi]);
\r
1119 param_tbl->add_param(param_names[pi],dt->duplicate(),
\r
1120 spx->param_tbl->handle_access(param_names[pi]));
\r
1122 definitions = spx->definitions;
\r
1126 // Merge aggregation LFTAs
\r
1128 mrg_qpn(sgah_qpn *sgah, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair< std::string, std::string> > &ifaces, ifq_t *ifdb){
\r
1132 param_tbl = sgah->param_tbl;
\r
1134 node_name = n_name;
\r
1135 field_entry_list *fel = new field_entry_list();
\r
1136 merge_fieldpos = -1;
\r
1137 for(i=0;i<sgah->select_list.size();++i){
\r
1138 data_type *dt = sgah->select_list[i]->se->get_data_type()->duplicate();
\r
1139 if(dt->is_temporal()){
\r
1140 if(merge_fieldpos < 0){
\r
1141 merge_fieldpos = i;
\r
1143 fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str(), sgah->select_list[i]->name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str() );
\r
1144 dt->reset_temporal();
\r
1148 field_entry *fe = dt->make_field_entry(sgah->select_list[i]->name);
\r
1149 fel->append_field(fe);
\r
1152 if(merge_fieldpos<0){
\r
1153 fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
\r
1156 table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
\r
1158 // NEED TO HANDLE USER_SPECIFIED SLACK
\r
1159 this->resolve_slack(sgah->select_list[merge_fieldpos]->se,
\r
1160 sgah->select_list[merge_fieldpos]->name, ifaces, ifdb,
\r
1162 if(this->slack == NULL)
\r
1163 fprintf(stderr,"Zero slack.\n");
\r
1165 fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
\r
1168 for(i=0;i<sources.size();i++){
\r
1169 std::string rvar = "_m"+int_to_string(i);
\r
1170 mvars.push_back(new colref_t(rvar.c_str(), sgah->select_list[merge_fieldpos]->name.c_str()));
\r
1171 mvars[i]->set_tablevar_ref(i);
\r
1172 fm.push_back(new tablevar_t(sources[i].c_str()));
\r
1173 fm[i]->set_range_var(rvar);
\r
1176 param_tbl = new param_table();
\r
1177 std::vector<std::string> param_names = sgah->param_tbl->get_param_names();
\r
1179 for(pi=0;pi<param_names.size();pi++){
\r
1180 data_type *dt = sgah->param_tbl->get_data_type(param_names[pi]);
\r
1181 param_tbl->add_param(param_names[pi],dt->duplicate(),
\r
1182 sgah->param_tbl->handle_access(param_names[pi]));
\r
1184 definitions = sgah->definitions;
\r
1188 qp_node *make_copy(std::string suffix){
\r
1189 mrg_qpn *ret = new mrg_qpn();
\r
1190 ret->slack = slack;
\r
1191 ret->disorder = disorder;
\r
1193 ret->param_tbl = new param_table();
\r
1194 std::vector<std::string> param_names = param_tbl->get_param_names();
\r
1196 for(pi=0;pi<param_names.size();pi++){
\r
1197 data_type *dt = param_tbl->get_data_type(param_names[pi]);
\r
1198 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
\r
1199 param_tbl->handle_access(param_names[pi]));
\r
1201 ret->definitions = definitions;
\r
1203 ret->node_name = node_name + suffix;
\r
1204 ret->table_layout = table_layout->make_shallow_copy(ret->node_name);
\r
1205 ret->merge_fieldpos = merge_fieldpos;
\r
1210 std::vector<mrg_qpn *> split_sources();
\r
1212 // the following method is used for distributed query optimization
\r
1213 double get_rate_estimate();
\r
1216 // get partition definition for merges that combine streams partitioned over multiple interfaces
\r
1217 // return NULL for regular merges
\r
1218 partn_def_t* get_partn_definition(map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result) {
\r
1224 string partn_name;
\r
1226 vector<tablevar_t *> input_tables = get_input_tbls();
\r
1227 for (int i = 0; i < input_tables.size(); ++i) {
\r
1228 tablevar_t * table = input_tables[i];
\r
1230 vector<string> partn_names = ifaces_db->get_iface_vals(table->get_machine(), table->get_interface(),"iface_partition",err,err_str);
\r
1231 if (partn_names.size() != 1) // can't have more than one value of partition attribute
\r
1233 string new_partn_name = partn_names[0];
\r
1235 // need to make sure that all ifaces belong to the same partition
\r
1237 partn_name = new_partn_name;
\r
1238 else if (new_partn_name != partn_name)
\r
1242 // now find partition definition corresponding to partn_name
\r
1243 partn_def = partn_parse_result->get_partn_def(partn_name);
\r
1247 void set_partn_definition(partn_def_t* def) {
\r
1251 bool is_multihost_merge() {
\r
1253 bool is_multihost = false;
\r
1255 // each input table must be have machine attribute be non-empty
\r
1256 // and there should be at least 2 different values of machine attributes
\r
1257 vector<tablevar_t *> input_tables = get_input_tbls();
\r
1258 string host = input_tables[0]->get_machine();
\r
1259 for (int i = 1; i < input_tables.size(); ++i) {
\r
1260 string new_host = input_tables[i]->get_machine();
\r
1261 if (new_host == "")
\r
1263 if (new_host != host)
\r
1264 is_multihost = true;
\r
1266 return is_multihost;
\r
1269 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
\r
1273 // eq_temporal, hash join query plan node.
\r
1274 // represent the following query fragment
\r
1275 // select scalar_expression_1, ..., scalar_expression_k
\r
1276 // from T0 t0, T1 t1
\r
1277 // where predicate
\r
1279 // the predicates and the scalar expressions can reference
\r
1280 // attributes of t0 and t1 and also functions.
\r
1281 // The predicate must contain CNF elements to enable the
\r
1282 // efficient evaluation of the query.
\r
1283 // 1) at least one predicate of the form
\r
1284 // (temporal se in t0) = (temporal se in t1)
\r
1285 // 2) at least one predicate of the form
\r
1286 // (non-temporal se in t0) = (non-temporal se in t1)
\r
1288 class join_eq_hash_qpn: public qp_node{
\r
1290 std::vector<tablevar_t *> from; // Source tables
\r
1291 std::vector<select_element *> select_list; // Select list
\r
1292 std::vector<cnf_elem *> prefilter[2]; // source prefilters
\r
1293 std::vector<cnf_elem *> temporal_eq; // define temporal window
\r
1294 std::vector<cnf_elem *> hash_eq; // define hash key
\r
1295 std::vector<cnf_elem *> postfilter; // final filter on hash matches.
\r
1297 std::vector<cnf_elem *> where; // all the filters
\r
1298 // useful for summary analysis
\r
1300 std::vector<scalarexp_t *> hash_src_r, hash_src_l;
\r
1302 std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
\r
1303 std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
\r
1305 std::string node_type(){return("join_eq_hash_qpn"); };
\r
1306 bool makes_transform(){return true;};
\r
1307 std::vector<std::string> external_libs(){
\r
1308 std::vector<std::string> ret;
\r
1312 void bind_to_schema(table_list *Schema);
\r
1313 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
\r
1314 fprintf(stderr,"INTERNAL ERROR, calling join_eq_hash_qpn::get_colrefs\n");
\r
1318 std::string to_query_string();
\r
1319 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
\r
1320 std::string generate_functor_name();
\r
1321 std::string generate_operator(int i, std::string params);
\r
1322 std::string get_include_file(){return("#include <join_eq_hash_operator.h>\n");};
\r
1324 std::vector<select_element *> get_select_list(){return select_list;};
\r
1325 std::vector<scalarexp_t *> get_select_se_list(){
\r
1326 std::vector<scalarexp_t *> ret;
\r
1328 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
\r
1331 // Used for LFTA only
\r
1332 std::vector<cnf_elem *> get_where_clause(){
\r
1333 std::vector<cnf_elem *> t;
\r
1336 std::vector<cnf_elem *> get_filter_clause(){
\r
1337 return get_where_clause();
\r
1340 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
\r
1341 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
\r
1343 table_def *get_fields();
\r
1344 std::vector<tablevar_t *> get_input_tbls();
\r
1345 std::vector<tablevar_t *> get_output_tbls();
\r
1347 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
\r
1348 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
\r
1349 // Ensure that any refs to interface params have been split away.
\r
1350 int count_ifp_refs(std::set<std::string> &ifpnames);
\r
1352 join_eq_hash_qpn(){
\r
1354 join_eq_hash_qpn(query_summary_class *qs,table_list *Schema){
\r
1356 // Get the table name.
\r
1357 // NOTE the colrefs have the table ref (an int)
\r
1358 // embedded in them. Would it make sense
\r
1359 // to grab the whole table list?
\r
1360 from = qs->fta_tree->get_from()->get_table_list();
\r
1361 if(from.size() != 2){
\r
1363 sprintf(tmpstr,"ERROR building join_eq_hash node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
\r
1368 // Get the select list.
\r
1369 select_list = qs->fta_tree->get_sl_vec();
\r
1371 // Get the selection predicate.
\r
1372 where = qs->wh_cnf;
\r
1373 for(w=0;w<where.size();++w){
\r
1374 analyze_cnf(where[w]);
\r
1375 std::vector<int> pred_tbls;
\r
1376 get_tablevar_ref_pr(where[w]->pr,pred_tbls);
\r
1377 // Prefilter if refs only one tablevar
\r
1378 if(pred_tbls.size()==1){
\r
1379 prefilter[pred_tbls[0]].push_back(where[w]);
\r
1382 // refs nothing -- might be sampling, do it as postfilter.
\r
1383 if(pred_tbls.size()==0){
\r
1384 postfilter.push_back(where[w]);
\r
1387 // See if it can be a hash or temporal predicate.
\r
1388 // NOTE: synchronize with the temporality checking
\r
1389 // done at join_eq_hash_qpn::get_fields
\r
1390 if(where[w]->is_atom && where[w]->eq_pred){
\r
1391 std::vector<int> sel_tbls, ser_tbls;
\r
1392 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
\r
1393 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
\r
1394 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
\r
1395 // make channel 0 SE on LHS.
\r
1396 if(sel_tbls[0] != 0)
\r
1397 where[w]->pr->swap_scalar_operands();
\r
1399 data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
\r
1400 data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
\r
1401 if( (dtl->is_increasing() && dtr->is_increasing()) ||
\r
1402 (dtl->is_decreasing() && dtr->is_decreasing()) )
\r
1403 temporal_eq.push_back(where[w]);
\r
1405 hash_eq.push_back(where[w]);
\r
1410 // All tests failed, fallback is postfilter.
\r
1411 postfilter.push_back(where[w]);
\r
1414 if(temporal_eq.size()==0){
\r
1415 err_str = "ERROR in join query: can't find temporal equality predicate to define a join window.\n";
\r
1419 // Get the parameters
\r
1420 param_tbl = qs->param_tbl;
\r
1424 // the following method is used for distributed query optimization
\r
1425 double get_rate_estimate();
\r
1428 qp_node* make_copy(std::string suffix){
\r
1429 join_eq_hash_qpn *ret = new join_eq_hash_qpn();
\r
1431 ret->param_tbl = new param_table();
\r
1432 std::vector<std::string> param_names = param_tbl->get_param_names();
\r
1434 for(pi=0;pi<param_names.size();pi++){
\r
1435 data_type *dt = param_tbl->get_data_type(param_names[pi]);
\r
1436 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
\r
1437 param_tbl->handle_access(param_names[pi]));
\r
1439 ret->definitions = definitions;
\r
1441 ret->node_name = node_name + suffix;
\r
1443 // make shallow copy of all fields
\r
1444 ret->where = where;
\r
1446 ret->select_list = select_list;
\r
1447 ret->prefilter[0] = prefilter[0];
\r
1448 ret->prefilter[1] = prefilter[1];
\r
1449 ret->postfilter = postfilter;
\r
1450 ret->temporal_eq = temporal_eq;
\r
1451 ret->hash_eq = hash_eq;
\r
1455 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
\r
1460 // ---------------------------------------------
\r
1461 // eq_temporal, hash join query plan node.
\r
1462 // represent the following query fragment
\r
1463 // select scalar_expression_1, ..., scalar_expression_k
\r
1464 // FILTER_JOIN(col, range) from T0 t0, T1 t1
\r
1465 // where predicate
\r
1467 // t0 is the output range variable, t1 is the filtering range
\r
1468 // variable. Both must alias a PROTOCOL.
\r
1469 // The scalar expressions in the select clause may
\r
1470 // reference t0 only.
\r
1471 // The predicates are classified as follows
\r
1472 // prefilter predicates:
\r
1473 // a cheap predicate in t0 such that there is an equivalent
\r
1474 // predicate in t1. Cost decisions about pushing to
\r
1475 // lfta prefilter made later.
\r
1476 // t0 predicates (other than prefilter predicates)
\r
1477 // -- cheap vs. expensive sorted out at genereate time,
\r
1478 // the constructor isn't called with the function list.
\r
1479 // t1 predicates (other than prefiler predicates).
\r
1480 // equi-join predicates of the form:
\r
1481 // (se in t0) = (se in t1)
\r
1483 // There must be at least one equi-join predicate.
\r
1484 // No join predicates other than equi-join predicates
\r
1486 // Warn on temporal equi-join predicates.
\r
1487 // t1 predicates should not be expensive ... warn?
\r
1489 class filter_join_qpn: public qp_node{
\r
1491 std::vector<tablevar_t *> from; // Source tables
\r
1492 colref_t *temporal_var; // join window in FROM
\r
1493 unsigned int temporal_range; // metadata.
\r
1494 std::vector<select_element *> select_list; // Select list
\r
1495 std::vector<cnf_elem *> shared_pred; // prefilter preds
\r
1496 std::vector<cnf_elem *> pred_t0; // main (R) preds
\r
1497 std::vector<cnf_elem *> pred_t1; // filtering (S) preds
\r
1498 std::vector<cnf_elem *> hash_eq; // define hash key
\r
1499 std::vector<cnf_elem *> postfilter; // ref's no table.
\r
1501 std::vector<cnf_elem *> where; // all the filters
\r
1502 // useful for summary analysis
\r
1504 std::vector<scalarexp_t *> hash_src_r, hash_src_l;
\r
1505 std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
\r
1506 std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
\r
1509 bool use_bloom; // true => bloom filter, false => limited hash
\r
1511 std::string node_type(){return("filter_join"); };
\r
1512 bool makes_transform(){return true;};
\r
1513 std::vector<std::string> external_libs(){
\r
1514 std::vector<std::string> ret;
\r
1518 void bind_to_schema(table_list *Schema);
\r
1519 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
\r
1521 std::string to_query_string();
\r
1522 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform){
\r
1523 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor called\n");
\r
1526 std::string generate_functor_name(){
\r
1527 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor_name called\n");
\r
1530 std::string generate_operator(int i, std::string params){
\r
1531 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_operator called\n");
\r
1534 std::string get_include_file(){return("#include <join_eq_hash_operator.h>\n");};
\r
1536 std::vector<select_element *> get_select_list(){return select_list;};
\r
1537 std::vector<scalarexp_t *> get_select_se_list(){
\r
1538 std::vector<scalarexp_t *> ret;
\r
1540 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
\r
1543 // Used for LFTA only
\r
1544 std::vector<cnf_elem *> get_where_clause(){return where;}
\r
1545 std::vector<cnf_elem *> get_filter_clause(){return shared_pred;}
\r
1547 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
\r
1548 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
\r
1550 table_def *get_fields();
\r
1551 std::vector<tablevar_t *> get_input_tbls();
\r
1552 std::vector<tablevar_t *> get_output_tbls();
\r
1554 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
\r
1555 int resolve_if_params(ifq_t *ifdb, std::string &err);
\r
1557 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
\r
1558 // Ensure that any refs to interface params have been split away.
\r
1559 int count_ifp_refs(std::set<std::string> &ifpnames);
\r
1562 filter_join_qpn(){
\r
1564 filter_join_qpn(query_summary_class *qs,table_list *Schema){
\r
1566 // Get the table name.
\r
1567 // NOTE the colrefs have the table ref (an int)
\r
1568 // embedded in them. Would it make sense
\r
1569 // to grab the whole table list?
\r
1570 from = qs->fta_tree->get_from()->get_table_list();
\r
1571 temporal_var = qs->fta_tree->get_from()->get_colref();
\r
1572 temporal_range = qs->fta_tree->get_from()->get_temporal_range();
\r
1573 if(from.size() != 2){
\r
1575 sprintf(tmpstr,"ERROR building filter_join_qpn node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
\r
1576 err_str += tmpstr;
\r
1580 // Get the select list.
\r
1581 select_list = qs->fta_tree->get_sl_vec();
\r
1582 // Verify that only t0 is referenced.
\r
1583 bool bad_ref = false;
\r
1584 for(i=0;i<select_list.size();i++){
\r
1585 vector<int> sel_tbls;
\r
1586 get_tablevar_ref_se(select_list[i]->se,sel_tbls);
\r
1587 if((sel_tbls.size() == 2) || (sel_tbls.size()==1 && sel_tbls[0]==1))
\r
1591 err_str += "ERROR building filter_join_qpn node: query references range variable "+from[1]->variable_name+", but only the first range variable ("+from[0]->variable_name+" can be referenced.\n";
\r
1596 // Get the selection predicate.
\r
1597 where = qs->wh_cnf;
\r
1598 std::vector<cnf_elem *> t0_only, t1_only;
\r
1599 for(w=0;w<where.size();++w){
\r
1600 analyze_cnf(where[w]);
\r
1601 std::vector<int> pred_tbls;
\r
1602 get_tablevar_ref_pr(where[w]->pr,pred_tbls);
\r
1603 // Collect the list of preds by src var,
\r
1604 // extract the shared preds later.
\r
1605 if(pred_tbls.size()==1){
\r
1606 if(pred_tbls[0] == 0){
\r
1607 t0_only.push_back(where[w]);
\r
1609 t1_only.push_back(where[w]);
\r
1613 // refs nothing -- might be sampling, do it as postfilter.
\r
1614 if(pred_tbls.size()==0){
\r
1615 postfilter.push_back(where[w]);
\r
1618 // See if it can be a hash or temporal predicate.
\r
1619 // NOTE: synchronize with the temporality checking
\r
1620 // done at join_eq_hash_qpn::get_fields
\r
1621 if(where[w]->is_atom && where[w]->eq_pred){
\r
1622 std::vector<int> sel_tbls, ser_tbls;
\r
1623 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
\r
1624 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
\r
1625 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
\r
1626 // make channel 0 SE on LHS.
\r
1627 if(sel_tbls[0] != 0)
\r
1628 where[w]->pr->swap_scalar_operands();
\r
1630 hash_eq.push_back(where[w]);
\r
1632 data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
\r
1633 data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
\r
1634 if( (dtl->is_increasing() && dtr->is_increasing()) ||
\r
1635 (dtl->is_decreasing() && dtr->is_decreasing()) )
\r
1636 err_str += "Warning, a filter join should not have join predicates on temporal fields.\n";
\r
1641 // All tests failed, fallback is postfilter.
\r
1642 err_str += "ERROR, join predicates in a filter join should have the form (scalar expression in "+from[0]->variable_name+") = (scalar expression in "+from[1]->variable_name+").\n";
\r
1645 // Classify the t0_only and t1_only preds.
\r
1646 set<int> matched_pred;
\r
1648 for(w=0;w<t0_only.size();w++){
\r
1649 for(v=0;v<t1_only.size();++v)
\r
1650 if(is_equivalent_pred_base(t0_only[w]->pr,t1_only[v]->pr,Schema))
\r
1652 if(v<t1_only.size()){
\r
1653 shared_pred.push_back(t0_only[w]);
\r
1654 matched_pred.insert(v);
\r
1656 pred_t0.push_back(t0_only[w]);
\r
1659 for(v=0;v<t1_only.size();++v){
\r
1660 if(matched_pred.count(v) == 0)
\r
1661 pred_t1.push_back(t1_only[v]);
\r
1665 // Get the parameters
\r
1666 param_tbl = qs->param_tbl;
\r
1667 definitions = qs->definitions;
\r
1669 // Determine the algorithm
\r
1670 if(this->get_val_of_def("algorithm") == "hash"){
\r
1671 use_bloom = false;
\r
1677 // the following method is used for distributed query optimization
\r
1678 double get_rate_estimate();
\r
1681 qp_node* make_copy(std::string suffix){
\r
1682 filter_join_qpn *ret = new filter_join_qpn();
\r
1684 ret->param_tbl = new param_table();
\r
1685 std::vector<std::string> param_names = param_tbl->get_param_names();
\r
1687 for(pi=0;pi<param_names.size();pi++){
\r
1688 data_type *dt = param_tbl->get_data_type(param_names[pi]);
\r
1689 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
\r
1690 param_tbl->handle_access(param_names[pi]));
\r
1692 ret->definitions = definitions;
\r
1694 ret->node_name = node_name + suffix;
\r
1696 // make shallow copy of all fields
\r
1697 ret->where = where;
\r
1699 ret->temporal_range = temporal_range;
\r
1700 ret->temporal_var = temporal_var;
\r
1701 ret->select_list = select_list;
\r
1702 ret->shared_pred = shared_pred;
\r
1703 ret->pred_t0 = pred_t0;
\r
1704 ret->pred_t1 = pred_t1;
\r
1705 ret->postfilter = postfilter;
\r
1706 ret->hash_eq = hash_eq;
\r
1710 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
\r
1715 enum output_file_type_enum {regular, gzip, bzip};
\r
1717 class output_file_qpn: public qp_node{
\r
1719 std::string source_op_name; // Source table
\r
1720 std::vector<field_entry *> fields;
\r
1721 ospec_str *output_spec;
\r
1722 vector<tablevar_t *> fm;
\r
1723 std::string hfta_query_name;
\r
1724 std::string filestream_id;
\r
1726 std::vector<std::string> params;
\r
1728 output_file_type_enum compression_type;
\r
1730 int n_streams; // Number of output streams
\r
1731 int n_hfta_clones; // number of hfta clones
\r
1732 int parallel_idx; // which close this produces output for.
\r
1733 std::vector<int> hash_flds; // fields used to hash the output.
\r
1735 std::string node_type(){return("output_file_qpn"); };
\r
1736 bool makes_transform(){return false;};
\r
1737 std::vector<std::string> external_libs(){
\r
1738 std::vector<std::string> ret;
\r
1739 switch(compression_type){
\r
1741 ret.push_back("-lz");
\r
1744 ret.push_back("-lbz2");
\r
1752 void bind_to_schema(table_list *Schema){}
\r
1753 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
\r
1758 std::string to_query_string(){return "// output_file_operator \n";}
\r
1759 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
\r
1760 std::string generate_functor_name();
\r
1761 std::string generate_operator(int i, std::string params);
\r
1762 std::string get_include_file(){
\r
1763 switch(compression_type){
\r
1765 return("#include <zfile_output_operator.h>\n");
\r
1767 return("#include <file_output_operator.h>\n");
\r
1769 return("#include <file_output_operator.h>\n");
\r
1772 std::vector<cnf_elem *> get_where_clause(){std::vector<cnf_elem *> ret; return ret;};
\r
1773 std::vector<cnf_elem *> get_filter_clause(){std::vector<cnf_elem *> ret; return ret;};
\r
1774 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){cplx_lit_table *ret = new cplx_lit_table(); return ret;}
\r
1775 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns){std::vector<handle_param_tbl_entry *> ret; return ret;}
\r
1777 table_def *get_fields(){
\r
1778 field_entry_list *fel = new field_entry_list();
\r
1780 for(i=0;i<fields.size();++i)
\r
1781 fel->append_field(fields[i]);
\r
1782 return new table_def(node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
\r
1784 std::vector<tablevar_t *> get_input_tbls();
\r
1785 std::vector<tablevar_t *> get_output_tbls();
\r
1787 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
\r
1788 std::vector<qp_node *> ret; ret.push_back(this); hfta_returned = true; return ret;
\r
1790 std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm){
\r
1791 std::vector<table_exp_t *> ret; return ret;
\r
1793 // Ensure that any refs to interface params have been split away.
\r
1794 int count_ifp_refs(std::set<std::string> &ifpnames){return 0;}
\r
1795 int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;};
\r
1798 output_file_qpn(std::string src_op, std::string qn, std::string fs_id, table_def *src_tbl_def, ospec_str *ospec, bool ei){
\r
1799 source_op_name = src_op;
\r
1800 node_name = source_op_name + "_output";
\r
1801 filestream_id = fs_id;
\r
1802 fields = src_tbl_def->get_fields();
\r
1803 output_spec = ospec;
\r
1804 fm.push_back(new tablevar_t(source_op_name.c_str()));
\r
1805 hfta_query_name = qn;
\r
1809 compression_type = regular;
\r
1810 if(ospec->operator_type == "zfile")
\r
1811 compression_type = gzip;
\r
1815 n_hfta_clones = 1;
\r
1818 strncpy(buf, output_spec->operator_param.c_str(),1000);
\r
1821 int nwords = split_string(buf, ':', words,100);
\r
1823 for(i=0;i<nwords;i++){
\r
1824 params.push_back(words[i]);
\r
1826 for(i=0;i<params.size();i++){
\r
1827 if(params[i] == "gzip")
\r
1832 // Set output splitting parameters
\r
1833 bool set_splitting_params(int np, int ix, int ns, std::string split_flds, std::string &err_report){
\r
1835 n_hfta_clones = np;
\r
1836 parallel_idx = ix;
\r
1838 if(split_flds != ""){
\r
1839 string err_flds = "";
\r
1840 char *tmpstr = strdup(split_flds.c_str());
\r
1842 int nwords = split_string(tmpstr,':',words,100);
\r
1844 for(i=0;i<nwords;++i){
\r
1845 string target = words[i];
\r
1846 for(j=0;j<fields.size();++j){
\r
1847 if(fields[j]->get_name() == target){
\r
1848 hash_flds.push_back(j);
\r
1852 if(j==fields.size()){
\r
1853 err_flds += " "+target;
\r
1856 if(err_flds != ""){
\r
1857 err_report += "ERROR in "+hfta_query_name+", a file output operator needs to split the output but these splitting fileds are not part of the output:"+err_flds;
\r
1864 // the following method is used for distributed query optimization
\r
1865 double get_rate_estimate(){return 1.0;}
\r
1868 qp_node* make_copy(std::string suffix){
\r
1869 // output_file_qpn *ret = new output_file_qpn();
\r
1870 output_file_qpn *ret = new output_file_qpn(source_op_name, hfta_query_name, filestream_id, this->get_fields(), output_spec, eat_input);
\r
1874 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){}
\r
1882 // ---------------------------------------------
\r
1885 // Select, group-by, aggregate, sampling.
\r
1887 // Select SE_1, ..., SE_k
\r
1889 // Where predicate
\r
1890 // Group By gb1, ..., gb_n
\r
1891 // [Subgroup gb_i1, .., gb_ik]
\r
1892 // Cleaning_when predicate
\r
1893 // Cleaning_by predicate
\r
1894 // Having predicate
\r
1896 // For now, must have group-by variables and aggregates.
\r
1897 // The scalar expressions which are output must be a function
\r
1898 // of the groub-by variables and the aggregates.
\r
1899 // The group-by variables can be references to columsn of T,
\r
1900 // or they can be scalar expressions.
\r
1901 class sgahcwcb_qpn: public qp_node{
\r
1903 tablevar_t *table_name; // source table
\r
1904 std::vector<cnf_elem *> where; // selection predicate
\r
1905 std::vector<cnf_elem *> having; // post-aggregation predicate
\r
1906 std::vector<select_element *> select_list; // se's of output
\r
1907 gb_table gb_tbl; // Table of all group-by attributes.
\r
1908 std::set<int> sg_tbl; // Names of the superGB attributes
\r
1909 aggregate_table aggr_tbl; // Table of all referenced aggregates.
\r
1910 std::set<std::string> states_refd; // states ref'd by stateful fcns.
\r
1911 std::vector<cnf_elem *> cleanby;
\r
1912 std::vector<cnf_elem *> cleanwhen;
\r
1914 std::vector<scalarexp_t *> gb_sources; // pre-compute for partitioning.
\r
1916 std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
\r
1918 std::string node_type(){return("sgahcwcb_qpn"); };
\r
1919 bool makes_transform(){return true;};
\r
1920 std::vector<std::string> external_libs(){
\r
1921 std::vector<std::string> ret;
\r
1925 void bind_to_schema(table_list *Schema);
\r
1926 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
\r
1927 fprintf(stderr,"INTERNAL ERROR, calling sgahcwcb_qpn::get_colrefs\n");
\r
1931 std::string to_query_string();
\r
1932 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
\r
1933 std::string generate_functor_name();
\r
1935 std::string generate_operator(int i, std::string params);
\r
1936 std::string get_include_file(){return("#include <clean_operator.h>\n");};
\r
1938 std::vector<select_element *> get_select_list(){return select_list;};
\r
1939 std::vector<scalarexp_t *> get_select_se_list(){
\r
1940 std::vector<scalarexp_t *> ret;
\r
1942 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
\r
1945 std::vector<cnf_elem *> get_where_clause(){return where;};
\r
1946 std::vector<cnf_elem *> get_filter_clause(){return where;};
\r
1947 std::vector<cnf_elem *> get_having_clause(){return having;};
\r
1948 gb_table *get_gb_tbl(){return &gb_tbl;};
\r
1949 aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
\r
1950 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
\r
1951 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
\r
1953 // table which represents output tuple.
\r
1954 table_def *get_fields();
\r
1955 std::vector<tablevar_t *> get_input_tbls();
\r
1956 std::vector<tablevar_t *> get_output_tbls();
\r
1961 sgahcwcb_qpn(query_summary_class *qs,table_list *Schema){
\r
1962 // Get the table name.
\r
1963 // NOTE the colrefs have the tablevar ref (an int)
\r
1964 // embedded in them. Would it make sense
\r
1965 // to grab the whole table list?
\r
1966 tablevar_list_t *fm = qs->fta_tree->get_from();
\r
1967 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
\r
1968 if(tbl_vec.size() != 1){
\r
1970 sprintf(tmpstr,"INTERNAL ERROR building SGAHCWCB node: query defined over %lu tables.\n",tbl_vec.size() );
\r
1974 table_name = (tbl_vec[0]);
\r
1976 // Get the select list.
\r
1977 select_list = qs->fta_tree->get_sl_vec();
\r
1979 // Get the selection and having predicates.
\r
1980 where = qs->wh_cnf;
\r
1981 having = qs->hav_cnf;
\r
1982 cleanby = qs->cb_cnf;
\r
1983 cleanwhen = qs->cw_cnf;
\r
1985 // Build a new GB var table (don't share, might need to modify)
\r
1987 for(g=0;g<qs->gb_tbl->size();g++){
\r
1988 gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
\r
1989 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
\r
1990 qs->gb_tbl->get_reftype(g)
\r
1994 // Build a new aggregate table. (don't share, might need
\r
1997 for(a=0;a<qs->aggr_tbl->size();a++){
\r
1998 aggr_tbl.add_aggr(
\r
1999 // qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
\r
2000 qs->aggr_tbl->duplicate(a)
\r
2004 sg_tbl = qs->sg_tbl;
\r
2005 states_refd = qs->states_refd;
\r
2008 // Get the parameters
\r
2009 param_tbl = qs->param_tbl;
\r
2015 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
\r
2016 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
\r
2017 // Ensure that any refs to interface params have been split away.
\r
2018 // CURRENTLY not allowed by split_node_for_fta
\r
2019 int count_ifp_refs(std::set<std::string> &ifpnames){return 0;}
\r
2020 int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;}
\r
2022 // the following method is used for distributed query optimization
\r
2023 double get_rate_estimate();
\r
2025 qp_node* make_copy(std::string suffix){
\r
2026 sgahcwcb_qpn *ret = new sgahcwcb_qpn();
\r
2028 ret->param_tbl = new param_table();
\r
2029 std::vector<std::string> param_names = param_tbl->get_param_names();
\r
2031 for(pi=0;pi<param_names.size();pi++){
\r
2032 data_type *dt = param_tbl->get_data_type(param_names[pi]);
\r
2033 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
\r
2034 param_tbl->handle_access(param_names[pi]));
\r
2036 ret->definitions = definitions;
\r
2038 ret->node_name = node_name + suffix;
\r
2040 // make shallow copy of all fields
\r
2041 ret->where = where;
\r
2042 ret->having = having;
\r
2043 ret->select_list = select_list;
\r
2044 ret->gb_tbl = gb_tbl;
\r
2045 ret->aggr_tbl = aggr_tbl;
\r
2046 ret->sg_tbl = sg_tbl;
\r
2047 ret->states_refd = states_refd;
\r
2048 ret->cleanby = cleanby;
\r
2049 ret->cleanwhen = cleanwhen;
\r
2054 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
\r
2058 std::vector<qp_node *> create_query_nodes(query_summary_class *qs,table_list *Schema);
\r
2062 void untaboo(string &s);
\r
2064 table_def *create_attributes(string tname, vector<select_element *> &select_list);
\r