Add support for query key extraction
[com/gs-lite.git] / src / ftacmp / query_plan.h
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15 #ifndef __QUERY_PLAN_H__
16 #define __QUERY_PLAN_H__
17
18 #include<vector>
19 #include<string>
20 #include<map>
21 using namespace std;
22
23 #include"analyze_fta.h"
24 #include"iface_q.h"
25 #include"parse_partn.h"
26 #include"generate_utils.h"
27
28 //              Identify the format of the input, output streams.
29 #define UNKNOWNFORMAT 0
30 #define NETFORMAT 1
31 #define HOSTFORMAT 2
32
33 ///////////////////////////////////////////////////
34 //      representation of an output operator specification
35
36 struct ospec_str{
37         string query;
38         string operator_type;
39         string operator_param;
40         string output_directory;
41         int bucketwidth;
42         string partitioning_flds;
43         int n_partitions;
44 };
45
46
47 ////////////////////////////////////////////////////
48 //      Input representation of a query
49
50 struct query_node{
51         int idx;
52         std::set<int> reads_from;
53         std::set<int> sources_to;
54         std::vector<std::string> refd_tbls;
55         std::vector<var_pair_t *> params;
56         std::string name;
57         std::string file;
58         std::string mangler;            // for UDOPs
59         bool touched;
60         table_exp_t *parse_tree;
61         int n_consumers;
62         bool is_udop;
63         bool is_externally_visible;
64         bool inferred_visible_node;
65
66         set<int> subtree_roots;
67
68         query_node(){
69                 idx = -1;
70                 touched = false;
71                 parse_tree = NULL;
72                 n_consumers = 0;
73                 is_externally_visible = false;
74                 inferred_visible_node = false;
75                 mangler="";
76         };
77         query_node(int i, std::string qnm, std::string flnm, table_exp_t *pt){
78                 idx = i;
79                 touched = false;
80                 name = qnm;
81                 file = flnm;
82                 parse_tree = pt;
83                 n_consumers = 0;
84                 is_udop = false;
85                 is_externally_visible = pt->get_visible();
86                 inferred_visible_node = false;
87                 mangler="";
88
89                 tablevar_list_t *fm = parse_tree->get_from();
90                 refd_tbls =  fm->get_table_names();
91
92                 params  = pt->query_params;
93         };
94         query_node(int ix, std::string udop_name,table_list *Schema){
95                 idx = ix;
96                 touched = false;
97                 name = udop_name;
98                 file = udop_name;
99                 parse_tree = NULL;
100                 n_consumers = 0;
101                 is_udop = true;
102                 is_externally_visible = true;
103                 inferred_visible_node = false;
104                 mangler="";
105
106                 int sid = Schema->find_tbl(udop_name);
107                 std::vector<subquery_spec *> subq = Schema->get_subqueryspecs(sid);
108                 int i;
109                 for(i=0;i<subq.size();++i){
110                         refd_tbls.push_back(subq[i]->name);
111                 }
112         };
113 };
114
115 struct hfta_node{
116         std::string name;
117         std::string source_name;
118         std::vector<int> query_node_indices;
119         std::set<int> reads_from;
120         std::set<int> sources_to;
121         bool is_udop;
122         bool inferred_visible_node;
123         int n_parallel;
124         int parallel_idx;
125         bool do_generation;     // false means, ignore it.
126
127         hfta_node(){
128                 is_udop = false;
129                 inferred_visible_node = false;
130                 n_parallel = 1;
131                 parallel_idx = 0;
132                 do_generation = true;
133         }
134 };
135
136
137
138
139
140
141 #define SPX_QUERY 1
142 #define SGAH_QUERY 2
143
144 // the following selectivity estimates are used by our primitive rate estimators
145 #define SPX_SELECTIVITY 1.0
146 #define SGAH_SELECTIVITY 0.1
147 #define RSGAH_SELECTIVITY 0.1
148 #define SGAHCWCB_SELECTIVITY 0.1
149 #define MRG_SELECTIVITY 1.0
150 #define JOIN_EQ_HASH_SELECTIVITY 1.0
151
152 // the the output rate of the interface is not given we are going to use
153 // this default value
154 #define DEFAULT_INTERFACE_RATE 100
155
156
157 //                      Define query plan nodes
158 //                      These nodes are intended for query modeling
159 //                      and transformation rather than for code generation.
160
161
162 //                      Query plan node base class.
163 //                      It has an ID, can return its type,
164 //                      and can be linked into lists with the predecessors
165 //                      and successors.
166 //                      To add : serialize, unserialize?
167
168 class qp_node{
169 public:
170   int id;
171   std::vector<int> predecessors;
172   std::vector<int> successors;
173   std::string node_name;
174
175 //              For error reporting without exiting the program.
176   int error_code;
177   std::string err_str;
178
179 //                      These should be moved to the containing stream_query object.
180   std::map<std::string, std::string> definitions;
181   param_table *param_tbl;
182
183 //              The value of a field in terms of protocol fields (if any).
184   std::map<std::string, scalarexp_t *> protocol_map;
185
186   qp_node(){
187         error_code = 0;
188         id = -1;
189         param_tbl = new param_table();
190   };
191   qp_node(int i){
192         error_code = 0;
193         id = i;
194         param_tbl = new param_table();
195   };
196
197   int get_id(){return(id);};
198   void set_id(int i){id = i;    };
199
200   int get_error_code(){return error_code;};
201   std::string get_error_str(){return err_str;};
202
203   virtual std::string node_type() = 0;
204
205 //              For code generation, does the operator xform its input.
206   virtual bool makes_transform() = 0;
207
208 //              For linking, what external libraries does the operator depend on?
209   virtual std::vector<std::string> external_libs() = 0;
210
211   void set_node_name(std::string n){node_name = n;};
212   std::string get_node_name(){return node_name;};
213
214   void set_definitions(std::map<std::string, std::string> &def){
215           definitions = def;
216   };
217   std::map<std::string, std::string> get_definitions(){return definitions;};
218
219
220 //              call to create the mapping from field name to se in protocol fields.
221 //              Pass in qp_node of data sources, in order.
222   virtual void create_protocol_se(std::vector<qp_node *> q_sources,table_list *Schema)=0;
223 //              get the protocol map.  the parameter is the return value.
224   std::map<std::string, scalarexp_t *> *get_protocol_se(){return &protocol_map;}
225
226 //              Each qp node must be able to return a description
227 //              of the tuples it creates.
228 //              TODO: the get_output_tbls method should subsume the get_fields
229 //                      method, but in fact it really just returns the
230 //                      operator name.
231   virtual table_def *get_fields() = 0;  // Should be vector?
232 //              get keys from the operator.  Currently, only works on group-by queries.
233 //              partial_keys set to true if there is a suspicion that the list is partial.
234         virtual std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys) = 0;
235 //              Get the from clause
236   virtual std::vector<tablevar_t *> get_input_tbls() = 0;
237 //              this is a confused function, it acutally return the output
238 //              table name.
239   virtual std::vector<tablevar_t *> get_output_tbls() = 0;
240
241   std::string get_val_of_def(std::string def){
242         if(definitions.count(def) > 0) return definitions[def];
243         return("");
244   };
245   void set_definition(std::string def, std::string val){
246         definitions[def]=val;
247   }
248
249 //              Associate colrefs in SEs with tables
250 //              at code generation time.
251   virtual void bind_to_schema(table_list *Schema) = 0;
252
253 //              Get colrefs of the operator, currently only meaningful for lfta
254 //              operators, and only interested in colrefs with extraction fcns
255   virtual col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema)=0;
256
257   virtual std::string to_query_string() = 0;
258   virtual std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform) = 0;
259   virtual std::string generate_functor_name() = 0;
260
261   virtual std::string generate_operator(int i, std::string params) = 0;
262   virtual std::string get_include_file() = 0;
263
264   virtual cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns) = 0;
265   virtual std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns) = 0;
266
267 //              Split this node into LFTA and HFTA nodes.
268 //              Four possible outcomes:
269 //              1) the qp_node reads from a protocol, but does not need to
270 //                      split (can be evaluated as an LFTA).
271 //                      The lfta node is the only element in the return vector,
272 //                      and hfta_returned is false.
273 //              2) the qp_node reads from no protocol, and therefore cannot be split.
274 //                      THe hfta node is the only element in the return vector,
275 //                      and hfta_returned is true.
276 //              3) reads from at least one protocol, but cannot be split : failure.
277 //                      return vector is empty, the error conditions are written
278 //                      in the qp_node.
279 //              4) The qp_node splits into an hfta node and one or more LFTA nodes.
280 //                      the return vector has two or more elements, and hfta_returned
281 //                      is true.  The last element is the HFTA.
282         virtual std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx) = 0;
283
284
285 //              Ensure that any refs to interface params have been split away.
286         virtual int count_ifp_refs(std::set<std::string> &ifpnames)=0;
287
288
289
290 //              Tag the data sources which are views,
291 //              return the (optimized) source queries and
292 //              record the view access in opview_set
293         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm) = 0;
294
295   param_table *get_param_tbl(){return param_tbl;};
296
297 //                      The "where" clause is a pre-filter
298   virtual  std::vector<cnf_elem *> get_where_clause() = 0;
299 //                      To be more explicit, use get_filter_preds
300   virtual  std::vector<cnf_elem *> get_filter_clause() = 0;
301
302 //              Add an extra predicate.  Currently only used for LFTAs.
303   virtual void append_to_where(cnf_elem *c) = 0;
304
305   void add_predecessor(int i){predecessors.push_back(i);};
306   void remove_predecessor(int i){
307         std::vector<int>::iterator vi;
308         for(vi=predecessors.begin(); vi!=predecessors.end();++vi){
309                 if((*vi) == i){
310                         predecessors.erase(vi);
311                         return;
312                 }
313         }
314   };
315   void add_successor(int i){successors.push_back(i);};
316   std::vector<int> get_predecessors(){return predecessors;};
317   int n_predecessors(){return predecessors.size();};
318   std::vector<int> get_successors(){return successors;};
319   int n_successors(){return successors.size();};
320   void clear_predecessors(){predecessors.clear();};
321   void clear_successors(){successors.clear();};
322
323   // the following method is used for distributed query optimization
324   double get_rate_estimate();
325
326
327   // used for cloning query nodes
328   virtual qp_node* make_copy(std::string suffix) = 0;
329 };
330
331
332
333 //              Select, project, transform (xform) query plan node.
334 //              represent the following query fragment
335 //                      select scalar_expression_1, ..., scalar_expression_k
336 //                      from S
337 //                      where predicate
338 //
339 //              the predicates and the scalar expressions can reference
340 //              attributes of S and also functions.
341 class spx_qpn: public qp_node{
342 public:
343         tablevar_t *table_name;                                 //      Source table
344         std::vector<cnf_elem *> where;                  // selection predicate
345         std::vector<select_element *> select_list;      //      Select list
346
347
348
349         std::string node_type(){return("spx_qpn");      };
350     bool makes_transform(){return true;};
351         std::vector<std::string> external_libs(){
352                 std::vector<std::string> ret;
353                 return ret;
354         }
355
356   void append_to_where(cnf_elem *c){
357                 where.push_back(c);
358         }
359
360
361         void bind_to_schema(table_list *Schema);
362         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
363
364         std::string to_query_string();
365         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
366         std::string generate_functor_name();
367         std::string generate_operator(int i, std::string params);
368         std::string get_include_file(){return("#include <selection_operator.h>\n");};
369
370     std::vector<select_element *> get_select_list(){return select_list;};
371     std::vector<scalarexp_t *> get_select_se_list(){
372                 std::vector<scalarexp_t *> ret;
373                 int i;
374                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
375                 return ret;
376         };
377     std::vector<cnf_elem *> get_where_clause(){return where;};
378     std::vector<cnf_elem *> get_filter_clause(){return where;};
379         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
380     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
381
382         table_def *get_fields();
383         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
384                 std::vector<string> ret;
385                 return ret;
386         }
387
388         std::vector<tablevar_t *> get_input_tbls();
389         std::vector<tablevar_t *> get_output_tbls();
390
391         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
392         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
393 //              Ensure that any refs to interface params have been split away.
394         int count_ifp_refs(std::set<std::string> &ifpnames);
395         int resolve_if_params(ifq_t *ifdb, std::string &err);
396
397         spx_qpn(){
398         };
399         spx_qpn(query_summary_class *qs,table_list *Schema){
400 //                              Get the table name.
401 //                              NOTE the colrefs have the table ref (an int)
402 //                              embedded in them.  Would it make sense
403 //                              to grab the whole table list?
404                 tablevar_list_t *fm = qs->fta_tree->get_from();
405                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
406                 if(tbl_vec.size() != 1){
407                         char tmpstr[200];
408                         sprintf(tmpstr,"INTERNAL ERROR building SPX node: query defined over %lu tables.\n",tbl_vec.size() );
409                         err_str = tmpstr;
410                         error_code = 1;
411                 }
412                 table_name = (tbl_vec[0]);
413
414 //                              Get the select list.
415                 select_list = qs->fta_tree->get_sl_vec();
416
417 //                              Get the selection predicate.
418                 where = qs->wh_cnf;
419
420
421 //                              Get the parameters
422                 param_tbl = qs->param_tbl;
423
424
425
426         };
427
428         // the following method is used for distributed query optimization
429         double get_rate_estimate();
430
431
432         qp_node* make_copy(std::string suffix){
433                 spx_qpn *ret = new spx_qpn();
434
435                 ret->param_tbl = new param_table();
436                 std::vector<std::string> param_names = param_tbl->get_param_names();
437                 int pi;
438                 for(pi=0;pi<param_names.size();pi++){
439                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
440                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
441                                                         param_tbl->handle_access(param_names[pi]));
442                 }
443                 ret->definitions = definitions;
444                 ret->node_name = node_name + suffix;
445
446                 // make shallow copy of all fields
447                 ret->where = where;
448                 ret->select_list = select_list;
449
450                 return ret;
451         };
452         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
453
454 };
455
456
457
458 //              Select, group-by, aggregate.
459 //              Representing
460 //                      Select SE_1, ..., SE_k
461 //                      From T
462 //                      Where predicate
463 //                      Group By gb1, ..., gb_n
464 //                      Having predicate
465 //
466 //              NOTE : the samlping operator is sgahcwcb_qpn.
467 //
468 //              For now, must have group-by variables and aggregates.
469 //              The scalar expressions which are output must be a function
470 //              of the groub-by variables and the aggregates.
471 //              The group-by variables can be references to columsn of T,
472 //              or they can be scalar expressions.
473 class sgah_qpn: public qp_node{
474 public:
475         tablevar_t *table_name;                         // source table
476         std::vector<cnf_elem *> where;          // selection predicate
477         std::vector<cnf_elem *> having;         // post-aggregation predicate
478         std::vector<select_element *> select_list;      // se's of output
479         gb_table gb_tbl;                        // Table of all group-by attributes.
480         aggregate_table aggr_tbl;       // Table of all referenced aggregates.
481
482         std::vector<scalarexp_t *> gb_sources;  // pre-compute for partitioning.
483
484         int lfta_disorder;              // maximum disorder in the steam between lfta, hfta
485         int hfta_disorder;              // maximum disorder in the  hfta
486
487 //              rollup, cube, and grouping_sets cannot be readily reconstructed by
488 //              analyzing the patterns, so explicitly record them here.
489 //              used only so that to_query_string produces something meaningful.
490         std::vector<std::string> gb_entry_type;
491         std::vector<int> gb_entry_count;
492
493         std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
494
495         std::string node_type(){return("sgah_qpn");     };
496     bool makes_transform(){return true;};
497         std::vector<std::string> external_libs(){
498                 std::vector<std::string> ret;
499                 return ret;
500         }
501
502         void bind_to_schema(table_list *Schema);
503         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
504
505         std::string to_query_string();
506         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
507         std::string generate_functor_name();
508
509         std::string generate_operator(int i, std::string params);
510         std::string get_include_file(){
511                         if(hfta_disorder <= 1){
512                                 return("#include <groupby_operator.h>\n");
513                         }else{
514                                 return("#include <groupby_operator_oop.h>\n");
515                         }
516         };
517
518     std::vector<select_element *> get_select_list(){return select_list;};
519     std::vector<scalarexp_t *> get_select_se_list(){
520                 std::vector<scalarexp_t *> ret;
521                 int i;
522                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
523                 return ret;
524         };
525     std::vector<cnf_elem *> get_where_clause(){return where;};
526
527   void append_to_where(cnf_elem *c){
528                 where.push_back(c);
529         }
530
531     std::vector<cnf_elem *> get_filter_clause(){return where;};
532     std::vector<cnf_elem *> get_having_clause(){return having;};
533     gb_table *get_gb_tbl(){return &gb_tbl;};
534     aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
535         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
536     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
537
538 //                              table which represents output tuple.
539         table_def *get_fields();
540         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys);
541         std::vector<tablevar_t *> get_input_tbls();
542         std::vector<tablevar_t *> get_output_tbls();
543
544
545         sgah_qpn(){
546                 lfta_disorder = 1;
547                 hfta_disorder = 1;
548         };
549         sgah_qpn(query_summary_class *qs,table_list *Schema){
550                 lfta_disorder = 1;
551                 hfta_disorder = 1;
552
553 //                              Get the table name.
554 //                              NOTE the colrefs have the tablevar ref (an int)
555 //                              embedded in them.  Would it make sense
556 //                              to grab the whole table list?
557                 tablevar_list_t *fm = qs->fta_tree->get_from();
558                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
559                 if(tbl_vec.size() != 1){
560                         char tmpstr[200];
561                         sprintf(tmpstr,"INTERNAL ERROR building SGAH node: query defined over %lu tables.\n",tbl_vec.size() );
562                         err_str=tmpstr;
563                         error_code = 1;
564                 }
565                 table_name = (tbl_vec[0]);
566
567 //                              Get the select list.
568                 select_list = qs->fta_tree->get_sl_vec();
569
570 //                              Get the selection and having predicates.
571                 where = qs->wh_cnf;
572                 having = qs->hav_cnf;
573
574 //                              Build a new GB var table (don't share, might need to modify)
575                 int g;
576                 for(g=0;g<qs->gb_tbl->size();g++){
577                         gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
578                                 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
579                                 qs->gb_tbl->get_reftype(g)
580                         );
581                 }
582                 gb_tbl.set_pattern_info(qs->gb_tbl);
583 //              gb_tbl.gb_entry_type = qs->gb_tbl->gb_entry_type;
584 //              gb_tbl.gb_entry_count = qs->gb_tbl->gb_entry_count;
585 //              gb_tbl.pattern_components = qs->gb_tbl->pattern_components;
586
587 //                              Build a new aggregate table. (don't share, might need
588 //                              to modify).
589                 int a;
590                 for(a=0;a<qs->aggr_tbl->size();a++){
591                         aggr_tbl.add_aggr(
592 //                              qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
593                                 qs->aggr_tbl->duplicate(a)
594                         );
595                 }
596
597
598 //                              Get the parameters
599                 param_tbl = qs->param_tbl;
600
601         };
602
603
604
605         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
606         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
607 //              Ensure that any refs to interface params have been split away.
608         int count_ifp_refs(std::set<std::string> &ifpnames);
609         int resolve_if_params(ifq_t *ifdb, std::string &err);
610
611         // the following method is used for distributed query optimization
612         double get_rate_estimate();
613
614
615         qp_node* make_copy(std::string suffix){
616                 sgah_qpn *ret = new sgah_qpn();
617
618                 ret->param_tbl = new param_table();
619                 std::vector<std::string> param_names = param_tbl->get_param_names();
620                 int pi;
621                 for(pi=0;pi<param_names.size();pi++){
622                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
623                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
624                                                         param_tbl->handle_access(param_names[pi]));
625                 }
626                 ret->definitions = definitions;
627
628                 ret->node_name = node_name + suffix;
629
630                 // make shallow copy of all fields
631                 ret->where = where;
632                 ret->having = having;
633                 ret->select_list = select_list;
634                 ret->gb_tbl = gb_tbl;
635                 ret->aggr_tbl = aggr_tbl;
636
637                 return ret;
638         };
639
640 //              Split aggregation into two HFTA components - sub and superaggregation
641 //              If unable to split the aggreagates, split into selection and aggregation
642 //              If resulting low-level query is empty (e.g. when aggregates cannot be split and
643 //              where clause is empty) empty vector willb e returned
644         virtual std::vector<qp_node *> split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema);
645
646         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
647
648 };
649
650
651
652
653 //              Select, group-by, aggregate. with running aggregates
654 //              Representing
655 //                      Select SE_1, ..., SE_k
656 //                      From T
657 //                      Where predicate
658 //                      Group By gb1, ..., gb_n
659 //                      Closing When predicate
660 //                      Having predicate
661 //
662 //              NOTE : the sampling operator is sgahcwcb_qpn.
663 //
664 //              For now, must have group-by variables and aggregates.
665 //              The scalar expressions which are output must be a function
666 //              of the groub-by variables and the aggregates.
667 //              The group-by variables can be references to columsn of T,
668 //              or they can be scalar expressions.
669 class rsgah_qpn: public qp_node{
670 public:
671         tablevar_t *table_name;                         // source table
672         std::vector<cnf_elem *> where;          // selection predicate
673         std::vector<cnf_elem *> having;         // post-aggregation predicate
674         std::vector<cnf_elem *> closing_when;           // group closing predicate
675         std::vector<select_element *> select_list;      // se's of output
676         gb_table gb_tbl;                        // Table of all group-by attributes.
677         aggregate_table aggr_tbl;       // Table of all referenced aggregates.
678
679         std::vector<scalarexp_t *> gb_sources;  // pre-compute for partitioning.
680
681         int lfta_disorder;              // maximum disorder allowed in stream between lfta, hfta
682         int hfta_disorder;              // maximum disorder allowed in hfta
683
684         std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
685
686
687         std::string node_type(){return("rsgah_qpn");    };
688     bool makes_transform(){return true;};
689         std::vector<std::string> external_libs(){
690                 std::vector<std::string> ret;
691                 return ret;
692         }
693
694         void bind_to_schema(table_list *Schema);
695         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
696                 fprintf(stderr,"INTERNAL ERROR, calling rsgah_qpn::get_colrefs\n");
697                 exit(1);
698         }
699
700         std::string to_query_string();
701         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
702         std::string generate_functor_name();
703
704         std::string generate_operator(int i, std::string params);
705         std::string get_include_file(){return("#include <running_gb_operator.h>\n");};
706
707     std::vector<select_element *> get_select_list(){return select_list;};
708     std::vector<scalarexp_t *> get_select_se_list(){
709                 std::vector<scalarexp_t *> ret;
710                 int i;
711                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
712                 return ret;
713         };
714     std::vector<cnf_elem *> get_where_clause(){return where;};
715   void append_to_where(cnf_elem *c){
716                 where.push_back(c);
717         }
718
719     std::vector<cnf_elem *> get_filter_clause(){return where;};
720     std::vector<cnf_elem *> get_having_clause(){return having;};
721     std::vector<cnf_elem *> get_closing_when_clause(){return closing_when;};
722     gb_table *get_gb_tbl(){return &gb_tbl;};
723     aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
724         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
725     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
726
727 //                              table which represents output tuple.
728         table_def *get_fields();
729         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys);
730
731         std::vector<tablevar_t *> get_input_tbls();
732         std::vector<tablevar_t *> get_output_tbls();
733
734
735         rsgah_qpn(){
736                 lfta_disorder = 1;
737                 hfta_disorder = 1;
738         };
739         rsgah_qpn(query_summary_class *qs,table_list *Schema){
740                 lfta_disorder = 1;
741                 hfta_disorder = 1;
742
743 //                              Get the table name.
744 //                              NOTE the colrefs have the tablevar ref (an int)
745 //                              embedded in them.  Would it make sense
746 //                              to grab the whole table list?
747                 tablevar_list_t *fm = qs->fta_tree->get_from();
748                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
749                 if(tbl_vec.size() != 1){
750                         char tmpstr[200];
751                         sprintf(tmpstr,"INTERNAL ERROR buildingR SGAH node: query defined over %lu tables.\n",tbl_vec.size() );
752                         err_str=tmpstr;
753                         error_code = 1;
754                 }
755                 table_name = (tbl_vec[0]);
756
757 //                              Get the select list.
758                 select_list = qs->fta_tree->get_sl_vec();
759
760 //                              Get the selection and having predicates.
761                 where = qs->wh_cnf;
762                 having = qs->hav_cnf;
763                 closing_when = qs->closew_cnf;
764
765 //                              Build a new GB var table (don't share, might need to modify)
766                 int g;
767                 for(g=0;g<qs->gb_tbl->size();g++){
768                         gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
769                                 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
770                                 qs->gb_tbl->get_reftype(g)
771                         );
772                 }
773
774 //                              Build a new aggregate table. (don't share, might need
775 //                              to modify).
776                 int a;
777                 for(a=0;a<qs->aggr_tbl->size();a++){
778                         aggr_tbl.add_aggr(
779 //                              qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
780                                 qs->aggr_tbl->duplicate(a)
781                         );
782                 }
783
784
785 //                              Get the parameters
786                 param_tbl = qs->param_tbl;
787
788         };
789
790
791
792         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
793         std::vector<qp_node *> split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema);
794         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
795 //              Ensure that any refs to interface params have been split away.
796         int count_ifp_refs(std::set<std::string> &ifpnames);
797         int resolve_if_params(ifq_t *ifdb, std::string &err){ return 0;}
798
799         // the following method is used for distributed query optimization
800         double get_rate_estimate();
801
802         qp_node* make_copy(std::string suffix){
803                 rsgah_qpn *ret = new rsgah_qpn();
804
805                 ret->param_tbl = new param_table();
806                 std::vector<std::string> param_names = param_tbl->get_param_names();
807                 int pi;
808                 for(pi=0;pi<param_names.size();pi++){
809                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
810                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
811                                                         param_tbl->handle_access(param_names[pi]));
812                 }
813                 ret->definitions = definitions;
814
815                 ret->node_name = node_name + suffix;
816
817                 // make shallow copy of all fields
818                 ret->where = where;
819                 ret->having = having;
820                 ret->closing_when = closing_when;
821                 ret->select_list = select_list;
822                 ret->gb_tbl = gb_tbl;
823                 ret->aggr_tbl = aggr_tbl;
824
825                 return ret;
826         };
827         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
828 };
829
830
831 //              forward reference
832 class filter_join_qpn;
833
834
835 //              (temporal) Merge query plan node.
836 //              represent the following query fragment
837 //                      Merge c1:c2
838 //                      from T1 _t1, T2 _t2
839 //
840 //              T1 and T2 must have compatible schemas,
841 //              that is the same types in the same slots.
842 //              c1 and c2 must be colrefs from T1 and T2,
843 //              both ref'ing the same slot.  Their types
844 //              must be temporal and the same kind of temporal.
845 //              in the output, no other field is temporal.
846 //              the field names ofthe output are drawn from T1.
847 class mrg_qpn: public qp_node{
848 public:
849         std::vector<tablevar_t *> fm;                                   //      Source table
850         std::vector<colref_t *> mvars;                                  // the merge-by columns.
851         scalarexp_t *slack;
852
853         table_def *table_layout;                                // the output schema
854         int merge_fieldpos;                                             // position of merge field,
855                                                                                         // convenience for manipulation.
856
857         int disorder;           // max disorder seen in the input / allowed in the output
858
859
860         // partition definition for merges that combine streams partitioned over multiple interfaces
861         partn_def_t* partn_def;
862
863
864   void append_to_where(cnf_elem *c){
865                 fprintf(stderr, "ERROR, append_to_where called on mrg_qpn, not supported, query %s.\n",  node_name.c_str());
866                 exit(1);
867         }
868
869
870
871         std::string node_type(){return("mrg_qpn");      };
872     bool makes_transform(){return false;};
873         std::vector<std::string> external_libs(){
874                 std::vector<std::string> ret;
875                 return ret;
876         }
877
878         void bind_to_schema(table_list *Schema);
879         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
880                 fprintf(stderr,"INTERNAL ERROR, calling mrg_qpn::get_colrefs\n");
881                 exit(1);
882         }
883
884         std::string to_query_string();
885         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
886         std::string generate_functor_name();
887         std::string generate_operator(int i, std::string params);
888         std::string get_include_file(){
889                 if(disorder>1)
890                         return("#include <merge_operator_oop.h>\n");
891                 return("#include <merge_operator.h>\n");
892         };
893
894         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
895     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
896
897         table_def *get_fields();
898         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
899                 std::vector<string> ret;
900                 return ret;
901         }
902
903         std::vector<tablevar_t *> get_input_tbls();
904         std::vector<tablevar_t *> get_output_tbls();
905
906         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
907         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);
908 //              Ensure that any refs to interface params have been split away.
909         int count_ifp_refs(std::set<std::string> &ifpnames);
910
911 //                      No predicates, return an empty clause
912     std::vector<cnf_elem *> get_where_clause(){
913                  std::vector<cnf_elem *> t;
914                 return(t);
915         };
916     std::vector<cnf_elem *> get_filter_clause(){
917                 return get_where_clause();
918         }
919
920         mrg_qpn(){
921                 partn_def = NULL;
922         };
923
924         void set_disorder(int d){
925                 disorder = d;
926         }
927
928         mrg_qpn(query_summary_class *qs,table_list *Schema){
929                 disorder = 1;
930
931 //                              Grab the elements of the query node.
932                 fm = qs->fta_tree->get_from()->get_table_list();
933                 mvars = qs->mvars;
934                 slack = qs->slack;
935
936 //                      sanity check
937                 if(fm.size() != mvars.size()){
938                         fprintf(stderr,"INTERNAL ERROR in mrg_qpn::mrg_qpn.  fm.size() = %lu, mvars.size() = %lu\n",fm.size(),mvars.size());
939                         exit(1);
940                 }
941
942 //                              Get the parameters
943                 param_tbl = qs->param_tbl;
944
945 //                              Need to set the node name now, so that the
946 //                              schema (table_layout) can be properly named.
947 //                              TODO: Setting the name of the table might best be done
948 //                              via the set_node_name method, because presumably
949 //                              thats when the node name is really known.
950 //                              This should propogate to the table_def table_layout
951                 node_name=qs->query_name;
952
953 /*
954 int ff;
955 printf("instantiating merge node, name = %s, %d sources.\n\t",node_name.c_str(), fm.size());
956 for(ff=0;ff<fm.size();++ff){
957 printf("%s ",fm[ff]->to_string().c_str());
958 }
959 printf("\n");
960 */
961
962
963 //              Create the output schema.
964 //              strip temporal properites form all fields except the merge field.
965                 std::vector<field_entry *> flva = Schema->get_fields(fm[0]->get_schema_name());
966                 field_entry_list *fel = new field_entry_list();
967                 int f;
968                 for(f=0;f<flva.size();++f){
969                         field_entry *fe;
970                         data_type dt(flva[f]->get_type().c_str(), flva[f]->get_modifier_list());
971                         if(flva[f]->get_name() == mvars[0]->get_field()){
972                                 merge_fieldpos = f;
973 //                              if(slack != NULL) dt.reset_temporal();
974                         }else{
975                                 dt.reset_temporal();
976                         }
977
978                         param_list *plist = new param_list();
979                         std::vector<std::string> param_strings = dt.get_param_keys();
980                         int p;
981                         for(p=0;p<param_strings.size();++p){
982                                 std::string v = dt.get_param_val(param_strings[p]);
983                                 if(v != "")
984                                         plist->append(param_strings[p].c_str(),v.c_str());
985                                 else
986                                         plist->append(param_strings[p].c_str());
987                         }
988
989
990                         fe=new field_entry(
991                                 dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist, flva[f]->get_unpack_fcns());
992                         fel->append_field(fe);
993                 }
994
995
996
997
998                 table_layout = new table_def(
999                         node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA
1000                 );
1001
1002                 partn_def = NULL;
1003         };
1004
1005
1006 /////////////////////////////////////////////
1007 ///             Created for de-siloing.  to be removed?  or is it otherwise useful?
1008 //              Merge existing set of sources (de-siloing)
1009         mrg_qpn(std::string n_name, std::vector<std::string> &src_names,table_list *Schema){
1010                 int i,f;
1011
1012                 disorder = 1;
1013
1014 //                              Construct the fm list
1015                 for(f=0;f<src_names.size();++f){
1016                         int tbl_ref = Schema->get_table_ref(src_names[f]);
1017                         if(tbl_ref < 0){
1018                                 fprintf(stderr,"INTERNAL ERROR, can't find %s in the schema when constructing no-silo merge node %s\n",src_names[f].c_str(), n_name.c_str());
1019                                 exit(1);
1020                         }
1021                         table_def *src_tbl = Schema->get_table(tbl_ref);
1022                         tablevar_t *fm_t = new tablevar_t(src_names[f].c_str());
1023                         string range_name = "_t" + int_to_string(f);
1024                         fm_t->set_range_var(range_name);
1025                         fm_t->set_schema_ref(tbl_ref);
1026                         fm.push_back(fm_t);
1027                 }
1028
1029 //              Create the output schema.
1030 //              strip temporal properites form all fields except the merge field.
1031                 std::vector<field_entry *> flva = Schema->get_fields(fm[0]->get_schema_name());
1032                 field_entry_list *fel = new field_entry_list();
1033                 bool temporal_found = false;
1034                 for(f=0;f<flva.size();++f){
1035                         field_entry *fe;
1036                         data_type dt(flva[f]->get_type().c_str(), flva[f]->get_modifier_list());
1037                         if(dt.is_temporal() && !temporal_found){
1038                                 merge_fieldpos = f;
1039                                 temporal_found = true;
1040                         }else{
1041                                 dt.reset_temporal();
1042                         }
1043
1044                         param_list *plist = new param_list();
1045                         std::vector<std::string> param_strings = dt.get_param_keys();
1046                         int p;
1047                         for(p=0;p<param_strings.size();++p){
1048                                 std::string v = dt.get_param_val(param_strings[p]);
1049                                 if(v != "")
1050                                         plist->append(param_strings[p].c_str(),v.c_str());
1051                                 else
1052                                         plist->append(param_strings[p].c_str());
1053                         }
1054
1055                         fe=new field_entry(
1056                                 dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist,
1057                                 flva[f]->get_unpack_fcns()
1058                         );
1059                         fel->append_field(fe);
1060                 }
1061
1062                 if(! temporal_found){
1063                         fprintf(stderr,"ERROR, can't find temporal field of the sources when constructing no-silo merge node %s\n",n_name.c_str());
1064                         exit(1);
1065                 }
1066
1067                 node_name=n_name;
1068                 table_layout = new table_def(
1069                         node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA
1070                 );
1071
1072                 partn_def = NULL;
1073                 param_tbl = new param_table();
1074
1075 //                      Construct mvars
1076                 for(f=0;f<fm.size();++f){
1077                         std::vector<field_entry *> flv_f = Schema->get_fields(fm[f]->get_schema_name());
1078                         data_type dt_f(flv_f[merge_fieldpos]->get_type().c_str(),
1079                              flva[merge_fieldpos]->get_modifier_list());
1080
1081                         colref_t *mcr = new colref_t(fm[f]->get_var_name().c_str(),
1082                                 flv_f[merge_fieldpos]->get_name().c_str());
1083                         mvars.push_back(mcr);
1084                 }
1085
1086 //              literal_t *s_lit = new literal_t("5",LITERAL_INT);
1087 //              slack = new scalarexp_t(s_lit);
1088                 slack = NULL;
1089
1090         };
1091 //                      end de-siloing
1092 ////////////////////////////////////////
1093
1094         void resolve_slack(scalarexp_t *t_se, std::string fname, std::vector<std::pair<std::string, std::string> > &sources,ifq_t *ifdb, gb_table *gbt);
1095
1096
1097 //                      Merge filter_join LFTAs.
1098
1099         mrg_qpn(filter_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb);
1100
1101 //                      Merge selection LFTAs.
1102
1103         mrg_qpn(spx_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb){
1104
1105                 disorder = 1;
1106
1107                 param_tbl = spx->param_tbl;
1108                 int i;
1109                 node_name = n_name;
1110                 field_entry_list *fel = new field_entry_list();
1111                 merge_fieldpos = -1;
1112
1113
1114
1115
1116                 for(i=0;i<spx->select_list.size();++i){
1117                         data_type *dt = spx->select_list[i]->se->get_data_type()->duplicate();
1118                         if(dt->is_temporal()){
1119                                 if(merge_fieldpos < 0){
1120                                         merge_fieldpos = i;
1121                                 }else{
1122                                         fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), spx->select_list[merge_fieldpos]->name.c_str(), spx->select_list[i]->name.c_str(), spx->select_list[merge_fieldpos]->name.c_str() );
1123                                         dt->reset_temporal();
1124                                 }
1125                         }
1126
1127                         field_entry *fe = dt->make_field_entry(spx->select_list[i]->name);
1128                         fel->append_field(fe);
1129                         delete dt;
1130                 }
1131                 if(merge_fieldpos<0){
1132                         fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
1133                                 exit(1);
1134                 }
1135                 table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1136
1137 //                              NEED TO HANDLE USER_SPECIFIED SLACK
1138                 this->resolve_slack(spx->select_list[merge_fieldpos]->se,
1139                                 spx->select_list[merge_fieldpos]->name, ifaces, ifdb,NULL);
1140 //      if(this->slack == NULL)
1141 //              fprintf(stderr,"Zero slack.\n");
1142 //      else
1143 //              fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
1144
1145                 for(i=0;i<sources.size();i++){
1146                         std::string rvar = "_m"+int_to_string(i);
1147                         mvars.push_back(new colref_t(rvar.c_str(), spx->select_list[merge_fieldpos]->name.c_str()));
1148                         mvars[i]->set_tablevar_ref(i);
1149                         fm.push_back(new tablevar_t(sources[i].c_str()));
1150                         fm[i]->set_range_var(rvar);
1151                 }
1152
1153                 param_tbl = new param_table();
1154                 std::vector<std::string> param_names = spx->param_tbl->get_param_names();
1155                 int pi;
1156                 for(pi=0;pi<param_names.size();pi++){
1157                         data_type *dt = spx->param_tbl->get_data_type(param_names[pi]);
1158                         param_tbl->add_param(param_names[pi],dt->duplicate(),
1159                                                         spx->param_tbl->handle_access(param_names[pi]));
1160                 }
1161                 definitions = spx->definitions;
1162
1163         }
1164
1165 //              Merge aggregation LFTAs
1166
1167         mrg_qpn(sgah_qpn *sgah, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair< std::string, std::string> > &ifaces, ifq_t *ifdb){
1168
1169                 disorder = 1;
1170
1171                 param_tbl = sgah->param_tbl;
1172                 int i;
1173                 node_name = n_name;
1174                 field_entry_list *fel = new field_entry_list();
1175                 merge_fieldpos = -1;
1176                 for(i=0;i<sgah->select_list.size();++i){
1177                         data_type *dt = sgah->select_list[i]->se->get_data_type()->duplicate();
1178                         if(dt->is_temporal()){
1179                                 if(merge_fieldpos < 0){
1180                                         merge_fieldpos = i;
1181                                 }else{
1182                                         fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str(), sgah->select_list[i]->name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str() );
1183                                         dt->reset_temporal();
1184                                 }
1185                         }
1186
1187                         field_entry *fe = dt->make_field_entry(sgah->select_list[i]->name);
1188                         fel->append_field(fe);
1189                         delete dt;
1190                 }
1191                 if(merge_fieldpos<0){
1192                         fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
1193                         exit(1);
1194                 }
1195                 table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1196
1197 //                              NEED TO HANDLE USER_SPECIFIED SLACK
1198                 this->resolve_slack(sgah->select_list[merge_fieldpos]->se,
1199                                 sgah->select_list[merge_fieldpos]->name, ifaces, ifdb,
1200                                 &(sgah->gb_tbl));
1201                 if(this->slack == NULL)
1202                         fprintf(stderr,"Zero slack.\n");
1203                 else
1204                         fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
1205
1206
1207                 for(i=0;i<sources.size();i++){
1208                         std::string rvar = "_m"+int_to_string(i);
1209                         mvars.push_back(new colref_t(rvar.c_str(), sgah->select_list[merge_fieldpos]->name.c_str()));
1210                         mvars[i]->set_tablevar_ref(i);
1211                         fm.push_back(new tablevar_t(sources[i].c_str()));
1212                         fm[i]->set_range_var(rvar);
1213                 }
1214
1215                 param_tbl = new param_table();
1216                 std::vector<std::string> param_names = sgah->param_tbl->get_param_names();
1217                 int pi;
1218                 for(pi=0;pi<param_names.size();pi++){
1219                         data_type *dt = sgah->param_tbl->get_data_type(param_names[pi]);
1220                         param_tbl->add_param(param_names[pi],dt->duplicate(),
1221                                                         sgah->param_tbl->handle_access(param_names[pi]));
1222                 }
1223                 definitions = sgah->definitions;
1224
1225         }
1226
1227         qp_node *make_copy(std::string suffix){
1228                 mrg_qpn *ret = new mrg_qpn();
1229                 ret->slack = slack;
1230                 ret->disorder = disorder;
1231
1232                 ret->param_tbl = new param_table();
1233                 std::vector<std::string> param_names = param_tbl->get_param_names();
1234                 int pi;
1235                 for(pi=0;pi<param_names.size();pi++){
1236                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
1237                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1238                                                         param_tbl->handle_access(param_names[pi]));
1239                 }
1240                 ret->definitions = definitions;
1241
1242                 ret->node_name = node_name + suffix;
1243                 ret->table_layout = table_layout->make_shallow_copy(ret->node_name);
1244                 ret->merge_fieldpos = merge_fieldpos;
1245
1246                 return ret;
1247         };
1248
1249         std::vector<mrg_qpn *> split_sources();
1250
1251         // the following method is used for distributed query optimization
1252         double get_rate_estimate();
1253
1254
1255         // get partition definition for merges that combine streams partitioned over multiple interfaces
1256         // return NULL for regular merges
1257         partn_def_t* get_partn_definition(map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result) {
1258                 if (partn_def)
1259                         return partn_def;
1260
1261                 int err;
1262                 string err_str;
1263                 string partn_name;
1264
1265                 vector<tablevar_t *> input_tables = get_input_tbls();
1266                 for (int i = 0; i <  input_tables.size(); ++i) {
1267                         tablevar_t * table = input_tables[i];
1268
1269                         vector<string> partn_names = ifaces_db->get_iface_vals(table->get_machine(), table->get_interface(),"iface_partition",err,err_str);
1270                         if (partn_names.size() != 1)    // can't have more than one value of partition attribute
1271                                 return NULL;
1272                         string new_partn_name = partn_names[0];
1273
1274                         // need to make sure that all ifaces belong to the same partition
1275                         if (!i)
1276                                 partn_name = new_partn_name;
1277                         else if (new_partn_name != partn_name)
1278                                 return NULL;
1279                 }
1280
1281                 // now find partition definition corresponding to partn_name
1282                 partn_def = partn_parse_result->get_partn_def(partn_name);
1283                 return partn_def;
1284         };
1285
1286         void set_partn_definition(partn_def_t* def) {
1287                 partn_def = def;
1288         }
1289
1290         bool is_multihost_merge() {
1291
1292                 bool is_multihost = false;
1293
1294                 // each input table must be have machine attribute be non-empty
1295                 // and there should be at least 2 different values of machine attributes
1296                 vector<tablevar_t *> input_tables = get_input_tbls();
1297                 string host = input_tables[0]->get_machine();
1298                 for  (int i = 1; i < input_tables.size(); ++i) {
1299                         string new_host = input_tables[i]->get_machine();
1300                         if (new_host == "")
1301                                 return false;
1302                         if (new_host != host)
1303                                 is_multihost = true;
1304                 }
1305                 return is_multihost;
1306         }
1307
1308         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1309 };
1310
1311
1312 //              eq_temporal, hash join query plan node.
1313 //              represent the following query fragment
1314 //                      select scalar_expression_1, ..., scalar_expression_k
1315 //                      from T0 t0, T1 t1
1316 //                      where predicate
1317 //
1318 //              the predicates and the scalar expressions can reference
1319 //              attributes of t0 and t1 and also functions.
1320 //              The predicate must contain CNF elements to enable the
1321 //              efficient evaluation of the query.
1322 //              1) at least one predicate of the form
1323 //                      (temporal se in t0) = (temporal se in t1)
1324 //              2) at least one predicate of the form
1325 //                      (non-temporal se in t0) = (non-temporal se in t1)
1326 //
1327 class join_eq_hash_qpn: public qp_node{
1328 public:
1329         std::vector<tablevar_t *> from;                                 //      Source tables
1330         std::vector<select_element *> select_list;      //      Select list
1331         std::vector<cnf_elem *> prefilter[2];           // source prefilters
1332         std::vector<cnf_elem *> temporal_eq;            // define temporal window
1333         std::vector<cnf_elem *> hash_eq;                        // define hash key
1334         std::vector<cnf_elem *> postfilter;                     // final filter on hash matches.
1335
1336         std::vector<cnf_elem *> where;                          // all the filters
1337                                                                                                 // useful for summary analysis
1338
1339         std::vector<scalarexp_t *> hash_src_r, hash_src_l;
1340
1341         std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
1342         std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
1343
1344         std::string node_type(){return("join_eq_hash_qpn");     };
1345     bool makes_transform(){return true;};
1346         std::vector<std::string> external_libs(){
1347                 std::vector<std::string> ret;
1348                 return ret;
1349         }
1350
1351         void bind_to_schema(table_list *Schema);
1352         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
1353                 fprintf(stderr,"INTERNAL ERROR, calling join_eq_hash_qpn::get_colrefs\n");
1354                 exit(1);
1355         }
1356
1357   void append_to_where(cnf_elem *c){
1358                 fprintf(stderr, "Error, append_to_where called on join_hash_qpn, not supported, query is %s\n",node_name.c_str());
1359                 exit(1);
1360         }
1361
1362
1363         std::string to_query_string();
1364         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
1365         std::string generate_functor_name();
1366         std::string generate_operator(int i, std::string params);
1367         std::string get_include_file(){return("#include <join_eq_hash_operator.h>\n");};
1368
1369     std::vector<select_element *> get_select_list(){return select_list;};
1370     std::vector<scalarexp_t *> get_select_se_list(){
1371                 std::vector<scalarexp_t *> ret;
1372                 int i;
1373                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
1374                 return ret;
1375         };
1376 //                      Used for LFTA only
1377     std::vector<cnf_elem *> get_where_clause(){
1378                  std::vector<cnf_elem *> t;
1379                 return(t);
1380         };
1381     std::vector<cnf_elem *> get_filter_clause(){
1382                 return get_where_clause();
1383         }
1384
1385         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1386     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1387
1388         table_def *get_fields();
1389
1390 //              It might be feasible to find keys in an equijoin expression.
1391         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
1392                 std::vector<string> ret;
1393                 return ret;
1394         }
1395
1396         std::vector<tablevar_t *> get_input_tbls();
1397         std::vector<tablevar_t *> get_output_tbls();
1398
1399         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
1400         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);
1401 //              Ensure that any refs to interface params have been split away.
1402         int count_ifp_refs(std::set<std::string> &ifpnames);
1403
1404         join_eq_hash_qpn(){
1405         };
1406         join_eq_hash_qpn(query_summary_class *qs,table_list *Schema){
1407                 int w;
1408 //                              Get the table name.
1409 //                              NOTE the colrefs have the table ref (an int)
1410 //                              embedded in them.  Would it make sense
1411 //                              to grab the whole table list?
1412                 from = qs->fta_tree->get_from()->get_table_list();
1413                 if(from.size() != 2){
1414                         char tmpstr[200];
1415                         sprintf(tmpstr,"ERROR building join_eq_hash node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
1416                         err_str = tmpstr;
1417                         error_code = 1;
1418                 }
1419
1420 //                              Get the select list.
1421                 select_list = qs->fta_tree->get_sl_vec();
1422
1423 //                              Get the selection predicate.
1424                 where = qs->wh_cnf;
1425                 for(w=0;w<where.size();++w){
1426                         analyze_cnf(where[w]);
1427                         std::vector<int> pred_tbls;
1428                         get_tablevar_ref_pr(where[w]->pr,pred_tbls);
1429 //                              Prefilter if refs only one tablevar
1430                         if(pred_tbls.size()==1){
1431                                 prefilter[pred_tbls[0]].push_back(where[w]);
1432                                 continue;
1433                         }
1434 //                              refs nothing -- might be sampling, do it as postfilter.
1435                         if(pred_tbls.size()==0){
1436                                 postfilter.push_back(where[w]);
1437                                 continue;
1438                         }
1439 //                              See if it can be a hash or temporal predicate.
1440 //                              NOTE: synchronize with the temporality checking
1441 //                              done at join_eq_hash_qpn::get_fields
1442                         if(where[w]->is_atom && where[w]->eq_pred){
1443                                 std::vector<int> sel_tbls, ser_tbls;
1444                                 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
1445                                 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
1446                                 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
1447 //                                              make channel 0 SE on LHS.
1448                                         if(sel_tbls[0] != 0)
1449                                                 where[w]->pr->swap_scalar_operands();
1450
1451                                         data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
1452                                         data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
1453                                         if( (dtl->is_increasing() && dtr->is_increasing()) ||
1454                                             (dtl->is_decreasing() && dtr->is_decreasing()) )
1455                                                         temporal_eq.push_back(where[w]);
1456                                         else
1457                                                         hash_eq.push_back(where[w]);
1458                                         continue;
1459
1460                                 }
1461                         }
1462 //                              All tests failed, fallback is postfilter.
1463                         postfilter.push_back(where[w]);
1464                 }
1465
1466                 if(temporal_eq.size()==0){
1467                         err_str = "ERROR in join query: can't find temporal equality predicate to define a join window.\n";
1468                         error_code = 1;
1469                 }
1470
1471 //                              Get the parameters
1472                 param_tbl = qs->param_tbl;
1473
1474         };
1475
1476         // the following method is used for distributed query optimization
1477         double get_rate_estimate();
1478
1479
1480         qp_node* make_copy(std::string suffix){
1481                 join_eq_hash_qpn *ret = new join_eq_hash_qpn();
1482
1483                 ret->param_tbl = new param_table();
1484                 std::vector<std::string> param_names = param_tbl->get_param_names();
1485                 int pi;
1486                 for(pi=0;pi<param_names.size();pi++){
1487                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
1488                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1489                                                         param_tbl->handle_access(param_names[pi]));
1490                 }
1491                 ret->definitions = definitions;
1492
1493                 ret->node_name = node_name + suffix;
1494
1495                 // make shallow copy of all fields
1496                 ret->where = where;
1497                 ret->from = from;
1498                 ret->select_list = select_list;
1499                 ret->prefilter[0] = prefilter[0];
1500                 ret->prefilter[1] = prefilter[1];
1501                 ret->postfilter = postfilter;
1502                 ret->temporal_eq = temporal_eq;
1503                 ret->hash_eq = hash_eq;
1504
1505                 return ret;
1506         };
1507         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1508
1509 };
1510
1511
1512 // ---------------------------------------------
1513 //              eq_temporal, hash join query plan node.
1514 //              represent the following query fragment
1515 //                      select scalar_expression_1, ..., scalar_expression_k
1516 //                      FILTER_JOIN(col, range) from T0 t0, T1 t1
1517 //                      where predicate
1518 //
1519 //              t0 is the output range variable, t1 is the filtering range
1520 //              variable.  Both must alias a PROTOCOL.
1521 //              The scalar expressions in the select clause may
1522 //              reference t0 only.
1523 //              The predicates are classified as follows
1524 //              prefilter predicates:
1525 //                a cheap predicate in t0 such that there is an equivalent
1526 //                predicate in t1.  Cost decisions about pushing to
1527 //                lfta prefilter made later.
1528 //              t0 predicates (other than prefilter predicates)
1529 //                      -- cheap vs. expensive sorted out at genereate time,
1530 //                              the constructor isn't called with the function list.
1531 //              t1 predicates (other than prefiler predicates).
1532 //              equi-join predicates of the form:
1533 //                      (se in t0) = (se in t1)
1534 //
1535 //              There must be at least one equi-join predicate.
1536 //              No join predicates other than equi-join predicates
1537 //                are allowed.
1538 //              Warn on temporal equi-join predicates.
1539 //              t1 predicates should not be expensive ... warn?
1540 //
1541 class filter_join_qpn: public qp_node{
1542 public:
1543         std::vector<tablevar_t *> from;                                 //      Source tables
1544                 colref_t *temporal_var;                 // join window in FROM
1545                 unsigned int temporal_range;    // metadata.
1546         std::vector<select_element *> select_list;      //      Select list
1547         std::vector<cnf_elem *> shared_pred;            // prefilter preds
1548         std::vector<cnf_elem *> pred_t0;                        // main (R) preds
1549         std::vector<cnf_elem *> pred_t1;                        // filtering (S) preds
1550         std::vector<cnf_elem *> hash_eq;                        // define hash key
1551         std::vector<cnf_elem *> postfilter;                     // ref's no table.
1552
1553         std::vector<cnf_elem *> where;                          // all the filters
1554                                                                                                 // useful for summary analysis
1555
1556         std::vector<scalarexp_t *> hash_src_r, hash_src_l;
1557         std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
1558         std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
1559
1560
1561         bool use_bloom;                 // true => bloom filter, false => limited hash
1562
1563         std::string node_type(){return("filter_join");  };
1564     bool makes_transform(){return true;};
1565         std::vector<std::string> external_libs(){
1566                 std::vector<std::string> ret;
1567                 return ret;
1568         }
1569
1570         void bind_to_schema(table_list *Schema);
1571         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
1572
1573         std::string to_query_string();
1574         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform){
1575                 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor called\n");
1576                 exit(1);
1577         }
1578         std::string generate_functor_name(){
1579                 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor_name called\n");
1580                 exit(1);
1581         }
1582         std::string generate_operator(int i, std::string params){
1583                 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_operator called\n");
1584                 exit(1);
1585         }
1586         std::string get_include_file(){return("#include <join_eq_hash_operator.h>\n");};
1587
1588     std::vector<select_element *> get_select_list(){return select_list;};
1589     std::vector<scalarexp_t *> get_select_se_list(){
1590                 std::vector<scalarexp_t *> ret;
1591                 int i;
1592                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
1593                 return ret;
1594         };
1595 //                      Used for LFTA only
1596         void append_to_where(cnf_elem *c){
1597                 where.push_back(c);
1598         }
1599
1600     std::vector<cnf_elem *> get_where_clause(){return where;}
1601     std::vector<cnf_elem *> get_filter_clause(){return shared_pred;}
1602
1603         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1604     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1605
1606         table_def *get_fields();
1607 //              It should be feasible to find keys in a filter join
1608         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
1609                 std::vector<string> ret;
1610                 return ret;
1611         }
1612
1613         std::vector<tablevar_t *> get_input_tbls();
1614         std::vector<tablevar_t *> get_output_tbls();
1615
1616         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
1617         int resolve_if_params(ifq_t *ifdb, std::string &err);
1618
1619         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);
1620 //              Ensure that any refs to interface params have been split away.
1621         int count_ifp_refs(std::set<std::string> &ifpnames);
1622
1623
1624         filter_join_qpn(){
1625         };
1626         filter_join_qpn(query_summary_class *qs,table_list *Schema){
1627                 int i,w;
1628 //                              Get the table name.
1629 //                              NOTE the colrefs have the table ref (an int)
1630 //                              embedded in them.  Would it make sense
1631 //                              to grab the whole table list?
1632                 from = qs->fta_tree->get_from()->get_table_list();
1633                 temporal_var = qs->fta_tree->get_from()->get_colref();
1634                 temporal_range = qs->fta_tree->get_from()->get_temporal_range();
1635                 if(from.size() != 2){
1636                         char tmpstr[200];
1637                         sprintf(tmpstr,"ERROR building filter_join_qpn node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
1638                         err_str += tmpstr;
1639                         error_code = 1;
1640                 }
1641
1642 //                              Get the select list.
1643                 select_list = qs->fta_tree->get_sl_vec();
1644 //                              Verify that only t0 is referenced.
1645                 bool bad_ref = false;
1646                 for(i=0;i<select_list.size();i++){
1647                         vector<int> sel_tbls;
1648                         get_tablevar_ref_se(select_list[i]->se,sel_tbls);
1649                         if((sel_tbls.size() == 2) || (sel_tbls.size()==1 && sel_tbls[0]==1))
1650                                 bad_ref = true;
1651                 }
1652                 if(bad_ref){
1653                         err_str += "ERROR building filter_join_qpn node: query references range variable "+from[1]->variable_name+", but only the first range variable ("+from[0]->variable_name+" can be referenced.\n";
1654                         error_code = 1;
1655                 }
1656
1657
1658 //                              Get the selection predicate.
1659                 where = qs->wh_cnf;
1660                 std::vector<cnf_elem *> t0_only, t1_only;
1661                 for(w=0;w<where.size();++w){
1662                         analyze_cnf(where[w]);
1663                         std::vector<int> pred_tbls;
1664                         get_tablevar_ref_pr(where[w]->pr,pred_tbls);
1665 //                              Collect the list of preds by src var,
1666 //                              extract the shared preds later.
1667                         if(pred_tbls.size()==1){
1668                                 if(pred_tbls[0] == 0){
1669                                         t0_only.push_back(where[w]);
1670                                 }else{
1671                                         t1_only.push_back(where[w]);
1672                                 }
1673                                 continue;
1674                         }
1675 //                              refs nothing -- might be sampling, do it as postfilter.
1676                         if(pred_tbls.size()==0){
1677                                 postfilter.push_back(where[w]);
1678                                 continue;
1679                         }
1680 //                              See if it can be a hash or temporal predicate.
1681 //                              NOTE: synchronize with the temporality checking
1682 //                              done at join_eq_hash_qpn::get_fields
1683                         if(where[w]->is_atom && where[w]->eq_pred){
1684                                 std::vector<int> sel_tbls, ser_tbls;
1685                                 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
1686                                 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
1687                                 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
1688 //                                              make channel 0 SE on LHS.
1689                                         if(sel_tbls[0] != 0)
1690                                                 where[w]->pr->swap_scalar_operands();
1691
1692                                         hash_eq.push_back(where[w]);
1693
1694                                         data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
1695                                         data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
1696                                         if( (dtl->is_increasing() && dtr->is_increasing()) ||
1697                                             (dtl->is_decreasing() && dtr->is_decreasing()) )
1698                                                 err_str += "Warning, a filter join should not have join predicates on temporal fields.\n";
1699                                         continue;
1700
1701                                 }
1702                         }
1703 //                              All tests failed, fallback is postfilter.
1704                         err_str += "ERROR, join predicates in a filter join should have the form (scalar expression in "+from[0]->variable_name+") = (scalar expression in "+from[1]->variable_name+").\n";
1705                         error_code = 3;
1706                 }
1707 //              Classify the t0_only and t1_only preds.
1708                 set<int> matched_pred;
1709                 int v;
1710                 for(w=0;w<t0_only.size();w++){
1711                         for(v=0;v<t1_only.size();++v)
1712                                 if(is_equivalent_pred_base(t0_only[w]->pr,t1_only[v]->pr,Schema))
1713                                         break;
1714                         if(v<t1_only.size()){
1715                                 shared_pred.push_back(t0_only[w]);
1716                                 matched_pred.insert(v);
1717                         }else{
1718                                 pred_t0.push_back(t0_only[w]);
1719                         }
1720                 }
1721                 for(v=0;v<t1_only.size();++v){
1722                         if(matched_pred.count(v) == 0)
1723                                 pred_t1.push_back(t1_only[v]);
1724                 }
1725
1726
1727 //                              Get the parameters
1728                 param_tbl = qs->param_tbl;
1729                 definitions = qs->definitions;
1730
1731 //                              Determine the algorithm
1732                 if(this->get_val_of_def("algorithm") == "hash"){
1733                         use_bloom = false;
1734                 }else{
1735                         use_bloom = true;
1736                 }
1737         };
1738
1739         // the following method is used for distributed query optimization
1740         double get_rate_estimate();
1741
1742
1743         qp_node* make_copy(std::string suffix){
1744                 filter_join_qpn *ret = new filter_join_qpn();
1745
1746                 ret->param_tbl = new param_table();
1747                 std::vector<std::string> param_names = param_tbl->get_param_names();
1748                 int pi;
1749                 for(pi=0;pi<param_names.size();pi++){
1750                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
1751                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1752                                                         param_tbl->handle_access(param_names[pi]));
1753                 }
1754                 ret->definitions = definitions;
1755
1756                 ret->node_name = node_name + suffix;
1757
1758                 // make shallow copy of all fields
1759                 ret->where = where;
1760                 ret->from = from;
1761                 ret->temporal_range = temporal_range;
1762                 ret->temporal_var = temporal_var;
1763                 ret->select_list = select_list;
1764                 ret->shared_pred = shared_pred;
1765                 ret->pred_t0 = pred_t0;
1766                 ret->pred_t1 = pred_t1;
1767                 ret->postfilter = postfilter;
1768                 ret->hash_eq = hash_eq;
1769
1770                 return ret;
1771         };
1772         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1773
1774 };
1775
1776
1777 enum output_file_type_enum {regular, gzip, bzip};
1778
1779 class output_file_qpn: public qp_node{
1780 public:
1781         std::string source_op_name;                                     //      Source table
1782         std::vector<field_entry *> fields;
1783         ospec_str *output_spec;
1784         vector<tablevar_t *> fm;
1785         std::string hfta_query_name;
1786         std::string filestream_id;
1787         bool eat_input;
1788         std::vector<std::string> params;
1789         bool do_gzip;
1790         output_file_type_enum compression_type;
1791
1792         int n_streams;          // Number of output streams
1793         int n_hfta_clones;      // number of hfta clones
1794         int parallel_idx;       // which close this produces output for.
1795         std::vector<int> hash_flds;     // fields used to hash the output.
1796
1797         std::string node_type(){return("output_file_qpn");      };
1798     bool makes_transform(){return false;};
1799         std::vector<std::string> external_libs(){
1800                 std::vector<std::string> ret;
1801                 switch(compression_type){
1802                 case gzip:
1803                         ret.push_back("-lz");
1804                 break;
1805                 case bzip:
1806                         ret.push_back("-lbz2");
1807                 break;
1808                 default:
1809                 break;
1810                 }
1811                 return ret;
1812         }
1813
1814   void append_to_where(cnf_elem *c){
1815                 fprintf(stderr, "ERROR, append_to_where called on output_file_qpn, not supported, query %s.\n",  node_name.c_str());
1816                 exit(1);
1817         }
1818
1819
1820
1821         void bind_to_schema(table_list *Schema){}
1822         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
1823                 col_id_set ret;
1824                 return ret;
1825         }
1826
1827         std::string to_query_string(){return "// output_file_operator \n";}
1828         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
1829         std::string generate_functor_name();
1830         std::string generate_operator(int i, std::string params);
1831         std::string get_include_file(){
1832                 switch(compression_type){
1833                 case gzip:
1834                         return("#include <zfile_output_operator.h>\n");
1835                 default:
1836                         return("#include <file_output_operator.h>\n");
1837                 }
1838                 return("#include <file_output_operator.h>\n");
1839         };
1840
1841     std::vector<cnf_elem *> get_where_clause(){std::vector<cnf_elem *> ret; return ret;};
1842     std::vector<cnf_elem *> get_filter_clause(){std::vector<cnf_elem *> ret; return ret;};
1843         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){cplx_lit_table *ret = new cplx_lit_table(); return ret;}
1844     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns){std::vector<handle_param_tbl_entry *> ret; return ret;}
1845
1846         table_def *get_fields(){
1847                 field_entry_list *fel = new field_entry_list();
1848                 int i;
1849                 for(i=0;i<fields.size();++i)
1850                         fel->append_field(fields[i]);
1851                 return new table_def(node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1852         }
1853
1854 //              TODO! either bypass the output operator in stream_query,
1855 //                      or propagate key information when the output operator is constructed.
1856         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
1857                 std::vector<string> ret;
1858                 return ret;
1859         }
1860
1861         std::vector<tablevar_t *> get_input_tbls();
1862         std::vector<tablevar_t *> get_output_tbls();
1863
1864         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
1865                 std::vector<qp_node *> ret; ret.push_back(this); hfta_returned = true; return ret;
1866         }
1867         std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm){
1868                 std::vector<table_exp_t *> ret; return ret;
1869         }
1870 //              Ensure that any refs to interface params have been split away.
1871         int count_ifp_refs(std::set<std::string> &ifpnames){return 0;}
1872         int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;};
1873
1874
1875         output_file_qpn(std::string src_op, std::string qn, std::string fs_id, table_def *src_tbl_def, ospec_str *ospec, bool ei){
1876                 source_op_name = src_op;
1877                 node_name = source_op_name + "_output";
1878                 filestream_id = fs_id;
1879                 fields = src_tbl_def->get_fields();
1880                 output_spec = ospec;
1881                 fm.push_back(new tablevar_t(source_op_name.c_str()));
1882                 hfta_query_name = qn;
1883                 eat_input = ei;
1884
1885                 do_gzip = false;
1886                 compression_type = regular;
1887                 if(ospec->operator_type == "zfile")
1888                         compression_type = gzip;
1889
1890                 n_streams = 1;
1891                 parallel_idx = 0;
1892                 n_hfta_clones = 1;
1893
1894                 char buf[1000];
1895                 strncpy(buf, output_spec->operator_param.c_str(),1000);
1896                 buf[999] = '\0';
1897                 char *words[100];
1898                 int nwords = split_string(buf, ':', words,100);
1899                 int i;
1900                 for(i=0;i<nwords;i++){
1901                         params.push_back(words[i]);
1902                 }
1903                 for(i=0;i<params.size();i++){
1904                         if(params[i] == "gzip")
1905                                 do_gzip = true;
1906                 }
1907         }
1908
1909 //              Set output splitting parameters
1910         bool set_splitting_params(int np, int ix, int ns, std::string split_flds, std::string &err_report){
1911                 n_streams = ns;
1912                 n_hfta_clones = np;
1913                 parallel_idx = ix;
1914
1915                 if(split_flds != ""){
1916                         string err_flds = "";
1917                         char *tmpstr = strdup(split_flds.c_str());
1918                         char *words[100];
1919                         int nwords = split_string(tmpstr,':',words,100);
1920                         int i,j;
1921                         for(i=0;i<nwords;++i){
1922                                 string target = words[i];
1923                                 for(j=0;j<fields.size();++j){
1924                                         if(fields[j]->get_name() == target){
1925                                                 hash_flds.push_back(j);
1926                                                 break;
1927                                         }
1928                                 }
1929                                 if(j==fields.size()){
1930                                         err_flds += " "+target;
1931                                 }
1932                         }
1933                         if(err_flds != ""){
1934                                 err_report += "ERROR in "+hfta_query_name+", a file output operator needs to split the output but these splitting fileds are not part of the output:"+err_flds;
1935                                 return true;
1936                         }
1937                 }
1938                 return false;
1939         }
1940
1941         // the following method is used for distributed query optimization
1942         double get_rate_estimate(){return 1.0;}
1943
1944
1945         qp_node* make_copy(std::string suffix){
1946 //              output_file_qpn *ret = new output_file_qpn();
1947                 output_file_qpn *ret = new output_file_qpn(source_op_name, hfta_query_name, filestream_id, this->get_fields(), output_spec, eat_input);
1948                 return ret;
1949         }
1950
1951         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){}
1952
1953 };
1954
1955
1956
1957 //
1958
1959 // ---------------------------------------------
1960
1961
1962 //              Select, group-by, aggregate, sampling.
1963 //              Representing
1964 //                      Select SE_1, ..., SE_k
1965 //                      From T
1966 //                      Where predicate
1967 //                      Group By gb1, ..., gb_n
1968 //                      [Subgroup gb_i1, .., gb_ik]
1969 //                      Cleaning_when  predicate
1970 //                      Cleaning_by predicate
1971 //                      Having predicate
1972 //
1973 //              For now, must have group-by variables and aggregates.
1974 //              The scalar expressions which are output must be a function
1975 //              of the groub-by variables and the aggregates.
1976 //              The group-by variables can be references to columsn of T,
1977 //              or they can be scalar expressions.
1978 class sgahcwcb_qpn: public qp_node{
1979 public:
1980         tablevar_t *table_name;                         // source table
1981         std::vector<cnf_elem *> where;          // selection predicate
1982         std::vector<cnf_elem *> having;         // post-aggregation predicate
1983         std::vector<select_element *> select_list;      // se's of output
1984         gb_table gb_tbl;                        // Table of all group-by attributes.
1985         std::set<int> sg_tbl;           // Names of the superGB attributes
1986         aggregate_table aggr_tbl;       // Table of all referenced aggregates.
1987         std::set<std::string> states_refd;      // states ref'd by stateful fcns.
1988         std::vector<cnf_elem *> cleanby;
1989         std::vector<cnf_elem *> cleanwhen;
1990
1991         std::vector<scalarexp_t *> gb_sources;  // pre-compute for partitioning.
1992
1993         std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
1994
1995         std::string node_type(){return("sgahcwcb_qpn"); };
1996     bool makes_transform(){return true;};
1997         std::vector<std::string> external_libs(){
1998                 std::vector<std::string> ret;
1999                 return ret;
2000         }
2001
2002         void bind_to_schema(table_list *Schema);
2003         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
2004                 fprintf(stderr,"INTERNAL ERROR, calling sgahcwcb_qpn::get_colrefs\n");
2005                 exit(1);
2006         }
2007
2008         std::string to_query_string();
2009         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
2010         std::string generate_functor_name();
2011
2012         std::string generate_operator(int i, std::string params);
2013         std::string get_include_file(){return("#include <clean_operator.h>\n");};
2014
2015     std::vector<select_element *> get_select_list(){return select_list;};
2016     std::vector<scalarexp_t *> get_select_se_list(){
2017                 std::vector<scalarexp_t *> ret;
2018                 int i;
2019                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
2020                 return ret;
2021         };
2022     std::vector<cnf_elem *> get_where_clause(){return where;};
2023     std::vector<cnf_elem *> get_filter_clause(){return where;};
2024     std::vector<cnf_elem *> get_having_clause(){return having;};
2025     gb_table *get_gb_tbl(){return &gb_tbl;};
2026     aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
2027         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
2028     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
2029
2030 //                              table which represents output tuple.
2031         table_def *get_fields();
2032 //                      TODO Key extraction should be feasible but I'll defer the issue.
2033         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
2034                 std::vector<string> ret;
2035                 return ret;
2036         }
2037
2038         std::vector<tablevar_t *> get_input_tbls();
2039         std::vector<tablevar_t *> get_output_tbls();
2040
2041   void append_to_where(cnf_elem *c){
2042                 where.push_back(c);
2043         }
2044
2045
2046         sgahcwcb_qpn(){
2047         };
2048         sgahcwcb_qpn(query_summary_class *qs,table_list *Schema){
2049 //                              Get the table name.
2050 //                              NOTE the colrefs have the tablevar ref (an int)
2051 //                              embedded in them.  Would it make sense
2052 //                              to grab the whole table list?
2053                 tablevar_list_t *fm = qs->fta_tree->get_from();
2054                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
2055                 if(tbl_vec.size() != 1){
2056                         char tmpstr[200];
2057                         sprintf(tmpstr,"INTERNAL ERROR building SGAHCWCB node: query defined over %lu tables.\n",tbl_vec.size() );
2058                         err_str=tmpstr;
2059                         error_code = 1;
2060                 }
2061                 table_name = (tbl_vec[0]);
2062
2063 //                              Get the select list.
2064                 select_list = qs->fta_tree->get_sl_vec();
2065
2066 //                              Get the selection and having predicates.
2067                 where = qs->wh_cnf;
2068                 having = qs->hav_cnf;
2069                 cleanby = qs->cb_cnf;
2070                 cleanwhen = qs->cw_cnf;
2071
2072 //                              Build a new GB var table (don't share, might need to modify)
2073                 int g;
2074                 for(g=0;g<qs->gb_tbl->size();g++){
2075                         gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
2076                                 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
2077                                 qs->gb_tbl->get_reftype(g)
2078                         );
2079                 }
2080
2081 //                              Build a new aggregate table. (don't share, might need
2082 //                              to modify).
2083                 int a;
2084                 for(a=0;a<qs->aggr_tbl->size();a++){
2085                         aggr_tbl.add_aggr(
2086 //                              qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
2087                                 qs->aggr_tbl->duplicate(a)
2088                         );
2089                 }
2090
2091                 sg_tbl = qs->sg_tbl;
2092                 states_refd = qs->states_refd;
2093
2094
2095 //                              Get the parameters
2096                 param_tbl = qs->param_tbl;
2097
2098         };
2099
2100
2101
2102         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
2103         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
2104 //              Ensure that any refs to interface params have been split away.
2105 //                      CURRENTLY not allowed by split_node_for_fta
2106         int count_ifp_refs(std::set<std::string> &ifpnames){return 0;}
2107         int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;}
2108
2109         // the following method is used for distributed query optimization
2110         double get_rate_estimate();
2111
2112         qp_node* make_copy(std::string suffix){
2113                 sgahcwcb_qpn *ret = new sgahcwcb_qpn();
2114
2115                 ret->param_tbl = new param_table();
2116                 std::vector<std::string> param_names = param_tbl->get_param_names();
2117                 int pi;
2118                 for(pi=0;pi<param_names.size();pi++){
2119                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
2120                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
2121                                                         param_tbl->handle_access(param_names[pi]));
2122                 }
2123                 ret->definitions = definitions;
2124
2125                 ret->node_name = node_name + suffix;
2126
2127                 // make shallow copy of all fields
2128                 ret->where = where;
2129                 ret->having = having;
2130                 ret->select_list = select_list;
2131                 ret->gb_tbl = gb_tbl;
2132                 ret->aggr_tbl = aggr_tbl;
2133                 ret->sg_tbl = sg_tbl;
2134                 ret->states_refd = states_refd;
2135                 ret->cleanby = cleanby;
2136                 ret->cleanwhen = cleanwhen;
2137
2138                 return ret;
2139         };
2140
2141         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
2142 };
2143
2144
2145 std::vector<qp_node *> create_query_nodes(query_summary_class *qs,table_list *Schema);
2146
2147
2148
2149 void untaboo(string &s);
2150
2151 table_def *create_attributes(string tname, vector<select_element *> &select_list);
2152
2153
2154
2155 #endif