1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
15 #ifndef __QUERY_PLAN_H__
16 #define __QUERY_PLAN_H__
23 #include"analyze_fta.h"
25 #include"parse_partn.h"
26 #include"generate_utils.h"
28 // Identify the format of the input, output streams.
29 #define UNKNOWNFORMAT 0
33 ///////////////////////////////////////////////////
34 // representation of an output operator specification
39 string operator_param;
40 string output_directory;
42 string partitioning_flds;
47 ////////////////////////////////////////////////////
48 // Input representation of a query
52 std::set<int> reads_from;
53 std::set<int> sources_to;
54 std::vector<std::string> refd_tbls;
55 std::vector<var_pair_t *> params;
58 std::string mangler; // for UDOPs
60 table_exp_t *parse_tree;
63 bool is_externally_visible;
64 bool inferred_visible_node;
66 set<int> subtree_roots;
73 is_externally_visible = false;
74 inferred_visible_node = false;
77 query_node(int i, std::string qnm, std::string flnm, table_exp_t *pt){
85 is_externally_visible = pt->get_visible();
86 inferred_visible_node = false;
89 tablevar_list_t *fm = parse_tree->get_from();
90 refd_tbls = fm->get_table_names();
92 params = pt->query_params;
94 query_node(int ix, std::string udop_name,table_list *Schema){
102 is_externally_visible = true;
103 inferred_visible_node = false;
106 int sid = Schema->find_tbl(udop_name);
107 std::vector<subquery_spec *> subq = Schema->get_subqueryspecs(sid);
109 for(i=0;i<subq.size();++i){
110 refd_tbls.push_back(subq[i]->name);
117 std::string source_name;
118 std::vector<int> query_node_indices;
119 std::set<int> reads_from;
120 std::set<int> sources_to;
122 bool inferred_visible_node;
125 bool do_generation; // false means, ignore it.
129 inferred_visible_node = false;
132 do_generation = true;
144 // the following selectivity estimates are used by our primitive rate estimators
145 #define SPX_SELECTIVITY 1.0
146 #define SGAH_SELECTIVITY 0.1
147 #define RSGAH_SELECTIVITY 0.1
148 #define SGAHCWCB_SELECTIVITY 0.1
149 #define MRG_SELECTIVITY 1.0
150 #define JOIN_EQ_HASH_SELECTIVITY 1.0
152 // the the output rate of the interface is not given we are going to use
153 // this default value
154 #define DEFAULT_INTERFACE_RATE 100
157 // Define query plan nodes
158 // These nodes are intended for query modeling
159 // and transformation rather than for code generation.
162 // Query plan node base class.
163 // It has an ID, can return its type,
164 // and can be linked into lists with the predecessors
166 // To add : serialize, unserialize?
171 std::vector<int> predecessors;
172 std::vector<int> successors;
173 std::string node_name;
175 // For error reporting without exiting the program.
179 // These should be moved to the containing stream_query object.
180 std::map<std::string, std::string> definitions;
181 param_table *param_tbl;
183 // The value of a field in terms of protocol fields (if any).
184 std::map<std::string, scalarexp_t *> protocol_map;
189 param_tbl = new param_table();
194 param_tbl = new param_table();
197 int get_id(){return(id);};
198 void set_id(int i){id = i; };
200 int get_error_code(){return error_code;};
201 std::string get_error_str(){return err_str;};
203 virtual std::string node_type() = 0;
205 // For code generation, does the operator xform its input.
206 virtual bool makes_transform() = 0;
208 // For linking, what external libraries does the operator depend on?
209 virtual std::vector<std::string> external_libs() = 0;
211 void set_node_name(std::string n){node_name = n;};
212 std::string get_node_name(){return node_name;};
214 void set_definitions(std::map<std::string, std::string> &def){
217 std::map<std::string, std::string> get_definitions(){return definitions;};
220 // call to create the mapping from field name to se in protocol fields.
221 // Pass in qp_node of data sources, in order.
222 virtual void create_protocol_se(std::vector<qp_node *> q_sources,table_list *Schema)=0;
223 // get the protocol map. the parameter is the return value.
224 std::map<std::string, scalarexp_t *> *get_protocol_se(){return &protocol_map;}
226 // Each qp node must be able to return a description
227 // of the tuples it creates.
228 // TODO: the get_output_tbls method should subsume the get_fields
229 // method, but in fact it really just returns the
231 virtual table_def *get_fields() = 0; // Should be vector?
232 // get keys from the operator. Currently, only works on group-by queries.
233 // partial_keys set to true if there is a suspicion that the list is partial.
234 virtual std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys) = 0;
235 // Get the from clause
236 virtual std::vector<tablevar_t *> get_input_tbls() = 0;
237 // this is a confused function, it acutally return the output
239 virtual std::vector<tablevar_t *> get_output_tbls() = 0;
241 std::string get_val_of_def(std::string def){
242 if(definitions.count(def) > 0) return definitions[def];
245 void set_definition(std::string def, std::string val){
246 definitions[def]=val;
249 // Associate colrefs in SEs with tables
250 // at code generation time.
251 virtual void bind_to_schema(table_list *Schema) = 0;
253 // Get colrefs of the operator, currently only meaningful for lfta
254 // operators, and only interested in colrefs with extraction fcns
255 virtual col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema)=0;
257 virtual std::string to_query_string() = 0;
258 virtual std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform) = 0;
259 virtual std::string generate_functor_name() = 0;
261 virtual std::string generate_operator(int i, std::string params) = 0;
262 virtual std::string get_include_file() = 0;
264 virtual cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns) = 0;
265 virtual std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns) = 0;
267 // Split this node into LFTA and HFTA nodes.
268 // Four possible outcomes:
269 // 1) the qp_node reads from a protocol, but does not need to
270 // split (can be evaluated as an LFTA).
271 // The lfta node is the only element in the return vector,
272 // and hfta_returned is false.
273 // 2) the qp_node reads from no protocol, and therefore cannot be split.
274 // THe hfta node is the only element in the return vector,
275 // and hfta_returned is true.
276 // 3) reads from at least one protocol, but cannot be split : failure.
277 // return vector is empty, the error conditions are written
279 // 4) The qp_node splits into an hfta node and one or more LFTA nodes.
280 // the return vector has two or more elements, and hfta_returned
281 // is true. The last element is the HFTA.
282 virtual std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx) = 0;
285 // Ensure that any refs to interface params have been split away.
286 virtual int count_ifp_refs(std::set<std::string> &ifpnames)=0;
290 // Tag the data sources which are views,
291 // return the (optimized) source queries and
292 // record the view access in opview_set
293 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm) = 0;
295 param_table *get_param_tbl(){return param_tbl;};
297 // The "where" clause is a pre-filter
298 virtual std::vector<cnf_elem *> get_where_clause() = 0;
299 // To be more explicit, use get_filter_preds
300 virtual std::vector<cnf_elem *> get_filter_clause() = 0;
302 // Add an extra predicate. Currently only used for LFTAs.
303 virtual void append_to_where(cnf_elem *c) = 0;
305 void add_predecessor(int i){predecessors.push_back(i);};
306 void remove_predecessor(int i){
307 std::vector<int>::iterator vi;
308 for(vi=predecessors.begin(); vi!=predecessors.end();++vi){
310 predecessors.erase(vi);
315 void add_successor(int i){successors.push_back(i);};
316 std::vector<int> get_predecessors(){return predecessors;};
317 int n_predecessors(){return predecessors.size();};
318 std::vector<int> get_successors(){return successors;};
319 int n_successors(){return successors.size();};
320 void clear_predecessors(){predecessors.clear();};
321 void clear_successors(){successors.clear();};
323 // the following method is used for distributed query optimization
324 double get_rate_estimate();
327 // used for cloning query nodes
328 virtual qp_node* make_copy(std::string suffix) = 0;
333 // Select, project, transform (xform) query plan node.
334 // represent the following query fragment
335 // select scalar_expression_1, ..., scalar_expression_k
339 // the predicates and the scalar expressions can reference
340 // attributes of S and also functions.
341 class spx_qpn: public qp_node{
343 tablevar_t *table_name; // Source table
344 std::vector<cnf_elem *> where; // selection predicate
345 std::vector<select_element *> select_list; // Select list
349 std::string node_type(){return("spx_qpn"); };
350 bool makes_transform(){return true;};
351 std::vector<std::string> external_libs(){
352 std::vector<std::string> ret;
356 void append_to_where(cnf_elem *c){
361 void bind_to_schema(table_list *Schema);
362 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
364 std::string to_query_string();
365 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
366 std::string generate_functor_name();
367 std::string generate_operator(int i, std::string params);
368 std::string get_include_file(){return("#include <selection_operator.h>\n");};
370 std::vector<select_element *> get_select_list(){return select_list;};
371 std::vector<scalarexp_t *> get_select_se_list(){
372 std::vector<scalarexp_t *> ret;
374 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
377 std::vector<cnf_elem *> get_where_clause(){return where;};
378 std::vector<cnf_elem *> get_filter_clause(){return where;};
379 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
380 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
382 table_def *get_fields();
383 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
384 std::vector<string> ret;
388 std::vector<tablevar_t *> get_input_tbls();
389 std::vector<tablevar_t *> get_output_tbls();
391 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
392 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
393 // Ensure that any refs to interface params have been split away.
394 int count_ifp_refs(std::set<std::string> &ifpnames);
395 int resolve_if_params(ifq_t *ifdb, std::string &err);
399 spx_qpn(query_summary_class *qs,table_list *Schema){
400 // Get the table name.
401 // NOTE the colrefs have the table ref (an int)
402 // embedded in them. Would it make sense
403 // to grab the whole table list?
404 tablevar_list_t *fm = qs->fta_tree->get_from();
405 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
406 if(tbl_vec.size() != 1){
408 sprintf(tmpstr,"INTERNAL ERROR building SPX node: query defined over %lu tables.\n",tbl_vec.size() );
412 table_name = (tbl_vec[0]);
414 // Get the select list.
415 select_list = qs->fta_tree->get_sl_vec();
417 // Get the selection predicate.
421 // Get the parameters
422 param_tbl = qs->param_tbl;
428 // the following method is used for distributed query optimization
429 double get_rate_estimate();
432 qp_node* make_copy(std::string suffix){
433 spx_qpn *ret = new spx_qpn();
435 ret->param_tbl = new param_table();
436 std::vector<std::string> param_names = param_tbl->get_param_names();
438 for(pi=0;pi<param_names.size();pi++){
439 data_type *dt = param_tbl->get_data_type(param_names[pi]);
440 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
441 param_tbl->handle_access(param_names[pi]));
443 ret->definitions = definitions;
444 ret->node_name = node_name + suffix;
446 // make shallow copy of all fields
448 ret->select_list = select_list;
452 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
458 // Select, group-by, aggregate.
460 // Select SE_1, ..., SE_k
463 // Group By gb1, ..., gb_n
466 // NOTE : the samlping operator is sgahcwcb_qpn.
468 // For now, must have group-by variables and aggregates.
469 // The scalar expressions which are output must be a function
470 // of the groub-by variables and the aggregates.
471 // The group-by variables can be references to columsn of T,
472 // or they can be scalar expressions.
473 class sgah_qpn: public qp_node{
475 tablevar_t *table_name; // source table
476 std::vector<cnf_elem *> where; // selection predicate
477 std::vector<cnf_elem *> having; // post-aggregation predicate
478 std::vector<select_element *> select_list; // se's of output
479 gb_table gb_tbl; // Table of all group-by attributes.
480 aggregate_table aggr_tbl; // Table of all referenced aggregates.
482 std::vector<scalarexp_t *> gb_sources; // pre-compute for partitioning.
484 int lfta_disorder; // maximum disorder in the steam between lfta, hfta
485 int hfta_disorder; // maximum disorder in the hfta
487 // rollup, cube, and grouping_sets cannot be readily reconstructed by
488 // analyzing the patterns, so explicitly record them here.
489 // used only so that to_query_string produces something meaningful.
490 std::vector<std::string> gb_entry_type;
491 std::vector<int> gb_entry_count;
493 std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
495 std::string node_type(){return("sgah_qpn"); };
496 bool makes_transform(){return true;};
497 std::vector<std::string> external_libs(){
498 std::vector<std::string> ret;
502 void bind_to_schema(table_list *Schema);
503 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
505 std::string to_query_string();
506 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
507 std::string generate_functor_name();
509 std::string generate_operator(int i, std::string params);
510 std::string get_include_file(){
511 if(hfta_disorder <= 1){
512 return("#include <groupby_operator.h>\n");
514 return("#include <groupby_operator_oop.h>\n");
518 std::vector<select_element *> get_select_list(){return select_list;};
519 std::vector<scalarexp_t *> get_select_se_list(){
520 std::vector<scalarexp_t *> ret;
522 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
525 std::vector<cnf_elem *> get_where_clause(){return where;};
527 void append_to_where(cnf_elem *c){
531 std::vector<cnf_elem *> get_filter_clause(){return where;};
532 std::vector<cnf_elem *> get_having_clause(){return having;};
533 gb_table *get_gb_tbl(){return &gb_tbl;};
534 aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
535 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
536 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
538 // table which represents output tuple.
539 table_def *get_fields();
540 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys);
541 std::vector<tablevar_t *> get_input_tbls();
542 std::vector<tablevar_t *> get_output_tbls();
549 sgah_qpn(query_summary_class *qs,table_list *Schema){
553 // Get the table name.
554 // NOTE the colrefs have the tablevar ref (an int)
555 // embedded in them. Would it make sense
556 // to grab the whole table list?
557 tablevar_list_t *fm = qs->fta_tree->get_from();
558 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
559 if(tbl_vec.size() != 1){
561 sprintf(tmpstr,"INTERNAL ERROR building SGAH node: query defined over %lu tables.\n",tbl_vec.size() );
565 table_name = (tbl_vec[0]);
567 // Get the select list.
568 select_list = qs->fta_tree->get_sl_vec();
570 // Get the selection and having predicates.
572 having = qs->hav_cnf;
574 // Build a new GB var table (don't share, might need to modify)
576 for(g=0;g<qs->gb_tbl->size();g++){
577 gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
578 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
579 qs->gb_tbl->get_reftype(g)
582 gb_tbl.set_pattern_info(qs->gb_tbl);
583 // gb_tbl.gb_entry_type = qs->gb_tbl->gb_entry_type;
584 // gb_tbl.gb_entry_count = qs->gb_tbl->gb_entry_count;
585 // gb_tbl.pattern_components = qs->gb_tbl->pattern_components;
587 // Build a new aggregate table. (don't share, might need
590 for(a=0;a<qs->aggr_tbl->size();a++){
592 // qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
593 qs->aggr_tbl->duplicate(a)
598 // Get the parameters
599 param_tbl = qs->param_tbl;
605 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
606 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
607 // Ensure that any refs to interface params have been split away.
608 int count_ifp_refs(std::set<std::string> &ifpnames);
609 int resolve_if_params(ifq_t *ifdb, std::string &err);
611 // the following method is used for distributed query optimization
612 double get_rate_estimate();
615 qp_node* make_copy(std::string suffix){
616 sgah_qpn *ret = new sgah_qpn();
618 ret->param_tbl = new param_table();
619 std::vector<std::string> param_names = param_tbl->get_param_names();
621 for(pi=0;pi<param_names.size();pi++){
622 data_type *dt = param_tbl->get_data_type(param_names[pi]);
623 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
624 param_tbl->handle_access(param_names[pi]));
626 ret->definitions = definitions;
628 ret->node_name = node_name + suffix;
630 // make shallow copy of all fields
632 ret->having = having;
633 ret->select_list = select_list;
634 ret->gb_tbl = gb_tbl;
635 ret->aggr_tbl = aggr_tbl;
640 // Split aggregation into two HFTA components - sub and superaggregation
641 // If unable to split the aggreagates, split into selection and aggregation
642 // If resulting low-level query is empty (e.g. when aggregates cannot be split and
643 // where clause is empty) empty vector willb e returned
644 virtual std::vector<qp_node *> split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema);
646 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
653 // Select, group-by, aggregate. with running aggregates
655 // Select SE_1, ..., SE_k
658 // Group By gb1, ..., gb_n
659 // Closing When predicate
662 // NOTE : the sampling operator is sgahcwcb_qpn.
664 // For now, must have group-by variables and aggregates.
665 // The scalar expressions which are output must be a function
666 // of the groub-by variables and the aggregates.
667 // The group-by variables can be references to columsn of T,
668 // or they can be scalar expressions.
669 class rsgah_qpn: public qp_node{
671 tablevar_t *table_name; // source table
672 std::vector<cnf_elem *> where; // selection predicate
673 std::vector<cnf_elem *> having; // post-aggregation predicate
674 std::vector<cnf_elem *> closing_when; // group closing predicate
675 std::vector<select_element *> select_list; // se's of output
676 gb_table gb_tbl; // Table of all group-by attributes.
677 aggregate_table aggr_tbl; // Table of all referenced aggregates.
679 std::vector<scalarexp_t *> gb_sources; // pre-compute for partitioning.
681 int lfta_disorder; // maximum disorder allowed in stream between lfta, hfta
682 int hfta_disorder; // maximum disorder allowed in hfta
684 std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
687 std::string node_type(){return("rsgah_qpn"); };
688 bool makes_transform(){return true;};
689 std::vector<std::string> external_libs(){
690 std::vector<std::string> ret;
694 void bind_to_schema(table_list *Schema);
695 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
696 fprintf(stderr,"INTERNAL ERROR, calling rsgah_qpn::get_colrefs\n");
700 std::string to_query_string();
701 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
702 std::string generate_functor_name();
704 std::string generate_operator(int i, std::string params);
705 std::string get_include_file(){return("#include <running_gb_operator.h>\n");};
707 std::vector<select_element *> get_select_list(){return select_list;};
708 std::vector<scalarexp_t *> get_select_se_list(){
709 std::vector<scalarexp_t *> ret;
711 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
714 std::vector<cnf_elem *> get_where_clause(){return where;};
715 void append_to_where(cnf_elem *c){
719 std::vector<cnf_elem *> get_filter_clause(){return where;};
720 std::vector<cnf_elem *> get_having_clause(){return having;};
721 std::vector<cnf_elem *> get_closing_when_clause(){return closing_when;};
722 gb_table *get_gb_tbl(){return &gb_tbl;};
723 aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
724 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
725 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
727 // table which represents output tuple.
728 table_def *get_fields();
729 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys);
731 std::vector<tablevar_t *> get_input_tbls();
732 std::vector<tablevar_t *> get_output_tbls();
739 rsgah_qpn(query_summary_class *qs,table_list *Schema){
743 // Get the table name.
744 // NOTE the colrefs have the tablevar ref (an int)
745 // embedded in them. Would it make sense
746 // to grab the whole table list?
747 tablevar_list_t *fm = qs->fta_tree->get_from();
748 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
749 if(tbl_vec.size() != 1){
751 sprintf(tmpstr,"INTERNAL ERROR buildingR SGAH node: query defined over %lu tables.\n",tbl_vec.size() );
755 table_name = (tbl_vec[0]);
757 // Get the select list.
758 select_list = qs->fta_tree->get_sl_vec();
760 // Get the selection and having predicates.
762 having = qs->hav_cnf;
763 closing_when = qs->closew_cnf;
765 // Build a new GB var table (don't share, might need to modify)
767 for(g=0;g<qs->gb_tbl->size();g++){
768 gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
769 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
770 qs->gb_tbl->get_reftype(g)
774 // Build a new aggregate table. (don't share, might need
777 for(a=0;a<qs->aggr_tbl->size();a++){
779 // qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
780 qs->aggr_tbl->duplicate(a)
785 // Get the parameters
786 param_tbl = qs->param_tbl;
792 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
793 std::vector<qp_node *> split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema);
794 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
795 // Ensure that any refs to interface params have been split away.
796 int count_ifp_refs(std::set<std::string> &ifpnames);
797 int resolve_if_params(ifq_t *ifdb, std::string &err){ return 0;}
799 // the following method is used for distributed query optimization
800 double get_rate_estimate();
802 qp_node* make_copy(std::string suffix){
803 rsgah_qpn *ret = new rsgah_qpn();
805 ret->param_tbl = new param_table();
806 std::vector<std::string> param_names = param_tbl->get_param_names();
808 for(pi=0;pi<param_names.size();pi++){
809 data_type *dt = param_tbl->get_data_type(param_names[pi]);
810 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
811 param_tbl->handle_access(param_names[pi]));
813 ret->definitions = definitions;
815 ret->node_name = node_name + suffix;
817 // make shallow copy of all fields
819 ret->having = having;
820 ret->closing_when = closing_when;
821 ret->select_list = select_list;
822 ret->gb_tbl = gb_tbl;
823 ret->aggr_tbl = aggr_tbl;
827 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
832 class filter_join_qpn;
835 // (temporal) Merge query plan node.
836 // represent the following query fragment
838 // from T1 _t1, T2 _t2
840 // T1 and T2 must have compatible schemas,
841 // that is the same types in the same slots.
842 // c1 and c2 must be colrefs from T1 and T2,
843 // both ref'ing the same slot. Their types
844 // must be temporal and the same kind of temporal.
845 // in the output, no other field is temporal.
846 // the field names ofthe output are drawn from T1.
847 class mrg_qpn: public qp_node{
849 std::vector<tablevar_t *> fm; // Source table
850 std::vector<colref_t *> mvars; // the merge-by columns.
853 table_def *table_layout; // the output schema
854 int merge_fieldpos; // position of merge field,
855 // convenience for manipulation.
857 int disorder; // max disorder seen in the input / allowed in the output
860 // partition definition for merges that combine streams partitioned over multiple interfaces
861 partn_def_t* partn_def;
864 void append_to_where(cnf_elem *c){
865 fprintf(stderr, "ERROR, append_to_where called on mrg_qpn, not supported, query %s.\n", node_name.c_str());
871 std::string node_type(){return("mrg_qpn"); };
872 bool makes_transform(){return false;};
873 std::vector<std::string> external_libs(){
874 std::vector<std::string> ret;
878 void bind_to_schema(table_list *Schema);
879 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
880 fprintf(stderr,"INTERNAL ERROR, calling mrg_qpn::get_colrefs\n");
884 std::string to_query_string();
885 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
886 std::string generate_functor_name();
887 std::string generate_operator(int i, std::string params);
888 std::string get_include_file(){
890 return("#include <merge_operator_oop.h>\n");
891 return("#include <merge_operator.h>\n");
894 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
895 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
897 table_def *get_fields();
898 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
899 std::vector<string> ret;
903 std::vector<tablevar_t *> get_input_tbls();
904 std::vector<tablevar_t *> get_output_tbls();
906 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
907 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
908 // Ensure that any refs to interface params have been split away.
909 int count_ifp_refs(std::set<std::string> &ifpnames);
911 // No predicates, return an empty clause
912 std::vector<cnf_elem *> get_where_clause(){
913 std::vector<cnf_elem *> t;
916 std::vector<cnf_elem *> get_filter_clause(){
917 return get_where_clause();
924 void set_disorder(int d){
928 mrg_qpn(query_summary_class *qs,table_list *Schema){
931 // Grab the elements of the query node.
932 fm = qs->fta_tree->get_from()->get_table_list();
937 if(fm.size() != mvars.size()){
938 fprintf(stderr,"INTERNAL ERROR in mrg_qpn::mrg_qpn. fm.size() = %lu, mvars.size() = %lu\n",fm.size(),mvars.size());
942 // Get the parameters
943 param_tbl = qs->param_tbl;
945 // Need to set the node name now, so that the
946 // schema (table_layout) can be properly named.
947 // TODO: Setting the name of the table might best be done
948 // via the set_node_name method, because presumably
949 // thats when the node name is really known.
950 // This should propogate to the table_def table_layout
951 node_name=qs->query_name;
955 printf("instantiating merge node, name = %s, %d sources.\n\t",node_name.c_str(), fm.size());
956 for(ff=0;ff<fm.size();++ff){
957 printf("%s ",fm[ff]->to_string().c_str());
963 // Create the output schema.
964 // strip temporal properites form all fields except the merge field.
965 std::vector<field_entry *> flva = Schema->get_fields(fm[0]->get_schema_name());
966 field_entry_list *fel = new field_entry_list();
968 for(f=0;f<flva.size();++f){
970 data_type dt(flva[f]->get_type().c_str(), flva[f]->get_modifier_list());
971 if(flva[f]->get_name() == mvars[0]->get_field()){
973 // if(slack != NULL) dt.reset_temporal();
978 param_list *plist = new param_list();
979 std::vector<std::string> param_strings = dt.get_param_keys();
981 for(p=0;p<param_strings.size();++p){
982 std::string v = dt.get_param_val(param_strings[p]);
984 plist->append(param_strings[p].c_str(),v.c_str());
986 plist->append(param_strings[p].c_str());
991 dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist, flva[f]->get_unpack_fcns());
992 fel->append_field(fe);
998 table_layout = new table_def(
999 node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA
1006 /////////////////////////////////////////////
1007 /// Created for de-siloing. to be removed? or is it otherwise useful?
1008 // Merge existing set of sources (de-siloing)
1009 mrg_qpn(std::string n_name, std::vector<std::string> &src_names,table_list *Schema){
1014 // Construct the fm list
1015 for(f=0;f<src_names.size();++f){
1016 int tbl_ref = Schema->get_table_ref(src_names[f]);
1018 fprintf(stderr,"INTERNAL ERROR, can't find %s in the schema when constructing no-silo merge node %s\n",src_names[f].c_str(), n_name.c_str());
1021 table_def *src_tbl = Schema->get_table(tbl_ref);
1022 tablevar_t *fm_t = new tablevar_t(src_names[f].c_str());
1023 string range_name = "_t" + int_to_string(f);
1024 fm_t->set_range_var(range_name);
1025 fm_t->set_schema_ref(tbl_ref);
1029 // Create the output schema.
1030 // strip temporal properites form all fields except the merge field.
1031 std::vector<field_entry *> flva = Schema->get_fields(fm[0]->get_schema_name());
1032 field_entry_list *fel = new field_entry_list();
1033 bool temporal_found = false;
1034 for(f=0;f<flva.size();++f){
1036 data_type dt(flva[f]->get_type().c_str(), flva[f]->get_modifier_list());
1037 if(dt.is_temporal() && !temporal_found){
1039 temporal_found = true;
1041 dt.reset_temporal();
1044 param_list *plist = new param_list();
1045 std::vector<std::string> param_strings = dt.get_param_keys();
1047 for(p=0;p<param_strings.size();++p){
1048 std::string v = dt.get_param_val(param_strings[p]);
1050 plist->append(param_strings[p].c_str(),v.c_str());
1052 plist->append(param_strings[p].c_str());
1056 dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist,
1057 flva[f]->get_unpack_fcns()
1059 fel->append_field(fe);
1062 if(! temporal_found){
1063 fprintf(stderr,"ERROR, can't find temporal field of the sources when constructing no-silo merge node %s\n",n_name.c_str());
1068 table_layout = new table_def(
1069 node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA
1073 param_tbl = new param_table();
1076 for(f=0;f<fm.size();++f){
1077 std::vector<field_entry *> flv_f = Schema->get_fields(fm[f]->get_schema_name());
1078 data_type dt_f(flv_f[merge_fieldpos]->get_type().c_str(),
1079 flva[merge_fieldpos]->get_modifier_list());
1081 colref_t *mcr = new colref_t(fm[f]->get_var_name().c_str(),
1082 flv_f[merge_fieldpos]->get_name().c_str());
1083 mvars.push_back(mcr);
1086 // literal_t *s_lit = new literal_t("5",LITERAL_INT);
1087 // slack = new scalarexp_t(s_lit);
1092 ////////////////////////////////////////
1094 void resolve_slack(scalarexp_t *t_se, std::string fname, std::vector<std::pair<std::string, std::string> > &sources,ifq_t *ifdb, gb_table *gbt);
1097 // Merge filter_join LFTAs.
1099 mrg_qpn(filter_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb);
1101 // Merge selection LFTAs.
1103 mrg_qpn(spx_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb){
1107 param_tbl = spx->param_tbl;
1110 field_entry_list *fel = new field_entry_list();
1111 merge_fieldpos = -1;
1116 for(i=0;i<spx->select_list.size();++i){
1117 data_type *dt = spx->select_list[i]->se->get_data_type()->duplicate();
1118 if(dt->is_temporal()){
1119 if(merge_fieldpos < 0){
1122 fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), spx->select_list[merge_fieldpos]->name.c_str(), spx->select_list[i]->name.c_str(), spx->select_list[merge_fieldpos]->name.c_str() );
1123 dt->reset_temporal();
1127 field_entry *fe = dt->make_field_entry(spx->select_list[i]->name);
1128 fel->append_field(fe);
1131 if(merge_fieldpos<0){
1132 fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
1135 table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1137 // NEED TO HANDLE USER_SPECIFIED SLACK
1138 this->resolve_slack(spx->select_list[merge_fieldpos]->se,
1139 spx->select_list[merge_fieldpos]->name, ifaces, ifdb,NULL);
1140 // if(this->slack == NULL)
1141 // fprintf(stderr,"Zero slack.\n");
1143 // fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
1145 for(i=0;i<sources.size();i++){
1146 std::string rvar = "_m"+int_to_string(i);
1147 mvars.push_back(new colref_t(rvar.c_str(), spx->select_list[merge_fieldpos]->name.c_str()));
1148 mvars[i]->set_tablevar_ref(i);
1149 fm.push_back(new tablevar_t(sources[i].c_str()));
1150 fm[i]->set_range_var(rvar);
1153 param_tbl = new param_table();
1154 std::vector<std::string> param_names = spx->param_tbl->get_param_names();
1156 for(pi=0;pi<param_names.size();pi++){
1157 data_type *dt = spx->param_tbl->get_data_type(param_names[pi]);
1158 param_tbl->add_param(param_names[pi],dt->duplicate(),
1159 spx->param_tbl->handle_access(param_names[pi]));
1161 definitions = spx->definitions;
1165 // Merge aggregation LFTAs
1167 mrg_qpn(sgah_qpn *sgah, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair< std::string, std::string> > &ifaces, ifq_t *ifdb){
1171 param_tbl = sgah->param_tbl;
1174 field_entry_list *fel = new field_entry_list();
1175 merge_fieldpos = -1;
1176 for(i=0;i<sgah->select_list.size();++i){
1177 data_type *dt = sgah->select_list[i]->se->get_data_type()->duplicate();
1178 if(dt->is_temporal()){
1179 if(merge_fieldpos < 0){
1182 fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str(), sgah->select_list[i]->name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str() );
1183 dt->reset_temporal();
1187 field_entry *fe = dt->make_field_entry(sgah->select_list[i]->name);
1188 fel->append_field(fe);
1191 if(merge_fieldpos<0){
1192 fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
1195 table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1197 // NEED TO HANDLE USER_SPECIFIED SLACK
1198 this->resolve_slack(sgah->select_list[merge_fieldpos]->se,
1199 sgah->select_list[merge_fieldpos]->name, ifaces, ifdb,
1201 if(this->slack == NULL)
1202 fprintf(stderr,"Zero slack.\n");
1204 fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
1207 for(i=0;i<sources.size();i++){
1208 std::string rvar = "_m"+int_to_string(i);
1209 mvars.push_back(new colref_t(rvar.c_str(), sgah->select_list[merge_fieldpos]->name.c_str()));
1210 mvars[i]->set_tablevar_ref(i);
1211 fm.push_back(new tablevar_t(sources[i].c_str()));
1212 fm[i]->set_range_var(rvar);
1215 param_tbl = new param_table();
1216 std::vector<std::string> param_names = sgah->param_tbl->get_param_names();
1218 for(pi=0;pi<param_names.size();pi++){
1219 data_type *dt = sgah->param_tbl->get_data_type(param_names[pi]);
1220 param_tbl->add_param(param_names[pi],dt->duplicate(),
1221 sgah->param_tbl->handle_access(param_names[pi]));
1223 definitions = sgah->definitions;
1227 qp_node *make_copy(std::string suffix){
1228 mrg_qpn *ret = new mrg_qpn();
1230 ret->disorder = disorder;
1232 ret->param_tbl = new param_table();
1233 std::vector<std::string> param_names = param_tbl->get_param_names();
1235 for(pi=0;pi<param_names.size();pi++){
1236 data_type *dt = param_tbl->get_data_type(param_names[pi]);
1237 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1238 param_tbl->handle_access(param_names[pi]));
1240 ret->definitions = definitions;
1242 ret->node_name = node_name + suffix;
1243 ret->table_layout = table_layout->make_shallow_copy(ret->node_name);
1244 ret->merge_fieldpos = merge_fieldpos;
1249 std::vector<mrg_qpn *> split_sources();
1251 // the following method is used for distributed query optimization
1252 double get_rate_estimate();
1255 // get partition definition for merges that combine streams partitioned over multiple interfaces
1256 // return NULL for regular merges
1257 partn_def_t* get_partn_definition(map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result) {
1265 vector<tablevar_t *> input_tables = get_input_tbls();
1266 for (int i = 0; i < input_tables.size(); ++i) {
1267 tablevar_t * table = input_tables[i];
1269 vector<string> partn_names = ifaces_db->get_iface_vals(table->get_machine(), table->get_interface(),"iface_partition",err,err_str);
1270 if (partn_names.size() != 1) // can't have more than one value of partition attribute
1272 string new_partn_name = partn_names[0];
1274 // need to make sure that all ifaces belong to the same partition
1276 partn_name = new_partn_name;
1277 else if (new_partn_name != partn_name)
1281 // now find partition definition corresponding to partn_name
1282 partn_def = partn_parse_result->get_partn_def(partn_name);
1286 void set_partn_definition(partn_def_t* def) {
1290 bool is_multihost_merge() {
1292 bool is_multihost = false;
1294 // each input table must be have machine attribute be non-empty
1295 // and there should be at least 2 different values of machine attributes
1296 vector<tablevar_t *> input_tables = get_input_tbls();
1297 string host = input_tables[0]->get_machine();
1298 for (int i = 1; i < input_tables.size(); ++i) {
1299 string new_host = input_tables[i]->get_machine();
1302 if (new_host != host)
1303 is_multihost = true;
1305 return is_multihost;
1308 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1312 // eq_temporal, hash join query plan node.
1313 // represent the following query fragment
1314 // select scalar_expression_1, ..., scalar_expression_k
1315 // from T0 t0, T1 t1
1318 // the predicates and the scalar expressions can reference
1319 // attributes of t0 and t1 and also functions.
1320 // The predicate must contain CNF elements to enable the
1321 // efficient evaluation of the query.
1322 // 1) at least one predicate of the form
1323 // (temporal se in t0) = (temporal se in t1)
1324 // 2) at least one predicate of the form
1325 // (non-temporal se in t0) = (non-temporal se in t1)
1327 class join_eq_hash_qpn: public qp_node{
1329 std::vector<tablevar_t *> from; // Source tables
1330 std::vector<select_element *> select_list; // Select list
1331 std::vector<cnf_elem *> prefilter[2]; // source prefilters
1332 std::vector<cnf_elem *> temporal_eq; // define temporal window
1333 std::vector<cnf_elem *> hash_eq; // define hash key
1334 std::vector<cnf_elem *> postfilter; // final filter on hash matches.
1336 std::vector<cnf_elem *> where; // all the filters
1337 // useful for summary analysis
1339 std::vector<scalarexp_t *> hash_src_r, hash_src_l;
1341 std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
1342 std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
1344 std::string node_type(){return("join_eq_hash_qpn"); };
1345 bool makes_transform(){return true;};
1346 std::vector<std::string> external_libs(){
1347 std::vector<std::string> ret;
1351 void bind_to_schema(table_list *Schema);
1352 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
1353 fprintf(stderr,"INTERNAL ERROR, calling join_eq_hash_qpn::get_colrefs\n");
1357 void append_to_where(cnf_elem *c){
1358 fprintf(stderr, "Error, append_to_where called on join_hash_qpn, not supported, query is %s\n",node_name.c_str());
1363 std::string to_query_string();
1364 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
1365 std::string generate_functor_name();
1366 std::string generate_operator(int i, std::string params);
1367 std::string get_include_file(){return("#include <join_eq_hash_operator.h>\n");};
1369 std::vector<select_element *> get_select_list(){return select_list;};
1370 std::vector<scalarexp_t *> get_select_se_list(){
1371 std::vector<scalarexp_t *> ret;
1373 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
1376 // Used for LFTA only
1377 std::vector<cnf_elem *> get_where_clause(){
1378 std::vector<cnf_elem *> t;
1381 std::vector<cnf_elem *> get_filter_clause(){
1382 return get_where_clause();
1385 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1386 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1388 table_def *get_fields();
1390 // It might be feasible to find keys in an equijoin expression.
1391 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
1392 std::vector<string> ret;
1396 std::vector<tablevar_t *> get_input_tbls();
1397 std::vector<tablevar_t *> get_output_tbls();
1399 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
1400 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
1401 // Ensure that any refs to interface params have been split away.
1402 int count_ifp_refs(std::set<std::string> &ifpnames);
1406 join_eq_hash_qpn(query_summary_class *qs,table_list *Schema){
1408 // Get the table name.
1409 // NOTE the colrefs have the table ref (an int)
1410 // embedded in them. Would it make sense
1411 // to grab the whole table list?
1412 from = qs->fta_tree->get_from()->get_table_list();
1413 if(from.size() != 2){
1415 sprintf(tmpstr,"ERROR building join_eq_hash node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
1420 // Get the select list.
1421 select_list = qs->fta_tree->get_sl_vec();
1423 // Get the selection predicate.
1425 for(w=0;w<where.size();++w){
1426 analyze_cnf(where[w]);
1427 std::vector<int> pred_tbls;
1428 get_tablevar_ref_pr(where[w]->pr,pred_tbls);
1429 // Prefilter if refs only one tablevar
1430 if(pred_tbls.size()==1){
1431 prefilter[pred_tbls[0]].push_back(where[w]);
1434 // refs nothing -- might be sampling, do it as postfilter.
1435 if(pred_tbls.size()==0){
1436 postfilter.push_back(where[w]);
1439 // See if it can be a hash or temporal predicate.
1440 // NOTE: synchronize with the temporality checking
1441 // done at join_eq_hash_qpn::get_fields
1442 if(where[w]->is_atom && where[w]->eq_pred){
1443 std::vector<int> sel_tbls, ser_tbls;
1444 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
1445 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
1446 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
1447 // make channel 0 SE on LHS.
1448 if(sel_tbls[0] != 0)
1449 where[w]->pr->swap_scalar_operands();
1451 data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
1452 data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
1453 if( (dtl->is_increasing() && dtr->is_increasing()) ||
1454 (dtl->is_decreasing() && dtr->is_decreasing()) )
1455 temporal_eq.push_back(where[w]);
1457 hash_eq.push_back(where[w]);
1462 // All tests failed, fallback is postfilter.
1463 postfilter.push_back(where[w]);
1466 if(temporal_eq.size()==0){
1467 err_str = "ERROR in join query: can't find temporal equality predicate to define a join window.\n";
1471 // Get the parameters
1472 param_tbl = qs->param_tbl;
1476 // the following method is used for distributed query optimization
1477 double get_rate_estimate();
1480 qp_node* make_copy(std::string suffix){
1481 join_eq_hash_qpn *ret = new join_eq_hash_qpn();
1483 ret->param_tbl = new param_table();
1484 std::vector<std::string> param_names = param_tbl->get_param_names();
1486 for(pi=0;pi<param_names.size();pi++){
1487 data_type *dt = param_tbl->get_data_type(param_names[pi]);
1488 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1489 param_tbl->handle_access(param_names[pi]));
1491 ret->definitions = definitions;
1493 ret->node_name = node_name + suffix;
1495 // make shallow copy of all fields
1498 ret->select_list = select_list;
1499 ret->prefilter[0] = prefilter[0];
1500 ret->prefilter[1] = prefilter[1];
1501 ret->postfilter = postfilter;
1502 ret->temporal_eq = temporal_eq;
1503 ret->hash_eq = hash_eq;
1507 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1512 // ---------------------------------------------
1513 // eq_temporal, hash join query plan node.
1514 // represent the following query fragment
1515 // select scalar_expression_1, ..., scalar_expression_k
1516 // FILTER_JOIN(col, range) from T0 t0, T1 t1
1519 // t0 is the output range variable, t1 is the filtering range
1520 // variable. Both must alias a PROTOCOL.
1521 // The scalar expressions in the select clause may
1522 // reference t0 only.
1523 // The predicates are classified as follows
1524 // prefilter predicates:
1525 // a cheap predicate in t0 such that there is an equivalent
1526 // predicate in t1. Cost decisions about pushing to
1527 // lfta prefilter made later.
1528 // t0 predicates (other than prefilter predicates)
1529 // -- cheap vs. expensive sorted out at genereate time,
1530 // the constructor isn't called with the function list.
1531 // t1 predicates (other than prefiler predicates).
1532 // equi-join predicates of the form:
1533 // (se in t0) = (se in t1)
1535 // There must be at least one equi-join predicate.
1536 // No join predicates other than equi-join predicates
1538 // Warn on temporal equi-join predicates.
1539 // t1 predicates should not be expensive ... warn?
1541 class filter_join_qpn: public qp_node{
1543 std::vector<tablevar_t *> from; // Source tables
1544 colref_t *temporal_var; // join window in FROM
1545 unsigned int temporal_range; // metadata.
1546 std::vector<select_element *> select_list; // Select list
1547 std::vector<cnf_elem *> shared_pred; // prefilter preds
1548 std::vector<cnf_elem *> pred_t0; // main (R) preds
1549 std::vector<cnf_elem *> pred_t1; // filtering (S) preds
1550 std::vector<cnf_elem *> hash_eq; // define hash key
1551 std::vector<cnf_elem *> postfilter; // ref's no table.
1553 std::vector<cnf_elem *> where; // all the filters
1554 // useful for summary analysis
1556 std::vector<scalarexp_t *> hash_src_r, hash_src_l;
1557 std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
1558 std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
1561 bool use_bloom; // true => bloom filter, false => limited hash
1563 std::string node_type(){return("filter_join"); };
1564 bool makes_transform(){return true;};
1565 std::vector<std::string> external_libs(){
1566 std::vector<std::string> ret;
1570 void bind_to_schema(table_list *Schema);
1571 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
1573 std::string to_query_string();
1574 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform){
1575 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor called\n");
1578 std::string generate_functor_name(){
1579 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor_name called\n");
1582 std::string generate_operator(int i, std::string params){
1583 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_operator called\n");
1586 std::string get_include_file(){return("#include <join_eq_hash_operator.h>\n");};
1588 std::vector<select_element *> get_select_list(){return select_list;};
1589 std::vector<scalarexp_t *> get_select_se_list(){
1590 std::vector<scalarexp_t *> ret;
1592 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
1595 // Used for LFTA only
1596 void append_to_where(cnf_elem *c){
1600 std::vector<cnf_elem *> get_where_clause(){return where;}
1601 std::vector<cnf_elem *> get_filter_clause(){return shared_pred;}
1603 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1604 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1606 table_def *get_fields();
1607 // It should be feasible to find keys in a filter join
1608 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
1609 std::vector<string> ret;
1613 std::vector<tablevar_t *> get_input_tbls();
1614 std::vector<tablevar_t *> get_output_tbls();
1616 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
1617 int resolve_if_params(ifq_t *ifdb, std::string &err);
1619 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
1620 // Ensure that any refs to interface params have been split away.
1621 int count_ifp_refs(std::set<std::string> &ifpnames);
1626 filter_join_qpn(query_summary_class *qs,table_list *Schema){
1628 // Get the table name.
1629 // NOTE the colrefs have the table ref (an int)
1630 // embedded in them. Would it make sense
1631 // to grab the whole table list?
1632 from = qs->fta_tree->get_from()->get_table_list();
1633 temporal_var = qs->fta_tree->get_from()->get_colref();
1634 temporal_range = qs->fta_tree->get_from()->get_temporal_range();
1635 if(from.size() != 2){
1637 sprintf(tmpstr,"ERROR building filter_join_qpn node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
1641 if(from[0]->get_interface() != from[1]->get_interface()){
1642 err_str += "Error building filter_join_qpn node: all range variables must be sourced from the same interface or interface set ("+from[0]->get_interface()+" vs. "+from[1]->get_interface()+")\n";
1646 // Get the select list.
1647 select_list = qs->fta_tree->get_sl_vec();
1648 // Verify that only t0 is referenced.
1649 bool bad_ref = false;
1650 for(i=0;i<select_list.size();i++){
1651 vector<int> sel_tbls;
1652 get_tablevar_ref_se(select_list[i]->se,sel_tbls);
1653 if((sel_tbls.size() == 2) || (sel_tbls.size()==1 && sel_tbls[0]==1))
1657 err_str += "ERROR building filter_join_qpn node: query references range variable "+from[1]->variable_name+", but only the first range variable ("+from[0]->variable_name+" can be referenced.\n";
1662 // Get the selection predicate.
1664 std::vector<cnf_elem *> t0_only, t1_only;
1665 for(w=0;w<where.size();++w){
1666 analyze_cnf(where[w]);
1667 std::vector<int> pred_tbls;
1668 get_tablevar_ref_pr(where[w]->pr,pred_tbls);
1669 // Collect the list of preds by src var,
1670 // extract the shared preds later.
1671 if(pred_tbls.size()==1){
1672 if(pred_tbls[0] == 0){
1673 t0_only.push_back(where[w]);
1675 t1_only.push_back(where[w]);
1679 // refs nothing -- might be sampling, do it as postfilter.
1680 if(pred_tbls.size()==0){
1681 postfilter.push_back(where[w]);
1684 // See if it can be a hash or temporal predicate.
1685 // NOTE: synchronize with the temporality checking
1686 // done at join_eq_hash_qpn::get_fields
1687 if(where[w]->is_atom && where[w]->eq_pred){
1688 std::vector<int> sel_tbls, ser_tbls;
1689 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
1690 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
1691 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
1692 // make channel 0 SE on LHS.
1693 if(sel_tbls[0] != 0)
1694 where[w]->pr->swap_scalar_operands();
1696 hash_eq.push_back(where[w]);
1698 data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
1699 data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
1700 if( (dtl->is_increasing() && dtr->is_increasing()) ||
1701 (dtl->is_decreasing() && dtr->is_decreasing()) )
1702 err_str += "Warning, a filter join should not have join predicates on temporal fields.\n";
1707 // All tests failed, fallback is postfilter.
1708 err_str += "ERROR, join predicates in a filter join should have the form (scalar expression in "+from[0]->variable_name+") = (scalar expression in "+from[1]->variable_name+").\n";
1711 // Classify the t0_only and t1_only preds.
1712 set<int> matched_pred;
1714 for(w=0;w<t0_only.size();w++){
1715 for(v=0;v<t1_only.size();++v)
1716 if(is_equivalent_pred_base(t0_only[w]->pr,t1_only[v]->pr,Schema))
1718 if(v<t1_only.size()){
1719 shared_pred.push_back(t0_only[w]);
1720 matched_pred.insert(v);
1722 pred_t0.push_back(t0_only[w]);
1725 for(v=0;v<t1_only.size();++v){
1726 if(matched_pred.count(v) == 0)
1727 pred_t1.push_back(t1_only[v]);
1731 // Get the parameters
1732 param_tbl = qs->param_tbl;
1733 definitions = qs->definitions;
1735 // Determine the algorithm
1736 if(this->get_val_of_def("algorithm") == "hash"){
1743 // the following method is used for distributed query optimization
1744 double get_rate_estimate();
1747 qp_node* make_copy(std::string suffix){
1748 filter_join_qpn *ret = new filter_join_qpn();
1750 ret->param_tbl = new param_table();
1751 std::vector<std::string> param_names = param_tbl->get_param_names();
1753 for(pi=0;pi<param_names.size();pi++){
1754 data_type *dt = param_tbl->get_data_type(param_names[pi]);
1755 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1756 param_tbl->handle_access(param_names[pi]));
1758 ret->definitions = definitions;
1760 ret->node_name = node_name + suffix;
1762 // make shallow copy of all fields
1765 ret->temporal_range = temporal_range;
1766 ret->temporal_var = temporal_var;
1767 ret->select_list = select_list;
1768 ret->shared_pred = shared_pred;
1769 ret->pred_t0 = pred_t0;
1770 ret->pred_t1 = pred_t1;
1771 ret->postfilter = postfilter;
1772 ret->hash_eq = hash_eq;
1776 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1781 enum output_file_type_enum {regular, gzip, bzip};
1783 class output_file_qpn: public qp_node{
1785 std::string source_op_name; // Source table
1786 std::vector<field_entry *> fields;
1787 ospec_str *output_spec;
1788 vector<tablevar_t *> fm;
1789 std::string hfta_query_name;
1790 std::string filestream_id;
1792 std::vector<std::string> params;
1794 output_file_type_enum compression_type;
1796 int n_streams; // Number of output streams
1797 int n_hfta_clones; // number of hfta clones
1798 int parallel_idx; // which close this produces output for.
1799 std::vector<int> hash_flds; // fields used to hash the output.
1801 std::string node_type(){return("output_file_qpn"); };
1802 bool makes_transform(){return false;};
1803 std::vector<std::string> external_libs(){
1804 std::vector<std::string> ret;
1805 switch(compression_type){
1807 ret.push_back("-lz");
1810 ret.push_back("-lbz2");
1818 void append_to_where(cnf_elem *c){
1819 fprintf(stderr, "ERROR, append_to_where called on output_file_qpn, not supported, query %s.\n", node_name.c_str());
1825 void bind_to_schema(table_list *Schema){}
1826 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
1831 std::string to_query_string(){return "// output_file_operator \n";}
1832 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
1833 std::string generate_functor_name();
1834 std::string generate_operator(int i, std::string params);
1835 std::string get_include_file(){
1836 switch(compression_type){
1838 return("#include <zfile_output_operator.h>\n");
1840 return("#include <file_output_operator.h>\n");
1842 return("#include <file_output_operator.h>\n");
1845 std::vector<cnf_elem *> get_where_clause(){std::vector<cnf_elem *> ret; return ret;};
1846 std::vector<cnf_elem *> get_filter_clause(){std::vector<cnf_elem *> ret; return ret;};
1847 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){cplx_lit_table *ret = new cplx_lit_table(); return ret;}
1848 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns){std::vector<handle_param_tbl_entry *> ret; return ret;}
1850 table_def *get_fields(){
1851 field_entry_list *fel = new field_entry_list();
1853 for(i=0;i<fields.size();++i)
1854 fel->append_field(fields[i]);
1855 return new table_def(node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1858 // TODO! either bypass the output operator in stream_query,
1859 // or propagate key information when the output operator is constructed.
1860 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
1861 std::vector<string> ret;
1865 std::vector<tablevar_t *> get_input_tbls();
1866 std::vector<tablevar_t *> get_output_tbls();
1868 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
1869 std::vector<qp_node *> ret; ret.push_back(this); hfta_returned = true; return ret;
1871 std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm){
1872 std::vector<table_exp_t *> ret; return ret;
1874 // Ensure that any refs to interface params have been split away.
1875 int count_ifp_refs(std::set<std::string> &ifpnames){return 0;}
1876 int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;};
1879 output_file_qpn(std::string src_op, std::string qn, std::string fs_id, table_def *src_tbl_def, ospec_str *ospec, bool ei){
1880 source_op_name = src_op;
1881 node_name = source_op_name + "_output";
1882 filestream_id = fs_id;
1883 fields = src_tbl_def->get_fields();
1884 output_spec = ospec;
1885 fm.push_back(new tablevar_t(source_op_name.c_str()));
1886 hfta_query_name = qn;
1890 compression_type = regular;
1891 if(ospec->operator_type == "zfile")
1892 compression_type = gzip;
1899 strncpy(buf, output_spec->operator_param.c_str(),1000);
1902 int nwords = split_string(buf, ':', words,100);
1904 for(i=0;i<nwords;i++){
1905 params.push_back(words[i]);
1907 for(i=0;i<params.size();i++){
1908 if(params[i] == "gzip")
1913 // Set output splitting parameters
1914 bool set_splitting_params(int np, int ix, int ns, std::string split_flds, std::string &err_report){
1919 if(split_flds != ""){
1920 string err_flds = "";
1921 char *tmpstr = strdup(split_flds.c_str());
1923 int nwords = split_string(tmpstr,':',words,100);
1925 for(i=0;i<nwords;++i){
1926 string target = words[i];
1927 for(j=0;j<fields.size();++j){
1928 if(fields[j]->get_name() == target){
1929 hash_flds.push_back(j);
1933 if(j==fields.size()){
1934 err_flds += " "+target;
1938 err_report += "ERROR in "+hfta_query_name+", a file output operator needs to split the output but these splitting fileds are not part of the output:"+err_flds;
1945 // the following method is used for distributed query optimization
1946 double get_rate_estimate(){return 1.0;}
1949 qp_node* make_copy(std::string suffix){
1950 // output_file_qpn *ret = new output_file_qpn();
1951 output_file_qpn *ret = new output_file_qpn(source_op_name, hfta_query_name, filestream_id, this->get_fields(), output_spec, eat_input);
1955 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){}
1963 // ---------------------------------------------
1966 // Select, group-by, aggregate, sampling.
1968 // Select SE_1, ..., SE_k
1971 // Group By gb1, ..., gb_n
1972 // [Subgroup gb_i1, .., gb_ik]
1973 // Cleaning_when predicate
1974 // Cleaning_by predicate
1977 // For now, must have group-by variables and aggregates.
1978 // The scalar expressions which are output must be a function
1979 // of the groub-by variables and the aggregates.
1980 // The group-by variables can be references to columsn of T,
1981 // or they can be scalar expressions.
1982 class sgahcwcb_qpn: public qp_node{
1984 tablevar_t *table_name; // source table
1985 std::vector<cnf_elem *> where; // selection predicate
1986 std::vector<cnf_elem *> having; // post-aggregation predicate
1987 std::vector<select_element *> select_list; // se's of output
1988 gb_table gb_tbl; // Table of all group-by attributes.
1989 std::set<int> sg_tbl; // Names of the superGB attributes
1990 aggregate_table aggr_tbl; // Table of all referenced aggregates.
1991 std::set<std::string> states_refd; // states ref'd by stateful fcns.
1992 std::vector<cnf_elem *> cleanby;
1993 std::vector<cnf_elem *> cleanwhen;
1995 std::vector<scalarexp_t *> gb_sources; // pre-compute for partitioning.
1997 std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
1999 std::string node_type(){return("sgahcwcb_qpn"); };
2000 bool makes_transform(){return true;};
2001 std::vector<std::string> external_libs(){
2002 std::vector<std::string> ret;
2006 void bind_to_schema(table_list *Schema);
2007 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
2008 fprintf(stderr,"INTERNAL ERROR, calling sgahcwcb_qpn::get_colrefs\n");
2012 std::string to_query_string();
2013 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
2014 std::string generate_functor_name();
2016 std::string generate_operator(int i, std::string params);
2017 std::string get_include_file(){return("#include <clean_operator.h>\n");};
2019 std::vector<select_element *> get_select_list(){return select_list;};
2020 std::vector<scalarexp_t *> get_select_se_list(){
2021 std::vector<scalarexp_t *> ret;
2023 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
2026 std::vector<cnf_elem *> get_where_clause(){return where;};
2027 std::vector<cnf_elem *> get_filter_clause(){return where;};
2028 std::vector<cnf_elem *> get_having_clause(){return having;};
2029 gb_table *get_gb_tbl(){return &gb_tbl;};
2030 aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
2031 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
2032 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
2034 // table which represents output tuple.
2035 table_def *get_fields();
2036 // TODO Key extraction should be feasible but I'll defer the issue.
2037 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
2038 std::vector<string> ret;
2042 std::vector<tablevar_t *> get_input_tbls();
2043 std::vector<tablevar_t *> get_output_tbls();
2045 void append_to_where(cnf_elem *c){
2052 sgahcwcb_qpn(query_summary_class *qs,table_list *Schema){
2053 // Get the table name.
2054 // NOTE the colrefs have the tablevar ref (an int)
2055 // embedded in them. Would it make sense
2056 // to grab the whole table list?
2057 tablevar_list_t *fm = qs->fta_tree->get_from();
2058 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
2059 if(tbl_vec.size() != 1){
2061 sprintf(tmpstr,"INTERNAL ERROR building SGAHCWCB node: query defined over %lu tables.\n",tbl_vec.size() );
2065 table_name = (tbl_vec[0]);
2067 // Get the select list.
2068 select_list = qs->fta_tree->get_sl_vec();
2070 // Get the selection and having predicates.
2072 having = qs->hav_cnf;
2073 cleanby = qs->cb_cnf;
2074 cleanwhen = qs->cw_cnf;
2076 // Build a new GB var table (don't share, might need to modify)
2078 for(g=0;g<qs->gb_tbl->size();g++){
2079 gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
2080 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
2081 qs->gb_tbl->get_reftype(g)
2085 // Build a new aggregate table. (don't share, might need
2088 for(a=0;a<qs->aggr_tbl->size();a++){
2090 // qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
2091 qs->aggr_tbl->duplicate(a)
2095 sg_tbl = qs->sg_tbl;
2096 states_refd = qs->states_refd;
2099 // Get the parameters
2100 param_tbl = qs->param_tbl;
2106 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
2107 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
2108 // Ensure that any refs to interface params have been split away.
2109 // CURRENTLY not allowed by split_node_for_fta
2110 int count_ifp_refs(std::set<std::string> &ifpnames){return 0;}
2111 int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;}
2113 // the following method is used for distributed query optimization
2114 double get_rate_estimate();
2116 qp_node* make_copy(std::string suffix){
2117 sgahcwcb_qpn *ret = new sgahcwcb_qpn();
2119 ret->param_tbl = new param_table();
2120 std::vector<std::string> param_names = param_tbl->get_param_names();
2122 for(pi=0;pi<param_names.size();pi++){
2123 data_type *dt = param_tbl->get_data_type(param_names[pi]);
2124 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
2125 param_tbl->handle_access(param_names[pi]));
2127 ret->definitions = definitions;
2129 ret->node_name = node_name + suffix;
2131 // make shallow copy of all fields
2133 ret->having = having;
2134 ret->select_list = select_list;
2135 ret->gb_tbl = gb_tbl;
2136 ret->aggr_tbl = aggr_tbl;
2137 ret->sg_tbl = sg_tbl;
2138 ret->states_refd = states_refd;
2139 ret->cleanby = cleanby;
2140 ret->cleanwhen = cleanwhen;
2145 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
2149 std::vector<qp_node *> create_query_nodes(query_summary_class *qs,table_list *Schema);
2153 void untaboo(string &s);
2155 table_def *create_attributes(string tname, vector<select_element *> &select_list);