8ba08a4c52338c8e0cd3ce73c8d0b58c3cd716eb
[com/gs-lite.git] / src / ftacmp / query_plan.h
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15 #ifndef __QUERY_PLAN_H__
16 #define __QUERY_PLAN_H__
17
18 #include<vector>
19 #include<string>
20 #include<map>
21 using namespace std;
22
23 #include"analyze_fta.h"
24 #include"iface_q.h"
25 #include"parse_partn.h"
26 #include"generate_utils.h"
27
28 //              Identify the format of the input, output streams.
29 #define UNKNOWNFORMAT 0
30 #define NETFORMAT 1
31 #define HOSTFORMAT 2
32
33 ///////////////////////////////////////////////////
34 //      representation of an output operator specification
35
36 struct ospec_str{
37         string query;
38         string operator_type;
39         string operator_param;
40         string output_directory;
41         int bucketwidth;
42         string partitioning_flds;
43         int n_partitions;
44 };
45
46
47 ////////////////////////////////////////////////////
48 //      Input representation of a query
49
50 struct query_node{
51         int idx;
52         std::set<int> reads_from;
53         std::set<int> sources_to;
54         std::vector<std::string> refd_tbls;
55         std::vector<var_pair_t *> params;
56         std::string name;
57         std::string file;
58         std::string mangler;            // for UDOPs
59         bool touched;
60         table_exp_t *parse_tree;
61         int n_consumers;
62         bool is_udop;
63         bool is_externally_visible;
64         bool inferred_visible_node;
65
66         set<int> subtree_roots;
67
68         query_node(){
69                 idx = -1;
70                 touched = false;
71                 parse_tree = NULL;
72                 n_consumers = 0;
73                 is_externally_visible = false;
74                 inferred_visible_node = false;
75                 mangler="";
76         };
77         query_node(int i, std::string qnm, std::string flnm, table_exp_t *pt){
78                 idx = i;
79                 touched = false;
80                 name = qnm;
81                 file = flnm;
82                 parse_tree = pt;
83                 n_consumers = 0;
84                 is_udop = false;
85                 is_externally_visible = pt->get_visible();
86                 inferred_visible_node = false;
87                 mangler="";
88
89                 tablevar_list_t *fm = parse_tree->get_from();
90                 refd_tbls =  fm->get_table_names();
91
92                 params  = pt->query_params;
93         };
94         query_node(int ix, std::string udop_name,table_list *Schema){
95                 idx = ix;
96                 touched = false;
97                 name = udop_name;
98                 file = udop_name;
99                 parse_tree = NULL;
100                 n_consumers = 0;
101                 is_udop = true;
102                 is_externally_visible = true;
103                 inferred_visible_node = false;
104                 mangler="";
105
106                 int sid = Schema->find_tbl(udop_name);
107                 std::vector<subquery_spec *> subq = Schema->get_subqueryspecs(sid);
108                 int i;
109                 for(i=0;i<subq.size();++i){
110                         refd_tbls.push_back(subq[i]->name);
111                 }
112         };
113 };
114
115 struct hfta_node{
116         std::string name;
117         std::string source_name;
118         std::vector<int> query_node_indices;
119         std::set<int> reads_from;
120         std::set<int> sources_to;
121         bool is_udop;
122         bool inferred_visible_node;
123         int n_parallel;
124         int parallel_idx;
125         bool do_generation;     // false means, ignore it.
126
127         hfta_node(){
128                 is_udop = false;
129                 inferred_visible_node = false;
130                 n_parallel = 1;
131                 parallel_idx = 0;
132                 do_generation = true;
133         }
134 };
135
136
137
138
139
140
141 #define SPX_QUERY 1
142 #define SGAH_QUERY 2
143
144 // the following selectivity estimates are used by our primitive rate estimators
145 #define SPX_SELECTIVITY 1.0
146 #define SGAH_SELECTIVITY 0.1
147 #define RSGAH_SELECTIVITY 0.1
148 #define SGAHCWCB_SELECTIVITY 0.1
149 #define MRG_SELECTIVITY 1.0
150 #define JOIN_EQ_HASH_SELECTIVITY 1.0
151
152 // the the output rate of the interface is not given we are going to use
153 // this default value
154 #define DEFAULT_INTERFACE_RATE 100
155
156
157 //                      Define query plan nodes
158 //                      These nodes are intended for query modeling
159 //                      and transformation rather than for code generation.
160
161
162 //                      Query plan node base class.
163 //                      It has an ID, can return its type,
164 //                      and can be linked into lists with the predecessors
165 //                      and successors.
166 //                      To add : serialize, unserialize?
167
168 class qp_node{
169 public:
170   int id;
171   std::vector<int> predecessors;
172   std::vector<int> successors;
173   std::string node_name;
174
175 //              For error reporting without exiting the program.
176   int error_code;
177   std::string err_str;
178
179 //                      These should be moved to the containing stream_query object.
180   std::map<std::string, std::string> definitions;
181   param_table *param_tbl;
182
183 //              The value of a field in terms of protocol fields (if any).
184   std::map<std::string, scalarexp_t *> protocol_map;
185
186   qp_node(){
187         error_code = 0;
188         id = -1;
189         param_tbl = new param_table();
190   };
191   qp_node(int i){
192         error_code = 0;
193         id = i;
194         param_tbl = new param_table();
195   };
196
197   int get_id(){return(id);};
198   void set_id(int i){id = i;    };
199
200   int get_error_code(){return error_code;};
201   std::string get_error_str(){return err_str;};
202
203   virtual std::string node_type() = 0;
204
205 //              For code generation, does the operator xform its input.
206   virtual bool makes_transform() = 0;
207
208 //              For linking, what external libraries does the operator depend on?
209   virtual std::vector<std::string> external_libs() = 0;
210
211   void set_node_name(std::string n){node_name = n;};
212   std::string get_node_name(){return node_name;};
213
214   void set_definitions(std::map<std::string, std::string> &def){
215           definitions = def;
216   };
217   std::map<std::string, std::string> get_definitions(){return definitions;};
218
219
220 //              call to create the mapping from field name to se in protocol fields.
221 //              Pass in qp_node of data sources, in order.
222   virtual void create_protocol_se(std::vector<qp_node *> q_sources,table_list *Schema)=0;
223 //              get the protocol map.  the parameter is the return value.
224   std::map<std::string, scalarexp_t *> *get_protocol_se(){return &protocol_map;}
225
226 //              Each qp node must be able to return a description
227 //              of the tuples it creates.
228 //              TODO: the get_output_tbls method should subsume the get_fields
229 //                      method, but in fact it really just returns the
230 //                      operator name.
231   virtual table_def *get_fields() = 0;  // Should be vector?
232 //              get keys from the operator.  Currently, only works on group-by queries.
233 //              partial_keys set to true if there is a suspicion that the list is partial.
234         virtual std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys) = 0;
235 //              Get the from clause
236   virtual std::vector<tablevar_t *> get_input_tbls() = 0;
237 //              this is a confused function, it acutally return the output
238 //              table name.
239   virtual std::vector<tablevar_t *> get_output_tbls() = 0;
240
241   std::string get_val_of_def(std::string def){
242         if(definitions.count(def) > 0) return definitions[def];
243         return("");
244   };
245   void set_definition(std::string def, std::string val){
246         definitions[def]=val;
247   }
248
249 //              Associate colrefs in SEs with tables
250 //              at code generation time.
251   virtual void bind_to_schema(table_list *Schema) = 0;
252
253 //              Get colrefs of the operator, currently only meaningful for lfta
254 //              operators, and only interested in colrefs with extraction fcns
255   virtual col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema)=0;
256
257   virtual std::string to_query_string() = 0;
258   virtual std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform) = 0;
259   virtual std::string generate_functor_name() = 0;
260
261   virtual std::string generate_operator(int i, std::string params) = 0;
262   virtual std::string get_include_file() = 0;
263
264   virtual cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns) = 0;
265   virtual std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns) = 0;
266
267 //              Split this node into LFTA and HFTA nodes.
268 //              Four possible outcomes:
269 //              1) the qp_node reads from a protocol, but does not need to
270 //                      split (can be evaluated as an LFTA).
271 //                      The lfta node is the only element in the return vector,
272 //                      and hfta_returned is false.
273 //              2) the qp_node reads from no protocol, and therefore cannot be split.
274 //                      THe hfta node is the only element in the return vector,
275 //                      and hfta_returned is true.
276 //              3) reads from at least one protocol, but cannot be split : failure.
277 //                      return vector is empty, the error conditions are written
278 //                      in the qp_node.
279 //              4) The qp_node splits into an hfta node and one or more LFTA nodes.
280 //                      the return vector has two or more elements, and hfta_returned
281 //                      is true.  The last element is the HFTA.
282         virtual std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx) = 0;
283
284
285 //              Ensure that any refs to interface params have been split away.
286         virtual int count_ifp_refs(std::set<std::string> &ifpnames)=0;
287
288
289
290 //              Tag the data sources which are views,
291 //              return the (optimized) source queries and
292 //              record the view access in opview_set
293         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm) = 0;
294
295   param_table *get_param_tbl(){return param_tbl;};
296
297 //                      The "where" clause is a pre-filter
298   virtual  std::vector<cnf_elem *> get_where_clause() = 0;
299 //                      To be more explicit, use get_filter_preds
300   virtual  std::vector<cnf_elem *> get_filter_clause() = 0;
301
302 //              Add an extra predicate.  Currently only used for LFTAs.
303   virtual void append_to_where(cnf_elem *c) = 0;
304
305   void add_predecessor(int i){predecessors.push_back(i);};
306   void remove_predecessor(int i){
307         std::vector<int>::iterator vi;
308         for(vi=predecessors.begin(); vi!=predecessors.end();++vi){
309                 if((*vi) == i){
310                         predecessors.erase(vi);
311                         return;
312                 }
313         }
314   };
315   void add_successor(int i){successors.push_back(i);};
316   std::vector<int> get_predecessors(){return predecessors;};
317   int n_predecessors(){return predecessors.size();};
318   std::vector<int> get_successors(){return successors;};
319   int n_successors(){return successors.size();};
320   void clear_predecessors(){predecessors.clear();};
321   void clear_successors(){successors.clear();};
322
323   // the following method is used for distributed query optimization
324   double get_rate_estimate();
325
326
327   // used for cloning query nodes
328   virtual qp_node* make_copy(std::string suffix) = 0;
329 };
330
331
332
333 //              Select, project, transform (xform) query plan node.
334 //              represent the following query fragment
335 //                      select scalar_expression_1, ..., scalar_expression_k
336 //                      from S
337 //                      where predicate
338 //
339 //              the predicates and the scalar expressions can reference
340 //              attributes of S and also functions.
341 class spx_qpn: public qp_node{
342 public:
343         tablevar_t *table_name;                                 //      Source table
344         std::vector<cnf_elem *> where;                  // selection predicate
345         std::vector<select_element *> select_list;      //      Select list
346
347
348
349         std::string node_type(){return("spx_qpn");      };
350     bool makes_transform(){return true;};
351         std::vector<std::string> external_libs(){
352                 std::vector<std::string> ret;
353                 return ret;
354         }
355
356   void append_to_where(cnf_elem *c){
357                 where.push_back(c);
358         }
359
360
361         void bind_to_schema(table_list *Schema);
362         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
363
364         std::string to_query_string();
365         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
366         std::string generate_functor_name();
367         std::string generate_operator(int i, std::string params);
368         std::string get_include_file(){return("#include <selection_operator.h>\n");};
369
370     std::vector<select_element *> get_select_list(){return select_list;};
371     std::vector<scalarexp_t *> get_select_se_list(){
372                 std::vector<scalarexp_t *> ret;
373                 int i;
374                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
375                 return ret;
376         };
377     std::vector<cnf_elem *> get_where_clause(){return where;};
378     std::vector<cnf_elem *> get_filter_clause(){return where;};
379         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
380     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
381
382         table_def *get_fields();
383         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
384                 std::vector<string> ret;
385                 return ret;
386         }
387
388         std::vector<tablevar_t *> get_input_tbls();
389         std::vector<tablevar_t *> get_output_tbls();
390
391         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
392         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
393 //              Ensure that any refs to interface params have been split away.
394         int count_ifp_refs(std::set<std::string> &ifpnames);
395         int resolve_if_params(ifq_t *ifdb, std::string &err);
396
397         spx_qpn(){
398         };
399         spx_qpn(query_summary_class *qs,table_list *Schema){
400 //                              Get the table name.
401 //                              NOTE the colrefs have the table ref (an int)
402 //                              embedded in them.  Would it make sense
403 //                              to grab the whole table list?
404                 tablevar_list_t *fm = qs->fta_tree->get_from();
405                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
406                 if(tbl_vec.size() != 1){
407                         char tmpstr[200];
408                         sprintf(tmpstr,"INTERNAL ERROR building SPX node: query defined over %lu tables.\n",tbl_vec.size() );
409                         err_str = tmpstr;
410                         error_code = 1;
411                 }
412                 table_name = (tbl_vec[0]);
413
414 //                              Get the select list.
415                 select_list = qs->fta_tree->get_sl_vec();
416
417 //                              Get the selection predicate.
418                 where = qs->wh_cnf;
419
420
421 //                              Get the parameters
422                 param_tbl = qs->param_tbl;
423
424
425
426         };
427
428         // the following method is used for distributed query optimization
429         double get_rate_estimate();
430
431
432         qp_node* make_copy(std::string suffix){
433                 spx_qpn *ret = new spx_qpn();
434
435                 ret->param_tbl = new param_table();
436                 std::vector<std::string> param_names = param_tbl->get_param_names();
437                 int pi;
438                 for(pi=0;pi<param_names.size();pi++){
439                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
440                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
441                                                         param_tbl->handle_access(param_names[pi]));
442                 }
443                 ret->definitions = definitions;
444                 ret->node_name = node_name + suffix;
445
446                 // make shallow copy of all fields
447                 ret->where = where;
448                 ret->select_list = select_list;
449
450                 return ret;
451         };
452         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
453
454 };
455
456
457
458 //              Select, group-by, aggregate.
459 //              Representing
460 //                      Select SE_1, ..., SE_k
461 //                      From T
462 //                      Where predicate
463 //                      Group By gb1, ..., gb_n
464 //                      Having predicate
465 //
466 //              NOTE : the samlping operator is sgahcwcb_qpn.
467 //
468 //              For now, must have group-by variables and aggregates.
469 //              The scalar expressions which are output must be a function
470 //              of the groub-by variables and the aggregates.
471 //              The group-by variables can be references to columsn of T,
472 //              or they can be scalar expressions.
473 class sgah_qpn: public qp_node{
474 public:
475         tablevar_t *table_name;                         // source table
476         std::vector<cnf_elem *> where;          // selection predicate
477         std::vector<cnf_elem *> having;         // post-aggregation predicate
478         std::vector<select_element *> select_list;      // se's of output
479         gb_table gb_tbl;                        // Table of all group-by attributes.
480         aggregate_table aggr_tbl;       // Table of all referenced aggregates.
481
482         std::vector<scalarexp_t *> gb_sources;  // pre-compute for partitioning.
483
484         int lfta_disorder;              // maximum disorder in the steam between lfta, hfta
485         int hfta_disorder;              // maximum disorder in the  hfta
486
487 //              rollup, cube, and grouping_sets cannot be readily reconstructed by
488 //              analyzing the patterns, so explicitly record them here.
489 //              used only so that to_query_string produces something meaningful.
490         std::vector<std::string> gb_entry_type;
491         std::vector<int> gb_entry_count;
492
493         std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
494
495         std::string node_type(){return("sgah_qpn");     };
496     bool makes_transform(){return true;};
497         std::vector<std::string> external_libs(){
498                 std::vector<std::string> ret;
499                 return ret;
500         }
501
502         void bind_to_schema(table_list *Schema);
503         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
504
505         std::string to_query_string();
506         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
507         std::string generate_functor_name();
508
509         std::string generate_operator(int i, std::string params);
510         std::string get_include_file(){
511                         if(hfta_disorder <= 1){
512                                 return("#include <groupby_operator.h>\n");
513                         }else{
514                                 return("#include <groupby_operator_oop.h>\n");
515                         }
516         };
517
518     std::vector<select_element *> get_select_list(){return select_list;};
519     std::vector<scalarexp_t *> get_select_se_list(){
520                 std::vector<scalarexp_t *> ret;
521                 int i;
522                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
523                 return ret;
524         };
525     std::vector<cnf_elem *> get_where_clause(){return where;};
526
527   void append_to_where(cnf_elem *c){
528                 where.push_back(c);
529         }
530
531     std::vector<cnf_elem *> get_filter_clause(){return where;};
532     std::vector<cnf_elem *> get_having_clause(){return having;};
533     gb_table *get_gb_tbl(){return &gb_tbl;};
534     aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
535         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
536     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
537
538 //                              table which represents output tuple.
539         table_def *get_fields();
540         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys);
541         std::vector<tablevar_t *> get_input_tbls();
542         std::vector<tablevar_t *> get_output_tbls();
543
544
545         sgah_qpn(){
546                 lfta_disorder = 1;
547                 hfta_disorder = 1;
548         };
549         sgah_qpn(query_summary_class *qs,table_list *Schema){
550                 lfta_disorder = 1;
551                 hfta_disorder = 1;
552
553 //                              Get the table name.
554 //                              NOTE the colrefs have the tablevar ref (an int)
555 //                              embedded in them.  Would it make sense
556 //                              to grab the whole table list?
557                 tablevar_list_t *fm = qs->fta_tree->get_from();
558                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
559                 if(tbl_vec.size() != 1){
560                         char tmpstr[200];
561                         sprintf(tmpstr,"INTERNAL ERROR building SGAH node: query defined over %lu tables.\n",tbl_vec.size() );
562                         err_str=tmpstr;
563                         error_code = 1;
564                 }
565                 table_name = (tbl_vec[0]);
566
567 //                              Get the select list.
568                 select_list = qs->fta_tree->get_sl_vec();
569
570 //                              Get the selection and having predicates.
571                 where = qs->wh_cnf;
572                 having = qs->hav_cnf;
573
574 //                              Build a new GB var table (don't share, might need to modify)
575                 int g;
576                 for(g=0;g<qs->gb_tbl->size();g++){
577                         gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
578                                 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
579                                 qs->gb_tbl->get_reftype(g)
580                         );
581                 }
582                 gb_tbl.set_pattern_info(qs->gb_tbl);
583 //              gb_tbl.gb_entry_type = qs->gb_tbl->gb_entry_type;
584 //              gb_tbl.gb_entry_count = qs->gb_tbl->gb_entry_count;
585 //              gb_tbl.pattern_components = qs->gb_tbl->pattern_components;
586
587 //                              Build a new aggregate table. (don't share, might need
588 //                              to modify).
589                 int a;
590                 for(a=0;a<qs->aggr_tbl->size();a++){
591                         aggr_tbl.add_aggr(
592 //                              qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
593                                 qs->aggr_tbl->duplicate(a)
594                         );
595                 }
596
597
598 //                              Get the parameters
599                 param_tbl = qs->param_tbl;
600
601         };
602
603
604
605         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
606         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
607 //              Ensure that any refs to interface params have been split away.
608         int count_ifp_refs(std::set<std::string> &ifpnames);
609         int resolve_if_params(ifq_t *ifdb, std::string &err);
610
611         // the following method is used for distributed query optimization
612         double get_rate_estimate();
613
614
615         qp_node* make_copy(std::string suffix){
616                 sgah_qpn *ret = new sgah_qpn();
617
618                 ret->param_tbl = new param_table();
619                 std::vector<std::string> param_names = param_tbl->get_param_names();
620                 int pi;
621                 for(pi=0;pi<param_names.size();pi++){
622                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
623                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
624                                                         param_tbl->handle_access(param_names[pi]));
625                 }
626                 ret->definitions = definitions;
627
628                 ret->node_name = node_name + suffix;
629
630                 // make shallow copy of all fields
631                 ret->where = where;
632                 ret->having = having;
633                 ret->select_list = select_list;
634                 ret->gb_tbl = gb_tbl;
635                 ret->aggr_tbl = aggr_tbl;
636
637                 return ret;
638         };
639
640 //              Split aggregation into two HFTA components - sub and superaggregation
641 //              If unable to split the aggreagates, split into selection and aggregation
642 //              If resulting low-level query is empty (e.g. when aggregates cannot be split and
643 //              where clause is empty) empty vector willb e returned
644         virtual std::vector<qp_node *> split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema);
645
646         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
647
648 };
649
650
651
652
653 //              Select, group-by, aggregate. with running aggregates
654 //              Representing
655 //                      Select SE_1, ..., SE_k
656 //                      From T
657 //                      Where predicate
658 //                      Group By gb1, ..., gb_n
659 //                      Closing When predicate
660 //                      Having predicate
661 //
662 //              NOTE : the sampling operator is sgahcwcb_qpn.
663 //
664 //              For now, must have group-by variables and aggregates.
665 //              The scalar expressions which are output must be a function
666 //              of the groub-by variables and the aggregates.
667 //              The group-by variables can be references to columsn of T,
668 //              or they can be scalar expressions.
669 class rsgah_qpn: public qp_node{
670 public:
671         tablevar_t *table_name;                         // source table
672         std::vector<cnf_elem *> where;          // selection predicate
673         std::vector<cnf_elem *> having;         // post-aggregation predicate
674         std::vector<cnf_elem *> closing_when;           // group closing predicate
675         std::vector<select_element *> select_list;      // se's of output
676         gb_table gb_tbl;                        // Table of all group-by attributes.
677         aggregate_table aggr_tbl;       // Table of all referenced aggregates.
678
679         std::vector<scalarexp_t *> gb_sources;  // pre-compute for partitioning.
680
681         int lfta_disorder;              // maximum disorder allowed in stream between lfta, hfta
682         int hfta_disorder;              // maximum disorder allowed in hfta
683
684         std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
685
686
687         std::string node_type(){return("rsgah_qpn");    };
688     bool makes_transform(){return true;};
689         std::vector<std::string> external_libs(){
690                 std::vector<std::string> ret;
691                 return ret;
692         }
693
694         void bind_to_schema(table_list *Schema);
695         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
696                 fprintf(stderr,"INTERNAL ERROR, calling rsgah_qpn::get_colrefs\n");
697                 exit(1);
698         }
699
700         std::string to_query_string();
701         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
702         std::string generate_functor_name();
703
704         std::string generate_operator(int i, std::string params);
705         std::string get_include_file(){return("#include <running_gb_operator.h>\n");};
706
707     std::vector<select_element *> get_select_list(){return select_list;};
708     std::vector<scalarexp_t *> get_select_se_list(){
709                 std::vector<scalarexp_t *> ret;
710                 int i;
711                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
712                 return ret;
713         };
714     std::vector<cnf_elem *> get_where_clause(){return where;};
715   void append_to_where(cnf_elem *c){
716                 where.push_back(c);
717         }
718
719     std::vector<cnf_elem *> get_filter_clause(){return where;};
720     std::vector<cnf_elem *> get_having_clause(){return having;};
721     std::vector<cnf_elem *> get_closing_when_clause(){return closing_when;};
722     gb_table *get_gb_tbl(){return &gb_tbl;};
723     aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
724         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
725     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
726
727 //                              table which represents output tuple.
728         table_def *get_fields();
729         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys);
730
731         std::vector<tablevar_t *> get_input_tbls();
732         std::vector<tablevar_t *> get_output_tbls();
733
734
735         rsgah_qpn(){
736                 lfta_disorder = 1;
737                 hfta_disorder = 1;
738         };
739         rsgah_qpn(query_summary_class *qs,table_list *Schema){
740                 lfta_disorder = 1;
741                 hfta_disorder = 1;
742
743 //                              Get the table name.
744 //                              NOTE the colrefs have the tablevar ref (an int)
745 //                              embedded in them.  Would it make sense
746 //                              to grab the whole table list?
747                 tablevar_list_t *fm = qs->fta_tree->get_from();
748                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
749                 if(tbl_vec.size() != 1){
750                         char tmpstr[200];
751                         sprintf(tmpstr,"INTERNAL ERROR buildingR SGAH node: query defined over %lu tables.\n",tbl_vec.size() );
752                         err_str=tmpstr;
753                         error_code = 1;
754                 }
755                 table_name = (tbl_vec[0]);
756
757 //                              Get the select list.
758                 select_list = qs->fta_tree->get_sl_vec();
759
760 //                              Get the selection and having predicates.
761                 where = qs->wh_cnf;
762                 having = qs->hav_cnf;
763                 closing_when = qs->closew_cnf;
764
765 //                              Build a new GB var table (don't share, might need to modify)
766                 int g;
767                 for(g=0;g<qs->gb_tbl->size();g++){
768                         gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
769                                 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
770                                 qs->gb_tbl->get_reftype(g)
771                         );
772                 }
773
774 //                              Build a new aggregate table. (don't share, might need
775 //                              to modify).
776                 int a;
777                 for(a=0;a<qs->aggr_tbl->size();a++){
778                         aggr_tbl.add_aggr(
779 //                              qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
780                                 qs->aggr_tbl->duplicate(a)
781                         );
782                 }
783
784
785 //                              Get the parameters
786                 param_tbl = qs->param_tbl;
787
788         };
789
790
791
792         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
793         std::vector<qp_node *> split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema);
794         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
795 //              Ensure that any refs to interface params have been split away.
796         int count_ifp_refs(std::set<std::string> &ifpnames);
797         int resolve_if_params(ifq_t *ifdb, std::string &err){ return 0;}
798
799         // the following method is used for distributed query optimization
800         double get_rate_estimate();
801
802         qp_node* make_copy(std::string suffix){
803                 rsgah_qpn *ret = new rsgah_qpn();
804
805                 ret->param_tbl = new param_table();
806                 std::vector<std::string> param_names = param_tbl->get_param_names();
807                 int pi;
808                 for(pi=0;pi<param_names.size();pi++){
809                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
810                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
811                                                         param_tbl->handle_access(param_names[pi]));
812                 }
813                 ret->definitions = definitions;
814
815                 ret->node_name = node_name + suffix;
816
817                 // make shallow copy of all fields
818                 ret->where = where;
819                 ret->having = having;
820                 ret->closing_when = closing_when;
821                 ret->select_list = select_list;
822                 ret->gb_tbl = gb_tbl;
823                 ret->aggr_tbl = aggr_tbl;
824
825                 return ret;
826         };
827         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
828 };
829
830
831 //              forward reference
832 class filter_join_qpn;
833
834
835 //              (temporal) Merge query plan node.
836 //              represent the following query fragment
837 //                      Merge c1:c2
838 //                      from T1 _t1, T2 _t2
839 //
840 //              T1 and T2 must have compatible schemas,
841 //              that is the same types in the same slots.
842 //              c1 and c2 must be colrefs from T1 and T2,
843 //              both ref'ing the same slot.  Their types
844 //              must be temporal and the same kind of temporal.
845 //              in the output, no other field is temporal.
846 //              the field names ofthe output are drawn from T1.
847 class mrg_qpn: public qp_node{
848 public:
849         std::vector<tablevar_t *> fm;                                   //      Source table
850         std::vector<colref_t *> mvars;                                  // the merge-by columns.
851         scalarexp_t *slack;
852
853         table_def *table_layout;                                // the output schema
854         int merge_fieldpos;                                             // position of merge field,
855                                                                                         // convenience for manipulation.
856
857         int disorder;           // max disorder seen in the input / allowed in the output
858
859
860         // partition definition for merges that combine streams partitioned over multiple interfaces
861         partn_def_t* partn_def;
862
863
864   void append_to_where(cnf_elem *c){
865                 fprintf(stderr, "ERROR, append_to_where called on mrg_qpn, not supported, query %s.\n",  node_name.c_str());
866                 exit(1);
867         }
868
869
870
871         std::string node_type(){return("mrg_qpn");      };
872     bool makes_transform(){return false;};
873         std::vector<std::string> external_libs(){
874                 std::vector<std::string> ret;
875                 return ret;
876         }
877
878         void bind_to_schema(table_list *Schema);
879         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
880                 fprintf(stderr,"INTERNAL ERROR, calling mrg_qpn::get_colrefs\n");
881                 exit(1);
882         }
883
884         std::string to_query_string();
885         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
886         std::string generate_functor_name();
887         std::string generate_operator(int i, std::string params);
888         std::string get_include_file(){
889                 if(disorder>1)
890                         return("#include <merge_operator_oop.h>\n");
891                 return("#include <merge_operator.h>\n");
892         };
893
894         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
895     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
896
897         table_def *get_fields();
898         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
899                 std::vector<string> ret;
900                 return ret;
901         }
902
903         std::vector<tablevar_t *> get_input_tbls();
904         std::vector<tablevar_t *> get_output_tbls();
905
906         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
907         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);
908 //              Ensure that any refs to interface params have been split away.
909         int count_ifp_refs(std::set<std::string> &ifpnames);
910
911 //                      No predicates, return an empty clause
912     std::vector<cnf_elem *> get_where_clause(){
913                  std::vector<cnf_elem *> t;
914                 return(t);
915         };
916     std::vector<cnf_elem *> get_filter_clause(){
917                 return get_where_clause();
918         }
919
920         mrg_qpn(){
921                 partn_def = NULL;
922         };
923
924         void set_disorder(int d){
925                 disorder = d;
926         }
927
928         mrg_qpn(query_summary_class *qs,table_list *Schema){
929                 disorder = 1;
930
931 //                              Grab the elements of the query node.
932                 fm = qs->fta_tree->get_from()->get_table_list();
933                 mvars = qs->mvars;
934                 slack = qs->slack;
935
936 //                      sanity check
937                 if(fm.size() != mvars.size()){
938                         fprintf(stderr,"INTERNAL ERROR in mrg_qpn::mrg_qpn.  fm.size() = %lu, mvars.size() = %lu\n",fm.size(),mvars.size());
939                         exit(1);
940                 }
941
942 //                              Get the parameters
943                 param_tbl = qs->param_tbl;
944
945 //                              Need to set the node name now, so that the
946 //                              schema (table_layout) can be properly named.
947 //                              TODO: Setting the name of the table might best be done
948 //                              via the set_node_name method, because presumably
949 //                              thats when the node name is really known.
950 //                              This should propogate to the table_def table_layout
951                 node_name=qs->query_name;
952
953 /*
954 int ff;
955 printf("instantiating merge node, name = %s, %d sources.\n\t",node_name.c_str(), fm.size());
956 for(ff=0;ff<fm.size();++ff){
957 printf("%s ",fm[ff]->to_string().c_str());
958 }
959 printf("\n");
960 */
961
962
963 //              Create the output schema.
964 //              strip temporal properites form all fields except the merge field.
965                 std::vector<field_entry *> flva = Schema->get_fields(fm[0]->get_schema_name());
966                 field_entry_list *fel = new field_entry_list();
967                 int f;
968                 for(f=0;f<flva.size();++f){
969                         field_entry *fe;
970                         data_type dt(flva[f]->get_type().c_str(), flva[f]->get_modifier_list());
971                         if(flva[f]->get_name() == mvars[0]->get_field()){
972                                 merge_fieldpos = f;
973 //                              if(slack != NULL) dt.reset_temporal();
974                         }else{
975                                 dt.reset_temporal();
976                         }
977
978                         param_list *plist = new param_list();
979                         std::vector<std::string> param_strings = dt.get_param_keys();
980                         int p;
981                         for(p=0;p<param_strings.size();++p){
982                                 std::string v = dt.get_param_val(param_strings[p]);
983                                 if(v != "")
984                                         plist->append(param_strings[p].c_str(),v.c_str());
985                                 else
986                                         plist->append(param_strings[p].c_str());
987                         }
988
989
990                         fe=new field_entry(
991                                 dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist, flva[f]->get_unpack_fcns());
992                         fel->append_field(fe);
993                 }
994
995
996
997
998                 table_layout = new table_def(
999                         node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA
1000                 );
1001
1002                 partn_def = NULL;
1003         };
1004
1005
1006 /////////////////////////////////////////////
1007 ///             Created for de-siloing.  to be removed?  or is it otherwise useful?
1008 //              Merge existing set of sources (de-siloing)
1009         mrg_qpn(std::string n_name, std::vector<std::string> &src_names,table_list *Schema){
1010                 int i,f;
1011
1012                 disorder = 1;
1013
1014 //                              Construct the fm list
1015                 for(f=0;f<src_names.size();++f){
1016                         int tbl_ref = Schema->get_table_ref(src_names[f]);
1017                         if(tbl_ref < 0){
1018                                 fprintf(stderr,"INTERNAL ERROR, can't find %s in the schema when constructing no-silo merge node %s\n",src_names[f].c_str(), n_name.c_str());
1019                                 exit(1);
1020                         }
1021                         table_def *src_tbl = Schema->get_table(tbl_ref);
1022                         tablevar_t *fm_t = new tablevar_t(src_names[f].c_str());
1023                         string range_name = "_t" + int_to_string(f);
1024                         fm_t->set_range_var(range_name);
1025                         fm_t->set_schema_ref(tbl_ref);
1026                         fm.push_back(fm_t);
1027                 }
1028
1029 //              Create the output schema.
1030 //              strip temporal properites form all fields except the merge field.
1031                 std::vector<field_entry *> flva = Schema->get_fields(fm[0]->get_schema_name());
1032                 field_entry_list *fel = new field_entry_list();
1033                 bool temporal_found = false;
1034                 for(f=0;f<flva.size();++f){
1035                         field_entry *fe;
1036                         data_type dt(flva[f]->get_type().c_str(), flva[f]->get_modifier_list());
1037                         if(dt.is_temporal() && !temporal_found){
1038                                 merge_fieldpos = f;
1039                                 temporal_found = true;
1040                         }else{
1041                                 dt.reset_temporal();
1042                         }
1043
1044                         param_list *plist = new param_list();
1045                         std::vector<std::string> param_strings = dt.get_param_keys();
1046                         int p;
1047                         for(p=0;p<param_strings.size();++p){
1048                                 std::string v = dt.get_param_val(param_strings[p]);
1049                                 if(v != "")
1050                                         plist->append(param_strings[p].c_str(),v.c_str());
1051                                 else
1052                                         plist->append(param_strings[p].c_str());
1053                         }
1054
1055                         fe=new field_entry(
1056                                 dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist,
1057                                 flva[f]->get_unpack_fcns()
1058                         );
1059                         fel->append_field(fe);
1060                 }
1061
1062                 if(! temporal_found){
1063                         fprintf(stderr,"ERROR, can't find temporal field of the sources when constructing no-silo merge node %s\n",n_name.c_str());
1064                         exit(1);
1065                 }
1066
1067                 node_name=n_name;
1068                 table_layout = new table_def(
1069                         node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA
1070                 );
1071
1072                 partn_def = NULL;
1073                 param_tbl = new param_table();
1074
1075 //                      Construct mvars
1076                 for(f=0;f<fm.size();++f){
1077                         std::vector<field_entry *> flv_f = Schema->get_fields(fm[f]->get_schema_name());
1078                         data_type dt_f(flv_f[merge_fieldpos]->get_type().c_str(),
1079                              flva[merge_fieldpos]->get_modifier_list());
1080
1081                         colref_t *mcr = new colref_t(fm[f]->get_var_name().c_str(),
1082                                 flv_f[merge_fieldpos]->get_name().c_str());
1083                         mvars.push_back(mcr);
1084                 }
1085
1086 //              literal_t *s_lit = new literal_t("5",LITERAL_INT);
1087 //              slack = new scalarexp_t(s_lit);
1088                 slack = NULL;
1089
1090         };
1091 //                      end de-siloing
1092 ////////////////////////////////////////
1093
1094         void resolve_slack(scalarexp_t *t_se, std::string fname, std::vector<std::pair<std::string, std::string> > &sources,ifq_t *ifdb, gb_table *gbt);
1095
1096
1097 //                      Merge filter_join LFTAs.
1098
1099         mrg_qpn(filter_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb);
1100
1101 //                      Merge selection LFTAs.
1102
1103         mrg_qpn(spx_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb){
1104
1105                 disorder = 1;
1106
1107                 param_tbl = spx->param_tbl;
1108                 int i;
1109                 node_name = n_name;
1110                 field_entry_list *fel = new field_entry_list();
1111                 merge_fieldpos = -1;
1112
1113
1114
1115
1116                 for(i=0;i<spx->select_list.size();++i){
1117                         data_type *dt = spx->select_list[i]->se->get_data_type()->duplicate();
1118                         if(dt->is_temporal()){
1119                                 if(merge_fieldpos < 0){
1120                                         merge_fieldpos = i;
1121                                 }else{
1122                                         fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), spx->select_list[merge_fieldpos]->name.c_str(), spx->select_list[i]->name.c_str(), spx->select_list[merge_fieldpos]->name.c_str() );
1123                                         dt->reset_temporal();
1124                                 }
1125                         }
1126
1127                         field_entry *fe = dt->make_field_entry(spx->select_list[i]->name);
1128                         fel->append_field(fe);
1129                         delete dt;
1130                 }
1131                 if(merge_fieldpos<0){
1132                         fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
1133                                 exit(1);
1134                 }
1135                 table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1136
1137 //                              NEED TO HANDLE USER_SPECIFIED SLACK
1138                 this->resolve_slack(spx->select_list[merge_fieldpos]->se,
1139                                 spx->select_list[merge_fieldpos]->name, ifaces, ifdb,NULL);
1140 //      if(this->slack == NULL)
1141 //              fprintf(stderr,"Zero slack.\n");
1142 //      else
1143 //              fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
1144
1145                 for(i=0;i<sources.size();i++){
1146                         std::string rvar = "_m"+int_to_string(i);
1147                         mvars.push_back(new colref_t(rvar.c_str(), spx->select_list[merge_fieldpos]->name.c_str()));
1148                         mvars[i]->set_tablevar_ref(i);
1149                         fm.push_back(new tablevar_t(sources[i].c_str()));
1150                         fm[i]->set_range_var(rvar);
1151                 }
1152
1153                 param_tbl = new param_table();
1154                 std::vector<std::string> param_names = spx->param_tbl->get_param_names();
1155                 int pi;
1156                 for(pi=0;pi<param_names.size();pi++){
1157                         data_type *dt = spx->param_tbl->get_data_type(param_names[pi]);
1158                         param_tbl->add_param(param_names[pi],dt->duplicate(),
1159                                                         spx->param_tbl->handle_access(param_names[pi]));
1160                 }
1161                 definitions = spx->definitions;
1162
1163         }
1164
1165 //              Merge aggregation LFTAs
1166
1167         mrg_qpn(sgah_qpn *sgah, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair< std::string, std::string> > &ifaces, ifq_t *ifdb){
1168
1169                 disorder = 1;
1170
1171                 param_tbl = sgah->param_tbl;
1172                 int i;
1173                 node_name = n_name;
1174                 field_entry_list *fel = new field_entry_list();
1175                 merge_fieldpos = -1;
1176                 for(i=0;i<sgah->select_list.size();++i){
1177                         data_type *dt = sgah->select_list[i]->se->get_data_type()->duplicate();
1178                         if(dt->is_temporal()){
1179                                 if(merge_fieldpos < 0){
1180                                         merge_fieldpos = i;
1181                                 }else{
1182                                         fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str(), sgah->select_list[i]->name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str() );
1183                                         dt->reset_temporal();
1184                                 }
1185                         }
1186
1187                         field_entry *fe = dt->make_field_entry(sgah->select_list[i]->name);
1188                         fel->append_field(fe);
1189                         delete dt;
1190                 }
1191                 if(merge_fieldpos<0){
1192                         fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
1193                         exit(1);
1194                 }
1195                 table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1196
1197 //                              NEED TO HANDLE USER_SPECIFIED SLACK
1198                 this->resolve_slack(sgah->select_list[merge_fieldpos]->se,
1199                                 sgah->select_list[merge_fieldpos]->name, ifaces, ifdb,
1200                                 &(sgah->gb_tbl));
1201                 if(this->slack == NULL)
1202                         fprintf(stderr,"Zero slack.\n");
1203                 else
1204                         fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
1205
1206
1207                 for(i=0;i<sources.size();i++){
1208                         std::string rvar = "_m"+int_to_string(i);
1209                         mvars.push_back(new colref_t(rvar.c_str(), sgah->select_list[merge_fieldpos]->name.c_str()));
1210                         mvars[i]->set_tablevar_ref(i);
1211                         fm.push_back(new tablevar_t(sources[i].c_str()));
1212                         fm[i]->set_range_var(rvar);
1213                 }
1214
1215                 param_tbl = new param_table();
1216                 std::vector<std::string> param_names = sgah->param_tbl->get_param_names();
1217                 int pi;
1218                 for(pi=0;pi<param_names.size();pi++){
1219                         data_type *dt = sgah->param_tbl->get_data_type(param_names[pi]);
1220                         param_tbl->add_param(param_names[pi],dt->duplicate(),
1221                                                         sgah->param_tbl->handle_access(param_names[pi]));
1222                 }
1223                 definitions = sgah->definitions;
1224
1225         }
1226
1227         qp_node *make_copy(std::string suffix){
1228                 mrg_qpn *ret = new mrg_qpn();
1229                 ret->slack = slack;
1230                 ret->disorder = disorder;
1231
1232                 ret->param_tbl = new param_table();
1233                 std::vector<std::string> param_names = param_tbl->get_param_names();
1234                 int pi;
1235                 for(pi=0;pi<param_names.size();pi++){
1236                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
1237                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1238                                                         param_tbl->handle_access(param_names[pi]));
1239                 }
1240                 ret->definitions = definitions;
1241
1242                 ret->node_name = node_name + suffix;
1243                 ret->table_layout = table_layout->make_shallow_copy(ret->node_name);
1244                 ret->merge_fieldpos = merge_fieldpos;
1245
1246                 return ret;
1247         };
1248
1249         std::vector<mrg_qpn *> split_sources();
1250
1251         // the following method is used for distributed query optimization
1252         double get_rate_estimate();
1253
1254
1255         // get partition definition for merges that combine streams partitioned over multiple interfaces
1256         // return NULL for regular merges
1257         partn_def_t* get_partn_definition(map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result) {
1258                 if (partn_def)
1259                         return partn_def;
1260
1261                 int err;
1262                 string err_str;
1263                 string partn_name;
1264
1265                 vector<tablevar_t *> input_tables = get_input_tbls();
1266                 for (int i = 0; i <  input_tables.size(); ++i) {
1267                         tablevar_t * table = input_tables[i];
1268
1269                         vector<string> partn_names = ifaces_db->get_iface_vals(table->get_machine(), table->get_interface(),"iface_partition",err,err_str);
1270                         if (partn_names.size() != 1)    // can't have more than one value of partition attribute
1271                                 return NULL;
1272                         string new_partn_name = partn_names[0];
1273
1274                         // need to make sure that all ifaces belong to the same partition
1275                         if (!i)
1276                                 partn_name = new_partn_name;
1277                         else if (new_partn_name != partn_name)
1278                                 return NULL;
1279                 }
1280
1281                 // now find partition definition corresponding to partn_name
1282                 partn_def = partn_parse_result->get_partn_def(partn_name);
1283                 return partn_def;
1284         };
1285
1286         void set_partn_definition(partn_def_t* def) {
1287                 partn_def = def;
1288         }
1289
1290         bool is_multihost_merge() {
1291
1292                 bool is_multihost = false;
1293
1294                 // each input table must be have machine attribute be non-empty
1295                 // and there should be at least 2 different values of machine attributes
1296                 vector<tablevar_t *> input_tables = get_input_tbls();
1297                 string host = input_tables[0]->get_machine();
1298                 for  (int i = 1; i < input_tables.size(); ++i) {
1299                         string new_host = input_tables[i]->get_machine();
1300                         if (new_host == "")
1301                                 return false;
1302                         if (new_host != host)
1303                                 is_multihost = true;
1304                 }
1305                 return is_multihost;
1306         }
1307
1308         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1309 };
1310
1311
1312 //              eq_temporal, hash join query plan node.
1313 //              represent the following query fragment
1314 //                      select scalar_expression_1, ..., scalar_expression_k
1315 //                      from T0 t0, T1 t1
1316 //                      where predicate
1317 //
1318 //              the predicates and the scalar expressions can reference
1319 //              attributes of t0 and t1 and also functions.
1320 //              The predicate must contain CNF elements to enable the
1321 //              efficient evaluation of the query.
1322 //              1) at least one predicate of the form
1323 //                      (temporal se in t0) = (temporal se in t1)
1324 //              2) at least one predicate of the form
1325 //                      (non-temporal se in t0) = (non-temporal se in t1)
1326 //
1327 class join_eq_hash_qpn: public qp_node{
1328 public:
1329         std::vector<tablevar_t *> from;                                 //      Source tables
1330         std::vector<select_element *> select_list;      //      Select list
1331         std::vector<cnf_elem *> prefilter[2];           // source prefilters
1332         std::vector<cnf_elem *> temporal_eq;            // define temporal window
1333         std::vector<cnf_elem *> hash_eq;                        // define hash key
1334         std::vector<cnf_elem *> postfilter;                     // final filter on hash matches.
1335
1336         std::vector<cnf_elem *> where;                          // all the filters
1337                                                                                                 // useful for summary analysis
1338
1339         std::vector<scalarexp_t *> hash_src_r, hash_src_l;
1340
1341         std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
1342         std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
1343
1344         std::string node_type(){return("join_eq_hash_qpn");     };
1345     bool makes_transform(){return true;};
1346         std::vector<std::string> external_libs(){
1347                 std::vector<std::string> ret;
1348                 return ret;
1349         }
1350
1351         void bind_to_schema(table_list *Schema);
1352         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
1353                 fprintf(stderr,"INTERNAL ERROR, calling join_eq_hash_qpn::get_colrefs\n");
1354                 exit(1);
1355         }
1356
1357   void append_to_where(cnf_elem *c){
1358                 fprintf(stderr, "Error, append_to_where called on join_hash_qpn, not supported, query is %s\n",node_name.c_str());
1359                 exit(1);
1360         }
1361
1362
1363         std::string to_query_string();
1364         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
1365         std::string generate_functor_name();
1366         std::string generate_operator(int i, std::string params);
1367         std::string get_include_file(){return("#include <join_eq_hash_operator.h>\n");};
1368
1369     std::vector<select_element *> get_select_list(){return select_list;};
1370     std::vector<scalarexp_t *> get_select_se_list(){
1371                 std::vector<scalarexp_t *> ret;
1372                 int i;
1373                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
1374                 return ret;
1375         };
1376 //                      Used for LFTA only
1377     std::vector<cnf_elem *> get_where_clause(){
1378                  std::vector<cnf_elem *> t;
1379                 return(t);
1380         };
1381     std::vector<cnf_elem *> get_filter_clause(){
1382                 return get_where_clause();
1383         }
1384
1385         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1386     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1387
1388         table_def *get_fields();
1389
1390 //              It might be feasible to find keys in an equijoin expression.
1391         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
1392                 std::vector<string> ret;
1393                 return ret;
1394         }
1395
1396         std::vector<tablevar_t *> get_input_tbls();
1397         std::vector<tablevar_t *> get_output_tbls();
1398
1399         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
1400         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);
1401 //              Ensure that any refs to interface params have been split away.
1402         int count_ifp_refs(std::set<std::string> &ifpnames);
1403
1404         join_eq_hash_qpn(){
1405         };
1406         join_eq_hash_qpn(query_summary_class *qs,table_list *Schema){
1407                 int w;
1408 //                              Get the table name.
1409 //                              NOTE the colrefs have the table ref (an int)
1410 //                              embedded in them.  Would it make sense
1411 //                              to grab the whole table list?
1412                 from = qs->fta_tree->get_from()->get_table_list();
1413                 if(from.size() != 2){
1414                         char tmpstr[200];
1415                         sprintf(tmpstr,"ERROR building join_eq_hash node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
1416                         err_str = tmpstr;
1417                         error_code = 1;
1418                 }
1419
1420 //                              Get the select list.
1421                 select_list = qs->fta_tree->get_sl_vec();
1422
1423 //                              Get the selection predicate.
1424                 where = qs->wh_cnf;
1425                 for(w=0;w<where.size();++w){
1426                         analyze_cnf(where[w]);
1427                         std::vector<int> pred_tbls;
1428                         get_tablevar_ref_pr(where[w]->pr,pred_tbls);
1429 //                              Prefilter if refs only one tablevar
1430                         if(pred_tbls.size()==1){
1431                                 prefilter[pred_tbls[0]].push_back(where[w]);
1432                                 continue;
1433                         }
1434 //                              refs nothing -- might be sampling, do it as postfilter.
1435                         if(pred_tbls.size()==0){
1436                                 postfilter.push_back(where[w]);
1437                                 continue;
1438                         }
1439 //                              See if it can be a hash or temporal predicate.
1440 //                              NOTE: synchronize with the temporality checking
1441 //                              done at join_eq_hash_qpn::get_fields
1442                         if(where[w]->is_atom && where[w]->eq_pred){
1443                                 std::vector<int> sel_tbls, ser_tbls;
1444                                 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
1445                                 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
1446                                 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
1447 //                                              make channel 0 SE on LHS.
1448                                         if(sel_tbls[0] != 0)
1449                                                 where[w]->pr->swap_scalar_operands();
1450
1451                                         data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
1452                                         data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
1453                                         if( (dtl->is_increasing() && dtr->is_increasing()) ||
1454                                             (dtl->is_decreasing() && dtr->is_decreasing()) )
1455                                                         temporal_eq.push_back(where[w]);
1456                                         else
1457                                                         hash_eq.push_back(where[w]);
1458                                         continue;
1459
1460                                 }
1461                         }
1462 //                              All tests failed, fallback is postfilter.
1463                         postfilter.push_back(where[w]);
1464                 }
1465
1466                 if(temporal_eq.size()==0){
1467                         err_str = "ERROR in join query: can't find temporal equality predicate to define a join window.\n";
1468                         error_code = 1;
1469                 }
1470
1471 //                              Get the parameters
1472                 param_tbl = qs->param_tbl;
1473
1474         };
1475
1476         // the following method is used for distributed query optimization
1477         double get_rate_estimate();
1478
1479
1480         qp_node* make_copy(std::string suffix){
1481                 join_eq_hash_qpn *ret = new join_eq_hash_qpn();
1482
1483                 ret->param_tbl = new param_table();
1484                 std::vector<std::string> param_names = param_tbl->get_param_names();
1485                 int pi;
1486                 for(pi=0;pi<param_names.size();pi++){
1487                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
1488                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1489                                                         param_tbl->handle_access(param_names[pi]));
1490                 }
1491                 ret->definitions = definitions;
1492
1493                 ret->node_name = node_name + suffix;
1494
1495                 // make shallow copy of all fields
1496                 ret->where = where;
1497                 ret->from = from;
1498                 ret->select_list = select_list;
1499                 ret->prefilter[0] = prefilter[0];
1500                 ret->prefilter[1] = prefilter[1];
1501                 ret->postfilter = postfilter;
1502                 ret->temporal_eq = temporal_eq;
1503                 ret->hash_eq = hash_eq;
1504
1505                 return ret;
1506         };
1507         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1508
1509 };
1510
1511
1512 // ---------------------------------------------
1513 //              eq_temporal, hash join query plan node.
1514 //              represent the following query fragment
1515 //                      select scalar_expression_1, ..., scalar_expression_k
1516 //                      FILTER_JOIN(col, range) from T0 t0, T1 t1
1517 //                      where predicate
1518 //
1519 //              t0 is the output range variable, t1 is the filtering range
1520 //              variable.  Both must alias a PROTOCOL.
1521 //              The scalar expressions in the select clause may
1522 //              reference t0 only.
1523 //              The predicates are classified as follows
1524 //              prefilter predicates:
1525 //                a cheap predicate in t0 such that there is an equivalent
1526 //                predicate in t1.  Cost decisions about pushing to
1527 //                lfta prefilter made later.
1528 //              t0 predicates (other than prefilter predicates)
1529 //                      -- cheap vs. expensive sorted out at genereate time,
1530 //                              the constructor isn't called with the function list.
1531 //              t1 predicates (other than prefiler predicates).
1532 //              equi-join predicates of the form:
1533 //                      (se in t0) = (se in t1)
1534 //
1535 //              There must be at least one equi-join predicate.
1536 //              No join predicates other than equi-join predicates
1537 //                are allowed.
1538 //              Warn on temporal equi-join predicates.
1539 //              t1 predicates should not be expensive ... warn?
1540 //
1541 class filter_join_qpn: public qp_node{
1542 public:
1543         std::vector<tablevar_t *> from;                                 //      Source tables
1544                 colref_t *temporal_var;                 // join window in FROM
1545                 unsigned int temporal_range;    // metadata.
1546         std::vector<select_element *> select_list;      //      Select list
1547         std::vector<cnf_elem *> shared_pred;            // prefilter preds
1548         std::vector<cnf_elem *> pred_t0;                        // main (R) preds
1549         std::vector<cnf_elem *> pred_t1;                        // filtering (S) preds
1550         std::vector<cnf_elem *> hash_eq;                        // define hash key
1551         std::vector<cnf_elem *> postfilter;                     // ref's no table.
1552
1553         std::vector<cnf_elem *> where;                          // all the filters
1554                                                                                                 // useful for summary analysis
1555
1556         std::vector<scalarexp_t *> hash_src_r, hash_src_l;
1557         std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
1558         std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
1559
1560
1561         bool use_bloom;                 // true => bloom filter, false => limited hash
1562
1563         std::string node_type(){return("filter_join");  };
1564     bool makes_transform(){return true;};
1565         std::vector<std::string> external_libs(){
1566                 std::vector<std::string> ret;
1567                 return ret;
1568         }
1569
1570         void bind_to_schema(table_list *Schema);
1571         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
1572
1573         std::string to_query_string();
1574         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform){
1575                 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor called\n");
1576                 exit(1);
1577         }
1578         std::string generate_functor_name(){
1579                 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor_name called\n");
1580                 exit(1);
1581         }
1582         std::string generate_operator(int i, std::string params){
1583                 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_operator called\n");
1584                 exit(1);
1585         }
1586         std::string get_include_file(){return("#include <join_eq_hash_operator.h>\n");};
1587
1588     std::vector<select_element *> get_select_list(){return select_list;};
1589     std::vector<scalarexp_t *> get_select_se_list(){
1590                 std::vector<scalarexp_t *> ret;
1591                 int i;
1592                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
1593                 return ret;
1594         };
1595 //                      Used for LFTA only
1596         void append_to_where(cnf_elem *c){
1597                 where.push_back(c);
1598         }
1599
1600     std::vector<cnf_elem *> get_where_clause(){return where;}
1601     std::vector<cnf_elem *> get_filter_clause(){return shared_pred;}
1602
1603         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1604     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1605
1606         table_def *get_fields();
1607 //              It should be feasible to find keys in a filter join
1608         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
1609                 std::vector<string> ret;
1610                 return ret;
1611         }
1612
1613         std::vector<tablevar_t *> get_input_tbls();
1614         std::vector<tablevar_t *> get_output_tbls();
1615
1616         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
1617         int resolve_if_params(ifq_t *ifdb, std::string &err);
1618
1619         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);
1620 //              Ensure that any refs to interface params have been split away.
1621         int count_ifp_refs(std::set<std::string> &ifpnames);
1622
1623 //              CONSTRUCTOR
1624         filter_join_qpn(){
1625         };
1626         filter_join_qpn(query_summary_class *qs,table_list *Schema){
1627                 int i,w;
1628 //                              Get the table name.
1629 //                              NOTE the colrefs have the table ref (an int)
1630 //                              embedded in them.  Would it make sense
1631 //                              to grab the whole table list?
1632                 from = qs->fta_tree->get_from()->get_table_list();
1633                 temporal_var = qs->fta_tree->get_from()->get_colref();
1634                 temporal_range = qs->fta_tree->get_from()->get_temporal_range();
1635                 if(from.size() != 2){
1636                         char tmpstr[200];
1637                         sprintf(tmpstr,"ERROR building filter_join_qpn node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
1638                         err_str += tmpstr;
1639                         error_code = 1;
1640                 }
1641                 if(from[0]->get_interface() != from[1]->get_interface()){
1642                         err_str += "Error building filter_join_qpn node: all range variables must be sourced from the same interface or interface set ("+from[0]->get_interface()+" vs. "+from[1]->get_interface()+")\n";
1643                         error_code = 1;
1644                 }
1645
1646 //                              Get the select list.
1647                 select_list = qs->fta_tree->get_sl_vec();
1648 //                              Verify that only t0 is referenced.
1649                 bool bad_ref = false;
1650                 for(i=0;i<select_list.size();i++){
1651                         vector<int> sel_tbls;
1652                         get_tablevar_ref_se(select_list[i]->se,sel_tbls);
1653                         if((sel_tbls.size() == 2) || (sel_tbls.size()==1 && sel_tbls[0]==1))
1654                                 bad_ref = true;
1655                 }
1656                 if(bad_ref){
1657                         err_str += "ERROR building filter_join_qpn node: query references range variable "+from[1]->variable_name+", but only the first range variable ("+from[0]->variable_name+" can be referenced.\n";
1658                         error_code = 1;
1659                 }
1660
1661
1662 //                              Get the selection predicate.
1663                 where = qs->wh_cnf;
1664                 std::vector<cnf_elem *> t0_only, t1_only;
1665                 for(w=0;w<where.size();++w){
1666                         analyze_cnf(where[w]);
1667                         std::vector<int> pred_tbls;
1668                         get_tablevar_ref_pr(where[w]->pr,pred_tbls);
1669 //                              Collect the list of preds by src var,
1670 //                              extract the shared preds later.
1671                         if(pred_tbls.size()==1){
1672                                 if(pred_tbls[0] == 0){
1673                                         t0_only.push_back(where[w]);
1674                                 }else{
1675                                         t1_only.push_back(where[w]);
1676                                 }
1677                                 continue;
1678                         }
1679 //                              refs nothing -- might be sampling, do it as postfilter.
1680                         if(pred_tbls.size()==0){
1681                                 postfilter.push_back(where[w]);
1682                                 continue;
1683                         }
1684 //                              See if it can be a hash or temporal predicate.
1685 //                              NOTE: synchronize with the temporality checking
1686 //                              done at join_eq_hash_qpn::get_fields
1687                         if(where[w]->is_atom && where[w]->eq_pred){
1688                                 std::vector<int> sel_tbls, ser_tbls;
1689                                 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
1690                                 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
1691                                 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
1692 //                                              make channel 0 SE on LHS.
1693                                         if(sel_tbls[0] != 0)
1694                                                 where[w]->pr->swap_scalar_operands();
1695
1696                                         hash_eq.push_back(where[w]);
1697
1698                                         data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
1699                                         data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
1700                                         if( (dtl->is_increasing() && dtr->is_increasing()) ||
1701                                             (dtl->is_decreasing() && dtr->is_decreasing()) )
1702                                                 err_str += "Warning, a filter join should not have join predicates on temporal fields.\n";
1703                                         continue;
1704
1705                                 }
1706                         }
1707 //                              All tests failed, fallback is postfilter.
1708                         err_str += "ERROR, join predicates in a filter join should have the form (scalar expression in "+from[0]->variable_name+") = (scalar expression in "+from[1]->variable_name+").\n";
1709                         error_code = 3;
1710                 }
1711 //              Classify the t0_only and t1_only preds.
1712                 set<int> matched_pred;
1713                 int v;
1714                 for(w=0;w<t0_only.size();w++){
1715                         for(v=0;v<t1_only.size();++v)
1716                                 if(is_equivalent_pred_base(t0_only[w]->pr,t1_only[v]->pr,Schema))
1717                                         break;
1718                         if(v<t1_only.size()){
1719                                 shared_pred.push_back(t0_only[w]);
1720                                 matched_pred.insert(v);
1721                         }else{
1722                                 pred_t0.push_back(t0_only[w]);
1723                         }
1724                 }
1725                 for(v=0;v<t1_only.size();++v){
1726                         if(matched_pred.count(v) == 0)
1727                                 pred_t1.push_back(t1_only[v]);
1728                 }
1729
1730
1731 //                              Get the parameters
1732                 param_tbl = qs->param_tbl;
1733                 definitions = qs->definitions;
1734
1735 //                              Determine the algorithm
1736                 if(this->get_val_of_def("algorithm") == "hash"){
1737                         use_bloom = false;
1738                 }else{
1739                         use_bloom = true;
1740                 }
1741         };
1742
1743         // the following method is used for distributed query optimization
1744         double get_rate_estimate();
1745
1746
1747         qp_node* make_copy(std::string suffix){
1748                 filter_join_qpn *ret = new filter_join_qpn();
1749
1750                 ret->param_tbl = new param_table();
1751                 std::vector<std::string> param_names = param_tbl->get_param_names();
1752                 int pi;
1753                 for(pi=0;pi<param_names.size();pi++){
1754                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
1755                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1756                                                         param_tbl->handle_access(param_names[pi]));
1757                 }
1758                 ret->definitions = definitions;
1759
1760                 ret->node_name = node_name + suffix;
1761
1762                 // make shallow copy of all fields
1763                 ret->where = where;
1764                 ret->from = from;
1765                 ret->temporal_range = temporal_range;
1766                 ret->temporal_var = temporal_var;
1767                 ret->select_list = select_list;
1768                 ret->shared_pred = shared_pred;
1769                 ret->pred_t0 = pred_t0;
1770                 ret->pred_t1 = pred_t1;
1771                 ret->postfilter = postfilter;
1772                 ret->hash_eq = hash_eq;
1773
1774                 return ret;
1775         };
1776         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1777
1778 };
1779
1780
1781 enum output_file_type_enum {regular, gzip, bzip};
1782
1783 class output_file_qpn: public qp_node{
1784 public:
1785         std::string source_op_name;                                     //      Source table
1786         std::vector<field_entry *> fields;
1787         ospec_str *output_spec;
1788         vector<tablevar_t *> fm;
1789         std::string hfta_query_name;
1790         std::string filestream_id;
1791         bool eat_input;
1792         std::vector<std::string> params;
1793         bool do_gzip;
1794         output_file_type_enum compression_type;
1795
1796         int n_streams;          // Number of output streams
1797         int n_hfta_clones;      // number of hfta clones
1798         int parallel_idx;       // which close this produces output for.
1799         std::vector<int> hash_flds;     // fields used to hash the output.
1800
1801         std::string node_type(){return("output_file_qpn");      };
1802     bool makes_transform(){return false;};
1803         std::vector<std::string> external_libs(){
1804                 std::vector<std::string> ret;
1805                 switch(compression_type){
1806                 case gzip:
1807                         ret.push_back("-lz");
1808                 break;
1809                 case bzip:
1810                         ret.push_back("-lbz2");
1811                 break;
1812                 default:
1813                 break;
1814                 }
1815                 return ret;
1816         }
1817
1818   void append_to_where(cnf_elem *c){
1819                 fprintf(stderr, "ERROR, append_to_where called on output_file_qpn, not supported, query %s.\n",  node_name.c_str());
1820                 exit(1);
1821         }
1822
1823
1824
1825         void bind_to_schema(table_list *Schema){}
1826         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
1827                 col_id_set ret;
1828                 return ret;
1829         }
1830
1831         std::string to_query_string(){return "// output_file_operator \n";}
1832         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
1833         std::string generate_functor_name();
1834         std::string generate_operator(int i, std::string params);
1835         std::string get_include_file(){
1836                 switch(compression_type){
1837                 case gzip:
1838                         return("#include <zfile_output_operator.h>\n");
1839                 default:
1840                         return("#include <file_output_operator.h>\n");
1841                 }
1842                 return("#include <file_output_operator.h>\n");
1843         };
1844
1845     std::vector<cnf_elem *> get_where_clause(){std::vector<cnf_elem *> ret; return ret;};
1846     std::vector<cnf_elem *> get_filter_clause(){std::vector<cnf_elem *> ret; return ret;};
1847         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){cplx_lit_table *ret = new cplx_lit_table(); return ret;}
1848     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns){std::vector<handle_param_tbl_entry *> ret; return ret;}
1849
1850         table_def *get_fields(){
1851                 field_entry_list *fel = new field_entry_list();
1852                 int i;
1853                 for(i=0;i<fields.size();++i)
1854                         fel->append_field(fields[i]);
1855                 return new table_def(node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1856         }
1857
1858 //              TODO! either bypass the output operator in stream_query,
1859 //                      or propagate key information when the output operator is constructed.
1860         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
1861                 std::vector<string> ret;
1862                 return ret;
1863         }
1864
1865         std::vector<tablevar_t *> get_input_tbls();
1866         std::vector<tablevar_t *> get_output_tbls();
1867
1868         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
1869                 std::vector<qp_node *> ret; ret.push_back(this); hfta_returned = true; return ret;
1870         }
1871         std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm){
1872                 std::vector<table_exp_t *> ret; return ret;
1873         }
1874 //              Ensure that any refs to interface params have been split away.
1875         int count_ifp_refs(std::set<std::string> &ifpnames){return 0;}
1876         int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;};
1877
1878
1879         output_file_qpn(std::string src_op, std::string qn, std::string fs_id, table_def *src_tbl_def, ospec_str *ospec, bool ei){
1880                 source_op_name = src_op;
1881                 node_name = source_op_name + "_output";
1882                 filestream_id = fs_id;
1883                 fields = src_tbl_def->get_fields();
1884                 output_spec = ospec;
1885                 fm.push_back(new tablevar_t(source_op_name.c_str()));
1886                 hfta_query_name = qn;
1887                 eat_input = ei;
1888
1889                 do_gzip = false;
1890                 compression_type = regular;
1891                 if(ospec->operator_type == "zfile")
1892                         compression_type = gzip;
1893
1894                 n_streams = 1;
1895                 parallel_idx = 0;
1896                 n_hfta_clones = 1;
1897
1898                 char buf[1000];
1899                 strncpy(buf, output_spec->operator_param.c_str(),1000);
1900                 buf[999] = '\0';
1901                 char *words[100];
1902                 int nwords = split_string(buf, ':', words,100);
1903                 int i;
1904                 for(i=0;i<nwords;i++){
1905                         params.push_back(words[i]);
1906                 }
1907                 for(i=0;i<params.size();i++){
1908                         if(params[i] == "gzip")
1909                                 do_gzip = true;
1910                 }
1911         }
1912
1913 //              Set output splitting parameters
1914         bool set_splitting_params(int np, int ix, int ns, std::string split_flds, std::string &err_report){
1915                 n_streams = ns;
1916                 n_hfta_clones = np;
1917                 parallel_idx = ix;
1918
1919                 if(split_flds != ""){
1920                         string err_flds = "";
1921                         char *tmpstr = strdup(split_flds.c_str());
1922                         char *words[100];
1923                         int nwords = split_string(tmpstr,':',words,100);
1924                         int i,j;
1925                         for(i=0;i<nwords;++i){
1926                                 string target = words[i];
1927                                 for(j=0;j<fields.size();++j){
1928                                         if(fields[j]->get_name() == target){
1929                                                 hash_flds.push_back(j);
1930                                                 break;
1931                                         }
1932                                 }
1933                                 if(j==fields.size()){
1934                                         err_flds += " "+target;
1935                                 }
1936                         }
1937                         if(err_flds != ""){
1938                                 err_report += "ERROR in "+hfta_query_name+", a file output operator needs to split the output but these splitting fileds are not part of the output:"+err_flds;
1939                                 return true;
1940                         }
1941                 }
1942                 return false;
1943         }
1944
1945         // the following method is used for distributed query optimization
1946         double get_rate_estimate(){return 1.0;}
1947
1948
1949         qp_node* make_copy(std::string suffix){
1950 //              output_file_qpn *ret = new output_file_qpn();
1951                 output_file_qpn *ret = new output_file_qpn(source_op_name, hfta_query_name, filestream_id, this->get_fields(), output_spec, eat_input);
1952                 return ret;
1953         }
1954
1955         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){}
1956
1957 };
1958
1959
1960
1961 //
1962
1963 // ---------------------------------------------
1964
1965
1966 //              Select, group-by, aggregate, sampling.
1967 //              Representing
1968 //                      Select SE_1, ..., SE_k
1969 //                      From T
1970 //                      Where predicate
1971 //                      Group By gb1, ..., gb_n
1972 //                      [Subgroup gb_i1, .., gb_ik]
1973 //                      Cleaning_when  predicate
1974 //                      Cleaning_by predicate
1975 //                      Having predicate
1976 //
1977 //              For now, must have group-by variables and aggregates.
1978 //              The scalar expressions which are output must be a function
1979 //              of the groub-by variables and the aggregates.
1980 //              The group-by variables can be references to columsn of T,
1981 //              or they can be scalar expressions.
1982 class sgahcwcb_qpn: public qp_node{
1983 public:
1984         tablevar_t *table_name;                         // source table
1985         std::vector<cnf_elem *> where;          // selection predicate
1986         std::vector<cnf_elem *> having;         // post-aggregation predicate
1987         std::vector<select_element *> select_list;      // se's of output
1988         gb_table gb_tbl;                        // Table of all group-by attributes.
1989         std::set<int> sg_tbl;           // Names of the superGB attributes
1990         aggregate_table aggr_tbl;       // Table of all referenced aggregates.
1991         std::set<std::string> states_refd;      // states ref'd by stateful fcns.
1992         std::vector<cnf_elem *> cleanby;
1993         std::vector<cnf_elem *> cleanwhen;
1994
1995         std::vector<scalarexp_t *> gb_sources;  // pre-compute for partitioning.
1996
1997         std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
1998
1999         std::string node_type(){return("sgahcwcb_qpn"); };
2000     bool makes_transform(){return true;};
2001         std::vector<std::string> external_libs(){
2002                 std::vector<std::string> ret;
2003                 return ret;
2004         }
2005
2006         void bind_to_schema(table_list *Schema);
2007         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
2008                 fprintf(stderr,"INTERNAL ERROR, calling sgahcwcb_qpn::get_colrefs\n");
2009                 exit(1);
2010         }
2011
2012         std::string to_query_string();
2013         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
2014         std::string generate_functor_name();
2015
2016         std::string generate_operator(int i, std::string params);
2017         std::string get_include_file(){return("#include <clean_operator.h>\n");};
2018
2019     std::vector<select_element *> get_select_list(){return select_list;};
2020     std::vector<scalarexp_t *> get_select_se_list(){
2021                 std::vector<scalarexp_t *> ret;
2022                 int i;
2023                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
2024                 return ret;
2025         };
2026     std::vector<cnf_elem *> get_where_clause(){return where;};
2027     std::vector<cnf_elem *> get_filter_clause(){return where;};
2028     std::vector<cnf_elem *> get_having_clause(){return having;};
2029     gb_table *get_gb_tbl(){return &gb_tbl;};
2030     aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
2031         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
2032     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
2033
2034 //                              table which represents output tuple.
2035         table_def *get_fields();
2036 //                      TODO Key extraction should be feasible but I'll defer the issue.
2037         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
2038                 std::vector<string> ret;
2039                 return ret;
2040         }
2041
2042         std::vector<tablevar_t *> get_input_tbls();
2043         std::vector<tablevar_t *> get_output_tbls();
2044
2045   void append_to_where(cnf_elem *c){
2046                 where.push_back(c);
2047         }
2048
2049
2050         sgahcwcb_qpn(){
2051         };
2052         sgahcwcb_qpn(query_summary_class *qs,table_list *Schema){
2053 //                              Get the table name.
2054 //                              NOTE the colrefs have the tablevar ref (an int)
2055 //                              embedded in them.  Would it make sense
2056 //                              to grab the whole table list?
2057                 tablevar_list_t *fm = qs->fta_tree->get_from();
2058                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
2059                 if(tbl_vec.size() != 1){
2060                         char tmpstr[200];
2061                         sprintf(tmpstr,"INTERNAL ERROR building SGAHCWCB node: query defined over %lu tables.\n",tbl_vec.size() );
2062                         err_str=tmpstr;
2063                         error_code = 1;
2064                 }
2065                 table_name = (tbl_vec[0]);
2066
2067 //                              Get the select list.
2068                 select_list = qs->fta_tree->get_sl_vec();
2069
2070 //                              Get the selection and having predicates.
2071                 where = qs->wh_cnf;
2072                 having = qs->hav_cnf;
2073                 cleanby = qs->cb_cnf;
2074                 cleanwhen = qs->cw_cnf;
2075
2076 //                              Build a new GB var table (don't share, might need to modify)
2077                 int g;
2078                 for(g=0;g<qs->gb_tbl->size();g++){
2079                         gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
2080                                 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
2081                                 qs->gb_tbl->get_reftype(g)
2082                         );
2083                 }
2084
2085 //                              Build a new aggregate table. (don't share, might need
2086 //                              to modify).
2087                 int a;
2088                 for(a=0;a<qs->aggr_tbl->size();a++){
2089                         aggr_tbl.add_aggr(
2090 //                              qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
2091                                 qs->aggr_tbl->duplicate(a)
2092                         );
2093                 }
2094
2095                 sg_tbl = qs->sg_tbl;
2096                 states_refd = qs->states_refd;
2097
2098
2099 //                              Get the parameters
2100                 param_tbl = qs->param_tbl;
2101
2102         };
2103
2104
2105
2106         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
2107         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
2108 //              Ensure that any refs to interface params have been split away.
2109 //                      CURRENTLY not allowed by split_node_for_fta
2110         int count_ifp_refs(std::set<std::string> &ifpnames){return 0;}
2111         int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;}
2112
2113         // the following method is used for distributed query optimization
2114         double get_rate_estimate();
2115
2116         qp_node* make_copy(std::string suffix){
2117                 sgahcwcb_qpn *ret = new sgahcwcb_qpn();
2118
2119                 ret->param_tbl = new param_table();
2120                 std::vector<std::string> param_names = param_tbl->get_param_names();
2121                 int pi;
2122                 for(pi=0;pi<param_names.size();pi++){
2123                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
2124                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
2125                                                         param_tbl->handle_access(param_names[pi]));
2126                 }
2127                 ret->definitions = definitions;
2128
2129                 ret->node_name = node_name + suffix;
2130
2131                 // make shallow copy of all fields
2132                 ret->where = where;
2133                 ret->having = having;
2134                 ret->select_list = select_list;
2135                 ret->gb_tbl = gb_tbl;
2136                 ret->aggr_tbl = aggr_tbl;
2137                 ret->sg_tbl = sg_tbl;
2138                 ret->states_refd = states_refd;
2139                 ret->cleanby = cleanby;
2140                 ret->cleanwhen = cleanwhen;
2141
2142                 return ret;
2143         };
2144
2145         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
2146 };
2147
2148
2149 std::vector<qp_node *> create_query_nodes(query_summary_class *qs,table_list *Schema);
2150
2151
2152
2153 void untaboo(string &s);
2154
2155 table_def *create_attributes(string tname, vector<select_element *> &select_list);
2156
2157
2158
2159 #endif