1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
15 #ifndef __QUERY_PLAN_H__
16 #define __QUERY_PLAN_H__
23 #include"analyze_fta.h"
25 #include"parse_partn.h"
26 #include"generate_utils.h"
28 // Identify the format of the input, output streams.
29 #define UNKNOWNFORMAT 0
33 ///////////////////////////////////////////////////
34 // representation of an output operator specification
39 string operator_param;
40 string output_directory;
42 string partitioning_flds;
47 ////////////////////////////////////////////////////
48 // Input representation of a query
52 std::set<int> reads_from;
53 std::set<int> sources_to;
54 std::vector<std::string> refd_tbls;
55 std::vector<var_pair_t *> params;
58 std::string mangler; // for UDOPs
60 table_exp_t *parse_tree;
63 bool is_externally_visible;
64 bool inferred_visible_node;
66 set<int> subtree_roots;
73 is_externally_visible = false;
74 inferred_visible_node = false;
77 query_node(int i, std::string qnm, std::string flnm, table_exp_t *pt){
85 is_externally_visible = pt->get_visible();
86 inferred_visible_node = false;
89 tablevar_list_t *fm = parse_tree->get_from();
91 refd_tbls = fm->get_table_names();
94 params = pt->query_params;
96 query_node(int ix, std::string udop_name,table_list *Schema){
104 is_externally_visible = true;
105 inferred_visible_node = false;
108 int sid = Schema->find_tbl(udop_name);
109 std::vector<subquery_spec *> subq = Schema->get_subqueryspecs(sid);
111 for(i=0;i<subq.size();++i){
112 refd_tbls.push_back(subq[i]->name);
119 std::string source_name;
120 std::vector<int> query_node_indices;
121 std::set<int> reads_from;
122 std::set<int> sources_to;
124 bool inferred_visible_node;
127 bool do_generation; // false means, ignore it.
131 inferred_visible_node = false;
134 do_generation = true;
146 // the following selectivity estimates are used by our primitive rate estimators
147 #define SPX_SELECTIVITY 1.0
148 #define SGAH_SELECTIVITY 0.1
149 #define RSGAH_SELECTIVITY 0.1
150 #define SGAHCWCB_SELECTIVITY 0.1
151 #define MRG_SELECTIVITY 1.0
152 #define JOIN_EQ_HASH_SELECTIVITY 1.0
154 // the the output rate of the interface is not given we are going to use
155 // this default value
156 #define DEFAULT_INTERFACE_RATE 100
159 // Define query plan nodes
160 // These nodes are intended for query modeling
161 // and transformation rather than for code generation.
164 // Query plan node base class.
165 // It has an ID, can return its type,
166 // and can be linked into lists with the predecessors
168 // To add : serialize, unserialize?
173 std::vector<int> predecessors;
174 std::vector<int> successors;
175 std::string node_name;
177 // For error reporting without exiting the program.
181 // These should be moved to the containing stream_query object.
182 std::map<std::string, std::string> definitions;
183 param_table *param_tbl;
185 // The value of a field in terms of protocol fields (if any).
186 std::map<std::string, scalarexp_t *> protocol_map;
191 param_tbl = new param_table();
196 param_tbl = new param_table();
199 int get_id(){return(id);};
200 void set_id(int i){id = i; };
202 int get_error_code(){return error_code;};
203 std::string get_error_str(){return err_str;};
205 virtual std::string node_type() = 0;
207 // For code generation, does the operator xform its input.
208 virtual bool makes_transform() = 0;
210 // For linking, what external libraries does the operator depend on?
211 virtual std::vector<std::string> external_libs() = 0;
213 void set_node_name(std::string n){node_name = n;};
214 std::string get_node_name(){return node_name;};
216 void set_definitions(std::map<std::string, std::string> &def){
219 std::map<std::string, std::string> get_definitions(){return definitions;};
222 // call to create the mapping from field name to se in protocol fields.
223 // Pass in qp_node of data sources, in order.
224 virtual void create_protocol_se(std::vector<qp_node *> q_sources,table_list *Schema)=0;
225 // get the protocol map. the parameter is the return value.
226 std::map<std::string, scalarexp_t *> *get_protocol_se(){return &protocol_map;}
228 // Each qp node must be able to return a description
229 // of the tuples it creates.
230 // TODO: the get_output_tbls method should subsume the get_fields
231 // method, but in fact it really just returns the
233 virtual table_def *get_fields() = 0; // Should be vector?
234 // get keys from the operator. Currently, only works on group-by queries.
235 // partial_keys set to true if there is a suspicion that the list is partial.
236 virtual std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys) = 0;
237 // Get the from clause
238 virtual std::vector<tablevar_t *> get_input_tbls() = 0;
239 // this is a confused function, it acutally return the output
241 virtual std::vector<tablevar_t *> get_output_tbls() = 0;
243 std::string get_val_of_def(std::string def){
244 if(definitions.count(def) > 0) return definitions[def];
247 void set_definition(std::string def, std::string val){
248 definitions[def]=val;
251 // Associate colrefs in SEs with tables
252 // at code generation time.
253 virtual void bind_to_schema(table_list *Schema) = 0;
255 // Get colrefs of the operator, currently only meaningful for lfta
256 // operators, and only interested in colrefs with extraction fcns
257 virtual col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema)=0;
259 virtual std::string to_query_string() = 0;
260 virtual std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform) = 0;
261 virtual std::string generate_functor_name() = 0;
263 virtual std::string generate_operator(int i, std::string params) = 0;
264 virtual std::string get_include_file() = 0;
266 virtual cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns) = 0;
267 virtual std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns) = 0;
269 // Split this node into LFTA and HFTA nodes.
270 // Four possible outcomes:
271 // 1) the qp_node reads from a protocol, but does not need to
272 // split (can be evaluated as an LFTA).
273 // The lfta node is the only element in the return vector,
274 // and hfta_returned is false.
275 // 2) the qp_node reads from no protocol, and therefore cannot be split.
276 // THe hfta node is the only element in the return vector,
277 // and hfta_returned is true.
278 // 3) reads from at least one protocol, but cannot be split : failure.
279 // return vector is empty, the error conditions are written
281 // 4) The qp_node splits into an hfta node and one or more LFTA nodes.
282 // the return vector has two or more elements, and hfta_returned
283 // is true. The last element is the HFTA.
284 virtual std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx) = 0;
287 // Ensure that any refs to interface params have been split away.
288 virtual int count_ifp_refs(std::set<std::string> &ifpnames)=0;
292 // Tag the data sources which are views,
293 // return the (optimized) source queries and
294 // record the view access in opview_set
295 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm) = 0;
297 param_table *get_param_tbl(){return param_tbl;};
299 // The "where" clause is a pre-filter
300 virtual std::vector<cnf_elem *> get_where_clause() = 0;
301 // To be more explicit, use get_filter_preds, this is used to compute the prefilter
302 virtual std::vector<cnf_elem *> get_filter_clause() = 0;
304 // Add an extra predicate. Currently only used for LFTAs.
305 virtual void append_to_where(cnf_elem *c) = 0;
307 void add_predecessor(int i){predecessors.push_back(i);};
308 void remove_predecessor(int i){
309 std::vector<int>::iterator vi;
310 for(vi=predecessors.begin(); vi!=predecessors.end();++vi){
312 predecessors.erase(vi);
317 void add_successor(int i){successors.push_back(i);};
318 std::vector<int> get_predecessors(){return predecessors;};
319 int n_predecessors(){return predecessors.size();};
320 std::vector<int> get_successors(){return successors;};
321 int n_successors(){return successors.size();};
322 void clear_predecessors(){predecessors.clear();};
323 void clear_successors(){successors.clear();};
325 // the following method is used for distributed query optimization
326 double get_rate_estimate();
329 // used for cloning query nodes
330 virtual qp_node* make_copy(std::string suffix) = 0;
335 // Select, project, transform (xform) query plan node.
336 // represent the following query fragment
337 // select scalar_expression_1, ..., scalar_expression_k
341 // the predicates and the scalar expressions can reference
342 // attributes of S and also functions.
343 class spx_qpn: public qp_node{
345 tablevar_t *table_name; // Source table
346 std::vector<cnf_elem *> where; // selection predicate
347 std::vector<select_element *> select_list; // Select list
351 std::string node_type(){return("spx_qpn"); };
352 bool makes_transform(){return true;};
353 std::vector<std::string> external_libs(){
354 std::vector<std::string> ret;
358 void append_to_where(cnf_elem *c){
363 void bind_to_schema(table_list *Schema);
364 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
366 std::string to_query_string();
367 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
368 std::string generate_functor_name();
369 std::string generate_operator(int i, std::string params);
370 std::string get_include_file(){return("#include <selection_operator.h>\n");};
372 std::vector<select_element *> get_select_list(){return select_list;};
373 std::vector<scalarexp_t *> get_select_se_list(){
374 std::vector<scalarexp_t *> ret;
376 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
379 std::vector<cnf_elem *> get_where_clause(){return where;};
380 std::vector<cnf_elem *> get_filter_clause(){return where;};
381 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
382 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
384 table_def *get_fields();
385 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
386 std::vector<string> ret;
390 std::vector<tablevar_t *> get_input_tbls();
391 std::vector<tablevar_t *> get_output_tbls();
393 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
394 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
395 // Ensure that any refs to interface params have been split away.
396 int count_ifp_refs(std::set<std::string> &ifpnames);
397 int resolve_if_params(ifq_t *ifdb, std::string &err);
401 spx_qpn(query_summary_class *qs,table_list *Schema){
402 // Get the table name.
403 // NOTE the colrefs have the table ref (an int)
404 // embedded in them. Would it make sense
405 // to grab the whole table list?
406 tablevar_list_t *fm = qs->fta_tree->get_from();
408 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
409 if(tbl_vec.size() != 1){
411 sprintf(tmpstr,"INTERNAL ERROR building SPX node: query defined over %lu tables.\n",tbl_vec.size() );
415 table_name = (tbl_vec[0]);
417 int t = tbl_vec[0]->get_schema_ref();
418 if(! Schema->is_stream(t)){
419 err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
423 // Get the select list.
424 select_list = qs->fta_tree->get_sl_vec();
426 // Get the selection predicate.
430 // Get the parameters
431 param_tbl = qs->param_tbl;
437 // the following method is used for distributed query optimization
438 double get_rate_estimate();
441 qp_node* make_copy(std::string suffix){
442 spx_qpn *ret = new spx_qpn();
444 ret->param_tbl = new param_table();
445 std::vector<std::string> param_names = param_tbl->get_param_names();
447 for(pi=0;pi<param_names.size();pi++){
448 data_type *dt = param_tbl->get_data_type(param_names[pi]);
449 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
450 param_tbl->handle_access(param_names[pi]));
452 ret->definitions = definitions;
453 ret->node_name = node_name + suffix;
455 // make shallow copy of all fields
457 ret->select_list = select_list;
461 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
467 // Select, group-by, aggregate.
469 // Select SE_1, ..., SE_k
472 // Group By gb1, ..., gb_n
475 // NOTE : the samlping operator is sgahcwcb_qpn.
477 // For now, must have group-by variables and aggregates.
478 // The scalar expressions which are output must be a function
479 // of the groub-by variables and the aggregates.
480 // The group-by variables can be references to columsn of T,
481 // or they can be scalar expressions.
482 class sgah_qpn: public qp_node{
484 tablevar_t *table_name; // source table
485 std::vector<cnf_elem *> where; // selection predicate
486 std::vector<cnf_elem *> having; // post-aggregation predicate
487 std::vector<select_element *> select_list; // se's of output
488 gb_table gb_tbl; // Table of all group-by attributes.
489 aggregate_table aggr_tbl; // Table of all referenced aggregates.
491 std::vector<scalarexp_t *> gb_sources; // pre-compute for partitioning.
493 int lfta_disorder; // maximum disorder in the steam between lfta, hfta
494 int hfta_disorder; // maximum disorder in the hfta
495 int hfta_slow_flush; // outputs per input, 0 means no slow flush
497 // rollup, cube, and grouping_sets cannot be readily reconstructed by
498 // analyzing the patterns, so explicitly record them here.
499 // used only so that to_query_string produces something meaningful.
500 std::vector<std::string> gb_entry_type;
501 std::vector<int> gb_entry_count;
503 std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
505 std::string node_type(){return("sgah_qpn"); };
506 bool makes_transform(){return true;};
507 std::vector<std::string> external_libs(){
508 std::vector<std::string> ret;
512 void bind_to_schema(table_list *Schema);
513 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
515 std::string to_query_string();
516 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
517 std::string generate_functor_name();
519 std::string generate_operator(int i, std::string params);
520 std::string get_include_file(){
521 if(hfta_disorder <= 1){
522 if(hfta_slow_flush>0){
523 return("#include <groupby_slowflush_operator.h>\n");
525 return("#include <groupby_operator.h>\n");
528 return("#include <groupby_operator_oop.h>\n");
532 std::vector<select_element *> get_select_list(){return select_list;};
533 std::vector<scalarexp_t *> get_select_se_list(){
534 std::vector<scalarexp_t *> ret;
536 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
539 std::vector<cnf_elem *> get_where_clause(){return where;};
541 void append_to_where(cnf_elem *c){
545 std::vector<cnf_elem *> get_filter_clause(){return where;};
546 std::vector<cnf_elem *> get_having_clause(){return having;};
547 gb_table *get_gb_tbl(){return &gb_tbl;};
548 aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
549 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
550 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
552 // table which represents output tuple.
553 table_def *get_fields();
554 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys);
555 std::vector<tablevar_t *> get_input_tbls();
556 std::vector<tablevar_t *> get_output_tbls();
564 sgah_qpn(query_summary_class *qs,table_list *Schema){
569 // Get the table name.
570 // NOTE the colrefs have the tablevar ref (an int)
571 // embedded in them. Would it make sense
572 // to grab the whole table list?
573 tablevar_list_t *fm = qs->fta_tree->get_from();
574 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
575 if(tbl_vec.size() != 1){
577 sprintf(tmpstr,"INTERNAL ERROR building SGAH node: query defined over %lu tables.\n",tbl_vec.size() );
581 table_name = (tbl_vec[0]);
583 int t = tbl_vec[0]->get_schema_ref();
584 if(! Schema->is_stream(t)){
585 err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
590 // Get the select list.
591 select_list = qs->fta_tree->get_sl_vec();
593 // Get the selection and having predicates.
595 having = qs->hav_cnf;
597 // Build a new GB var table (don't share, might need to modify)
599 for(g=0;g<qs->gb_tbl->size();g++){
600 gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
601 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
602 qs->gb_tbl->get_reftype(g)
605 gb_tbl.set_pattern_info(qs->gb_tbl);
606 // gb_tbl.gb_entry_type = qs->gb_tbl->gb_entry_type;
607 // gb_tbl.gb_entry_count = qs->gb_tbl->gb_entry_count;
608 // gb_tbl.pattern_components = qs->gb_tbl->pattern_components;
610 // Build a new aggregate table. (don't share, might need
613 for(a=0;a<qs->aggr_tbl->size();a++){
615 // qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
616 qs->aggr_tbl->duplicate(a)
621 // Get the parameters
622 param_tbl = qs->param_tbl;
628 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
629 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
630 // Ensure that any refs to interface params have been split away.
631 int count_ifp_refs(std::set<std::string> &ifpnames);
632 int resolve_if_params(ifq_t *ifdb, std::string &err);
634 // the following method is used for distributed query optimization
635 double get_rate_estimate();
638 qp_node* make_copy(std::string suffix){
639 sgah_qpn *ret = new sgah_qpn();
641 ret->param_tbl = new param_table();
642 std::vector<std::string> param_names = param_tbl->get_param_names();
644 for(pi=0;pi<param_names.size();pi++){
645 data_type *dt = param_tbl->get_data_type(param_names[pi]);
646 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
647 param_tbl->handle_access(param_names[pi]));
649 ret->definitions = definitions;
650 ret->hfta_slow_flush = hfta_slow_flush;
652 ret->node_name = node_name + suffix;
654 // make shallow copy of all fields
656 ret->having = having;
657 ret->select_list = select_list;
658 ret->gb_tbl = gb_tbl;
659 ret->aggr_tbl = aggr_tbl;
664 // Split aggregation into two HFTA components - sub and superaggregation
665 // If unable to split the aggreagates, split into selection and aggregation
666 // If resulting low-level query is empty (e.g. when aggregates cannot be split and
667 // where clause is empty) empty vector willb e returned
668 virtual std::vector<qp_node *> split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema);
670 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
677 // Select, group-by, aggregate. with running aggregates
679 // Select SE_1, ..., SE_k
682 // Group By gb1, ..., gb_n
683 // Closing When predicate
686 // NOTE : the sampling operator is sgahcwcb_qpn.
688 // For now, must have group-by variables and aggregates.
689 // The scalar expressions which are output must be a function
690 // of the groub-by variables and the aggregates.
691 // The group-by variables can be references to columsn of T,
692 // or they can be scalar expressions.
693 class rsgah_qpn: public qp_node{
695 tablevar_t *table_name; // source table
696 std::vector<cnf_elem *> where; // selection predicate
697 std::vector<cnf_elem *> having; // post-aggregation predicate
698 std::vector<cnf_elem *> closing_when; // group closing predicate
699 std::vector<select_element *> select_list; // se's of output
700 gb_table gb_tbl; // Table of all group-by attributes.
701 aggregate_table aggr_tbl; // Table of all referenced aggregates.
703 std::vector<scalarexp_t *> gb_sources; // pre-compute for partitioning.
705 int lfta_disorder; // maximum disorder allowed in stream between lfta, hfta
706 int hfta_disorder; // maximum disorder allowed in hfta
708 std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
711 std::string node_type(){return("rsgah_qpn"); };
712 bool makes_transform(){return true;};
713 std::vector<std::string> external_libs(){
714 std::vector<std::string> ret;
718 void bind_to_schema(table_list *Schema);
719 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
720 fprintf(stderr,"INTERNAL ERROR, calling rsgah_qpn::get_colrefs\n");
724 std::string to_query_string();
725 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
726 std::string generate_functor_name();
728 std::string generate_operator(int i, std::string params);
729 std::string get_include_file(){return("#include <running_gb_operator.h>\n");};
731 std::vector<select_element *> get_select_list(){return select_list;};
732 std::vector<scalarexp_t *> get_select_se_list(){
733 std::vector<scalarexp_t *> ret;
735 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
738 std::vector<cnf_elem *> get_where_clause(){return where;};
739 void append_to_where(cnf_elem *c){
743 std::vector<cnf_elem *> get_filter_clause(){return where;};
744 std::vector<cnf_elem *> get_having_clause(){return having;};
745 std::vector<cnf_elem *> get_closing_when_clause(){return closing_when;};
746 gb_table *get_gb_tbl(){return &gb_tbl;};
747 aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
748 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
749 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
751 // table which represents output tuple.
752 table_def *get_fields();
753 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys);
755 std::vector<tablevar_t *> get_input_tbls();
756 std::vector<tablevar_t *> get_output_tbls();
763 rsgah_qpn(query_summary_class *qs,table_list *Schema){
767 // Get the table name.
768 // NOTE the colrefs have the tablevar ref (an int)
769 // embedded in them. Would it make sense
770 // to grab the whole table list?
771 tablevar_list_t *fm = qs->fta_tree->get_from();
772 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
773 if(tbl_vec.size() != 1){
775 sprintf(tmpstr,"INTERNAL ERROR buildingR SGAH node: query defined over %lu tables.\n",tbl_vec.size() );
779 table_name = (tbl_vec[0]);
781 int t = tbl_vec[0]->get_schema_ref();
782 if(! Schema->is_stream(t)){
783 err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
787 // Get the select list.
788 select_list = qs->fta_tree->get_sl_vec();
790 // Get the selection and having predicates.
792 having = qs->hav_cnf;
793 closing_when = qs->closew_cnf;
795 // Build a new GB var table (don't share, might need to modify)
797 for(g=0;g<qs->gb_tbl->size();g++){
798 gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
799 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
800 qs->gb_tbl->get_reftype(g)
804 // Build a new aggregate table. (don't share, might need
807 for(a=0;a<qs->aggr_tbl->size();a++){
809 // qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
810 qs->aggr_tbl->duplicate(a)
815 // Get the parameters
816 param_tbl = qs->param_tbl;
822 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
823 std::vector<qp_node *> split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema);
824 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
825 // Ensure that any refs to interface params have been split away.
826 int count_ifp_refs(std::set<std::string> &ifpnames);
827 int resolve_if_params(ifq_t *ifdb, std::string &err){ return 0;}
829 // the following method is used for distributed query optimization
830 double get_rate_estimate();
832 qp_node* make_copy(std::string suffix){
833 rsgah_qpn *ret = new rsgah_qpn();
835 ret->param_tbl = new param_table();
836 std::vector<std::string> param_names = param_tbl->get_param_names();
838 for(pi=0;pi<param_names.size();pi++){
839 data_type *dt = param_tbl->get_data_type(param_names[pi]);
840 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
841 param_tbl->handle_access(param_names[pi]));
843 ret->definitions = definitions;
845 ret->node_name = node_name + suffix;
847 // make shallow copy of all fields
849 ret->having = having;
850 ret->closing_when = closing_when;
851 ret->select_list = select_list;
852 ret->gb_tbl = gb_tbl;
853 ret->aggr_tbl = aggr_tbl;
857 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
862 // Watchlist - from a table read from an external source.
864 class watch_tbl_qpn: public qp_node{
866 table_def *table_layout; // the output schema
867 std::vector<std::string> key_flds;
869 // Parameters related to loading the table
870 std::string filename;
871 int refresh_interval;
874 void append_to_where(cnf_elem *c){
875 fprintf(stderr, "ERROR, append_to_where called on watch_tbl_qpn, not supported, query %s.\n", node_name.c_str());
879 std::string node_type(){return("watch_tbl_qpn"); };
880 bool makes_transform(){return false;};
881 std::vector<std::string> external_libs(){
882 std::vector<std::string> ret;
886 void bind_to_schema(table_list *Schema){}
887 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
892 std::string to_query_string();
893 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
894 std::string generate_functor_name();
895 std::string generate_operator(int i, std::string params);
896 std::string get_include_file(){
897 return("#include <watchlist_tbl.h>\n");
900 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
901 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
903 table_def *get_fields();
904 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
908 std::vector<tablevar_t *> get_input_tbls();
909 std::vector<tablevar_t *> get_output_tbls();
911 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
912 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
913 // Ensure that any refs to interface params have been split away.
914 int count_ifp_refs(std::set<std::string> &ifpnames);
916 // No predicates, return an empty clause
917 std::vector<cnf_elem *> get_where_clause(){
918 std::vector<cnf_elem *> t;
921 std::vector<cnf_elem *> get_filter_clause(){
922 return get_where_clause();
928 watch_tbl_qpn(query_summary_class *qs,table_list *Schema){
929 node_name=qs->query_name;
930 param_tbl = qs->param_tbl;
931 definitions = qs->definitions;
934 // Populate the schema
935 table_layout = new table_def(
936 node_name.c_str(), NULL, NULL, qs->fta_tree->fel, WATCHLIST_SCHEMA
940 std::vector<field_entry *> flds = qs->fta_tree->fel->get_list();
941 for(int f=0;f<flds.size();++f){
942 if(flds[f]->get_modifier_list()->contains_key("key") ||
943 flds[f]->get_modifier_list()->contains_key("Key") ||
944 flds[f]->get_modifier_list()->contains_key("KEY") ){
945 key_flds.push_back(flds[f]->get_name());
948 if(key_flds.size()==0){
949 fprintf(stderr,"Error, no key fields defined for table watchlist %s\n",node_name.c_str());
953 table_layout->set_keys(key_flds); // communicate keys to consumers
955 // Get loading parameters
956 if(definitions.count("filename")>0){
957 filename = definitions["filename"];
959 fprintf(stderr, "Error, no filename for source data defined for table watchlist %s\n",node_name.c_str());
962 if(definitions.count("refresh_interval")>0){
963 refresh_interval = atoi(definitions["refresh_interval"].c_str());
964 if(refresh_interval <= 0){
965 fprintf(stderr, "Error, the refresh_interval (%s) of table watchlist %s must be a positive non-zero integer.\n",definitions["refresh_interval"].c_str(), node_name.c_str());
969 fprintf(stderr, "Error, no refresh_interval defined for table watchlist %s\n",node_name.c_str());
975 qp_node *make_copy(std::string suffix){
976 watch_tbl_qpn *ret = new watch_tbl_qpn();
977 ret->filename = filename;
978 ret->refresh_interval = refresh_interval;
979 ret->key_flds = key_flds;
981 ret->param_tbl = new param_table();
982 std::vector<std::string> param_names = param_tbl->get_param_names();
984 for(pi=0;pi<param_names.size();pi++){
985 data_type *dt = param_tbl->get_data_type(param_names[pi]);
986 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
987 param_tbl->handle_access(param_names[pi]));
989 ret->definitions = definitions;
991 ret->node_name = node_name + suffix;
992 ret->table_layout = table_layout->make_shallow_copy(ret->node_name);
997 // the following method is used for distributed query optimization
998 double get_rate_estimate();
1000 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1010 // forward reference
1011 class filter_join_qpn;
1012 class watch_join_qpn;
1015 // (temporal) Merge query plan node.
1016 // represent the following query fragment
1018 // from T1 _t1, T2 _t2
1020 // T1 and T2 must have compatible schemas,
1021 // that is the same types in the same slots.
1022 // c1 and c2 must be colrefs from T1 and T2,
1023 // both ref'ing the same slot. Their types
1024 // must be temporal and the same kind of temporal.
1025 // in the output, no other field is temporal.
1026 // the field names ofthe output are drawn from T1.
1027 class mrg_qpn: public qp_node{
1029 std::vector<tablevar_t *> fm; // Source table
1030 std::vector<colref_t *> mvars; // the merge-by columns.
1033 table_def *table_layout; // the output schema
1034 int merge_fieldpos; // position of merge field,
1035 // convenience for manipulation.
1037 int disorder; // max disorder seen in the input / allowed in the output
1040 // partition definition for merges that combine streams partitioned over multiple interfaces
1041 partn_def_t* partn_def;
1044 void append_to_where(cnf_elem *c){
1045 fprintf(stderr, "ERROR, append_to_where called on mrg_qpn, not supported, query %s.\n", node_name.c_str());
1051 std::string node_type(){return("mrg_qpn"); };
1052 bool makes_transform(){return false;};
1053 std::vector<std::string> external_libs(){
1054 std::vector<std::string> ret;
1058 void bind_to_schema(table_list *Schema);
1059 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
1060 fprintf(stderr,"INTERNAL ERROR, calling mrg_qpn::get_colrefs\n");
1064 std::string to_query_string();
1065 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
1066 std::string generate_functor_name();
1067 std::string generate_operator(int i, std::string params);
1068 std::string get_include_file(){
1070 return("#include <merge_operator_oop.h>\n");
1071 return("#include <merge_operator.h>\n");
1074 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1075 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1077 table_def *get_fields();
1078 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
1079 std::vector<string> ret;
1083 std::vector<tablevar_t *> get_input_tbls();
1084 std::vector<tablevar_t *> get_output_tbls();
1086 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
1087 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
1088 // Ensure that any refs to interface params have been split away.
1089 int count_ifp_refs(std::set<std::string> &ifpnames);
1091 // No predicates, return an empty clause
1092 std::vector<cnf_elem *> get_where_clause(){
1093 std::vector<cnf_elem *> t;
1096 std::vector<cnf_elem *> get_filter_clause(){
1097 return get_where_clause();
1104 void set_disorder(int d){
1108 mrg_qpn(query_summary_class *qs,table_list *Schema){
1111 // Grab the elements of the query node.
1112 fm = qs->fta_tree->get_from()->get_table_list();
1117 if(fm.size() != mvars.size()){
1118 fprintf(stderr,"INTERNAL ERROR in mrg_qpn::mrg_qpn. fm.size() = %lu, mvars.size() = %lu\n",fm.size(),mvars.size());
1122 for(int f=0;f<fm.size();++f){
1123 int t=fm[f]->get_schema_ref();
1124 if(! Schema->is_stream(t)){
1125 err_str += "ERROR in query "+node_name+", the source "+fm[f]->get_schema_name()+" is not a stream.\n";
1130 // Get the parameters
1131 param_tbl = qs->param_tbl;
1133 // Need to set the node name now, so that the
1134 // schema (table_layout) can be properly named.
1135 // TODO: Setting the name of the table might best be done
1136 // via the set_node_name method, because presumably
1137 // thats when the node name is really known.
1138 // This should propogate to the table_def table_layout
1139 node_name=qs->query_name;
1143 printf("instantiating merge node, name = %s, %d sources.\n\t",node_name.c_str(), fm.size());
1144 for(ff=0;ff<fm.size();++ff){
1145 printf("%s ",fm[ff]->to_string().c_str());
1151 // Create the output schema.
1152 // strip temporal properites form all fields except the merge field.
1153 std::vector<field_entry *> flva = Schema->get_fields(fm[0]->get_schema_name());
1154 field_entry_list *fel = new field_entry_list();
1156 for(f=0;f<flva.size();++f){
1158 data_type dt(flva[f]->get_type().c_str(), flva[f]->get_modifier_list());
1159 if(flva[f]->get_name() == mvars[0]->get_field()){
1161 // if(slack != NULL) dt.reset_temporal();
1163 dt.reset_temporal();
1166 param_list *plist = new param_list();
1167 std::vector<std::string> param_strings = dt.get_param_keys();
1169 for(p=0;p<param_strings.size();++p){
1170 std::string v = dt.get_param_val(param_strings[p]);
1172 plist->append(param_strings[p].c_str(),v.c_str());
1174 plist->append(param_strings[p].c_str());
1179 dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist, flva[f]->get_unpack_fcns());
1180 fel->append_field(fe);
1186 table_layout = new table_def(
1187 node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA
1194 /////////////////////////////////////////////
1195 /// Created for de-siloing. to be removed? or is it otherwise useful?
1196 // Merge existing set of sources (de-siloing)
1197 mrg_qpn(std::string n_name, std::vector<std::string> &src_names,table_list *Schema){
1202 // Construct the fm list
1203 for(f=0;f<src_names.size();++f){
1204 int tbl_ref = Schema->get_table_ref(src_names[f]);
1206 fprintf(stderr,"INTERNAL ERROR, can't find %s in the schema when constructing no-silo merge node %s\n",src_names[f].c_str(), n_name.c_str());
1209 table_def *src_tbl = Schema->get_table(tbl_ref);
1210 tablevar_t *fm_t = new tablevar_t(src_names[f].c_str());
1211 string range_name = "_t" + int_to_string(f);
1212 fm_t->set_range_var(range_name);
1213 fm_t->set_schema_ref(tbl_ref);
1217 // Create the output schema.
1218 // strip temporal properites form all fields except the merge field.
1219 std::vector<field_entry *> flva = Schema->get_fields(fm[0]->get_schema_name());
1220 field_entry_list *fel = new field_entry_list();
1221 bool temporal_found = false;
1222 for(f=0;f<flva.size();++f){
1224 data_type dt(flva[f]->get_type().c_str(), flva[f]->get_modifier_list());
1225 if(dt.is_temporal() && !temporal_found){
1227 temporal_found = true;
1229 dt.reset_temporal();
1232 param_list *plist = new param_list();
1233 std::vector<std::string> param_strings = dt.get_param_keys();
1235 for(p=0;p<param_strings.size();++p){
1236 std::string v = dt.get_param_val(param_strings[p]);
1238 plist->append(param_strings[p].c_str(),v.c_str());
1240 plist->append(param_strings[p].c_str());
1244 dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist,
1245 flva[f]->get_unpack_fcns()
1247 fel->append_field(fe);
1250 if(! temporal_found){
1251 fprintf(stderr,"ERROR, can't find temporal field of the sources when constructing no-silo merge node %s\n",n_name.c_str());
1256 table_layout = new table_def(
1257 node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA
1261 param_tbl = new param_table();
1264 for(f=0;f<fm.size();++f){
1265 std::vector<field_entry *> flv_f = Schema->get_fields(fm[f]->get_schema_name());
1266 data_type dt_f(flv_f[merge_fieldpos]->get_type().c_str(),
1267 flva[merge_fieldpos]->get_modifier_list());
1269 colref_t *mcr = new colref_t(fm[f]->get_var_name().c_str(),
1270 flv_f[merge_fieldpos]->get_name().c_str());
1271 mvars.push_back(mcr);
1274 // literal_t *s_lit = new literal_t("5",LITERAL_INT);
1275 // slack = new scalarexp_t(s_lit);
1280 ////////////////////////////////////////
1282 void resolve_slack(scalarexp_t *t_se, std::string fname, std::vector<std::pair<std::string, std::string> > &sources,ifq_t *ifdb, gb_table *gbt);
1285 // Merge filter_join LFTAs.
1287 mrg_qpn(filter_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb);
1289 // Merge watch_join LFTAs.
1291 mrg_qpn(watch_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb);
1293 // Merge selection LFTAs.
1295 mrg_qpn(spx_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb){
1299 param_tbl = spx->param_tbl;
1302 field_entry_list *fel = new field_entry_list();
1303 merge_fieldpos = -1;
1308 for(i=0;i<spx->select_list.size();++i){
1309 data_type *dt = spx->select_list[i]->se->get_data_type()->duplicate();
1310 if(dt->is_temporal()){
1311 if(merge_fieldpos < 0){
1314 fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), spx->select_list[merge_fieldpos]->name.c_str(), spx->select_list[i]->name.c_str(), spx->select_list[merge_fieldpos]->name.c_str() );
1315 dt->reset_temporal();
1319 field_entry *fe = dt->make_field_entry(spx->select_list[i]->name);
1320 fel->append_field(fe);
1323 if(merge_fieldpos<0){
1324 fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
1327 table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1329 // NEED TO HANDLE USER_SPECIFIED SLACK
1330 this->resolve_slack(spx->select_list[merge_fieldpos]->se,
1331 spx->select_list[merge_fieldpos]->name, ifaces, ifdb,NULL);
1332 // if(this->slack == NULL)
1333 // fprintf(stderr,"Zero slack.\n");
1335 // fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
1337 for(i=0;i<sources.size();i++){
1338 std::string rvar = "_m"+int_to_string(i);
1339 mvars.push_back(new colref_t(rvar.c_str(), spx->select_list[merge_fieldpos]->name.c_str()));
1340 mvars[i]->set_tablevar_ref(i);
1341 fm.push_back(new tablevar_t(sources[i].c_str()));
1342 fm[i]->set_range_var(rvar);
1345 param_tbl = new param_table();
1346 std::vector<std::string> param_names = spx->param_tbl->get_param_names();
1348 for(pi=0;pi<param_names.size();pi++){
1349 data_type *dt = spx->param_tbl->get_data_type(param_names[pi]);
1350 param_tbl->add_param(param_names[pi],dt->duplicate(),
1351 spx->param_tbl->handle_access(param_names[pi]));
1353 definitions = spx->definitions;
1357 // Merge aggregation LFTAs
1359 mrg_qpn(sgah_qpn *sgah, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair< std::string, std::string> > &ifaces, ifq_t *ifdb){
1363 param_tbl = sgah->param_tbl;
1366 field_entry_list *fel = new field_entry_list();
1367 merge_fieldpos = -1;
1368 for(i=0;i<sgah->select_list.size();++i){
1369 data_type *dt = sgah->select_list[i]->se->get_data_type()->duplicate();
1370 if(dt->is_temporal()){
1371 if(merge_fieldpos < 0){
1374 fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str(), sgah->select_list[i]->name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str() );
1375 dt->reset_temporal();
1379 field_entry *fe = dt->make_field_entry(sgah->select_list[i]->name);
1380 fel->append_field(fe);
1383 if(merge_fieldpos<0){
1384 fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
1387 table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1389 // NEED TO HANDLE USER_SPECIFIED SLACK
1390 this->resolve_slack(sgah->select_list[merge_fieldpos]->se,
1391 sgah->select_list[merge_fieldpos]->name, ifaces, ifdb,
1393 if(this->slack == NULL)
1394 fprintf(stderr,"Zero slack.\n");
1396 fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
1399 for(i=0;i<sources.size();i++){
1400 std::string rvar = "_m"+int_to_string(i);
1401 mvars.push_back(new colref_t(rvar.c_str(), sgah->select_list[merge_fieldpos]->name.c_str()));
1402 mvars[i]->set_tablevar_ref(i);
1403 fm.push_back(new tablevar_t(sources[i].c_str()));
1404 fm[i]->set_range_var(rvar);
1407 param_tbl = new param_table();
1408 std::vector<std::string> param_names = sgah->param_tbl->get_param_names();
1410 for(pi=0;pi<param_names.size();pi++){
1411 data_type *dt = sgah->param_tbl->get_data_type(param_names[pi]);
1412 param_tbl->add_param(param_names[pi],dt->duplicate(),
1413 sgah->param_tbl->handle_access(param_names[pi]));
1415 definitions = sgah->definitions;
1419 qp_node *make_copy(std::string suffix){
1420 mrg_qpn *ret = new mrg_qpn();
1422 ret->disorder = disorder;
1424 ret->param_tbl = new param_table();
1425 std::vector<std::string> param_names = param_tbl->get_param_names();
1427 for(pi=0;pi<param_names.size();pi++){
1428 data_type *dt = param_tbl->get_data_type(param_names[pi]);
1429 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1430 param_tbl->handle_access(param_names[pi]));
1432 ret->definitions = definitions;
1434 ret->node_name = node_name + suffix;
1435 ret->table_layout = table_layout->make_shallow_copy(ret->node_name);
1436 ret->merge_fieldpos = merge_fieldpos;
1441 std::vector<mrg_qpn *> split_sources();
1443 // the following method is used for distributed query optimization
1444 double get_rate_estimate();
1447 // get partition definition for merges that combine streams partitioned over multiple interfaces
1448 // return NULL for regular merges
1449 partn_def_t* get_partn_definition(map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result) {
1457 vector<tablevar_t *> input_tables = get_input_tbls();
1458 for (int i = 0; i < input_tables.size(); ++i) {
1459 tablevar_t * table = input_tables[i];
1461 vector<string> partn_names = ifaces_db->get_iface_vals(table->get_machine(), table->get_interface(),"iface_partition",err,err_str);
1462 if (partn_names.size() != 1) // can't have more than one value of partition attribute
1464 string new_partn_name = partn_names[0];
1466 // need to make sure that all ifaces belong to the same partition
1468 partn_name = new_partn_name;
1469 else if (new_partn_name != partn_name)
1473 // now find partition definition corresponding to partn_name
1474 partn_def = partn_parse_result->get_partn_def(partn_name);
1478 void set_partn_definition(partn_def_t* def) {
1482 bool is_multihost_merge() {
1484 bool is_multihost = false;
1486 // each input table must be have machine attribute be non-empty
1487 // and there should be at least 2 different values of machine attributes
1488 vector<tablevar_t *> input_tables = get_input_tbls();
1489 string host = input_tables[0]->get_machine();
1490 for (int i = 1; i < input_tables.size(); ++i) {
1491 string new_host = input_tables[i]->get_machine();
1494 if (new_host != host)
1495 is_multihost = true;
1497 return is_multihost;
1500 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1504 // eq_temporal, hash join query plan node.
1505 // represent the following query fragment
1506 // select scalar_expression_1, ..., scalar_expression_k
1507 // from T0 t0, T1 t1
1510 // the predicates and the scalar expressions can reference
1511 // attributes of t0 and t1 and also functions.
1512 // The predicate must contain CNF elements to enable the
1513 // efficient evaluation of the query.
1514 // 1) at least one predicate of the form
1515 // (temporal se in t0) = (temporal se in t1)
1516 // 2) at least one predicate of the form
1517 // (non-temporal se in t0) = (non-temporal se in t1)
1519 class join_eq_hash_qpn: public qp_node{
1521 std::vector<tablevar_t *> from; // Source tables
1522 std::vector<select_element *> select_list; // Select list
1523 std::vector<cnf_elem *> prefilter[2]; // source prefilters
1524 std::vector<cnf_elem *> temporal_eq; // define temporal window
1525 std::vector<cnf_elem *> hash_eq; // define hash key
1526 std::vector<cnf_elem *> postfilter; // final filter on hash matches.
1528 std::vector<cnf_elem *> where; // all the filters
1529 // useful for summary analysis
1531 std::vector<scalarexp_t *> hash_src_r, hash_src_l;
1533 std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
1534 std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
1536 std::string node_type(){return("join_eq_hash_qpn"); };
1537 bool makes_transform(){return true;};
1538 std::vector<std::string> external_libs(){
1539 std::vector<std::string> ret;
1543 void bind_to_schema(table_list *Schema);
1544 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
1545 fprintf(stderr,"INTERNAL ERROR, calling join_eq_hash_qpn::get_colrefs\n");
1549 void append_to_where(cnf_elem *c){
1550 fprintf(stderr, "Error, append_to_where called on join_hash_qpn, not supported, query is %s\n",node_name.c_str());
1555 std::string to_query_string();
1556 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
1557 std::string generate_functor_name();
1558 std::string generate_operator(int i, std::string params);
1559 std::string get_include_file(){return("#include <join_eq_hash_operator.h>\n");};
1561 std::vector<select_element *> get_select_list(){return select_list;};
1562 std::vector<scalarexp_t *> get_select_se_list(){
1563 std::vector<scalarexp_t *> ret;
1565 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
1568 // Used for LFTA only
1569 std::vector<cnf_elem *> get_where_clause(){
1570 std::vector<cnf_elem *> t;
1573 std::vector<cnf_elem *> get_filter_clause(){
1574 return get_where_clause();
1577 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1578 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1580 table_def *get_fields();
1582 // It might be feasible to find keys in an equijoin expression.
1583 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
1584 std::vector<string> ret;
1588 std::vector<tablevar_t *> get_input_tbls();
1589 std::vector<tablevar_t *> get_output_tbls();
1591 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
1592 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
1593 // Ensure that any refs to interface params have been split away.
1594 int count_ifp_refs(std::set<std::string> &ifpnames);
1598 join_eq_hash_qpn(query_summary_class *qs,table_list *Schema){
1600 // Get the table name.
1601 // NOTE the colrefs have the table ref (an int)
1602 // embedded in them. Would it make sense
1603 // to grab the whole table list?
1604 from = qs->fta_tree->get_from()->get_table_list();
1605 if(from.size() != 2){
1607 sprintf(tmpstr,"ERROR building join_eq_hash node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
1612 for(int f=0;f<from.size();++f){
1613 int t=from[f]->get_schema_ref();
1614 if(! Schema->is_stream(t)){
1615 err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n";
1621 // Get the select list.
1622 select_list = qs->fta_tree->get_sl_vec();
1624 // Get the selection predicate.
1626 for(w=0;w<where.size();++w){
1627 analyze_cnf(where[w]);
1628 std::vector<int> pred_tbls;
1629 get_tablevar_ref_pr(where[w]->pr,pred_tbls);
1630 // Prefilter if refs only one tablevar
1631 if(pred_tbls.size()==1){
1632 prefilter[pred_tbls[0]].push_back(where[w]);
1635 // refs nothing -- might be sampling, do it as postfilter.
1636 if(pred_tbls.size()==0){
1637 postfilter.push_back(where[w]);
1640 // See if it can be a hash or temporal predicate.
1641 // NOTE: synchronize with the temporality checking
1642 // done at join_eq_hash_qpn::get_fields
1643 if(where[w]->is_atom && where[w]->eq_pred){
1644 std::vector<int> sel_tbls, ser_tbls;
1645 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
1646 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
1647 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
1648 // make channel 0 SE on LHS.
1649 if(sel_tbls[0] != 0)
1650 where[w]->pr->swap_scalar_operands();
1652 data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
1653 data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
1654 if( (dtl->is_increasing() && dtr->is_increasing()) ||
1655 (dtl->is_decreasing() && dtr->is_decreasing()) )
1656 temporal_eq.push_back(where[w]);
1658 hash_eq.push_back(where[w]);
1663 // All tests failed, fallback is postfilter.
1664 postfilter.push_back(where[w]);
1667 if(temporal_eq.size()==0){
1668 err_str = "ERROR in join query: can't find temporal equality predicate to define a join window.\n";
1672 // Get the parameters
1673 param_tbl = qs->param_tbl;
1677 // the following method is used for distributed query optimization
1678 double get_rate_estimate();
1681 qp_node* make_copy(std::string suffix){
1682 join_eq_hash_qpn *ret = new join_eq_hash_qpn();
1684 ret->param_tbl = new param_table();
1685 std::vector<std::string> param_names = param_tbl->get_param_names();
1687 for(pi=0;pi<param_names.size();pi++){
1688 data_type *dt = param_tbl->get_data_type(param_names[pi]);
1689 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1690 param_tbl->handle_access(param_names[pi]));
1692 ret->definitions = definitions;
1694 ret->node_name = node_name + suffix;
1696 // make shallow copy of all fields
1699 ret->select_list = select_list;
1700 ret->prefilter[0] = prefilter[0];
1701 ret->prefilter[1] = prefilter[1];
1702 ret->postfilter = postfilter;
1703 ret->temporal_eq = temporal_eq;
1704 ret->hash_eq = hash_eq;
1708 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1713 // ---------------------------------------------
1714 // eq_temporal, hash join query plan node.
1715 // represent the following query fragment
1716 // select scalar_expression_1, ..., scalar_expression_k
1717 // FILTER_JOIN(col, range) from T0 t0, T1 t1
1720 // t0 is the output range variable, t1 is the filtering range
1721 // variable. Both must alias a PROTOCOL.
1722 // The scalar expressions in the select clause may
1723 // reference t0 only.
1724 // The predicates are classified as follows
1725 // prefilter predicates:
1726 // a cheap predicate in t0 such that there is an equivalent
1727 // predicate in t1. Cost decisions about pushing to
1728 // lfta prefilter made later.
1729 // t0 predicates (other than prefilter predicates)
1730 // -- cheap vs. expensive sorted out at genereate time,
1731 // the constructor isn't called with the function list.
1732 // t1 predicates (other than prefiler predicates).
1733 // equi-join predicates of the form:
1734 // (se in t0) = (se in t1)
1736 // There must be at least one equi-join predicate.
1737 // No join predicates other than equi-join predicates
1739 // Warn on temporal equi-join predicates.
1740 // t1 predicates should not be expensive ... warn?
1742 class filter_join_qpn: public qp_node{
1744 std::vector<tablevar_t *> from; // Source tables
1745 colref_t *temporal_var; // join window in FROM
1746 unsigned int temporal_range; // metadata.
1747 std::vector<select_element *> select_list; // Select list
1748 std::vector<cnf_elem *> shared_pred; // prefilter preds
1749 std::vector<cnf_elem *> pred_t0; // main (R) preds
1750 std::vector<cnf_elem *> pred_t1; // filtering (S) preds
1751 std::vector<cnf_elem *> hash_eq; // define hash key
1752 std::vector<cnf_elem *> postfilter; // ref's no table.
1754 std::vector<cnf_elem *> where; // all the filters
1755 // useful for summary analysis
1757 std::vector<scalarexp_t *> hash_src_r, hash_src_l;
1758 std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
1759 std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
1762 bool use_bloom; // true => bloom filter, false => limited hash
1764 std::string node_type(){return("filter_join"); };
1765 bool makes_transform(){return true;};
1766 std::vector<std::string> external_libs(){
1767 std::vector<std::string> ret;
1771 void bind_to_schema(table_list *Schema);
1772 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
1774 std::string to_query_string();
1775 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform){
1776 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor called\n");
1779 std::string generate_functor_name(){
1780 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor_name called\n");
1783 std::string generate_operator(int i, std::string params){
1784 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_operator called\n");
1787 std::string get_include_file(){return("#include <join_eq_hash_operator.h>\n");};
1789 std::vector<select_element *> get_select_list(){return select_list;};
1790 std::vector<scalarexp_t *> get_select_se_list(){
1791 std::vector<scalarexp_t *> ret;
1793 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
1796 // Used for LFTA only
1797 void append_to_where(cnf_elem *c){
1801 std::vector<cnf_elem *> get_where_clause(){return where;}
1802 std::vector<cnf_elem *> get_filter_clause(){return shared_pred;}
1804 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1805 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1807 table_def *get_fields();
1808 // It should be feasible to find keys in a filter join
1809 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
1810 std::vector<string> ret;
1814 std::vector<tablevar_t *> get_input_tbls();
1815 std::vector<tablevar_t *> get_output_tbls();
1817 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
1818 int resolve_if_params(ifq_t *ifdb, std::string &err);
1820 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
1821 // Ensure that any refs to interface params have been split away.
1822 int count_ifp_refs(std::set<std::string> &ifpnames);
1827 filter_join_qpn(query_summary_class *qs,table_list *Schema){
1829 // Get the table name.
1830 // NOTE the colrefs have the table ref (an int)
1831 // embedded in them. Would it make sense
1832 // to grab the whole table list?
1833 from = qs->fta_tree->get_from()->get_table_list();
1834 temporal_var = qs->fta_tree->get_from()->get_colref();
1835 temporal_range = qs->fta_tree->get_from()->get_temporal_range();
1836 if(from.size() != 2){
1838 sprintf(tmpstr,"ERROR building filter_join_qpn node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
1842 if(from[0]->get_interface() != from[1]->get_interface()){
1843 err_str += "Error building filter_join_qpn node: all range variables must be sourced from the same interface or interface set ("+from[0]->get_interface()+" vs. "+from[1]->get_interface()+")\n";
1847 for(int f=0;f<from.size();++f){
1848 int t=from[f]->get_schema_ref();
1849 if(! Schema->is_stream(t)){
1850 err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n";
1856 // Get the select list.
1857 select_list = qs->fta_tree->get_sl_vec();
1858 // Verify that only t0 is referenced.
1859 bool bad_ref = false;
1860 for(i=0;i<select_list.size();i++){
1861 vector<int> sel_tbls;
1862 get_tablevar_ref_se(select_list[i]->se,sel_tbls);
1863 if((sel_tbls.size() == 2) || (sel_tbls.size()==1 && sel_tbls[0]==1))
1867 err_str += "ERROR building filter_join_qpn node: query references range variable "+from[1]->variable_name+", but only the first range variable ("+from[0]->variable_name+" can be referenced.\n";
1872 // Get the selection predicate.
1874 std::vector<cnf_elem *> t0_only, t1_only;
1875 for(w=0;w<where.size();++w){
1876 analyze_cnf(where[w]);
1877 std::vector<int> pred_tbls;
1878 get_tablevar_ref_pr(where[w]->pr,pred_tbls);
1879 // Collect the list of preds by src var,
1880 // extract the shared preds later.
1881 if(pred_tbls.size()==1){
1882 if(pred_tbls[0] == 0){
1883 t0_only.push_back(where[w]);
1885 t1_only.push_back(where[w]);
1889 // refs nothing -- might be sampling, do it as postfilter.
1890 if(pred_tbls.size()==0){
1891 postfilter.push_back(where[w]);
1894 // See if it can be a hash or temporal predicate.
1895 // NOTE: synchronize with the temporality checking
1896 // done at join_eq_hash_qpn::get_fields
1897 if(where[w]->is_atom && where[w]->eq_pred){
1898 std::vector<int> sel_tbls, ser_tbls;
1899 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
1900 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
1901 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
1902 // make channel 0 SE on LHS.
1903 if(sel_tbls[0] != 0)
1904 where[w]->pr->swap_scalar_operands();
1906 hash_eq.push_back(where[w]);
1908 data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
1909 data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
1910 if( (dtl->is_increasing() && dtr->is_increasing()) ||
1911 (dtl->is_decreasing() && dtr->is_decreasing()) )
1912 err_str += "Warning, a filter join should not have join predicates on temporal fields.\n";
1917 // All tests failed, fallback is postfilter.
1918 err_str += "ERROR, join predicates in a filter join should have the form (scalar expression in "+from[0]->variable_name+") = (scalar expression in "+from[1]->variable_name+").\n";
1921 // Classify the t0_only and t1_only preds.
1922 set<int> matched_pred;
1924 for(w=0;w<t0_only.size();w++){
1925 for(v=0;v<t1_only.size();++v)
1926 if(is_equivalent_pred_base(t0_only[w]->pr,t1_only[v]->pr,Schema))
1928 if(v<t1_only.size()){
1929 shared_pred.push_back(t0_only[w]);
1930 matched_pred.insert(v);
1932 pred_t0.push_back(t0_only[w]);
1935 for(v=0;v<t1_only.size();++v){
1936 if(matched_pred.count(v) == 0)
1937 pred_t1.push_back(t1_only[v]);
1941 // Get the parameters
1942 param_tbl = qs->param_tbl;
1943 definitions = qs->definitions;
1945 // Determine the algorithm
1946 if(this->get_val_of_def("algorithm") == "hash"){
1953 // the following method is used for distributed query optimization
1954 double get_rate_estimate();
1957 qp_node* make_copy(std::string suffix){
1958 filter_join_qpn *ret = new filter_join_qpn();
1960 ret->param_tbl = new param_table();
1961 std::vector<std::string> param_names = param_tbl->get_param_names();
1963 for(pi=0;pi<param_names.size();pi++){
1964 data_type *dt = param_tbl->get_data_type(param_names[pi]);
1965 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1966 param_tbl->handle_access(param_names[pi]));
1968 ret->definitions = definitions;
1970 ret->node_name = node_name + suffix;
1972 // make shallow copy of all fields
1975 ret->temporal_range = temporal_range;
1976 ret->temporal_var = temporal_var;
1977 ret->select_list = select_list;
1978 ret->shared_pred = shared_pred;
1979 ret->pred_t0 = pred_t0;
1980 ret->pred_t1 = pred_t1;
1981 ret->postfilter = postfilter;
1982 ret->hash_eq = hash_eq;
1986 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1992 // TODO : put tests on other operators to ensure they dont' read from a watchlist
1993 // TODO : register with : is_partn_compatible pushdown_partn_operator is_pushdown_compatible pushdown_operator ?
1994 class watch_join_qpn: public qp_node{
1996 std::vector<tablevar_t *> from; // Source tables
1997 std::vector<select_element *> select_list; // Select list
1998 std::vector<cnf_elem *> pred_t0; // main (R) preds
1999 std::vector<cnf_elem *> pred_t1; // watchlist-only (S) preds (?)
2000 std::map<std::string, cnf_elem *> hash_eq; // predicates on S hash keys
2001 std::vector<cnf_elem *> join_filter; // ref's R, S, but not a hash
2002 std::vector<cnf_elem *> postfilter; // ref's no table.
2004 std::vector<std::string> key_flds;
2006 std::vector<cnf_elem *> where; // all the filters
2007 // useful for summary analysis
2009 std::vector<scalarexp_t *> hash_src_r, hash_src_l;
2010 std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
2011 std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
2015 std::string node_type(){return("watch_join"); };
2016 bool makes_transform(){return true;};
2017 std::vector<std::string> external_libs(){
2018 std::vector<std::string> ret;
2022 void bind_to_schema(table_list *Schema);
2023 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
2025 std::string to_query_string();
2026 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform){
2027 fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_functor called\n");
2030 std::string generate_functor_name(){
2031 fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_functor_name called\n");
2034 std::string generate_operator(int i, std::string params){
2035 fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_operator called\n");
2038 std::string get_include_file(){return("#include <watchlist_operator.h>\n");};
2040 std::vector<select_element *> get_select_list(){return select_list;};
2041 std::vector<scalarexp_t *> get_select_se_list(){
2042 std::vector<scalarexp_t *> ret;
2044 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
2047 // Used for LFTA only
2048 void append_to_where(cnf_elem *c){
2052 std::vector<cnf_elem *> get_where_clause(){return where;}
2054 std::vector<cnf_elem *> get_filter_clause(){return pred_t0;}
2056 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
2057 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
2059 table_def *get_fields();
2060 // It should be feasible to find keys in a watchlist join
2061 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
2062 std::vector<string> ret;
2066 std::vector<tablevar_t *> get_input_tbls();
2067 std::vector<tablevar_t *> get_output_tbls();
2069 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
2070 int resolve_if_params(ifq_t *ifdb, std::string &err);
2072 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
2073 // Ensure that any refs to interface params have been split away.
2074 int count_ifp_refs(std::set<std::string> &ifpnames);
2079 watch_join_qpn(query_summary_class *qs,table_list *Schema){
2081 // Get the table name.
2082 // NOTE the colrefs have the table ref (an int)
2083 // embedded in them. Would it make sense
2084 // to grab the whole table list?
2085 from = qs->fta_tree->get_from()->get_table_list();
2086 if(from.size() != 2){
2088 sprintf(tmpstr,"ERROR building filter_join_qpn node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
2093 int t = from[0]->get_schema_ref();
2094 if(Schema->get_schema_type(t) != PROTOCOL_SCHEMA){
2095 err_str += "ERROR in query "+node_name+", the LHS of the join must be a PROTOCOL\n";
2098 t = from[1]->get_schema_ref();
2099 if(Schema->get_schema_type(t) != WATCHLIST_SCHEMA){
2100 err_str += "ERROR in query "+node_name+", the RHS of the join must be a WATCHLIST\n";
2103 key_flds = Schema->get_table(t)->get_keys();
2106 // Get the select list.
2107 select_list = qs->fta_tree->get_sl_vec();
2109 // Get the selection predicate.
2111 std::vector<cnf_elem *> t0_only, t1_only;
2112 for(w=0;w<where.size();++w){
2113 analyze_cnf(where[w]);
2114 std::vector<int> pred_tbls;
2115 get_tablevar_ref_pr(where[w]->pr,pred_tbls);
2116 // Collect the list of preds by src var,
2117 // extract the shared preds later.
2118 if(pred_tbls.size()==1){
2119 if(pred_tbls[0] == 0){
2120 pred_t0.push_back(where[w]);
2122 pred_t1.push_back(where[w]);
2126 // refs nothing -- might be sampling, do it as postfilter.
2127 if(pred_tbls.size()==0){
2128 postfilter.push_back(where[w]);
2132 // Must reference both
2133 // See if it can be a hash predicate.
2134 if(where[w]->is_atom && where[w]->eq_pred){
2135 std::vector<int> sel_tbls, ser_tbls;
2136 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
2137 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
2138 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
2139 // make channel 0 SE on LHS.
2140 if(sel_tbls[0] != 0)
2141 where[w]->swap_scalar_operands();
2143 // Must be simple (a colref) on the RHS
2144 if(where[w]->r_simple){
2145 string rcol = where[w]->pr->get_right_se()->get_colref()->get_field();
2146 if(std::find(key_flds.begin(), key_flds.end(), rcol) != key_flds.end()){
2147 hash_eq[rcol] = where[w];
2149 data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
2150 data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
2151 if( (dtl->is_increasing() && dtr->is_increasing()) || (dtl->is_decreasing() && dtr->is_decreasing()) )
2152 err_str += "Warning, a watchlist join should not have join predicates on temporal fields, query "+node_name+".\n";
2158 // All tests failed, fallback is join_filter.
2159 join_filter.push_back(where[w]);
2162 if(key_flds.size() > hash_eq.size()){
2163 err_str += "Error, in query "+node_name+" the watchlist join does not cover all fields in the watchlist with an equality predicate. Missing fields are";
2164 for(int k=0;k<key_flds.size();++k){
2165 if(hash_eq.count(key_flds[k]) < 1){
2166 err_str += " "+key_flds[k];
2174 // Get the parameters
2175 param_tbl = qs->param_tbl;
2176 definitions = qs->definitions;
2180 // the following method is used for distributed query optimization
2181 double get_rate_estimate();
2184 qp_node* make_copy(std::string suffix){
2185 watch_join_qpn *ret = new watch_join_qpn();
2187 ret->param_tbl = new param_table();
2188 std::vector<std::string> param_names = param_tbl->get_param_names();
2190 for(pi=0;pi<param_names.size();pi++){
2191 data_type *dt = param_tbl->get_data_type(param_names[pi]);
2192 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
2193 param_tbl->handle_access(param_names[pi]));
2195 ret->definitions = definitions;
2197 ret->node_name = node_name + suffix;
2199 // make shallow copy of all fields
2202 ret->select_list = select_list;
2203 ret->key_flds = key_flds;
2204 ret->pred_t0 = pred_t0;
2205 ret->pred_t1 = pred_t1;
2206 ret->join_filter = join_filter;
2207 ret->postfilter = postfilter;
2208 ret->hash_eq = hash_eq;
2209 ret->hash_src_r = hash_src_r;
2210 ret->hash_src_l = hash_src_l;
2215 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
2222 enum output_file_type_enum {regular, gzip, bzip};
2224 class output_file_qpn: public qp_node{
2226 std::string source_op_name; // Source table
2227 std::vector<field_entry *> fields;
2228 ospec_str *output_spec;
2229 vector<tablevar_t *> fm;
2230 std::string hfta_query_name;
2231 std::string filestream_id;
2233 std::vector<std::string> params;
2235 output_file_type_enum compression_type;
2237 int n_streams; // Number of output streams
2238 int n_hfta_clones; // number of hfta clones
2239 int parallel_idx; // which close this produces output for.
2240 std::vector<int> hash_flds; // fields used to hash the output.
2242 std::string node_type(){return("output_file_qpn"); };
2243 bool makes_transform(){return false;};
2244 std::vector<std::string> external_libs(){
2245 std::vector<std::string> ret;
2246 switch(compression_type){
2248 ret.push_back("-lz");
2251 ret.push_back("-lbz2");
2259 void append_to_where(cnf_elem *c){
2260 fprintf(stderr, "ERROR, append_to_where called on output_file_qpn, not supported, query %s.\n", node_name.c_str());
2266 void bind_to_schema(table_list *Schema){}
2267 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
2272 std::string to_query_string(){return "// output_file_operator \n";}
2273 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
2274 std::string generate_functor_name();
2275 std::string generate_operator(int i, std::string params);
2276 std::string get_include_file(){
2277 switch(compression_type){
2279 return("#include <zfile_output_operator.h>\n");
2281 return("#include <file_output_operator.h>\n");
2283 return("#include <file_output_operator.h>\n");
2286 std::vector<cnf_elem *> get_where_clause(){std::vector<cnf_elem *> ret; return ret;};
2287 std::vector<cnf_elem *> get_filter_clause(){std::vector<cnf_elem *> ret; return ret;};
2288 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){cplx_lit_table *ret = new cplx_lit_table(); return ret;}
2289 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns){std::vector<handle_param_tbl_entry *> ret; return ret;}
2291 table_def *get_fields(){
2292 field_entry_list *fel = new field_entry_list();
2294 for(i=0;i<fields.size();++i)
2295 fel->append_field(fields[i]);
2296 return new table_def(node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
2299 // TODO! either bypass the output operator in stream_query,
2300 // or propagate key information when the output operator is constructed.
2301 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
2302 std::vector<string> ret;
2306 std::vector<tablevar_t *> get_input_tbls();
2307 std::vector<tablevar_t *> get_output_tbls();
2309 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
2310 std::vector<qp_node *> ret; ret.push_back(this); hfta_returned = true; return ret;
2312 std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm){
2313 std::vector<table_exp_t *> ret; return ret;
2315 // Ensure that any refs to interface params have been split away.
2316 int count_ifp_refs(std::set<std::string> &ifpnames){return 0;}
2317 int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;};
2320 output_file_qpn(std::string src_op, std::string qn, std::string fs_id, table_def *src_tbl_def, ospec_str *ospec, bool ei){
2321 source_op_name = src_op;
2322 node_name = source_op_name + "_output";
2323 filestream_id = fs_id;
2324 fields = src_tbl_def->get_fields();
2325 output_spec = ospec;
2326 fm.push_back(new tablevar_t(source_op_name.c_str()));
2327 hfta_query_name = qn;
2330 // TODO stream checking, but it requires passing Schema to output_file_qpn
2332 for(int f=0;f<fm.size();++f){
2333 int t=fm[f]->get_schema_ref();
2334 if(! Schema->is_stream(t)){
2335 err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n";
2343 compression_type = regular;
2344 if(ospec->operator_type == "zfile")
2345 compression_type = gzip;
2352 strncpy(buf, output_spec->operator_param.c_str(),1000);
2355 int nwords = split_string(buf, ':', words,100);
2357 for(i=0;i<nwords;i++){
2358 params.push_back(words[i]);
2360 for(i=0;i<params.size();i++){
2361 if(params[i] == "gzip")
2366 // Set output splitting parameters
2367 bool set_splitting_params(int np, int ix, int ns, std::string split_flds, std::string &err_report){
2372 if(split_flds != ""){
2373 string err_flds = "";
2374 char *tmpstr = strdup(split_flds.c_str());
2376 int nwords = split_string(tmpstr,':',words,100);
2378 for(i=0;i<nwords;++i){
2379 string target = words[i];
2380 for(j=0;j<fields.size();++j){
2381 if(fields[j]->get_name() == target){
2382 hash_flds.push_back(j);
2386 if(j==fields.size()){
2387 err_flds += " "+target;
2391 err_report += "ERROR in "+hfta_query_name+", a file output operator needs to split the output but these splitting fileds are not part of the output:"+err_flds;
2398 // the following method is used for distributed query optimization
2399 double get_rate_estimate(){return 1.0;}
2402 qp_node* make_copy(std::string suffix){
2403 // output_file_qpn *ret = new output_file_qpn();
2404 output_file_qpn *ret = new output_file_qpn(source_op_name, hfta_query_name, filestream_id, this->get_fields(), output_spec, eat_input);
2408 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){}
2416 // ---------------------------------------------
2419 // Select, group-by, aggregate, sampling.
2421 // Select SE_1, ..., SE_k
2424 // Group By gb1, ..., gb_n
2425 // [Subgroup gb_i1, .., gb_ik]
2426 // Cleaning_when predicate
2427 // Cleaning_by predicate
2430 // For now, must have group-by variables and aggregates.
2431 // The scalar expressions which are output must be a function
2432 // of the groub-by variables and the aggregates.
2433 // The group-by variables can be references to columsn of T,
2434 // or they can be scalar expressions.
2435 class sgahcwcb_qpn: public qp_node{
2437 tablevar_t *table_name; // source table
2438 std::vector<cnf_elem *> where; // selection predicate
2439 std::vector<cnf_elem *> having; // post-aggregation predicate
2440 std::vector<select_element *> select_list; // se's of output
2441 gb_table gb_tbl; // Table of all group-by attributes.
2442 std::set<int> sg_tbl; // Names of the superGB attributes
2443 aggregate_table aggr_tbl; // Table of all referenced aggregates.
2444 std::set<std::string> states_refd; // states ref'd by stateful fcns.
2445 std::vector<cnf_elem *> cleanby;
2446 std::vector<cnf_elem *> cleanwhen;
2448 std::vector<scalarexp_t *> gb_sources; // pre-compute for partitioning.
2450 std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
2452 std::string node_type(){return("sgahcwcb_qpn"); };
2453 bool makes_transform(){return true;};
2454 std::vector<std::string> external_libs(){
2455 std::vector<std::string> ret;
2459 void bind_to_schema(table_list *Schema);
2460 col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
2461 fprintf(stderr,"INTERNAL ERROR, calling sgahcwcb_qpn::get_colrefs\n");
2465 std::string to_query_string();
2466 std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
2467 std::string generate_functor_name();
2469 std::string generate_operator(int i, std::string params);
2470 std::string get_include_file(){return("#include <clean_operator.h>\n");};
2472 std::vector<select_element *> get_select_list(){return select_list;};
2473 std::vector<scalarexp_t *> get_select_se_list(){
2474 std::vector<scalarexp_t *> ret;
2476 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
2479 std::vector<cnf_elem *> get_where_clause(){return where;};
2480 std::vector<cnf_elem *> get_filter_clause(){return where;};
2481 std::vector<cnf_elem *> get_having_clause(){return having;};
2482 gb_table *get_gb_tbl(){return &gb_tbl;};
2483 aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
2484 cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
2485 std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
2487 // table which represents output tuple.
2488 table_def *get_fields();
2489 // TODO Key extraction should be feasible but I'll defer the issue.
2490 std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
2491 std::vector<string> ret;
2495 std::vector<tablevar_t *> get_input_tbls();
2496 std::vector<tablevar_t *> get_output_tbls();
2498 void append_to_where(cnf_elem *c){
2505 sgahcwcb_qpn(query_summary_class *qs,table_list *Schema){
2506 // Get the table name.
2507 // NOTE the colrefs have the tablevar ref (an int)
2508 // embedded in them. Would it make sense
2509 // to grab the whole table list?
2510 tablevar_list_t *fm = qs->fta_tree->get_from();
2511 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
2512 if(tbl_vec.size() != 1){
2514 sprintf(tmpstr,"INTERNAL ERROR building SGAHCWCB node: query defined over %lu tables.\n",tbl_vec.size() );
2518 table_name = (tbl_vec[0]);
2520 int t = tbl_vec[0]->get_schema_ref();
2521 if(! Schema->is_stream(t)){
2522 err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
2527 // Get the select list.
2528 select_list = qs->fta_tree->get_sl_vec();
2530 // Get the selection and having predicates.
2532 having = qs->hav_cnf;
2533 cleanby = qs->cb_cnf;
2534 cleanwhen = qs->cw_cnf;
2536 // Build a new GB var table (don't share, might need to modify)
2538 for(g=0;g<qs->gb_tbl->size();g++){
2539 gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
2540 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
2541 qs->gb_tbl->get_reftype(g)
2545 // Build a new aggregate table. (don't share, might need
2548 for(a=0;a<qs->aggr_tbl->size();a++){
2550 // qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
2551 qs->aggr_tbl->duplicate(a)
2555 sg_tbl = qs->sg_tbl;
2556 states_refd = qs->states_refd;
2559 // Get the parameters
2560 param_tbl = qs->param_tbl;
2566 std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
2567 virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
2568 // Ensure that any refs to interface params have been split away.
2569 // CURRENTLY not allowed by split_node_for_fta
2570 int count_ifp_refs(std::set<std::string> &ifpnames){return 0;}
2571 int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;}
2573 // the following method is used for distributed query optimization
2574 double get_rate_estimate();
2576 qp_node* make_copy(std::string suffix){
2577 sgahcwcb_qpn *ret = new sgahcwcb_qpn();
2579 ret->param_tbl = new param_table();
2580 std::vector<std::string> param_names = param_tbl->get_param_names();
2582 for(pi=0;pi<param_names.size();pi++){
2583 data_type *dt = param_tbl->get_data_type(param_names[pi]);
2584 ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
2585 param_tbl->handle_access(param_names[pi]));
2587 ret->definitions = definitions;
2589 ret->node_name = node_name + suffix;
2591 // make shallow copy of all fields
2593 ret->having = having;
2594 ret->select_list = select_list;
2595 ret->gb_tbl = gb_tbl;
2596 ret->aggr_tbl = aggr_tbl;
2597 ret->sg_tbl = sg_tbl;
2598 ret->states_refd = states_refd;
2599 ret->cleanby = cleanby;
2600 ret->cleanwhen = cleanwhen;
2605 void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
2609 std::vector<qp_node *> create_query_nodes(query_summary_class *qs,table_list *Schema);
2613 void untaboo(string &s);
2615 table_def *create_attributes(string tname, vector<select_element *> &select_list);