1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
15 #ifndef __QUERY_PLAN_H__
16 #define __QUERY_PLAN_H__
23 #include"analyze_fta.h"
25 #include"parse_partn.h"
26 #include"generate_utils.h"
28 // Identify the format of the input, output streams.
29 #define UNKNOWNFORMAT 0
33 ///////////////////////////////////////////////////
34 // representation of an output operator specification
39 string operator_param;
40 string output_directory;
42 string partitioning_flds;
47 ////////////////////////////////////////////////////
48 // Input representation of a query
52 std::set<int> reads_from;
53 std::set<int> sources_to;
54 std::vector<std::string> refd_tbls;
55 std::vector<var_pair_t *> params;
58 std::string mangler; // for UDOPs
60 table_exp_t *parse_tree;
63 bool is_externally_visible;
64 bool inferred_visible_node;
66 set<int> subtree_roots;
73 is_externally_visible = false;
74 inferred_visible_node = false;
77 query_node(int i, std::string qnm, std::string flnm, table_exp_t *pt){
85 is_externally_visible = pt->get_visible();
86 inferred_visible_node = false;
89 tablevar_list_t *fm = parse_tree->get_from();
91 refd_tbls = fm->get_table_names();
94 params = pt->query_params;
96 query_node(int ix, std::string udop_name,table_list *Schema){
104 is_externally_visible = true;
105 inferred_visible_node = false;
108 int sid = Schema->find_tbl(udop_name);
109 std::vector<subquery_spec *> subq = Schema->get_subqueryspecs(sid);
111 for(i=0;i<subq.size();++i){
112 refd_tbls.push_back(subq[i]->name);
119 std::string source_name;
120 std::vector<int> query_node_indices;
121 std::set<int> reads_from;
122 std::set<int> sources_to;
124 bool inferred_visible_node;
127 bool do_generation; // false means, ignore it.
131 inferred_visible_node = false;
134 do_generation = true;
146 // the following selectivity estimates are used by our primitive rate estimators
147 #define SPX_SELECTIVITY 1.0
148 #define SGAH_SELECTIVITY 0.1
149 #define RSGAH_SELECTIVITY 0.1
150 #define SGAHCWCB_SELECTIVITY 0.1
151 #define MRG_SELECTIVITY 1.0
152 #define JOIN_EQ_HASH_SELECTIVITY 1.0
154 // the the output rate of the interface is not given we are going to use
155 // this default value
156 #define DEFAULT_INTERFACE_RATE 100
159 // Define query plan nodes
160 // These nodes are intended for query modeling
161 // and transformation rather than for code generation.
164 // Query plan node base class.
165 // It has an ID, can return its type,
166 // and can be linked into lists with the predecessors
168 // To add : serialize, unserialize?
173 std::vector<int> predecessors;
174 std::vector<int> successors;
175 std::string node_name;
177 // For error reporting without exiting the program.
181 // These should be moved to the containing stream_query object.
182 std::map<std::string, std::string> definitions;
183 param_table *param_tbl;
185 // The value of a field in terms of protocol fields (if any).
186 std::map<std::string, scalarexp_t *> protocol_map;
191 param_tbl = new param_table();
196 param_tbl = new param_table();
199 int get_id(){return(id);};
200 void set_id(int i){id = i; };
202 int get_error_code(){return error_code;};
203 std::string get_error_str(){return err_str;};
205 virtual std::string node_type() = 0;
207 // For code generation, does the operator xform its input.
208 virtual bool makes_transform() = 0;
210 // For linking, what external libraries does the operator depend on?
211 virtual std::vector<std::string> external_libs() = 0;
213 void set_node_name(std::string n){node_name = n;};
214 std::string get_node_name(){return node_name;};
216 void set_definitions(std::map<std::string, std::string> &def){
219 std::map<std::string, std::string> get_definitions(){return definitions;};
222 // call to create the mapping from field name to se in protocol fields.
223 // Pass in qp_node of data sources, in order.
224 virtual void create_protocol_se(std::vector<qp_node *> q_sources,table_list *Schema)=0;
225 // get the protocol map. the parameter is the return value.
226 std::map<std::string, scalarexp_t *> *get_protocol_se(){return &protocol_map;}
228 // Each qp node must be able to return a description
229 // of the tuples it creates.
230 // TODO: the get_output_tbls method should subsume the get_fields
231 // method, but in fact it really just returns the
233 virtual table_def *get_fields() = 0; // Should be vector?
234 // get keys from the operator. Currently, only works on group-by queries.
235 // partial_keys set to true if there is a suspicion that the list is partial.
236 virtual std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys) = 0;
237 // Get the from clause
238 virtual std::vector<tablevar_t *> get_input_tbls() = 0;
239 // this is a confused function, it acutally return the output
241 virtual std::vector<tablevar_t *> get_output_tbls() = 0;
243 std::string get_val_of_def(std::string def){
244 if(definitions.count(def) > 0) return definitions[def];
247 void set_definition(std::string def, std::string val){
248 definitions[def]=val;
251 // Associate colrefs in SEs with tables
252 // at code generation time.
253 virtual void bind_to_schema(table_list *Schema) = 0;
255 // Get colrefs of the operator, currently only meaningful for lfta
256 // operators, and only interested in colrefs with extraction fcns
257 virtual col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema)=0;
259 virtual std::string to_query_string() = 0;
260 virtual std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform) = 0;
261 virtual std::string generate_functor_name() = 0;
263 virtual std::string generate_operator(int i, std::string params) = 0;
264 virtual std::string get_include_file() = 0;
266 virtual cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns) = 0;
267 virtual std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns) = 0;
269 // Split this node into LFTA and HFTA nodes.
270 // Four possible outcomes:
271 // 1) the qp_node reads from a protocol, but does not need to
272 // split (can be evaluated as an LFTA).
273 // The lfta node is the only element in the return vector,
274 // and hfta_returned is false.
275 // 2) the qp_node reads from no protocol, and therefore cannot be split.
276 // THe hfta node is the only element in the return vector,
277 // and hfta_returned is true.
278 // 3) reads from at least one protocol, but cannot be split : failure.
279 // return vector is empty, the error conditions are written
281 // 4) The qp_node splits into an hfta node and one or more LFTA nodes.
282 // the return vector has two or more elements, and hfta_returned
283 // is true. The last element is the HFTA.
284 virtual std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx) = 0;
287 // Ensure that any refs to interface params have been split away.
288 virtual int count_ifp_refs(std::set<std::string> &ifpnames)=0;
292 // Tag the data sources which are views,
293 // return the (optimized) source queries and
294 // record the view access in opview_set
295 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm) = 0;
297 param_table *get_param_tbl(){return param_tbl;};
299 // The "where" clause is a pre-filter
300 virtual std::vector<cnf_elem *> get_where_clause() = 0;
301 // To be more explicit, use get_filter_preds, this is used to compute the prefilter
302 virtual std::vector<cnf_elem *> get_filter_clause() = 0;
304 // Add an extra predicate. Currently only used for LFTAs.
305 virtual void append_to_where(cnf_elem *c) = 0;
307 void add_predecessor(int i){predecessors.push_back(i);};
308 void remove_predecessor(int i){
309 std::vector<int>::iterator vi;
310 for(vi=predecessors.begin(); vi!=predecessors.end();++vi){
312 predecessors.erase(vi);
317 void add_successor(int i){successors.push_back(i);};
318 std::vector<int> get_predecessors(){return predecessors;};
319 int n_predecessors(){return predecessors.size();};
320 std::vector<int> get_successors(){return successors;};
321 int n_successors(){return successors.size();};
322 void clear_predecessors(){predecessors.clear();};
323 void clear_successors(){successors.clear();};
325 // the following method is used for distributed query optimization
326 double get_rate_estimate();
329 // used for cloning query nodes
330 virtual qp_node* make_copy(std::string suffix) = 0;
335 // Select, project, transform (xform) query plan node.
336 // represent the following query fragment
337 // select scalar_expression_1, ..., scalar_expression_k
341 // the predicates and the scalar expressions can reference
342 // attributes of S and also functions.
343 class spx_qpn: public qp_node{
345 tablevar_t *table_name; // Source table
346 std::vector<cnf_elem *> where; // selection predicate
347 std::vector<select_element *> select_list; // Select list
351 std::string node_type(){return("spx_qpn"); };
352 bool makes_transform(){return true;};
353 std::vector<std::string> external_libs(){
354 std::vector<std::string> ret;
358 void append_to_where(cnf_elem *c){
363 void bind_to_schema(table_list *Schema);
364 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
366 std::string to_query_string();
367 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
368 std::string generate_functor_name();
369 std::string generate_operator(int i, std::string params);
370 std::string get_include_file(){return("#include <selection_operator.h>\n");};
372 std::vector<select_element *> get_select_list(){return select_list;};
373 std::vector<scalarexp_t *> get_select_se_list(){
374 std::vector<scalarexp_t *> ret;
376 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
379 std::vector<cnf_elem *> get_where_clause(){return where;};
380 std::vector<cnf_elem *> get_filter_clause(){return where;};
381 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
382 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
384 table_def *get_fields();
385 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
386 std::vector<string> ret;
390 std::vector<tablevar_t *> get_input_tbls();
391 std::vector<tablevar_t *> get_output_tbls();
393 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
394 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
395 // Ensure that any refs to interface params have been split away.
396 int count_ifp_refs(std::set<std::string> &ifpnames);
397 int resolve_if_params(ifq_t *ifdb, std::string &err);
401 spx_qpn(query_summary_class *qs,table_list *Schema){
402 // Get the table name.
403 // NOTE the colrefs have the table ref (an int)
404 // embedded in them. Would it make sense
405 // to grab the whole table list?
406 tablevar_list_t *fm = qs->fta_tree->get_from();
408 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
409 if(tbl_vec.size() != 1){
411 sprintf(tmpstr,"INTERNAL ERROR building SPX node: query defined over %lu tables.\n",tbl_vec.size() );
415 table_name = (tbl_vec[0]);
417 int t = tbl_vec[0]->get_schema_ref();
418 if(! Schema->is_stream(t)){
419 err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
423 // Get the select list.
424 select_list = qs->fta_tree->get_sl_vec();
426 // Get the selection predicate.
430 // Get the parameters
431 param_tbl = qs->param_tbl;
437 // the following method is used for distributed query optimization
438 double get_rate_estimate();
441 qp_node* make_copy(std::string suffix){
442 spx_qpn *ret = new spx_qpn();
444 ret->param_tbl = new param_table();
445 std::vector<std::string> param_names = param_tbl->get_param_names();
447 for(pi=0;pi<param_names.size();pi++){
448 data_type *dt = param_tbl->get_data_type(param_names[pi]);
449 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
450 param_tbl->handle_access(param_names[pi]));
452 ret->definitions = definitions;
453 ret->node_name = node_name + suffix;
455 // make shallow copy of all fields
457 ret->select_list = select_list;
461 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
467 // Select, group-by, aggregate.
469 // Select SE_1, ..., SE_k
472 // Group By gb1, ..., gb_n
475 // NOTE : the samlping operator is sgahcwcb_qpn.
477 // For now, must have group-by variables and aggregates.
478 // The scalar expressions which are output must be a function
479 // of the groub-by variables and the aggregates.
480 // The group-by variables can be references to columsn of T,
481 // or they can be scalar expressions.
482 class sgah_qpn: public qp_node{
484 tablevar_t *table_name; // source table
485 std::vector<cnf_elem *> where; // selection predicate
486 std::vector<cnf_elem *> having; // post-aggregation predicate
487 std::vector<select_element *> select_list; // se's of output
488 gb_table gb_tbl; // Table of all group-by attributes.
489 aggregate_table aggr_tbl; // Table of all referenced aggregates.
491 std::vector<scalarexp_t *> gb_sources; // pre-compute for partitioning.
493 int lfta_disorder; // maximum disorder in the steam between lfta, hfta
494 int hfta_disorder; // maximum disorder in the hfta
496 // rollup, cube, and grouping_sets cannot be readily reconstructed by
497 // analyzing the patterns, so explicitly record them here.
498 // used only so that to_query_string produces something meaningful.
499 std::vector<std::string> gb_entry_type;
500 std::vector<int> gb_entry_count;
502 std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
504 std::string node_type(){return("sgah_qpn"); };
505 bool makes_transform(){return true;};
506 std::vector<std::string> external_libs(){
507 std::vector<std::string> ret;
511 void bind_to_schema(table_list *Schema);
512 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
514 std::string to_query_string();
515 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
516 std::string generate_functor_name();
518 std::string generate_operator(int i, std::string params);
519 std::string get_include_file(){
520 if(hfta_disorder <= 1){
521 return("#include <groupby_operator.h>\n");
523 return("#include <groupby_operator_oop.h>\n");
527 std::vector<select_element *> get_select_list(){return select_list;};
528 std::vector<scalarexp_t *> get_select_se_list(){
529 std::vector<scalarexp_t *> ret;
531 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
534 std::vector<cnf_elem *> get_where_clause(){return where;};
536 void append_to_where(cnf_elem *c){
540 std::vector<cnf_elem *> get_filter_clause(){return where;};
541 std::vector<cnf_elem *> get_having_clause(){return having;};
542 gb_table *get_gb_tbl(){return &gb_tbl;};
543 aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
544 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
545 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
547 // table which represents output tuple.
548 table_def *get_fields();
549 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys);
550 std::vector<tablevar_t *> get_input_tbls();
551 std::vector<tablevar_t *> get_output_tbls();
558 sgah_qpn(query_summary_class *qs,table_list *Schema){
562 // Get the table name.
563 // NOTE the colrefs have the tablevar ref (an int)
564 // embedded in them. Would it make sense
565 // to grab the whole table list?
566 tablevar_list_t *fm = qs->fta_tree->get_from();
567 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
568 if(tbl_vec.size() != 1){
570 sprintf(tmpstr,"INTERNAL ERROR building SGAH node: query defined over %lu tables.\n",tbl_vec.size() );
574 table_name = (tbl_vec[0]);
576 int t = tbl_vec[0]->get_schema_ref();
577 if(! Schema->is_stream(t)){
578 err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
583 // Get the select list.
584 select_list = qs->fta_tree->get_sl_vec();
586 // Get the selection and having predicates.
588 having = qs->hav_cnf;
590 // Build a new GB var table (don't share, might need to modify)
592 for(g=0;g<qs->gb_tbl->size();g++){
593 gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
594 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
595 qs->gb_tbl->get_reftype(g)
598 gb_tbl.set_pattern_info(qs->gb_tbl);
599 // gb_tbl.gb_entry_type = qs->gb_tbl->gb_entry_type;
600 // gb_tbl.gb_entry_count = qs->gb_tbl->gb_entry_count;
601 // gb_tbl.pattern_components = qs->gb_tbl->pattern_components;
603 // Build a new aggregate table. (don't share, might need
606 for(a=0;a<qs->aggr_tbl->size();a++){
608 // qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
609 qs->aggr_tbl->duplicate(a)
614 // Get the parameters
615 param_tbl = qs->param_tbl;
621 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
622 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
623 // Ensure that any refs to interface params have been split away.
624 int count_ifp_refs(std::set<std::string> &ifpnames);
625 int resolve_if_params(ifq_t *ifdb, std::string &err);
627 // the following method is used for distributed query optimization
628 double get_rate_estimate();
631 qp_node* make_copy(std::string suffix){
632 sgah_qpn *ret = new sgah_qpn();
634 ret->param_tbl = new param_table();
635 std::vector<std::string> param_names = param_tbl->get_param_names();
637 for(pi=0;pi<param_names.size();pi++){
638 data_type *dt = param_tbl->get_data_type(param_names[pi]);
639 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
640 param_tbl->handle_access(param_names[pi]));
642 ret->definitions = definitions;
644 ret->node_name = node_name + suffix;
646 // make shallow copy of all fields
648 ret->having = having;
649 ret->select_list = select_list;
650 ret->gb_tbl = gb_tbl;
651 ret->aggr_tbl = aggr_tbl;
656 // Split aggregation into two HFTA components - sub and superaggregation
657 // If unable to split the aggreagates, split into selection and aggregation
658 // If resulting low-level query is empty (e.g. when aggregates cannot be split and
659 // where clause is empty) empty vector willb e returned
660 virtual std::vector<qp_node *> split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema);
662 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
669 // Select, group-by, aggregate. with running aggregates
671 // Select SE_1, ..., SE_k
674 // Group By gb1, ..., gb_n
675 // Closing When predicate
678 // NOTE : the sampling operator is sgahcwcb_qpn.
680 // For now, must have group-by variables and aggregates.
681 // The scalar expressions which are output must be a function
682 // of the groub-by variables and the aggregates.
683 // The group-by variables can be references to columsn of T,
684 // or they can be scalar expressions.
685 class rsgah_qpn: public qp_node{
687 tablevar_t *table_name; // source table
688 std::vector<cnf_elem *> where; // selection predicate
689 std::vector<cnf_elem *> having; // post-aggregation predicate
690 std::vector<cnf_elem *> closing_when; // group closing predicate
691 std::vector<select_element *> select_list; // se's of output
692 gb_table gb_tbl; // Table of all group-by attributes.
693 aggregate_table aggr_tbl; // Table of all referenced aggregates.
695 std::vector<scalarexp_t *> gb_sources; // pre-compute for partitioning.
697 int lfta_disorder; // maximum disorder allowed in stream between lfta, hfta
698 int hfta_disorder; // maximum disorder allowed in hfta
700 std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
703 std::string node_type(){return("rsgah_qpn"); };
704 bool makes_transform(){return true;};
705 std::vector<std::string> external_libs(){
706 std::vector<std::string> ret;
710 void bind_to_schema(table_list *Schema);
711 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
712 fprintf(stderr,"INTERNAL ERROR, calling rsgah_qpn::get_colrefs\n");
716 std::string to_query_string();
717 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
718 std::string generate_functor_name();
720 std::string generate_operator(int i, std::string params);
721 std::string get_include_file(){return("#include <running_gb_operator.h>\n");};
723 std::vector<select_element *> get_select_list(){return select_list;};
724 std::vector<scalarexp_t *> get_select_se_list(){
725 std::vector<scalarexp_t *> ret;
727 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
730 std::vector<cnf_elem *> get_where_clause(){return where;};
731 void append_to_where(cnf_elem *c){
735 std::vector<cnf_elem *> get_filter_clause(){return where;};
736 std::vector<cnf_elem *> get_having_clause(){return having;};
737 std::vector<cnf_elem *> get_closing_when_clause(){return closing_when;};
738 gb_table *get_gb_tbl(){return &gb_tbl;};
739 aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
740 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
741 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
743 // table which represents output tuple.
744 table_def *get_fields();
745 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys);
747 std::vector<tablevar_t *> get_input_tbls();
748 std::vector<tablevar_t *> get_output_tbls();
755 rsgah_qpn(query_summary_class *qs,table_list *Schema){
759 // Get the table name.
760 // NOTE the colrefs have the tablevar ref (an int)
761 // embedded in them. Would it make sense
762 // to grab the whole table list?
763 tablevar_list_t *fm = qs->fta_tree->get_from();
764 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
765 if(tbl_vec.size() != 1){
767 sprintf(tmpstr,"INTERNAL ERROR buildingR SGAH node: query defined over %lu tables.\n",tbl_vec.size() );
771 table_name = (tbl_vec[0]);
773 int t = tbl_vec[0]->get_schema_ref();
774 if(! Schema->is_stream(t)){
775 err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
779 // Get the select list.
780 select_list = qs->fta_tree->get_sl_vec();
782 // Get the selection and having predicates.
784 having = qs->hav_cnf;
785 closing_when = qs->closew_cnf;
787 // Build a new GB var table (don't share, might need to modify)
789 for(g=0;g<qs->gb_tbl->size();g++){
790 gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
791 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
792 qs->gb_tbl->get_reftype(g)
796 // Build a new aggregate table. (don't share, might need
799 for(a=0;a<qs->aggr_tbl->size();a++){
801 // qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
802 qs->aggr_tbl->duplicate(a)
807 // Get the parameters
808 param_tbl = qs->param_tbl;
814 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
815 std::vector<qp_node *> split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema);
816 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
817 // Ensure that any refs to interface params have been split away.
818 int count_ifp_refs(std::set<std::string> &ifpnames);
819 int resolve_if_params(ifq_t *ifdb, std::string &err){ return 0;}
821 // the following method is used for distributed query optimization
822 double get_rate_estimate();
824 qp_node* make_copy(std::string suffix){
825 rsgah_qpn *ret = new rsgah_qpn();
827 ret->param_tbl = new param_table();
828 std::vector<std::string> param_names = param_tbl->get_param_names();
830 for(pi=0;pi<param_names.size();pi++){
831 data_type *dt = param_tbl->get_data_type(param_names[pi]);
832 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
833 param_tbl->handle_access(param_names[pi]));
835 ret->definitions = definitions;
837 ret->node_name = node_name + suffix;
839 // make shallow copy of all fields
841 ret->having = having;
842 ret->closing_when = closing_when;
843 ret->select_list = select_list;
844 ret->gb_tbl = gb_tbl;
845 ret->aggr_tbl = aggr_tbl;
849 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
854 // Watchlist - from a table read from an external source.
856 class watch_tbl_qpn: public qp_node{
858 table_def *table_layout; // the output schema
859 std::vector<std::string> key_flds;
861 // Parameters related to loading the table
862 std::string filename;
863 int refresh_interval;
866 void append_to_where(cnf_elem *c){
867 fprintf(stderr, "ERROR, append_to_where called on watch_tbl_qpn, not supported, query %s.\n", node_name.c_str());
871 std::string node_type(){return("watch_tbl_qpn"); };
872 bool makes_transform(){return false;};
873 std::vector<std::string> external_libs(){
874 std::vector<std::string> ret;
878 void bind_to_schema(table_list *Schema){}
879 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
884 std::string to_query_string();
885 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
886 std::string generate_functor_name();
887 std::string generate_operator(int i, std::string params);
888 std::string get_include_file(){
889 return("#include <watchlist_tbl.h>\n");
892 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
893 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
895 table_def *get_fields();
896 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
900 std::vector<tablevar_t *> get_input_tbls();
901 std::vector<tablevar_t *> get_output_tbls();
903 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
904 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
905 // Ensure that any refs to interface params have been split away.
906 int count_ifp_refs(std::set<std::string> &ifpnames);
908 // No predicates, return an empty clause
909 std::vector<cnf_elem *> get_where_clause(){
910 std::vector<cnf_elem *> t;
913 std::vector<cnf_elem *> get_filter_clause(){
914 return get_where_clause();
920 watch_tbl_qpn(query_summary_class *qs,table_list *Schema){
921 node_name=qs->query_name;
922 param_tbl = qs->param_tbl;
923 definitions = qs->definitions;
926 // Populate the schema
927 table_layout = new table_def(
928 node_name.c_str(), NULL, NULL, qs->fta_tree->fel, WATCHLIST_SCHEMA
932 std::vector<field_entry *> flds = qs->fta_tree->fel->get_list();
933 for(int f=0;f<flds.size();++f){
934 if(flds[f]->get_modifier_list()->contains_key("key") ||
935 flds[f]->get_modifier_list()->contains_key("Key") ||
936 flds[f]->get_modifier_list()->contains_key("KEY") ){
937 key_flds.push_back(flds[f]->get_name());
940 if(key_flds.size()==0){
941 fprintf(stderr,"Error, no key fields defined for table watchlist %s\n",node_name.c_str());
945 table_layout->set_keys(key_flds); // communicate keys to consumers
947 // Get loading parameters
948 if(definitions.count("filename")>0){
949 filename = definitions["filename"];
951 fprintf(stderr, "Error, no filename for source data defined for table watchlist %s\n",node_name.c_str());
954 if(definitions.count("refresh_interval")>0){
955 refresh_interval = atoi(definitions["refresh_interval"].c_str());
956 if(refresh_interval <= 0){
957 fprintf(stderr, "Error, the refresh_interval (%s) of table watchlist %s must be a positive non-zero integer.\n",definitions["refresh_interval"].c_str(), node_name.c_str());
961 fprintf(stderr, "Error, no refresh_interval defined for table watchlist %s\n",node_name.c_str());
967 qp_node *make_copy(std::string suffix){
968 watch_tbl_qpn *ret = new watch_tbl_qpn();
969 ret->filename = filename;
970 ret->refresh_interval = refresh_interval;
971 ret->key_flds = key_flds;
973 ret->param_tbl = new param_table();
974 std::vector<std::string> param_names = param_tbl->get_param_names();
976 for(pi=0;pi<param_names.size();pi++){
977 data_type *dt = param_tbl->get_data_type(param_names[pi]);
978 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
979 param_tbl->handle_access(param_names[pi]));
981 ret->definitions = definitions;
983 ret->node_name = node_name + suffix;
984 ret->table_layout = table_layout->make_shallow_copy(ret->node_name);
989 // the following method is used for distributed query optimization
990 double get_rate_estimate();
992 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1002 // forward reference
1003 class filter_join_qpn;
1004 class watch_join_qpn;
1007 // (temporal) Merge query plan node.
1008 // represent the following query fragment
1010 // from T1 _t1, T2 _t2
1012 // T1 and T2 must have compatible schemas,
1013 // that is the same types in the same slots.
1014 // c1 and c2 must be colrefs from T1 and T2,
1015 // both ref'ing the same slot. Their types
1016 // must be temporal and the same kind of temporal.
1017 // in the output, no other field is temporal.
1018 // the field names ofthe output are drawn from T1.
1019 class mrg_qpn: public qp_node{
1021 std::vector<tablevar_t *> fm; // Source table
1022 std::vector<colref_t *> mvars; // the merge-by columns.
1025 table_def *table_layout; // the output schema
1026 int merge_fieldpos; // position of merge field,
1027 // convenience for manipulation.
1029 int disorder; // max disorder seen in the input / allowed in the output
1032 // partition definition for merges that combine streams partitioned over multiple interfaces
1033 partn_def_t* partn_def;
1036 void append_to_where(cnf_elem *c){
1037 fprintf(stderr, "ERROR, append_to_where called on mrg_qpn, not supported, query %s.\n", node_name.c_str());
1043 std::string node_type(){return("mrg_qpn"); };
1044 bool makes_transform(){return false;};
1045 std::vector<std::string> external_libs(){
1046 std::vector<std::string> ret;
1050 void bind_to_schema(table_list *Schema);
1051 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
1052 fprintf(stderr,"INTERNAL ERROR, calling mrg_qpn::get_colrefs\n");
1056 std::string to_query_string();
1057 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
1058 std::string generate_functor_name();
1059 std::string generate_operator(int i, std::string params);
1060 std::string get_include_file(){
1062 return("#include <merge_operator_oop.h>\n");
1063 return("#include <merge_operator.h>\n");
1066 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1067 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1069 table_def *get_fields();
1070 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
1071 std::vector<string> ret;
1075 std::vector<tablevar_t *> get_input_tbls();
1076 std::vector<tablevar_t *> get_output_tbls();
1078 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
1079 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
1080 // Ensure that any refs to interface params have been split away.
1081 int count_ifp_refs(std::set<std::string> &ifpnames);
1083 // No predicates, return an empty clause
1084 std::vector<cnf_elem *> get_where_clause(){
1085 std::vector<cnf_elem *> t;
1088 std::vector<cnf_elem *> get_filter_clause(){
1089 return get_where_clause();
1096 void set_disorder(int d){
1100 mrg_qpn(query_summary_class *qs,table_list *Schema){
1103 // Grab the elements of the query node.
1104 fm = qs->fta_tree->get_from()->get_table_list();
1109 if(fm.size() != mvars.size()){
1110 fprintf(stderr,"INTERNAL ERROR in mrg_qpn::mrg_qpn. fm.size() = %lu, mvars.size() = %lu\n",fm.size(),mvars.size());
1114 for(int f=0;f<fm.size();++f){
1115 int t=fm[f]->get_schema_ref();
1116 if(! Schema->is_stream(t)){
1117 err_str += "ERROR in query "+node_name+", the source "+fm[f]->get_schema_name()+" is not a stream.\n";
1122 // Get the parameters
1123 param_tbl = qs->param_tbl;
1125 // Need to set the node name now, so that the
1126 // schema (table_layout) can be properly named.
1127 // TODO: Setting the name of the table might best be done
1128 // via the set_node_name method, because presumably
1129 // thats when the node name is really known.
1130 // This should propogate to the table_def table_layout
1131 node_name=qs->query_name;
1135 printf("instantiating merge node, name = %s, %d sources.\n\t",node_name.c_str(), fm.size());
1136 for(ff=0;ff<fm.size();++ff){
1137 printf("%s ",fm[ff]->to_string().c_str());
1143 // Create the output schema.
1144 // strip temporal properites form all fields except the merge field.
1145 std::vector<field_entry *> flva = Schema->get_fields(fm[0]->get_schema_name());
1146 field_entry_list *fel = new field_entry_list();
1148 for(f=0;f<flva.size();++f){
1150 data_type dt(flva[f]->get_type().c_str(), flva[f]->get_modifier_list());
1151 if(flva[f]->get_name() == mvars[0]->get_field()){
1153 // if(slack != NULL) dt.reset_temporal();
1155 dt.reset_temporal();
1158 param_list *plist = new param_list();
1159 std::vector<std::string> param_strings = dt.get_param_keys();
1161 for(p=0;p<param_strings.size();++p){
1162 std::string v = dt.get_param_val(param_strings[p]);
1164 plist->append(param_strings[p].c_str(),v.c_str());
1166 plist->append(param_strings[p].c_str());
1171 dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist, flva[f]->get_unpack_fcns());
1172 fel->append_field(fe);
1178 table_layout = new table_def(
1179 node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA
1186 /////////////////////////////////////////////
1187 /// Created for de-siloing. to be removed? or is it otherwise useful?
1188 // Merge existing set of sources (de-siloing)
1189 mrg_qpn(std::string n_name, std::vector<std::string> &src_names,table_list *Schema){
1194 // Construct the fm list
1195 for(f=0;f<src_names.size();++f){
1196 int tbl_ref = Schema->get_table_ref(src_names[f]);
1198 fprintf(stderr,"INTERNAL ERROR, can't find %s in the schema when constructing no-silo merge node %s\n",src_names[f].c_str(), n_name.c_str());
1201 table_def *src_tbl = Schema->get_table(tbl_ref);
1202 tablevar_t *fm_t = new tablevar_t(src_names[f].c_str());
1203 string range_name = "_t" + int_to_string(f);
1204 fm_t->set_range_var(range_name);
1205 fm_t->set_schema_ref(tbl_ref);
1209 // Create the output schema.
1210 // strip temporal properites form all fields except the merge field.
1211 std::vector<field_entry *> flva = Schema->get_fields(fm[0]->get_schema_name());
1212 field_entry_list *fel = new field_entry_list();
1213 bool temporal_found = false;
1214 for(f=0;f<flva.size();++f){
1216 data_type dt(flva[f]->get_type().c_str(), flva[f]->get_modifier_list());
1217 if(dt.is_temporal() && !temporal_found){
1219 temporal_found = true;
1221 dt.reset_temporal();
1224 param_list *plist = new param_list();
1225 std::vector<std::string> param_strings = dt.get_param_keys();
1227 for(p=0;p<param_strings.size();++p){
1228 std::string v = dt.get_param_val(param_strings[p]);
1230 plist->append(param_strings[p].c_str(),v.c_str());
1232 plist->append(param_strings[p].c_str());
1236 dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist,
1237 flva[f]->get_unpack_fcns()
1239 fel->append_field(fe);
1242 if(! temporal_found){
1243 fprintf(stderr,"ERROR, can't find temporal field of the sources when constructing no-silo merge node %s\n",n_name.c_str());
1248 table_layout = new table_def(
1249 node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA
1253 param_tbl = new param_table();
1256 for(f=0;f<fm.size();++f){
1257 std::vector<field_entry *> flv_f = Schema->get_fields(fm[f]->get_schema_name());
1258 data_type dt_f(flv_f[merge_fieldpos]->get_type().c_str(),
1259 flva[merge_fieldpos]->get_modifier_list());
1261 colref_t *mcr = new colref_t(fm[f]->get_var_name().c_str(),
1262 flv_f[merge_fieldpos]->get_name().c_str());
1263 mvars.push_back(mcr);
1266 // literal_t *s_lit = new literal_t("5",LITERAL_INT);
1267 // slack = new scalarexp_t(s_lit);
1272 ////////////////////////////////////////
1274 void resolve_slack(scalarexp_t *t_se, std::string fname, std::vector<std::pair<std::string, std::string> > &sources,ifq_t *ifdb, gb_table *gbt);
1277 // Merge filter_join LFTAs.
1279 mrg_qpn(filter_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb);
1281 // Merge watch_join LFTAs.
1283 mrg_qpn(watch_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb);
1285 // Merge selection LFTAs.
1287 mrg_qpn(spx_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb){
1291 param_tbl = spx->param_tbl;
1294 field_entry_list *fel = new field_entry_list();
1295 merge_fieldpos = -1;
1300 for(i=0;i<spx->select_list.size();++i){
1301 data_type *dt = spx->select_list[i]->se->get_data_type()->duplicate();
1302 if(dt->is_temporal()){
1303 if(merge_fieldpos < 0){
1306 fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), spx->select_list[merge_fieldpos]->name.c_str(), spx->select_list[i]->name.c_str(), spx->select_list[merge_fieldpos]->name.c_str() );
1307 dt->reset_temporal();
1311 field_entry *fe = dt->make_field_entry(spx->select_list[i]->name);
1312 fel->append_field(fe);
1315 if(merge_fieldpos<0){
1316 fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
1319 table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1321 // NEED TO HANDLE USER_SPECIFIED SLACK
1322 this->resolve_slack(spx->select_list[merge_fieldpos]->se,
1323 spx->select_list[merge_fieldpos]->name, ifaces, ifdb,NULL);
1324 // if(this->slack == NULL)
1325 // fprintf(stderr,"Zero slack.\n");
1327 // fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
1329 for(i=0;i<sources.size();i++){
1330 std::string rvar = "_m"+int_to_string(i);
1331 mvars.push_back(new colref_t(rvar.c_str(), spx->select_list[merge_fieldpos]->name.c_str()));
1332 mvars[i]->set_tablevar_ref(i);
1333 fm.push_back(new tablevar_t(sources[i].c_str()));
1334 fm[i]->set_range_var(rvar);
1337 param_tbl = new param_table();
1338 std::vector<std::string> param_names = spx->param_tbl->get_param_names();
1340 for(pi=0;pi<param_names.size();pi++){
1341 data_type *dt = spx->param_tbl->get_data_type(param_names[pi]);
1342 param_tbl->add_param(param_names[pi],dt->duplicate(),
1343 spx->param_tbl->handle_access(param_names[pi]));
1345 definitions = spx->definitions;
1349 // Merge aggregation LFTAs
1351 mrg_qpn(sgah_qpn *sgah, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair< std::string, std::string> > &ifaces, ifq_t *ifdb){
1355 param_tbl = sgah->param_tbl;
1358 field_entry_list *fel = new field_entry_list();
1359 merge_fieldpos = -1;
1360 for(i=0;i<sgah->select_list.size();++i){
1361 data_type *dt = sgah->select_list[i]->se->get_data_type()->duplicate();
1362 if(dt->is_temporal()){
1363 if(merge_fieldpos < 0){
1366 fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str(), sgah->select_list[i]->name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str() );
1367 dt->reset_temporal();
1371 field_entry *fe = dt->make_field_entry(sgah->select_list[i]->name);
1372 fel->append_field(fe);
1375 if(merge_fieldpos<0){
1376 fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
1379 table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1381 // NEED TO HANDLE USER_SPECIFIED SLACK
1382 this->resolve_slack(sgah->select_list[merge_fieldpos]->se,
1383 sgah->select_list[merge_fieldpos]->name, ifaces, ifdb,
1385 if(this->slack == NULL)
1386 fprintf(stderr,"Zero slack.\n");
1388 fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
1391 for(i=0;i<sources.size();i++){
1392 std::string rvar = "_m"+int_to_string(i);
1393 mvars.push_back(new colref_t(rvar.c_str(), sgah->select_list[merge_fieldpos]->name.c_str()));
1394 mvars[i]->set_tablevar_ref(i);
1395 fm.push_back(new tablevar_t(sources[i].c_str()));
1396 fm[i]->set_range_var(rvar);
1399 param_tbl = new param_table();
1400 std::vector<std::string> param_names = sgah->param_tbl->get_param_names();
1402 for(pi=0;pi<param_names.size();pi++){
1403 data_type *dt = sgah->param_tbl->get_data_type(param_names[pi]);
1404 param_tbl->add_param(param_names[pi],dt->duplicate(),
1405 sgah->param_tbl->handle_access(param_names[pi]));
1407 definitions = sgah->definitions;
1411 qp_node *make_copy(std::string suffix){
1412 mrg_qpn *ret = new mrg_qpn();
1414 ret->disorder = disorder;
1416 ret->param_tbl = new param_table();
1417 std::vector<std::string> param_names = param_tbl->get_param_names();
1419 for(pi=0;pi<param_names.size();pi++){
1420 data_type *dt = param_tbl->get_data_type(param_names[pi]);
1421 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1422 param_tbl->handle_access(param_names[pi]));
1424 ret->definitions = definitions;
1426 ret->node_name = node_name + suffix;
1427 ret->table_layout = table_layout->make_shallow_copy(ret->node_name);
1428 ret->merge_fieldpos = merge_fieldpos;
1433 std::vector<mrg_qpn *> split_sources();
1435 // the following method is used for distributed query optimization
1436 double get_rate_estimate();
1439 // get partition definition for merges that combine streams partitioned over multiple interfaces
1440 // return NULL for regular merges
1441 partn_def_t* get_partn_definition(map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result) {
1449 vector<tablevar_t *> input_tables = get_input_tbls();
1450 for (int i = 0; i < input_tables.size(); ++i) {
1451 tablevar_t * table = input_tables[i];
1453 vector<string> partn_names = ifaces_db->get_iface_vals(table->get_machine(), table->get_interface(),"iface_partition",err,err_str);
1454 if (partn_names.size() != 1) // can't have more than one value of partition attribute
1456 string new_partn_name = partn_names[0];
1458 // need to make sure that all ifaces belong to the same partition
1460 partn_name = new_partn_name;
1461 else if (new_partn_name != partn_name)
1465 // now find partition definition corresponding to partn_name
1466 partn_def = partn_parse_result->get_partn_def(partn_name);
1470 void set_partn_definition(partn_def_t* def) {
1474 bool is_multihost_merge() {
1476 bool is_multihost = false;
1478 // each input table must be have machine attribute be non-empty
1479 // and there should be at least 2 different values of machine attributes
1480 vector<tablevar_t *> input_tables = get_input_tbls();
1481 string host = input_tables[0]->get_machine();
1482 for (int i = 1; i < input_tables.size(); ++i) {
1483 string new_host = input_tables[i]->get_machine();
1486 if (new_host != host)
1487 is_multihost = true;
1489 return is_multihost;
1492 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1496 // eq_temporal, hash join query plan node.
1497 // represent the following query fragment
1498 // select scalar_expression_1, ..., scalar_expression_k
1499 // from T0 t0, T1 t1
1502 // the predicates and the scalar expressions can reference
1503 // attributes of t0 and t1 and also functions.
1504 // The predicate must contain CNF elements to enable the
1505 // efficient evaluation of the query.
1506 // 1) at least one predicate of the form
1507 // (temporal se in t0) = (temporal se in t1)
1508 // 2) at least one predicate of the form
1509 // (non-temporal se in t0) = (non-temporal se in t1)
1511 class join_eq_hash_qpn: public qp_node{
1513 std::vector<tablevar_t *> from; // Source tables
1514 std::vector<select_element *> select_list; // Select list
1515 std::vector<cnf_elem *> prefilter[2]; // source prefilters
1516 std::vector<cnf_elem *> temporal_eq; // define temporal window
1517 std::vector<cnf_elem *> hash_eq; // define hash key
1518 std::vector<cnf_elem *> postfilter; // final filter on hash matches.
1520 std::vector<cnf_elem *> where; // all the filters
1521 // useful for summary analysis
1523 std::vector<scalarexp_t *> hash_src_r, hash_src_l;
1525 std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
1526 std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
1528 std::string node_type(){return("join_eq_hash_qpn"); };
1529 bool makes_transform(){return true;};
1530 std::vector<std::string> external_libs(){
1531 std::vector<std::string> ret;
1535 void bind_to_schema(table_list *Schema);
1536 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
1537 fprintf(stderr,"INTERNAL ERROR, calling join_eq_hash_qpn::get_colrefs\n");
1541 void append_to_where(cnf_elem *c){
1542 fprintf(stderr, "Error, append_to_where called on join_hash_qpn, not supported, query is %s\n",node_name.c_str());
1547 std::string to_query_string();
1548 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
1549 std::string generate_functor_name();
1550 std::string generate_operator(int i, std::string params);
1551 std::string get_include_file(){return("#include <join_eq_hash_operator.h>\n");};
1553 std::vector<select_element *> get_select_list(){return select_list;};
1554 std::vector<scalarexp_t *> get_select_se_list(){
1555 std::vector<scalarexp_t *> ret;
1557 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
1560 // Used for LFTA only
1561 std::vector<cnf_elem *> get_where_clause(){
1562 std::vector<cnf_elem *> t;
1565 std::vector<cnf_elem *> get_filter_clause(){
1566 return get_where_clause();
1569 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1570 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1572 table_def *get_fields();
1574 // It might be feasible to find keys in an equijoin expression.
1575 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
1576 std::vector<string> ret;
1580 std::vector<tablevar_t *> get_input_tbls();
1581 std::vector<tablevar_t *> get_output_tbls();
1583 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
1584 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
1585 // Ensure that any refs to interface params have been split away.
1586 int count_ifp_refs(std::set<std::string> &ifpnames);
1590 join_eq_hash_qpn(query_summary_class *qs,table_list *Schema){
1592 // Get the table name.
1593 // NOTE the colrefs have the table ref (an int)
1594 // embedded in them. Would it make sense
1595 // to grab the whole table list?
1596 from = qs->fta_tree->get_from()->get_table_list();
1597 if(from.size() != 2){
1599 sprintf(tmpstr,"ERROR building join_eq_hash node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
1604 for(int f=0;f<from.size();++f){
1605 int t=from[f]->get_schema_ref();
1606 if(! Schema->is_stream(t)){
1607 err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n";
1613 // Get the select list.
1614 select_list = qs->fta_tree->get_sl_vec();
1616 // Get the selection predicate.
1618 for(w=0;w<where.size();++w){
1619 analyze_cnf(where[w]);
1620 std::vector<int> pred_tbls;
1621 get_tablevar_ref_pr(where[w]->pr,pred_tbls);
1622 // Prefilter if refs only one tablevar
1623 if(pred_tbls.size()==1){
1624 prefilter[pred_tbls[0]].push_back(where[w]);
1627 // refs nothing -- might be sampling, do it as postfilter.
1628 if(pred_tbls.size()==0){
1629 postfilter.push_back(where[w]);
1632 // See if it can be a hash or temporal predicate.
1633 // NOTE: synchronize with the temporality checking
1634 // done at join_eq_hash_qpn::get_fields
1635 if(where[w]->is_atom && where[w]->eq_pred){
1636 std::vector<int> sel_tbls, ser_tbls;
1637 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
1638 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
1639 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
1640 // make channel 0 SE on LHS.
1641 if(sel_tbls[0] != 0)
1642 where[w]->pr->swap_scalar_operands();
1644 data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
1645 data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
1646 if( (dtl->is_increasing() && dtr->is_increasing()) ||
1647 (dtl->is_decreasing() && dtr->is_decreasing()) )
1648 temporal_eq.push_back(where[w]);
1650 hash_eq.push_back(where[w]);
1655 // All tests failed, fallback is postfilter.
1656 postfilter.push_back(where[w]);
1659 if(temporal_eq.size()==0){
1660 err_str = "ERROR in join query: can't find temporal equality predicate to define a join window.\n";
1664 // Get the parameters
1665 param_tbl = qs->param_tbl;
1669 // the following method is used for distributed query optimization
1670 double get_rate_estimate();
1673 qp_node* make_copy(std::string suffix){
1674 join_eq_hash_qpn *ret = new join_eq_hash_qpn();
1676 ret->param_tbl = new param_table();
1677 std::vector<std::string> param_names = param_tbl->get_param_names();
1679 for(pi=0;pi<param_names.size();pi++){
1680 data_type *dt = param_tbl->get_data_type(param_names[pi]);
1681 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1682 param_tbl->handle_access(param_names[pi]));
1684 ret->definitions = definitions;
1686 ret->node_name = node_name + suffix;
1688 // make shallow copy of all fields
1691 ret->select_list = select_list;
1692 ret->prefilter[0] = prefilter[0];
1693 ret->prefilter[1] = prefilter[1];
1694 ret->postfilter = postfilter;
1695 ret->temporal_eq = temporal_eq;
1696 ret->hash_eq = hash_eq;
1700 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1705 // ---------------------------------------------
1706 // eq_temporal, hash join query plan node.
1707 // represent the following query fragment
1708 // select scalar_expression_1, ..., scalar_expression_k
1709 // FILTER_JOIN(col, range) from T0 t0, T1 t1
1712 // t0 is the output range variable, t1 is the filtering range
1713 // variable. Both must alias a PROTOCOL.
1714 // The scalar expressions in the select clause may
1715 // reference t0 only.
1716 // The predicates are classified as follows
1717 // prefilter predicates:
1718 // a cheap predicate in t0 such that there is an equivalent
1719 // predicate in t1. Cost decisions about pushing to
1720 // lfta prefilter made later.
1721 // t0 predicates (other than prefilter predicates)
1722 // -- cheap vs. expensive sorted out at genereate time,
1723 // the constructor isn't called with the function list.
1724 // t1 predicates (other than prefiler predicates).
1725 // equi-join predicates of the form:
1726 // (se in t0) = (se in t1)
1728 // There must be at least one equi-join predicate.
1729 // No join predicates other than equi-join predicates
1731 // Warn on temporal equi-join predicates.
1732 // t1 predicates should not be expensive ... warn?
1734 class filter_join_qpn: public qp_node{
1736 std::vector<tablevar_t *> from; // Source tables
1737 colref_t *temporal_var; // join window in FROM
1738 unsigned int temporal_range; // metadata.
1739 std::vector<select_element *> select_list; // Select list
1740 std::vector<cnf_elem *> shared_pred; // prefilter preds
1741 std::vector<cnf_elem *> pred_t0; // main (R) preds
1742 std::vector<cnf_elem *> pred_t1; // filtering (S) preds
1743 std::vector<cnf_elem *> hash_eq; // define hash key
1744 std::vector<cnf_elem *> postfilter; // ref's no table.
1746 std::vector<cnf_elem *> where; // all the filters
1747 // useful for summary analysis
1749 std::vector<scalarexp_t *> hash_src_r, hash_src_l;
1750 std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
1751 std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
1754 bool use_bloom; // true => bloom filter, false => limited hash
1756 std::string node_type(){return("filter_join"); };
1757 bool makes_transform(){return true;};
1758 std::vector<std::string> external_libs(){
1759 std::vector<std::string> ret;
1763 void bind_to_schema(table_list *Schema);
1764 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
1766 std::string to_query_string();
1767 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform){
1768 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor called\n");
1771 std::string generate_functor_name(){
1772 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor_name called\n");
1775 std::string generate_operator(int i, std::string params){
1776 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_operator called\n");
1779 std::string get_include_file(){return("#include <join_eq_hash_operator.h>\n");};
1781 std::vector<select_element *> get_select_list(){return select_list;};
1782 std::vector<scalarexp_t *> get_select_se_list(){
1783 std::vector<scalarexp_t *> ret;
1785 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
1788 // Used for LFTA only
1789 void append_to_where(cnf_elem *c){
1793 std::vector<cnf_elem *> get_where_clause(){return where;}
1794 std::vector<cnf_elem *> get_filter_clause(){return shared_pred;}
1796 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1797 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1799 table_def *get_fields();
1800 // It should be feasible to find keys in a filter join
1801 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
1802 std::vector<string> ret;
1806 std::vector<tablevar_t *> get_input_tbls();
1807 std::vector<tablevar_t *> get_output_tbls();
1809 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
1810 int resolve_if_params(ifq_t *ifdb, std::string &err);
1812 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
1813 // Ensure that any refs to interface params have been split away.
1814 int count_ifp_refs(std::set<std::string> &ifpnames);
1819 filter_join_qpn(query_summary_class *qs,table_list *Schema){
1821 // Get the table name.
1822 // NOTE the colrefs have the table ref (an int)
1823 // embedded in them. Would it make sense
1824 // to grab the whole table list?
1825 from = qs->fta_tree->get_from()->get_table_list();
1826 temporal_var = qs->fta_tree->get_from()->get_colref();
1827 temporal_range = qs->fta_tree->get_from()->get_temporal_range();
1828 if(from.size() != 2){
1830 sprintf(tmpstr,"ERROR building filter_join_qpn node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
1834 if(from[0]->get_interface() != from[1]->get_interface()){
1835 err_str += "Error building filter_join_qpn node: all range variables must be sourced from the same interface or interface set ("+from[0]->get_interface()+" vs. "+from[1]->get_interface()+")\n";
1839 for(int f=0;f<from.size();++f){
1840 int t=from[f]->get_schema_ref();
1841 if(! Schema->is_stream(t)){
1842 err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n";
1848 // Get the select list.
1849 select_list = qs->fta_tree->get_sl_vec();
1850 // Verify that only t0 is referenced.
1851 bool bad_ref = false;
1852 for(i=0;i<select_list.size();i++){
1853 vector<int> sel_tbls;
1854 get_tablevar_ref_se(select_list[i]->se,sel_tbls);
1855 if((sel_tbls.size() == 2) || (sel_tbls.size()==1 && sel_tbls[0]==1))
1859 err_str += "ERROR building filter_join_qpn node: query references range variable "+from[1]->variable_name+", but only the first range variable ("+from[0]->variable_name+" can be referenced.\n";
1864 // Get the selection predicate.
1866 std::vector<cnf_elem *> t0_only, t1_only;
1867 for(w=0;w<where.size();++w){
1868 analyze_cnf(where[w]);
1869 std::vector<int> pred_tbls;
1870 get_tablevar_ref_pr(where[w]->pr,pred_tbls);
1871 // Collect the list of preds by src var,
1872 // extract the shared preds later.
1873 if(pred_tbls.size()==1){
1874 if(pred_tbls[0] == 0){
1875 t0_only.push_back(where[w]);
1877 t1_only.push_back(where[w]);
1881 // refs nothing -- might be sampling, do it as postfilter.
1882 if(pred_tbls.size()==0){
1883 postfilter.push_back(where[w]);
1886 // See if it can be a hash or temporal predicate.
1887 // NOTE: synchronize with the temporality checking
1888 // done at join_eq_hash_qpn::get_fields
1889 if(where[w]->is_atom && where[w]->eq_pred){
1890 std::vector<int> sel_tbls, ser_tbls;
1891 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
1892 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
1893 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
1894 // make channel 0 SE on LHS.
1895 if(sel_tbls[0] != 0)
1896 where[w]->pr->swap_scalar_operands();
1898 hash_eq.push_back(where[w]);
1900 data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
1901 data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
1902 if( (dtl->is_increasing() && dtr->is_increasing()) ||
1903 (dtl->is_decreasing() && dtr->is_decreasing()) )
1904 err_str += "Warning, a filter join should not have join predicates on temporal fields.\n";
1909 // All tests failed, fallback is postfilter.
1910 err_str += "ERROR, join predicates in a filter join should have the form (scalar expression in "+from[0]->variable_name+") = (scalar expression in "+from[1]->variable_name+").\n";
1913 // Classify the t0_only and t1_only preds.
1914 set<int> matched_pred;
1916 for(w=0;w<t0_only.size();w++){
1917 for(v=0;v<t1_only.size();++v)
1918 if(is_equivalent_pred_base(t0_only[w]->pr,t1_only[v]->pr,Schema))
1920 if(v<t1_only.size()){
1921 shared_pred.push_back(t0_only[w]);
1922 matched_pred.insert(v);
1924 pred_t0.push_back(t0_only[w]);
1927 for(v=0;v<t1_only.size();++v){
1928 if(matched_pred.count(v) == 0)
1929 pred_t1.push_back(t1_only[v]);
1933 // Get the parameters
1934 param_tbl = qs->param_tbl;
1935 definitions = qs->definitions;
1937 // Determine the algorithm
1938 if(this->get_val_of_def("algorithm") == "hash"){
1945 // the following method is used for distributed query optimization
1946 double get_rate_estimate();
1949 qp_node* make_copy(std::string suffix){
1950 filter_join_qpn *ret = new filter_join_qpn();
1952 ret->param_tbl = new param_table();
1953 std::vector<std::string> param_names = param_tbl->get_param_names();
1955 for(pi=0;pi<param_names.size();pi++){
1956 data_type *dt = param_tbl->get_data_type(param_names[pi]);
1957 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1958 param_tbl->handle_access(param_names[pi]));
1960 ret->definitions = definitions;
1962 ret->node_name = node_name + suffix;
1964 // make shallow copy of all fields
1967 ret->temporal_range = temporal_range;
1968 ret->temporal_var = temporal_var;
1969 ret->select_list = select_list;
1970 ret->shared_pred = shared_pred;
1971 ret->pred_t0 = pred_t0;
1972 ret->pred_t1 = pred_t1;
1973 ret->postfilter = postfilter;
1974 ret->hash_eq = hash_eq;
1978 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1984 // TODO : put tests on other operators to ensure they dont' read from a watchlist
1985 // TODO : register with : is_partn_compatible pushdown_partn_operator is_pushdown_compatible pushdown_operator ?
1986 class watch_join_qpn: public qp_node{
1988 std::vector<tablevar_t *> from; // Source tables
1989 std::vector<select_element *> select_list; // Select list
1990 std::vector<cnf_elem *> pred_t0; // main (R) preds
1991 std::vector<cnf_elem *> pred_t1; // watchlist-only (S) preds (?)
1992 std::map<std::string, cnf_elem *> hash_eq; // predicates on S hash keys
1993 std::vector<cnf_elem *> join_filter; // ref's R, S, but not a hash
1994 std::vector<cnf_elem *> postfilter; // ref's no table.
1996 std::vector<std::string> key_flds;
1998 std::vector<cnf_elem *> where; // all the filters
1999 // useful for summary analysis
2001 std::vector<scalarexp_t *> hash_src_r, hash_src_l;
2002 std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
2003 std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
2007 std::string node_type(){return("watch_join"); };
2008 bool makes_transform(){return true;};
2009 std::vector<std::string> external_libs(){
2010 std::vector<std::string> ret;
2014 void bind_to_schema(table_list *Schema);
2015 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
2017 std::string to_query_string();
2018 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform){
2019 fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_functor called\n");
2022 std::string generate_functor_name(){
2023 fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_functor_name called\n");
2026 std::string generate_operator(int i, std::string params){
2027 fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_operator called\n");
2030 std::string get_include_file(){return("#include <watchlist_operator.h>\n");};
2032 std::vector<select_element *> get_select_list(){return select_list;};
2033 std::vector<scalarexp_t *> get_select_se_list(){
2034 std::vector<scalarexp_t *> ret;
2036 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
2039 // Used for LFTA only
2040 void append_to_where(cnf_elem *c){
2044 std::vector<cnf_elem *> get_where_clause(){return where;}
2046 std::vector<cnf_elem *> get_filter_clause(){return pred_t0;}
2048 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
2049 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
2051 table_def *get_fields();
2052 // It should be feasible to find keys in a watchlist join
2053 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
2054 std::vector<string> ret;
2058 std::vector<tablevar_t *> get_input_tbls();
2059 std::vector<tablevar_t *> get_output_tbls();
2061 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
2062 int resolve_if_params(ifq_t *ifdb, std::string &err);
2064 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
2065 // Ensure that any refs to interface params have been split away.
2066 int count_ifp_refs(std::set<std::string> &ifpnames);
2071 watch_join_qpn(query_summary_class *qs,table_list *Schema){
2073 // Get the table name.
2074 // NOTE the colrefs have the table ref (an int)
2075 // embedded in them. Would it make sense
2076 // to grab the whole table list?
2077 from = qs->fta_tree->get_from()->get_table_list();
2078 if(from.size() != 2){
2080 sprintf(tmpstr,"ERROR building filter_join_qpn node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
2085 int t = from[0]->get_schema_ref();
2086 if(Schema->get_schema_type(t) != PROTOCOL_SCHEMA){
2087 err_str += "ERROR in query "+node_name+", the LHS of the join must be a PROTOCOL\n";
2090 t = from[1]->get_schema_ref();
2091 if(Schema->get_schema_type(t) != WATCHLIST_SCHEMA){
2092 err_str += "ERROR in query "+node_name+", the RHS of the join must be a WATCHLIST\n";
2095 key_flds = Schema->get_table(t)->get_keys();
2098 // Get the select list.
2099 select_list = qs->fta_tree->get_sl_vec();
2101 // Get the selection predicate.
2103 std::vector<cnf_elem *> t0_only, t1_only;
2104 for(w=0;w<where.size();++w){
2105 analyze_cnf(where[w]);
2106 std::vector<int> pred_tbls;
2107 get_tablevar_ref_pr(where[w]->pr,pred_tbls);
2108 // Collect the list of preds by src var,
2109 // extract the shared preds later.
2110 if(pred_tbls.size()==1){
2111 if(pred_tbls[0] == 0){
2112 pred_t0.push_back(where[w]);
2114 pred_t1.push_back(where[w]);
2118 // refs nothing -- might be sampling, do it as postfilter.
2119 if(pred_tbls.size()==0){
2120 postfilter.push_back(where[w]);
2124 // Must reference both
2125 // See if it can be a hash predicate.
2126 if(where[w]->is_atom && where[w]->eq_pred){
2127 std::vector<int> sel_tbls, ser_tbls;
2128 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
2129 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
2130 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
2131 // make channel 0 SE on LHS.
2132 if(sel_tbls[0] != 0)
2133 where[w]->swap_scalar_operands();
2135 // Must be simple (a colref) on the RHS
2136 if(where[w]->r_simple){
2137 string rcol = where[w]->pr->get_right_se()->get_colref()->get_field();
2138 if(std::find(key_flds.begin(), key_flds.end(), rcol) != key_flds.end()){
2139 hash_eq[rcol] = where[w];
2141 data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
2142 data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
2143 if( (dtl->is_increasing() && dtr->is_increasing()) || (dtl->is_decreasing() && dtr->is_decreasing()) )
2144 err_str += "Warning, a watchlist join should not have join predicates on temporal fields, query "+node_name+".\n";
2150 // All tests failed, fallback is join_filter.
2151 join_filter.push_back(where[w]);
2154 if(key_flds.size() > hash_eq.size()){
2155 err_str += "Error, in query "+node_name+" the watchlist join does not cover all fields in the watchlist with an equality predicate. Missing fields are";
2156 for(int k=0;k<key_flds.size();++k){
2157 if(hash_eq.count(key_flds[k]) < 1){
2158 err_str += " "+key_flds[k];
2166 // Get the parameters
2167 param_tbl = qs->param_tbl;
2168 definitions = qs->definitions;
2172 // the following method is used for distributed query optimization
2173 double get_rate_estimate();
2176 qp_node* make_copy(std::string suffix){
2177 watch_join_qpn *ret = new watch_join_qpn();
2179 ret->param_tbl = new param_table();
2180 std::vector<std::string> param_names = param_tbl->get_param_names();
2182 for(pi=0;pi<param_names.size();pi++){
2183 data_type *dt = param_tbl->get_data_type(param_names[pi]);
2184 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
2185 param_tbl->handle_access(param_names[pi]));
2187 ret->definitions = definitions;
2189 ret->node_name = node_name + suffix;
2191 // make shallow copy of all fields
2194 ret->select_list = select_list;
2195 ret->key_flds = key_flds;
2196 ret->pred_t0 = pred_t0;
2197 ret->pred_t1 = pred_t1;
2198 ret->join_filter = join_filter;
2199 ret->postfilter = postfilter;
2200 ret->hash_eq = hash_eq;
2201 ret->hash_src_r = hash_src_r;
2202 ret->hash_src_l = hash_src_l;
2207 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
2214 enum output_file_type_enum {regular, gzip, bzip};
2216 class output_file_qpn: public qp_node{
2218 std::string source_op_name; // Source table
2219 std::vector<field_entry *> fields;
2220 ospec_str *output_spec;
2221 vector<tablevar_t *> fm;
2222 std::string hfta_query_name;
2223 std::string filestream_id;
2225 std::vector<std::string> params;
2227 output_file_type_enum compression_type;
2229 int n_streams; // Number of output streams
2230 int n_hfta_clones; // number of hfta clones
2231 int parallel_idx; // which close this produces output for.
2232 std::vector<int> hash_flds; // fields used to hash the output.
2234 std::string node_type(){return("output_file_qpn"); };
2235 bool makes_transform(){return false;};
2236 std::vector<std::string> external_libs(){
2237 std::vector<std::string> ret;
2238 switch(compression_type){
2240 ret.push_back("-lz");
2243 ret.push_back("-lbz2");
2251 void append_to_where(cnf_elem *c){
2252 fprintf(stderr, "ERROR, append_to_where called on output_file_qpn, not supported, query %s.\n", node_name.c_str());
2258 void bind_to_schema(table_list *Schema){}
2259 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
2264 std::string to_query_string(){return "// output_file_operator \n";}
2265 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
2266 std::string generate_functor_name();
2267 std::string generate_operator(int i, std::string params);
2268 std::string get_include_file(){
2269 switch(compression_type){
2271 return("#include <zfile_output_operator.h>\n");
2273 return("#include <file_output_operator.h>\n");
2275 return("#include <file_output_operator.h>\n");
2278 std::vector<cnf_elem *> get_where_clause(){std::vector<cnf_elem *> ret; return ret;};
2279 std::vector<cnf_elem *> get_filter_clause(){std::vector<cnf_elem *> ret; return ret;};
2280 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){cplx_lit_table *ret = new cplx_lit_table(); return ret;}
2281 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns){std::vector<handle_param_tbl_entry *> ret; return ret;}
2283 table_def *get_fields(){
2284 field_entry_list *fel = new field_entry_list();
2286 for(i=0;i<fields.size();++i)
2287 fel->append_field(fields[i]);
2288 return new table_def(node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
2291 // TODO! either bypass the output operator in stream_query,
2292 // or propagate key information when the output operator is constructed.
2293 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
2294 std::vector<string> ret;
2298 std::vector<tablevar_t *> get_input_tbls();
2299 std::vector<tablevar_t *> get_output_tbls();
2301 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
2302 std::vector<qp_node *> ret; ret.push_back(this); hfta_returned = true; return ret;
2304 std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm){
2305 std::vector<table_exp_t *> ret; return ret;
2307 // Ensure that any refs to interface params have been split away.
2308 int count_ifp_refs(std::set<std::string> &ifpnames){return 0;}
2309 int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;};
2312 output_file_qpn(std::string src_op, std::string qn, std::string fs_id, table_def *src_tbl_def, ospec_str *ospec, bool ei){
2313 source_op_name = src_op;
2314 node_name = source_op_name + "_output";
2315 filestream_id = fs_id;
2316 fields = src_tbl_def->get_fields();
2317 output_spec = ospec;
2318 fm.push_back(new tablevar_t(source_op_name.c_str()));
2319 hfta_query_name = qn;
2322 // TODO stream checking, but it requires passing Schema to output_file_qpn
2324 for(int f=0;f<fm.size();++f){
2325 int t=fm[f]->get_schema_ref();
2326 if(! Schema->is_stream(t)){
2327 err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n";
2335 compression_type = regular;
2336 if(ospec->operator_type == "zfile")
2337 compression_type = gzip;
2344 strncpy(buf, output_spec->operator_param.c_str(),1000);
2347 int nwords = split_string(buf, ':', words,100);
2349 for(i=0;i<nwords;i++){
2350 params.push_back(words[i]);
2352 for(i=0;i<params.size();i++){
2353 if(params[i] == "gzip")
2358 // Set output splitting parameters
2359 bool set_splitting_params(int np, int ix, int ns, std::string split_flds, std::string &err_report){
2364 if(split_flds != ""){
2365 string err_flds = "";
2366 char *tmpstr = strdup(split_flds.c_str());
2368 int nwords = split_string(tmpstr,':',words,100);
2370 for(i=0;i<nwords;++i){
2371 string target = words[i];
2372 for(j=0;j<fields.size();++j){
2373 if(fields[j]->get_name() == target){
2374 hash_flds.push_back(j);
2378 if(j==fields.size()){
2379 err_flds += " "+target;
2383 err_report += "ERROR in "+hfta_query_name+", a file output operator needs to split the output but these splitting fileds are not part of the output:"+err_flds;
2390 // the following method is used for distributed query optimization
2391 double get_rate_estimate(){return 1.0;}
2394 qp_node* make_copy(std::string suffix){
2395 // output_file_qpn *ret = new output_file_qpn();
2396 output_file_qpn *ret = new output_file_qpn(source_op_name, hfta_query_name, filestream_id, this->get_fields(), output_spec, eat_input);
2400 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){}
2408 // ---------------------------------------------
2411 // Select, group-by, aggregate, sampling.
2413 // Select SE_1, ..., SE_k
2416 // Group By gb1, ..., gb_n
2417 // [Subgroup gb_i1, .., gb_ik]
2418 // Cleaning_when predicate
2419 // Cleaning_by predicate
2422 // For now, must have group-by variables and aggregates.
2423 // The scalar expressions which are output must be a function
2424 // of the groub-by variables and the aggregates.
2425 // The group-by variables can be references to columsn of T,
2426 // or they can be scalar expressions.
2427 class sgahcwcb_qpn: public qp_node{
2429 tablevar_t *table_name; // source table
2430 std::vector<cnf_elem *> where; // selection predicate
2431 std::vector<cnf_elem *> having; // post-aggregation predicate
2432 std::vector<select_element *> select_list; // se's of output
2433 gb_table gb_tbl; // Table of all group-by attributes.
2434 std::set<int> sg_tbl; // Names of the superGB attributes
2435 aggregate_table aggr_tbl; // Table of all referenced aggregates.
2436 std::set<std::string> states_refd; // states ref'd by stateful fcns.
2437 std::vector<cnf_elem *> cleanby;
2438 std::vector<cnf_elem *> cleanwhen;
2440 std::vector<scalarexp_t *> gb_sources; // pre-compute for partitioning.
2442 std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
2444 std::string node_type(){return("sgahcwcb_qpn"); };
2445 bool makes_transform(){return true;};
2446 std::vector<std::string> external_libs(){
2447 std::vector<std::string> ret;
2451 void bind_to_schema(table_list *Schema);
2452 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
2453 fprintf(stderr,"INTERNAL ERROR, calling sgahcwcb_qpn::get_colrefs\n");
2457 std::string to_query_string();
2458 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
2459 std::string generate_functor_name();
2461 std::string generate_operator(int i, std::string params);
2462 std::string get_include_file(){return("#include <clean_operator.h>\n");};
2464 std::vector<select_element *> get_select_list(){return select_list;};
2465 std::vector<scalarexp_t *> get_select_se_list(){
2466 std::vector<scalarexp_t *> ret;
2468 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
2471 std::vector<cnf_elem *> get_where_clause(){return where;};
2472 std::vector<cnf_elem *> get_filter_clause(){return where;};
2473 std::vector<cnf_elem *> get_having_clause(){return having;};
2474 gb_table *get_gb_tbl(){return &gb_tbl;};
2475 aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
2476 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
2477 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
2479 // table which represents output tuple.
2480 table_def *get_fields();
2481 // TODO Key extraction should be feasible but I'll defer the issue.
2482 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
2483 std::vector<string> ret;
2487 std::vector<tablevar_t *> get_input_tbls();
2488 std::vector<tablevar_t *> get_output_tbls();
2490 void append_to_where(cnf_elem *c){
2497 sgahcwcb_qpn(query_summary_class *qs,table_list *Schema){
2498 // Get the table name.
2499 // NOTE the colrefs have the tablevar ref (an int)
2500 // embedded in them. Would it make sense
2501 // to grab the whole table list?
2502 tablevar_list_t *fm = qs->fta_tree->get_from();
2503 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
2504 if(tbl_vec.size() != 1){
2506 sprintf(tmpstr,"INTERNAL ERROR building SGAHCWCB node: query defined over %lu tables.\n",tbl_vec.size() );
2510 table_name = (tbl_vec[0]);
2512 int t = tbl_vec[0]->get_schema_ref();
2513 if(! Schema->is_stream(t)){
2514 err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
2519 // Get the select list.
2520 select_list = qs->fta_tree->get_sl_vec();
2522 // Get the selection and having predicates.
2524 having = qs->hav_cnf;
2525 cleanby = qs->cb_cnf;
2526 cleanwhen = qs->cw_cnf;
2528 // Build a new GB var table (don't share, might need to modify)
2530 for(g=0;g<qs->gb_tbl->size();g++){
2531 gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
2532 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
2533 qs->gb_tbl->get_reftype(g)
2537 // Build a new aggregate table. (don't share, might need
2540 for(a=0;a<qs->aggr_tbl->size();a++){
2542 // qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
2543 qs->aggr_tbl->duplicate(a)
2547 sg_tbl = qs->sg_tbl;
2548 states_refd = qs->states_refd;
2551 // Get the parameters
2552 param_tbl = qs->param_tbl;
2558 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
2559 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
2560 // Ensure that any refs to interface params have been split away.
2561 // CURRENTLY not allowed by split_node_for_fta
2562 int count_ifp_refs(std::set<std::string> &ifpnames){return 0;}
2563 int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;}
2565 // the following method is used for distributed query optimization
2566 double get_rate_estimate();
2568 qp_node* make_copy(std::string suffix){
2569 sgahcwcb_qpn *ret = new sgahcwcb_qpn();
2571 ret->param_tbl = new param_table();
2572 std::vector<std::string> param_names = param_tbl->get_param_names();
2574 for(pi=0;pi<param_names.size();pi++){
2575 data_type *dt = param_tbl->get_data_type(param_names[pi]);
2576 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
2577 param_tbl->handle_access(param_names[pi]));
2579 ret->definitions = definitions;
2581 ret->node_name = node_name + suffix;
2583 // make shallow copy of all fields
2585 ret->having = having;
2586 ret->select_list = select_list;
2587 ret->gb_tbl = gb_tbl;
2588 ret->aggr_tbl = aggr_tbl;
2589 ret->sg_tbl = sg_tbl;
2590 ret->states_refd = states_refd;
2591 ret->cleanby = cleanby;
2592 ret->cleanwhen = cleanwhen;
2597 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
2601 std::vector<qp_node *> create_query_nodes(query_summary_class *qs,table_list *Schema);
2605 void untaboo(string &s);
2607 table_def *create_attributes(string tname, vector<select_element *> &select_list);