1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
15 #ifndef __QUERY_PLAN_H__
16 #define __QUERY_PLAN_H__
23 #include"analyze_fta.h"
25 #include"parse_partn.h"
26 #include"generate_utils.h"
28 // Identify the format of the input, output streams.
29 #define UNKNOWNFORMAT 0
33 ///////////////////////////////////////////////////
34 // representation of an output operator specification
39 string operator_param;
40 string output_directory;
42 string partitioning_flds;
47 ////////////////////////////////////////////////////
48 // Input representation of a query
52 std::set<int> reads_from;
53 std::set<int> sources_to;
54 std::vector<std::string> refd_tbls;
55 std::vector<var_pair_t *> params;
58 std::string mangler; // for UDOPs
60 table_exp_t *parse_tree;
63 bool is_externally_visible;
64 bool inferred_visible_node;
66 set<int> subtree_roots;
73 is_externally_visible = false;
74 inferred_visible_node = false;
77 query_node(int i, std::string qnm, std::string flnm, table_exp_t *pt){
85 is_externally_visible = pt->get_visible();
86 inferred_visible_node = false;
89 tablevar_list_t *fm = parse_tree->get_from();
90 refd_tbls = fm->get_table_names();
92 params = pt->query_params;
94 query_node(int ix, std::string udop_name,table_list *Schema){
102 is_externally_visible = true;
103 inferred_visible_node = false;
106 int sid = Schema->find_tbl(udop_name);
107 std::vector<subquery_spec *> subq = Schema->get_subqueryspecs(sid);
109 for(i=0;i<subq.size();++i){
110 refd_tbls.push_back(subq[i]->name);
117 std::string source_name;
118 std::vector<int> query_node_indices;
119 std::set<int> reads_from;
120 std::set<int> sources_to;
122 bool inferred_visible_node;
125 bool do_generation; // false means, ignore it.
129 inferred_visible_node = false;
132 do_generation = true;
144 // the following selectivity estimates are used by our primitive rate estimators
145 #define SPX_SELECTIVITY 1.0
146 #define SGAH_SELECTIVITY 0.1
147 #define RSGAH_SELECTIVITY 0.1
148 #define SGAHCWCB_SELECTIVITY 0.1
149 #define MRG_SELECTIVITY 1.0
150 #define JOIN_EQ_HASH_SELECTIVITY 1.0
152 // the the output rate of the interface is not given we are going to use
153 // this default value
154 #define DEFAULT_INTERFACE_RATE 100
157 // Define query plan nodes
158 // These nodes are intended for query modeling
159 // and transformation rather than for code generation.
162 // Query plan node base class.
163 // It has an ID, can return its type,
164 // and can be linked into lists with the predecessors
166 // To add : serialize, unserialize?
171 std::vector<int> predecessors;
172 std::vector<int> successors;
173 std::string node_name;
175 // For error reporting without exiting the program.
179 // These should be moved to the containing stream_query object.
180 std::map<std::string, std::string> definitions;
181 param_table *param_tbl;
183 // The value of a field in terms of protocol fields (if any).
184 std::map<std::string, scalarexp_t *> protocol_map;
189 param_tbl = new param_table();
194 param_tbl = new param_table();
197 int get_id(){return(id);};
198 void set_id(int i){id = i; };
200 int get_error_code(){return error_code;};
201 std::string get_error_str(){return err_str;};
203 virtual std::string node_type() = 0;
205 // For code generation, does the operator xform its input.
206 virtual bool makes_transform() = 0;
208 // For linking, what external libraries does the operator depend on?
209 virtual std::vector<std::string> external_libs() = 0;
211 void set_node_name(std::string n){node_name = n;};
212 std::string get_node_name(){return node_name;};
214 void set_definitions(std::map<std::string, std::string> &def){
217 std::map<std::string, std::string> get_definitions(){return definitions;};
220 // call to create the mapping from field name to se in protocol fields.
221 // Pass in qp_node of data sources, in order.
222 virtual void create_protocol_se(std::vector<qp_node *> q_sources,table_list *Schema)=0;
223 // get the protocol map. the parameter is the return value.
224 std::map<std::string, scalarexp_t *> *get_protocol_se(){return &protocol_map;}
226 // Each qp node must be able to return a description
227 // of the tuples it creates.
228 // TODO: the get_output_tls method should subsume the get_fields
229 // method, but in fact it really just returns the
231 virtual table_def *get_fields() = 0; // Should be vector?
232 // Get the from clause
233 virtual std::vector<tablevar_t *> get_input_tbls() = 0;
234 // this is a confused function, it acutally return the output
236 virtual std::vector<tablevar_t *> get_output_tbls() = 0;
238 std::string get_val_of_def(std::string def){
239 if(definitions.count(def) > 0) return definitions[def];
242 void set_definition(std::string def, std::string val){
243 definitions[def]=val;
246 // Associate colrefs in SEs with tables
247 // at code generation time.
248 virtual void bind_to_schema(table_list *Schema) = 0;
250 // Get colrefs of the operator, currently only meaningful for lfta
251 // operators, and only interested in colrefs with extraction fcns
252 virtual col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema)=0;
254 virtual std::string to_query_string() = 0;
255 virtual std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform) = 0;
256 virtual std::string generate_functor_name() = 0;
258 virtual std::string generate_operator(int i, std::string params) = 0;
259 virtual std::string get_include_file() = 0;
261 virtual cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns) = 0;
262 virtual std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns) = 0;
264 // Split this node into LFTA and HFTA nodes.
265 // Four possible outcomes:
266 // 1) the qp_node reads from a protocol, but does not need to
267 // split (can be evaluated as an LFTA).
268 // The lfta node is the only element in the return vector,
269 // and hfta_returned is false.
270 // 2) the qp_node reads from no protocol, and therefore cannot be split.
271 // THe hfta node is the only element in the return vector,
272 // and hfta_returned is true.
273 // 3) reads from at least one protocol, but cannot be split : failure.
274 // return vector is empty, the error conditions are written
276 // 4) The qp_node splits into an hfta node and one or more LFTA nodes.
277 // the return vector has two or more elements, and hfta_returned
278 // is true. The last element is the HFTA.
279 virtual std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx) = 0;
282 // Ensure that any refs to interface params have been split away.
283 virtual int count_ifp_refs(std::set<std::string> &ifpnames)=0;
287 // Tag the data sources which are views,
288 // return the (optimized) source queries and
289 // record the view access in opview_set
290 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm) = 0;
292 param_table *get_param_tbl(){return param_tbl;};
294 // The "where" clause is a pre-filter
295 virtual std::vector<cnf_elem *> get_where_clause() = 0;
296 // To be more explicit, use get_filter_preds
297 virtual std::vector<cnf_elem *> get_filter_clause() = 0;
299 void add_predecessor(int i){predecessors.push_back(i);};
300 void remove_predecessor(int i){
301 std::vector<int>::iterator vi;
302 for(vi=predecessors.begin(); vi!=predecessors.end();++vi){
304 predecessors.erase(vi);
309 void add_successor(int i){successors.push_back(i);};
310 std::vector<int> get_predecessors(){return predecessors;};
311 int n_predecessors(){return predecessors.size();};
312 std::vector<int> get_successors(){return successors;};
313 int n_successors(){return successors.size();};
314 void clear_predecessors(){predecessors.clear();};
315 void clear_successors(){successors.clear();};
317 // the following method is used for distributed query optimization
318 double get_rate_estimate();
321 // used for cloning query nodes
322 virtual qp_node* make_copy(std::string suffix) = 0;
327 // Select, project, transform (xform) query plan node.
328 // represent the following query fragment
329 // select scalar_expression_1, ..., scalar_expression_k
333 // the predicates and the scalar expressions can reference
334 // attributes of S and also functions.
335 class spx_qpn: public qp_node{
337 tablevar_t *table_name; // Source table
338 std::vector<cnf_elem *> where; // selection predicate
339 std::vector<select_element *> select_list; // Select list
343 std::string node_type(){return("spx_qpn"); };
344 bool makes_transform(){return true;};
345 std::vector<std::string> external_libs(){
346 std::vector<std::string> ret;
350 void bind_to_schema(table_list *Schema);
351 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
353 std::string to_query_string();
354 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
355 std::string generate_functor_name();
356 std::string generate_operator(int i, std::string params);
357 std::string get_include_file(){return("#include <selection_operator.h>\n");};
359 std::vector<select_element *> get_select_list(){return select_list;};
360 std::vector<scalarexp_t *> get_select_se_list(){
361 std::vector<scalarexp_t *> ret;
363 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
366 std::vector<cnf_elem *> get_where_clause(){return where;};
367 std::vector<cnf_elem *> get_filter_clause(){return where;};
368 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
369 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
371 table_def *get_fields();
372 std::vector<tablevar_t *> get_input_tbls();
373 std::vector<tablevar_t *> get_output_tbls();
375 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
376 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
377 // Ensure that any refs to interface params have been split away.
378 int count_ifp_refs(std::set<std::string> &ifpnames);
379 int resolve_if_params(ifq_t *ifdb, std::string &err);
383 spx_qpn(query_summary_class *qs,table_list *Schema){
384 // Get the table name.
385 // NOTE the colrefs have the table ref (an int)
386 // embedded in them. Would it make sense
387 // to grab the whole table list?
388 tablevar_list_t *fm = qs->fta_tree->get_from();
389 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
390 if(tbl_vec.size() != 1){
392 sprintf(tmpstr,"INTERNAL ERROR building SPX node: query defined over %lu tables.\n",tbl_vec.size() );
396 table_name = (tbl_vec[0]);
398 // Get the select list.
399 select_list = qs->fta_tree->get_sl_vec();
401 // Get the selection predicate.
405 // Get the parameters
406 param_tbl = qs->param_tbl;
412 // the following method is used for distributed query optimization
413 double get_rate_estimate();
416 qp_node* make_copy(std::string suffix){
417 spx_qpn *ret = new spx_qpn();
419 ret->param_tbl = new param_table();
420 std::vector<std::string> param_names = param_tbl->get_param_names();
422 for(pi=0;pi<param_names.size();pi++){
423 data_type *dt = param_tbl->get_data_type(param_names[pi]);
424 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
425 param_tbl->handle_access(param_names[pi]));
427 ret->definitions = definitions;
428 ret->node_name = node_name + suffix;
430 // make shallow copy of all fields
432 ret->select_list = select_list;
436 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
442 // Select, group-by, aggregate.
444 // Select SE_1, ..., SE_k
447 // Group By gb1, ..., gb_n
450 // NOTE : the samlping operator is sgahcwcb_qpn.
452 // For now, must have group-by variables and aggregates.
453 // The scalar expressions which are output must be a function
454 // of the groub-by variables and the aggregates.
455 // The group-by variables can be references to columsn of T,
456 // or they can be scalar expressions.
457 class sgah_qpn: public qp_node{
459 tablevar_t *table_name; // source table
460 std::vector<cnf_elem *> where; // selection predicate
461 std::vector<cnf_elem *> having; // post-aggregation predicate
462 std::vector<select_element *> select_list; // se's of output
463 gb_table gb_tbl; // Table of all group-by attributes.
464 aggregate_table aggr_tbl; // Table of all referenced aggregates.
466 std::vector<scalarexp_t *> gb_sources; // pre-compute for partitioning.
468 int lfta_disorder; // maximum disorder in the steam between lfta, hfta
469 int hfta_disorder; // maximum disorder in the hfta
471 // rollup, cube, and grouping_sets cannot be readily reconstructed by
472 // analyzing the patterns, so explicitly record them here.
473 // used only so that to_query_string produces something meaningful.
474 std::vector<std::string> gb_entry_type;
475 std::vector<int> gb_entry_count;
477 std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
479 std::string node_type(){return("sgah_qpn"); };
480 bool makes_transform(){return true;};
481 std::vector<std::string> external_libs(){
482 std::vector<std::string> ret;
486 void bind_to_schema(table_list *Schema);
487 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
489 std::string to_query_string();
490 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
491 std::string generate_functor_name();
493 std::string generate_operator(int i, std::string params);
494 std::string get_include_file(){
495 if(hfta_disorder <= 1){
496 return("#include <groupby_operator.h>\n");
498 return("#include <groupby_operator_oop.h>\n");
502 std::vector<select_element *> get_select_list(){return select_list;};
503 std::vector<scalarexp_t *> get_select_se_list(){
504 std::vector<scalarexp_t *> ret;
506 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
509 std::vector<cnf_elem *> get_where_clause(){return where;};
510 std::vector<cnf_elem *> get_filter_clause(){return where;};
511 std::vector<cnf_elem *> get_having_clause(){return having;};
512 gb_table *get_gb_tbl(){return &gb_tbl;};
513 aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
514 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
515 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
517 // table which represents output tuple.
518 table_def *get_fields();
519 std::vector<tablevar_t *> get_input_tbls();
520 std::vector<tablevar_t *> get_output_tbls();
527 sgah_qpn(query_summary_class *qs,table_list *Schema){
531 // Get the table name.
532 // NOTE the colrefs have the tablevar ref (an int)
533 // embedded in them. Would it make sense
534 // to grab the whole table list?
535 tablevar_list_t *fm = qs->fta_tree->get_from();
536 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
537 if(tbl_vec.size() != 1){
539 sprintf(tmpstr,"INTERNAL ERROR building SGAH node: query defined over %lu tables.\n",tbl_vec.size() );
543 table_name = (tbl_vec[0]);
545 // Get the select list.
546 select_list = qs->fta_tree->get_sl_vec();
548 // Get the selection and having predicates.
550 having = qs->hav_cnf;
552 // Build a new GB var table (don't share, might need to modify)
554 for(g=0;g<qs->gb_tbl->size();g++){
555 gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
556 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
557 qs->gb_tbl->get_reftype(g)
560 gb_tbl.set_pattern_info(qs->gb_tbl);
561 // gb_tbl.gb_entry_type = qs->gb_tbl->gb_entry_type;
562 // gb_tbl.gb_entry_count = qs->gb_tbl->gb_entry_count;
563 // gb_tbl.pattern_components = qs->gb_tbl->pattern_components;
565 // Build a new aggregate table. (don't share, might need
568 for(a=0;a<qs->aggr_tbl->size();a++){
570 // qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
571 qs->aggr_tbl->duplicate(a)
576 // Get the parameters
577 param_tbl = qs->param_tbl;
583 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
584 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
585 // Ensure that any refs to interface params have been split away.
586 int count_ifp_refs(std::set<std::string> &ifpnames);
587 int resolve_if_params(ifq_t *ifdb, std::string &err);
589 // the following method is used for distributed query optimization
590 double get_rate_estimate();
593 qp_node* make_copy(std::string suffix){
594 sgah_qpn *ret = new sgah_qpn();
596 ret->param_tbl = new param_table();
597 std::vector<std::string> param_names = param_tbl->get_param_names();
599 for(pi=0;pi<param_names.size();pi++){
600 data_type *dt = param_tbl->get_data_type(param_names[pi]);
601 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
602 param_tbl->handle_access(param_names[pi]));
604 ret->definitions = definitions;
606 ret->node_name = node_name + suffix;
608 // make shallow copy of all fields
610 ret->having = having;
611 ret->select_list = select_list;
612 ret->gb_tbl = gb_tbl;
613 ret->aggr_tbl = aggr_tbl;
618 // Split aggregation into two HFTA components - sub and superaggregation
619 // If unable to split the aggreagates, split into selection and aggregation
620 // If resulting low-level query is empty (e.g. when aggregates cannot be split and
621 // where clause is empty) empty vector willb e returned
622 virtual std::vector<qp_node *> split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema);
624 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
631 // Select, group-by, aggregate. with running aggregates
633 // Select SE_1, ..., SE_k
636 // Group By gb1, ..., gb_n
637 // Closing When predicate
640 // NOTE : the sampling operator is sgahcwcb_qpn.
642 // For now, must have group-by variables and aggregates.
643 // The scalar expressions which are output must be a function
644 // of the groub-by variables and the aggregates.
645 // The group-by variables can be references to columsn of T,
646 // or they can be scalar expressions.
647 class rsgah_qpn: public qp_node{
649 tablevar_t *table_name; // source table
650 std::vector<cnf_elem *> where; // selection predicate
651 std::vector<cnf_elem *> having; // post-aggregation predicate
652 std::vector<cnf_elem *> closing_when; // group closing predicate
653 std::vector<select_element *> select_list; // se's of output
654 gb_table gb_tbl; // Table of all group-by attributes.
655 aggregate_table aggr_tbl; // Table of all referenced aggregates.
657 std::vector<scalarexp_t *> gb_sources; // pre-compute for partitioning.
659 int lfta_disorder; // maximum disorder allowed in stream between lfta, hfta
660 int hfta_disorder; // maximum disorder allowed in hfta
662 std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
665 std::string node_type(){return("rsgah_qpn"); };
666 bool makes_transform(){return true;};
667 std::vector<std::string> external_libs(){
668 std::vector<std::string> ret;
672 void bind_to_schema(table_list *Schema);
673 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
674 fprintf(stderr,"INTERNAL ERROR, calling rsgah_qpn::get_colrefs\n");
678 std::string to_query_string();
679 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
680 std::string generate_functor_name();
682 std::string generate_operator(int i, std::string params);
683 std::string get_include_file(){return("#include <running_gb_operator.h>\n");};
685 std::vector<select_element *> get_select_list(){return select_list;};
686 std::vector<scalarexp_t *> get_select_se_list(){
687 std::vector<scalarexp_t *> ret;
689 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
692 std::vector<cnf_elem *> get_where_clause(){return where;};
693 std::vector<cnf_elem *> get_filter_clause(){return where;};
694 std::vector<cnf_elem *> get_having_clause(){return having;};
695 std::vector<cnf_elem *> get_closing_when_clause(){return closing_when;};
696 gb_table *get_gb_tbl(){return &gb_tbl;};
697 aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
698 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
699 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
701 // table which represents output tuple.
702 table_def *get_fields();
703 std::vector<tablevar_t *> get_input_tbls();
704 std::vector<tablevar_t *> get_output_tbls();
711 rsgah_qpn(query_summary_class *qs,table_list *Schema){
715 // Get the table name.
716 // NOTE the colrefs have the tablevar ref (an int)
717 // embedded in them. Would it make sense
718 // to grab the whole table list?
719 tablevar_list_t *fm = qs->fta_tree->get_from();
720 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
721 if(tbl_vec.size() != 1){
723 sprintf(tmpstr,"INTERNAL ERROR buildingR SGAH node: query defined over %lu tables.\n",tbl_vec.size() );
727 table_name = (tbl_vec[0]);
729 // Get the select list.
730 select_list = qs->fta_tree->get_sl_vec();
732 // Get the selection and having predicates.
734 having = qs->hav_cnf;
735 closing_when = qs->closew_cnf;
737 // Build a new GB var table (don't share, might need to modify)
739 for(g=0;g<qs->gb_tbl->size();g++){
740 gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
741 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
742 qs->gb_tbl->get_reftype(g)
746 // Build a new aggregate table. (don't share, might need
749 for(a=0;a<qs->aggr_tbl->size();a++){
751 // qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
752 qs->aggr_tbl->duplicate(a)
757 // Get the parameters
758 param_tbl = qs->param_tbl;
764 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
765 std::vector<qp_node *> split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema);
766 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
767 // Ensure that any refs to interface params have been split away.
768 int count_ifp_refs(std::set<std::string> &ifpnames);
769 int resolve_if_params(ifq_t *ifdb, std::string &err){ return 0;}
771 // the following method is used for distributed query optimization
772 double get_rate_estimate();
774 qp_node* make_copy(std::string suffix){
775 rsgah_qpn *ret = new rsgah_qpn();
777 ret->param_tbl = new param_table();
778 std::vector<std::string> param_names = param_tbl->get_param_names();
780 for(pi=0;pi<param_names.size();pi++){
781 data_type *dt = param_tbl->get_data_type(param_names[pi]);
782 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
783 param_tbl->handle_access(param_names[pi]));
785 ret->definitions = definitions;
787 ret->node_name = node_name + suffix;
789 // make shallow copy of all fields
791 ret->having = having;
792 ret->closing_when = closing_when;
793 ret->select_list = select_list;
794 ret->gb_tbl = gb_tbl;
795 ret->aggr_tbl = aggr_tbl;
799 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
804 class filter_join_qpn;
807 // (temporal) Merge query plan node.
808 // represent the following query fragment
810 // from T1 _t1, T2 _t2
812 // T1 and T2 must have compatible schemas,
813 // that is the same types in the same slots.
814 // c1 and c2 must be colrefs from T1 and T2,
815 // both ref'ing the same slot. Their types
816 // must be temporal and the same kind of temporal.
817 // in the output, no other field is temporal.
818 // the field names ofthe output are drawn from T1.
819 class mrg_qpn: public qp_node{
821 std::vector<tablevar_t *> fm; // Source table
822 std::vector<colref_t *> mvars; // the merge-by columns.
825 table_def *table_layout; // the output schema
826 int merge_fieldpos; // position of merge field,
827 // convenience for manipulation.
829 int disorder; // max disorder seen in the input / allowed in the output
832 // partition definition for merges that combine streams partitioned over multiple interfaces
833 partn_def_t* partn_def;
837 std::string node_type(){return("mrg_qpn"); };
838 bool makes_transform(){return false;};
839 std::vector<std::string> external_libs(){
840 std::vector<std::string> ret;
844 void bind_to_schema(table_list *Schema);
845 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
846 fprintf(stderr,"INTERNAL ERROR, calling mrg_qpn::get_colrefs\n");
850 std::string to_query_string();
851 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
852 std::string generate_functor_name();
853 std::string generate_operator(int i, std::string params);
854 std::string get_include_file(){
856 return("#include <merge_operator_oop.h>\n");
857 return("#include <merge_operator.h>\n");
860 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
861 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
863 table_def *get_fields();
864 std::vector<tablevar_t *> get_input_tbls();
865 std::vector<tablevar_t *> get_output_tbls();
867 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
868 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
869 // Ensure that any refs to interface params have been split away.
870 int count_ifp_refs(std::set<std::string> &ifpnames);
872 // No predicates, return an empty clause
873 std::vector<cnf_elem *> get_where_clause(){
874 std::vector<cnf_elem *> t;
877 std::vector<cnf_elem *> get_filter_clause(){
878 return get_where_clause();
885 void set_disorder(int d){
889 mrg_qpn(query_summary_class *qs,table_list *Schema){
892 // Grab the elements of the query node.
893 fm = qs->fta_tree->get_from()->get_table_list();
898 if(fm.size() != mvars.size()){
899 fprintf(stderr,"INTERNAL ERROR in mrg_qpn::mrg_qpn. fm.size() = %lu, mvars.size() = %lu\n",fm.size(),mvars.size());
903 // Get the parameters
904 param_tbl = qs->param_tbl;
906 // Need to set the node name now, so that the
907 // schema (table_layout) can be properly named.
908 // TODO: Setting the name of the table might best be done
909 // via the set_node_name method, because presumably
910 // thats when the node name is really known.
911 // This should propogate to the table_def table_layout
912 node_name=qs->query_name;
916 printf("instantiating merge node, name = %s, %d sources.\n\t",node_name.c_str(), fm.size());
917 for(ff=0;ff<fm.size();++ff){
918 printf("%s ",fm[ff]->to_string().c_str());
924 // Create the output schema.
925 // strip temporal properites form all fields except the merge field.
926 std::vector<field_entry *> flva = Schema->get_fields(fm[0]->get_schema_name());
927 field_entry_list *fel = new field_entry_list();
929 for(f=0;f<flva.size();++f){
931 data_type dt(flva[f]->get_type().c_str(), flva[f]->get_modifier_list());
932 if(flva[f]->get_name() == mvars[0]->get_field()){
934 // if(slack != NULL) dt.reset_temporal();
939 param_list *plist = new param_list();
940 std::vector<std::string> param_strings = dt.get_param_keys();
942 for(p=0;p<param_strings.size();++p){
943 std::string v = dt.get_param_val(param_strings[p]);
945 plist->append(param_strings[p].c_str(),v.c_str());
947 plist->append(param_strings[p].c_str());
952 dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist, flva[f]->get_unpack_fcns());
953 fel->append_field(fe);
959 table_layout = new table_def(
960 node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA
967 /////////////////////////////////////////////
968 /// Created for de-siloing. to be removed? or is it otherwise useful?
969 // Merge existing set of sources (de-siloing)
970 mrg_qpn(std::string n_name, std::vector<std::string> &src_names,table_list *Schema){
975 // Construct the fm list
976 for(f=0;f<src_names.size();++f){
977 int tbl_ref = Schema->get_table_ref(src_names[f]);
979 fprintf(stderr,"INTERNAL ERROR, can't find %s in the schema when constructing no-silo merge node %s\n",src_names[f].c_str(), n_name.c_str());
982 table_def *src_tbl = Schema->get_table(tbl_ref);
983 tablevar_t *fm_t = new tablevar_t(src_names[f].c_str());
984 string range_name = "_t" + int_to_string(f);
985 fm_t->set_range_var(range_name);
986 fm_t->set_schema_ref(tbl_ref);
990 // Create the output schema.
991 // strip temporal properites form all fields except the merge field.
992 std::vector<field_entry *> flva = Schema->get_fields(fm[0]->get_schema_name());
993 field_entry_list *fel = new field_entry_list();
994 bool temporal_found = false;
995 for(f=0;f<flva.size();++f){
997 data_type dt(flva[f]->get_type().c_str(), flva[f]->get_modifier_list());
998 if(dt.is_temporal() && !temporal_found){
1000 temporal_found = true;
1002 dt.reset_temporal();
1005 param_list *plist = new param_list();
1006 std::vector<std::string> param_strings = dt.get_param_keys();
1008 for(p=0;p<param_strings.size();++p){
1009 std::string v = dt.get_param_val(param_strings[p]);
1011 plist->append(param_strings[p].c_str(),v.c_str());
1013 plist->append(param_strings[p].c_str());
1017 dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist,
1018 flva[f]->get_unpack_fcns()
1020 fel->append_field(fe);
1023 if(! temporal_found){
1024 fprintf(stderr,"ERROR, can't find temporal field of the sources when constructing no-silo merge node %s\n",n_name.c_str());
1029 table_layout = new table_def(
1030 node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA
1034 param_tbl = new param_table();
1037 for(f=0;f<fm.size();++f){
1038 std::vector<field_entry *> flv_f = Schema->get_fields(fm[f]->get_schema_name());
1039 data_type dt_f(flv_f[merge_fieldpos]->get_type().c_str(),
1040 flva[merge_fieldpos]->get_modifier_list());
1042 colref_t *mcr = new colref_t(fm[f]->get_var_name().c_str(),
1043 flv_f[merge_fieldpos]->get_name().c_str());
1044 mvars.push_back(mcr);
1047 // literal_t *s_lit = new literal_t("5",LITERAL_INT);
1048 // slack = new scalarexp_t(s_lit);
1053 ////////////////////////////////////////
1055 void resolve_slack(scalarexp_t *t_se, std::string fname, std::vector<std::pair<std::string, std::string> > &sources,ifq_t *ifdb, gb_table *gbt);
1058 // Merge filter_join LFTAs.
1060 mrg_qpn(filter_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb);
1062 // Merge selection LFTAs.
1064 mrg_qpn(spx_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb){
1068 param_tbl = spx->param_tbl;
1071 field_entry_list *fel = new field_entry_list();
1072 merge_fieldpos = -1;
1077 for(i=0;i<spx->select_list.size();++i){
1078 data_type *dt = spx->select_list[i]->se->get_data_type()->duplicate();
1079 if(dt->is_temporal()){
1080 if(merge_fieldpos < 0){
1083 fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), spx->select_list[merge_fieldpos]->name.c_str(), spx->select_list[i]->name.c_str(), spx->select_list[merge_fieldpos]->name.c_str() );
1084 dt->reset_temporal();
1088 field_entry *fe = dt->make_field_entry(spx->select_list[i]->name);
1089 fel->append_field(fe);
1092 if(merge_fieldpos<0){
1093 fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
1096 table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1098 // NEED TO HANDLE USER_SPECIFIED SLACK
1099 this->resolve_slack(spx->select_list[merge_fieldpos]->se,
1100 spx->select_list[merge_fieldpos]->name, ifaces, ifdb,NULL);
1101 // if(this->slack == NULL)
1102 // fprintf(stderr,"Zero slack.\n");
1104 // fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
1106 for(i=0;i<sources.size();i++){
1107 std::string rvar = "_m"+int_to_string(i);
1108 mvars.push_back(new colref_t(rvar.c_str(), spx->select_list[merge_fieldpos]->name.c_str()));
1109 mvars[i]->set_tablevar_ref(i);
1110 fm.push_back(new tablevar_t(sources[i].c_str()));
1111 fm[i]->set_range_var(rvar);
1114 param_tbl = new param_table();
1115 std::vector<std::string> param_names = spx->param_tbl->get_param_names();
1117 for(pi=0;pi<param_names.size();pi++){
1118 data_type *dt = spx->param_tbl->get_data_type(param_names[pi]);
1119 param_tbl->add_param(param_names[pi],dt->duplicate(),
1120 spx->param_tbl->handle_access(param_names[pi]));
1122 definitions = spx->definitions;
1126 // Merge aggregation LFTAs
1128 mrg_qpn(sgah_qpn *sgah, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair< std::string, std::string> > &ifaces, ifq_t *ifdb){
1132 param_tbl = sgah->param_tbl;
1135 field_entry_list *fel = new field_entry_list();
1136 merge_fieldpos = -1;
1137 for(i=0;i<sgah->select_list.size();++i){
1138 data_type *dt = sgah->select_list[i]->se->get_data_type()->duplicate();
1139 if(dt->is_temporal()){
1140 if(merge_fieldpos < 0){
1143 fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str(), sgah->select_list[i]->name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str() );
1144 dt->reset_temporal();
1148 field_entry *fe = dt->make_field_entry(sgah->select_list[i]->name);
1149 fel->append_field(fe);
1152 if(merge_fieldpos<0){
1153 fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
1156 table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1158 // NEED TO HANDLE USER_SPECIFIED SLACK
1159 this->resolve_slack(sgah->select_list[merge_fieldpos]->se,
1160 sgah->select_list[merge_fieldpos]->name, ifaces, ifdb,
1162 if(this->slack == NULL)
1163 fprintf(stderr,"Zero slack.\n");
1165 fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
1168 for(i=0;i<sources.size();i++){
1169 std::string rvar = "_m"+int_to_string(i);
1170 mvars.push_back(new colref_t(rvar.c_str(), sgah->select_list[merge_fieldpos]->name.c_str()));
1171 mvars[i]->set_tablevar_ref(i);
1172 fm.push_back(new tablevar_t(sources[i].c_str()));
1173 fm[i]->set_range_var(rvar);
1176 param_tbl = new param_table();
1177 std::vector<std::string> param_names = sgah->param_tbl->get_param_names();
1179 for(pi=0;pi<param_names.size();pi++){
1180 data_type *dt = sgah->param_tbl->get_data_type(param_names[pi]);
1181 param_tbl->add_param(param_names[pi],dt->duplicate(),
1182 sgah->param_tbl->handle_access(param_names[pi]));
1184 definitions = sgah->definitions;
1188 qp_node *make_copy(std::string suffix){
1189 mrg_qpn *ret = new mrg_qpn();
1191 ret->disorder = disorder;
1193 ret->param_tbl = new param_table();
1194 std::vector<std::string> param_names = param_tbl->get_param_names();
1196 for(pi=0;pi<param_names.size();pi++){
1197 data_type *dt = param_tbl->get_data_type(param_names[pi]);
1198 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1199 param_tbl->handle_access(param_names[pi]));
1201 ret->definitions = definitions;
1203 ret->node_name = node_name + suffix;
1204 ret->table_layout = table_layout->make_shallow_copy(ret->node_name);
1205 ret->merge_fieldpos = merge_fieldpos;
1210 std::vector<mrg_qpn *> split_sources();
1212 // the following method is used for distributed query optimization
1213 double get_rate_estimate();
1216 // get partition definition for merges that combine streams partitioned over multiple interfaces
1217 // return NULL for regular merges
1218 partn_def_t* get_partn_definition(map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result) {
1226 vector<tablevar_t *> input_tables = get_input_tbls();
1227 for (int i = 0; i < input_tables.size(); ++i) {
1228 tablevar_t * table = input_tables[i];
1230 vector<string> partn_names = ifaces_db->get_iface_vals(table->get_machine(), table->get_interface(),"iface_partition",err,err_str);
1231 if (partn_names.size() != 1) // can't have more than one value of partition attribute
1233 string new_partn_name = partn_names[0];
1235 // need to make sure that all ifaces belong to the same partition
1237 partn_name = new_partn_name;
1238 else if (new_partn_name != partn_name)
1242 // now find partition definition corresponding to partn_name
1243 partn_def = partn_parse_result->get_partn_def(partn_name);
1247 void set_partn_definition(partn_def_t* def) {
1251 bool is_multihost_merge() {
1253 bool is_multihost = false;
1255 // each input table must be have machine attribute be non-empty
1256 // and there should be at least 2 different values of machine attributes
1257 vector<tablevar_t *> input_tables = get_input_tbls();
1258 string host = input_tables[0]->get_machine();
1259 for (int i = 1; i < input_tables.size(); ++i) {
1260 string new_host = input_tables[i]->get_machine();
1263 if (new_host != host)
1264 is_multihost = true;
1266 return is_multihost;
1269 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1273 // eq_temporal, hash join query plan node.
1274 // represent the following query fragment
1275 // select scalar_expression_1, ..., scalar_expression_k
1276 // from T0 t0, T1 t1
1279 // the predicates and the scalar expressions can reference
1280 // attributes of t0 and t1 and also functions.
1281 // The predicate must contain CNF elements to enable the
1282 // efficient evaluation of the query.
1283 // 1) at least one predicate of the form
1284 // (temporal se in t0) = (temporal se in t1)
1285 // 2) at least one predicate of the form
1286 // (non-temporal se in t0) = (non-temporal se in t1)
1288 class join_eq_hash_qpn: public qp_node{
1290 std::vector<tablevar_t *> from; // Source tables
1291 std::vector<select_element *> select_list; // Select list
1292 std::vector<cnf_elem *> prefilter[2]; // source prefilters
1293 std::vector<cnf_elem *> temporal_eq; // define temporal window
1294 std::vector<cnf_elem *> hash_eq; // define hash key
1295 std::vector<cnf_elem *> postfilter; // final filter on hash matches.
1297 std::vector<cnf_elem *> where; // all the filters
1298 // useful for summary analysis
1300 std::vector<scalarexp_t *> hash_src_r, hash_src_l;
1302 std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
1303 std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
1305 std::string node_type(){return("join_eq_hash_qpn"); };
1306 bool makes_transform(){return true;};
1307 std::vector<std::string> external_libs(){
1308 std::vector<std::string> ret;
1312 void bind_to_schema(table_list *Schema);
1313 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
1314 fprintf(stderr,"INTERNAL ERROR, calling join_eq_hash_qpn::get_colrefs\n");
1318 std::string to_query_string();
1319 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
1320 std::string generate_functor_name();
1321 std::string generate_operator(int i, std::string params);
1322 std::string get_include_file(){return("#include <join_eq_hash_operator.h>\n");};
1324 std::vector<select_element *> get_select_list(){return select_list;};
1325 std::vector<scalarexp_t *> get_select_se_list(){
1326 std::vector<scalarexp_t *> ret;
1328 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
1331 // Used for LFTA only
1332 std::vector<cnf_elem *> get_where_clause(){
1333 std::vector<cnf_elem *> t;
1336 std::vector<cnf_elem *> get_filter_clause(){
1337 return get_where_clause();
1340 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1341 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1343 table_def *get_fields();
1344 std::vector<tablevar_t *> get_input_tbls();
1345 std::vector<tablevar_t *> get_output_tbls();
1347 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
1348 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
1349 // Ensure that any refs to interface params have been split away.
1350 int count_ifp_refs(std::set<std::string> &ifpnames);
1354 join_eq_hash_qpn(query_summary_class *qs,table_list *Schema){
1356 // Get the table name.
1357 // NOTE the colrefs have the table ref (an int)
1358 // embedded in them. Would it make sense
1359 // to grab the whole table list?
1360 from = qs->fta_tree->get_from()->get_table_list();
1361 if(from.size() != 2){
1363 sprintf(tmpstr,"ERROR building join_eq_hash node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
1368 // Get the select list.
1369 select_list = qs->fta_tree->get_sl_vec();
1371 // Get the selection predicate.
1373 for(w=0;w<where.size();++w){
1374 analyze_cnf(where[w]);
1375 std::vector<int> pred_tbls;
1376 get_tablevar_ref_pr(where[w]->pr,pred_tbls);
1377 // Prefilter if refs only one tablevar
1378 if(pred_tbls.size()==1){
1379 prefilter[pred_tbls[0]].push_back(where[w]);
1382 // refs nothing -- might be sampling, do it as postfilter.
1383 if(pred_tbls.size()==0){
1384 postfilter.push_back(where[w]);
1387 // See if it can be a hash or temporal predicate.
1388 // NOTE: synchronize with the temporality checking
1389 // done at join_eq_hash_qpn::get_fields
1390 if(where[w]->is_atom && where[w]->eq_pred){
1391 std::vector<int> sel_tbls, ser_tbls;
1392 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
1393 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
1394 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
1395 // make channel 0 SE on LHS.
1396 if(sel_tbls[0] != 0)
1397 where[w]->pr->swap_scalar_operands();
1399 data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
1400 data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
1401 if( (dtl->is_increasing() && dtr->is_increasing()) ||
1402 (dtl->is_decreasing() && dtr->is_decreasing()) )
1403 temporal_eq.push_back(where[w]);
1405 hash_eq.push_back(where[w]);
1410 // All tests failed, fallback is postfilter.
1411 postfilter.push_back(where[w]);
1414 if(temporal_eq.size()==0){
1415 err_str = "ERROR in join query: can't find temporal equality predicate to define a join window.\n";
1419 // Get the parameters
1420 param_tbl = qs->param_tbl;
1424 // the following method is used for distributed query optimization
1425 double get_rate_estimate();
1428 qp_node* make_copy(std::string suffix){
1429 join_eq_hash_qpn *ret = new join_eq_hash_qpn();
1431 ret->param_tbl = new param_table();
1432 std::vector<std::string> param_names = param_tbl->get_param_names();
1434 for(pi=0;pi<param_names.size();pi++){
1435 data_type *dt = param_tbl->get_data_type(param_names[pi]);
1436 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1437 param_tbl->handle_access(param_names[pi]));
1439 ret->definitions = definitions;
1441 ret->node_name = node_name + suffix;
1443 // make shallow copy of all fields
1446 ret->select_list = select_list;
1447 ret->prefilter[0] = prefilter[0];
1448 ret->prefilter[1] = prefilter[1];
1449 ret->postfilter = postfilter;
1450 ret->temporal_eq = temporal_eq;
1451 ret->hash_eq = hash_eq;
1455 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1460 // ---------------------------------------------
1461 // eq_temporal, hash join query plan node.
1462 // represent the following query fragment
1463 // select scalar_expression_1, ..., scalar_expression_k
1464 // FILTER_JOIN(col, range) from T0 t0, T1 t1
1467 // t0 is the output range variable, t1 is the filtering range
1468 // variable. Both must alias a PROTOCOL.
1469 // The scalar expressions in the select clause may
1470 // reference t0 only.
1471 // The predicates are classified as follows
1472 // prefilter predicates:
1473 // a cheap predicate in t0 such that there is an equivalent
1474 // predicate in t1. Cost decisions about pushing to
1475 // lfta prefilter made later.
1476 // t0 predicates (other than prefilter predicates)
1477 // -- cheap vs. expensive sorted out at genereate time,
1478 // the constructor isn't called with the function list.
1479 // t1 predicates (other than prefiler predicates).
1480 // equi-join predicates of the form:
1481 // (se in t0) = (se in t1)
1483 // There must be at least one equi-join predicate.
1484 // No join predicates other than equi-join predicates
1486 // Warn on temporal equi-join predicates.
1487 // t1 predicates should not be expensive ... warn?
1489 class filter_join_qpn: public qp_node{
1491 std::vector<tablevar_t *> from; // Source tables
1492 colref_t *temporal_var; // join window in FROM
1493 unsigned int temporal_range; // metadata.
1494 std::vector<select_element *> select_list; // Select list
1495 std::vector<cnf_elem *> shared_pred; // prefilter preds
1496 std::vector<cnf_elem *> pred_t0; // main (R) preds
1497 std::vector<cnf_elem *> pred_t1; // filtering (S) preds
1498 std::vector<cnf_elem *> hash_eq; // define hash key
1499 std::vector<cnf_elem *> postfilter; // ref's no table.
1501 std::vector<cnf_elem *> where; // all the filters
1502 // useful for summary analysis
1504 std::vector<scalarexp_t *> hash_src_r, hash_src_l;
1505 std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
1506 std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
1509 bool use_bloom; // true => bloom filter, false => limited hash
1511 std::string node_type(){return("filter_join"); };
1512 bool makes_transform(){return true;};
1513 std::vector<std::string> external_libs(){
1514 std::vector<std::string> ret;
1518 void bind_to_schema(table_list *Schema);
1519 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
1521 std::string to_query_string();
1522 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform){
1523 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor called\n");
1526 std::string generate_functor_name(){
1527 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor_name called\n");
1530 std::string generate_operator(int i, std::string params){
1531 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_operator called\n");
1534 std::string get_include_file(){return("#include <join_eq_hash_operator.h>\n");};
1536 std::vector<select_element *> get_select_list(){return select_list;};
1537 std::vector<scalarexp_t *> get_select_se_list(){
1538 std::vector<scalarexp_t *> ret;
1540 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
1543 // Used for LFTA only
1544 std::vector<cnf_elem *> get_where_clause(){return where;}
1545 std::vector<cnf_elem *> get_filter_clause(){return shared_pred;}
1547 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1548 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1550 table_def *get_fields();
1551 std::vector<tablevar_t *> get_input_tbls();
1552 std::vector<tablevar_t *> get_output_tbls();
1554 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
1555 int resolve_if_params(ifq_t *ifdb, std::string &err);
1557 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
1558 // Ensure that any refs to interface params have been split away.
1559 int count_ifp_refs(std::set<std::string> &ifpnames);
1564 filter_join_qpn(query_summary_class *qs,table_list *Schema){
1566 // Get the table name.
1567 // NOTE the colrefs have the table ref (an int)
1568 // embedded in them. Would it make sense
1569 // to grab the whole table list?
1570 from = qs->fta_tree->get_from()->get_table_list();
1571 temporal_var = qs->fta_tree->get_from()->get_colref();
1572 temporal_range = qs->fta_tree->get_from()->get_temporal_range();
1573 if(from.size() != 2){
1575 sprintf(tmpstr,"ERROR building filter_join_qpn node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
1580 // Get the select list.
1581 select_list = qs->fta_tree->get_sl_vec();
1582 // Verify that only t0 is referenced.
1583 bool bad_ref = false;
1584 for(i=0;i<select_list.size();i++){
1585 vector<int> sel_tbls;
1586 get_tablevar_ref_se(select_list[i]->se,sel_tbls);
1587 if((sel_tbls.size() == 2) || (sel_tbls.size()==1 && sel_tbls[0]==1))
1591 err_str += "ERROR building filter_join_qpn node: query references range variable "+from[1]->variable_name+", but only the first range variable ("+from[0]->variable_name+" can be referenced.\n";
1596 // Get the selection predicate.
1598 std::vector<cnf_elem *> t0_only, t1_only;
1599 for(w=0;w<where.size();++w){
1600 analyze_cnf(where[w]);
1601 std::vector<int> pred_tbls;
1602 get_tablevar_ref_pr(where[w]->pr,pred_tbls);
1603 // Collect the list of preds by src var,
1604 // extract the shared preds later.
1605 if(pred_tbls.size()==1){
1606 if(pred_tbls[0] == 0){
1607 t0_only.push_back(where[w]);
1609 t1_only.push_back(where[w]);
1613 // refs nothing -- might be sampling, do it as postfilter.
1614 if(pred_tbls.size()==0){
1615 postfilter.push_back(where[w]);
1618 // See if it can be a hash or temporal predicate.
1619 // NOTE: synchronize with the temporality checking
1620 // done at join_eq_hash_qpn::get_fields
1621 if(where[w]->is_atom && where[w]->eq_pred){
1622 std::vector<int> sel_tbls, ser_tbls;
1623 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
1624 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
1625 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
1626 // make channel 0 SE on LHS.
1627 if(sel_tbls[0] != 0)
1628 where[w]->pr->swap_scalar_operands();
1630 hash_eq.push_back(where[w]);
1632 data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
1633 data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
1634 if( (dtl->is_increasing() && dtr->is_increasing()) ||
1635 (dtl->is_decreasing() && dtr->is_decreasing()) )
1636 err_str += "Warning, a filter join should not have join predicates on temporal fields.\n";
1641 // All tests failed, fallback is postfilter.
1642 err_str += "ERROR, join predicates in a filter join should have the form (scalar expression in "+from[0]->variable_name+") = (scalar expression in "+from[1]->variable_name+").\n";
1645 // Classify the t0_only and t1_only preds.
1646 set<int> matched_pred;
1648 for(w=0;w<t0_only.size();w++){
1649 for(v=0;v<t1_only.size();++v)
1650 if(is_equivalent_pred_base(t0_only[w]->pr,t1_only[v]->pr,Schema))
1652 if(v<t1_only.size()){
1653 shared_pred.push_back(t0_only[w]);
1654 matched_pred.insert(v);
1656 pred_t0.push_back(t0_only[w]);
1659 for(v=0;v<t1_only.size();++v){
1660 if(matched_pred.count(v) == 0)
1661 pred_t1.push_back(t1_only[v]);
1665 // Get the parameters
1666 param_tbl = qs->param_tbl;
1667 definitions = qs->definitions;
1669 // Determine the algorithm
1670 if(this->get_val_of_def("algorithm") == "hash"){
1677 // the following method is used for distributed query optimization
1678 double get_rate_estimate();
1681 qp_node* make_copy(std::string suffix){
1682 filter_join_qpn *ret = new filter_join_qpn();
1684 ret->param_tbl = new param_table();
1685 std::vector<std::string> param_names = param_tbl->get_param_names();
1687 for(pi=0;pi<param_names.size();pi++){
1688 data_type *dt = param_tbl->get_data_type(param_names[pi]);
1689 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1690 param_tbl->handle_access(param_names[pi]));
1692 ret->definitions = definitions;
1694 ret->node_name = node_name + suffix;
1696 // make shallow copy of all fields
1699 ret->temporal_range = temporal_range;
1700 ret->temporal_var = temporal_var;
1701 ret->select_list = select_list;
1702 ret->shared_pred = shared_pred;
1703 ret->pred_t0 = pred_t0;
1704 ret->pred_t1 = pred_t1;
1705 ret->postfilter = postfilter;
1706 ret->hash_eq = hash_eq;
1710 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1715 enum output_file_type_enum {regular, gzip, bzip};
1717 class output_file_qpn: public qp_node{
1719 std::string source_op_name; // Source table
1720 std::vector<field_entry *> fields;
1721 ospec_str *output_spec;
1722 vector<tablevar_t *> fm;
1723 std::string hfta_query_name;
1724 std::string filestream_id;
1726 std::vector<std::string> params;
1728 output_file_type_enum compression_type;
1730 int n_streams; // Number of output streams
1731 int n_hfta_clones; // number of hfta clones
1732 int parallel_idx; // which close this produces output for.
1733 std::vector<int> hash_flds; // fields used to hash the output.
1735 std::string node_type(){return("output_file_qpn"); };
1736 bool makes_transform(){return false;};
1737 std::vector<std::string> external_libs(){
1738 std::vector<std::string> ret;
1739 switch(compression_type){
1741 ret.push_back("-lz");
1744 ret.push_back("-lbz2");
1752 void bind_to_schema(table_list *Schema){}
1753 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
1758 std::string to_query_string(){return "// output_file_operator \n";}
1759 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
1760 std::string generate_functor_name();
1761 std::string generate_operator(int i, std::string params);
1762 std::string get_include_file(){
1763 switch(compression_type){
1765 return("#include <zfile_output_operator.h>\n");
1767 return("#include <file_output_operator.h>\n");
1769 return("#include <file_output_operator.h>\n");
1772 std::vector<cnf_elem *> get_where_clause(){std::vector<cnf_elem *> ret; return ret;};
1773 std::vector<cnf_elem *> get_filter_clause(){std::vector<cnf_elem *> ret; return ret;};
1774 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){cplx_lit_table *ret = new cplx_lit_table(); return ret;}
1775 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns){std::vector<handle_param_tbl_entry *> ret; return ret;}
1777 table_def *get_fields(){
1778 field_entry_list *fel = new field_entry_list();
1780 for(i=0;i<fields.size();++i)
1781 fel->append_field(fields[i]);
1782 return new table_def(node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1784 std::vector<tablevar_t *> get_input_tbls();
1785 std::vector<tablevar_t *> get_output_tbls();
1787 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
1788 std::vector<qp_node *> ret; ret.push_back(this); hfta_returned = true; return ret;
1790 std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm){
1791 std::vector<table_exp_t *> ret; return ret;
1793 // Ensure that any refs to interface params have been split away.
1794 int count_ifp_refs(std::set<std::string> &ifpnames){return 0;}
1795 int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;};
1798 output_file_qpn(std::string src_op, std::string qn, std::string fs_id, table_def *src_tbl_def, ospec_str *ospec, bool ei){
1799 source_op_name = src_op;
1800 node_name = source_op_name + "_output";
1801 filestream_id = fs_id;
1802 fields = src_tbl_def->get_fields();
1803 output_spec = ospec;
1804 fm.push_back(new tablevar_t(source_op_name.c_str()));
1805 hfta_query_name = qn;
1809 compression_type = regular;
1810 if(ospec->operator_type == "zfile")
1811 compression_type = gzip;
1818 strncpy(buf, output_spec->operator_param.c_str(),1000);
1821 int nwords = split_string(buf, ':', words,100);
1823 for(i=0;i<nwords;i++){
1824 params.push_back(words[i]);
1826 for(i=0;i<params.size();i++){
1827 if(params[i] == "gzip")
1832 // Set output splitting parameters
1833 bool set_splitting_params(int np, int ix, int ns, std::string split_flds, std::string &err_report){
1838 if(split_flds != ""){
1839 string err_flds = "";
1840 char *tmpstr = strdup(split_flds.c_str());
1842 int nwords = split_string(tmpstr,':',words,100);
1844 for(i=0;i<nwords;++i){
1845 string target = words[i];
1846 for(j=0;j<fields.size();++j){
1847 if(fields[j]->get_name() == target){
1848 hash_flds.push_back(j);
1852 if(j==fields.size()){
1853 err_flds += " "+target;
1857 err_report += "ERROR in "+hfta_query_name+", a file output operator needs to split the output but these splitting fileds are not part of the output:"+err_flds;
1864 // the following method is used for distributed query optimization
1865 double get_rate_estimate(){return 1.0;}
1868 qp_node* make_copy(std::string suffix){
1869 // output_file_qpn *ret = new output_file_qpn();
1870 output_file_qpn *ret = new output_file_qpn(source_op_name, hfta_query_name, filestream_id, this->get_fields(), output_spec, eat_input);
1874 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){}
1882 // ---------------------------------------------
1885 // Select, group-by, aggregate, sampling.
1887 // Select SE_1, ..., SE_k
1890 // Group By gb1, ..., gb_n
1891 // [Subgroup gb_i1, .., gb_ik]
1892 // Cleaning_when predicate
1893 // Cleaning_by predicate
1896 // For now, must have group-by variables and aggregates.
1897 // The scalar expressions which are output must be a function
1898 // of the groub-by variables and the aggregates.
1899 // The group-by variables can be references to columsn of T,
1900 // or they can be scalar expressions.
1901 class sgahcwcb_qpn: public qp_node{
1903 tablevar_t *table_name; // source table
1904 std::vector<cnf_elem *> where; // selection predicate
1905 std::vector<cnf_elem *> having; // post-aggregation predicate
1906 std::vector<select_element *> select_list; // se's of output
1907 gb_table gb_tbl; // Table of all group-by attributes.
1908 std::set<int> sg_tbl; // Names of the superGB attributes
1909 aggregate_table aggr_tbl; // Table of all referenced aggregates.
1910 std::set<std::string> states_refd; // states ref'd by stateful fcns.
1911 std::vector<cnf_elem *> cleanby;
1912 std::vector<cnf_elem *> cleanwhen;
1914 std::vector<scalarexp_t *> gb_sources; // pre-compute for partitioning.
1916 std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
1918 std::string node_type(){return("sgahcwcb_qpn"); };
1919 bool makes_transform(){return true;};
1920 std::vector<std::string> external_libs(){
1921 std::vector<std::string> ret;
1925 void bind_to_schema(table_list *Schema);
1926 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
1927 fprintf(stderr,"INTERNAL ERROR, calling sgahcwcb_qpn::get_colrefs\n");
1931 std::string to_query_string();
1932 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
1933 std::string generate_functor_name();
1935 std::string generate_operator(int i, std::string params);
1936 std::string get_include_file(){return("#include <clean_operator.h>\n");};
1938 std::vector<select_element *> get_select_list(){return select_list;};
1939 std::vector<scalarexp_t *> get_select_se_list(){
1940 std::vector<scalarexp_t *> ret;
1942 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
1945 std::vector<cnf_elem *> get_where_clause(){return where;};
1946 std::vector<cnf_elem *> get_filter_clause(){return where;};
1947 std::vector<cnf_elem *> get_having_clause(){return having;};
1948 gb_table *get_gb_tbl(){return &gb_tbl;};
1949 aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
1950 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1951 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1953 // table which represents output tuple.
1954 table_def *get_fields();
1955 std::vector<tablevar_t *> get_input_tbls();
1956 std::vector<tablevar_t *> get_output_tbls();
1961 sgahcwcb_qpn(query_summary_class *qs,table_list *Schema){
1962 // Get the table name.
1963 // NOTE the colrefs have the tablevar ref (an int)
1964 // embedded in them. Would it make sense
1965 // to grab the whole table list?
1966 tablevar_list_t *fm = qs->fta_tree->get_from();
1967 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
1968 if(tbl_vec.size() != 1){
1970 sprintf(tmpstr,"INTERNAL ERROR building SGAHCWCB node: query defined over %lu tables.\n",tbl_vec.size() );
1974 table_name = (tbl_vec[0]);
1976 // Get the select list.
1977 select_list = qs->fta_tree->get_sl_vec();
1979 // Get the selection and having predicates.
1981 having = qs->hav_cnf;
1982 cleanby = qs->cb_cnf;
1983 cleanwhen = qs->cw_cnf;
1985 // Build a new GB var table (don't share, might need to modify)
1987 for(g=0;g<qs->gb_tbl->size();g++){
1988 gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
1989 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
1990 qs->gb_tbl->get_reftype(g)
1994 // Build a new aggregate table. (don't share, might need
1997 for(a=0;a<qs->aggr_tbl->size();a++){
1999 // qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
2000 qs->aggr_tbl->duplicate(a)
2004 sg_tbl = qs->sg_tbl;
2005 states_refd = qs->states_refd;
2008 // Get the parameters
2009 param_tbl = qs->param_tbl;
2015 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
2016 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
2017 // Ensure that any refs to interface params have been split away.
2018 // CURRENTLY not allowed by split_node_for_fta
2019 int count_ifp_refs(std::set<std::string> &ifpnames){return 0;}
2020 int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;}
2022 // the following method is used for distributed query optimization
2023 double get_rate_estimate();
2025 qp_node* make_copy(std::string suffix){
2026 sgahcwcb_qpn *ret = new sgahcwcb_qpn();
2028 ret->param_tbl = new param_table();
2029 std::vector<std::string> param_names = param_tbl->get_param_names();
2031 for(pi=0;pi<param_names.size();pi++){
2032 data_type *dt = param_tbl->get_data_type(param_names[pi]);
2033 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
2034 param_tbl->handle_access(param_names[pi]));
2036 ret->definitions = definitions;
2038 ret->node_name = node_name + suffix;
2040 // make shallow copy of all fields
2042 ret->having = having;
2043 ret->select_list = select_list;
2044 ret->gb_tbl = gb_tbl;
2045 ret->aggr_tbl = aggr_tbl;
2046 ret->sg_tbl = sg_tbl;
2047 ret->states_refd = states_refd;
2048 ret->cleanby = cleanby;
2049 ret->cleanwhen = cleanwhen;
2054 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
2058 std::vector<qp_node *> create_query_nodes(query_summary_class *qs,table_list *Schema);
2062 void untaboo(string &s);
2064 table_def *create_attributes(string tname, vector<select_element *> &select_list);