8c7d32a4e3a82faed1bb76c1be7432fe978c7347
[com/gs-lite.git] / src / ftacmp / query_plan.h
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15 #ifndef __QUERY_PLAN_H__
16 #define __QUERY_PLAN_H__
17
18 #include<vector>
19 #include<string>
20 #include<map>
21 using namespace std;
22
23 #include"analyze_fta.h"
24 #include"iface_q.h"
25 #include"parse_partn.h"
26 #include"generate_utils.h"
27
28 //              Identify the format of the input, output streams.
29 #define UNKNOWNFORMAT 0
30 #define NETFORMAT 1
31 #define HOSTFORMAT 2
32
33 ///////////////////////////////////////////////////
34 //      representation of an output operator specification
35
36 struct ospec_str{
37         string query;
38         string operator_type;
39         string operator_param;
40         string output_directory;
41         int bucketwidth;
42         string partitioning_flds;
43         int n_partitions;
44 };
45
46
47 ////////////////////////////////////////////////////
48 //      Input representation of a query
49
50 struct query_node{
51         int idx;
52         std::set<int> reads_from;
53         std::set<int> sources_to;
54         std::vector<std::string> refd_tbls;
55         std::vector<var_pair_t *> params;
56         std::string name;
57         std::string file;
58         std::string mangler;            // for UDOPs
59         bool touched;
60         table_exp_t *parse_tree;
61         int n_consumers;
62         bool is_udop;
63         bool is_externally_visible;
64         bool inferred_visible_node;
65
66         set<int> subtree_roots;
67
68         query_node(){
69                 idx = -1;
70                 touched = false;
71                 parse_tree = NULL;
72                 n_consumers = 0;
73                 is_externally_visible = false;
74                 inferred_visible_node = false;
75                 mangler="";
76         };
77         query_node(int i, std::string qnm, std::string flnm, table_exp_t *pt){
78                 idx = i;
79                 touched = false;
80                 name = qnm;
81                 file = flnm;
82                 parse_tree = pt;
83                 n_consumers = 0;
84                 is_udop = false;
85                 is_externally_visible = pt->get_visible();
86                 inferred_visible_node = false;
87                 mangler="";
88
89                 tablevar_list_t *fm = parse_tree->get_from();
90                 refd_tbls =  fm->get_table_names();
91
92                 params  = pt->query_params;
93         };
94         query_node(int ix, std::string udop_name,table_list *Schema){
95                 idx = ix;
96                 touched = false;
97                 name = udop_name;
98                 file = udop_name;
99                 parse_tree = NULL;
100                 n_consumers = 0;
101                 is_udop = true;
102                 is_externally_visible = true;
103                 inferred_visible_node = false;
104                 mangler="";
105
106                 int sid = Schema->find_tbl(udop_name);
107                 std::vector<subquery_spec *> subq = Schema->get_subqueryspecs(sid);
108                 int i;
109                 for(i=0;i<subq.size();++i){
110                         refd_tbls.push_back(subq[i]->name);
111                 }
112         };
113 };
114
115 struct hfta_node{
116         std::string name;
117         std::string source_name;
118         std::vector<int> query_node_indices;
119         std::set<int> reads_from;
120         std::set<int> sources_to;
121         bool is_udop;
122         bool inferred_visible_node;
123         int n_parallel;
124         int parallel_idx;
125         bool do_generation;     // false means, ignore it.
126
127         hfta_node(){
128                 is_udop = false;
129                 inferred_visible_node = false;
130                 n_parallel = 1;
131                 parallel_idx = 0;
132                 do_generation = true;
133         }
134 };
135
136
137
138
139
140
141 #define SPX_QUERY 1
142 #define SGAH_QUERY 2
143
144 // the following selectivity estimates are used by our primitive rate estimators
145 #define SPX_SELECTIVITY 1.0
146 #define SGAH_SELECTIVITY 0.1
147 #define RSGAH_SELECTIVITY 0.1
148 #define SGAHCWCB_SELECTIVITY 0.1
149 #define MRG_SELECTIVITY 1.0
150 #define JOIN_EQ_HASH_SELECTIVITY 1.0
151
152 // the the output rate of the interface is not given we are going to use
153 // this default value
154 #define DEFAULT_INTERFACE_RATE 100
155
156
157 //                      Define query plan nodes
158 //                      These nodes are intended for query modeling
159 //                      and transformation rather than for code generation.
160
161
162 //                      Query plan node base class.
163 //                      It has an ID, can return its type,
164 //                      and can be linked into lists with the predecessors
165 //                      and successors.
166 //                      To add : serialize, unserialize?
167
168 class qp_node{
169 public:
170   int id;
171   std::vector<int> predecessors;
172   std::vector<int> successors;
173   std::string node_name;
174
175 //              For error reporting without exiting the program.
176   int error_code;
177   std::string err_str;
178
179 //                      These should be moved to the containing stream_query object.
180   std::map<std::string, std::string> definitions;
181   param_table *param_tbl;
182
183 //              The value of a field in terms of protocol fields (if any).
184   std::map<std::string, scalarexp_t *> protocol_map;
185
186   qp_node(){
187         error_code = 0;
188         id = -1;
189         param_tbl = new param_table();
190   };
191   qp_node(int i){
192         error_code = 0;
193         id = i;
194         param_tbl = new param_table();
195   };
196
197   int get_id(){return(id);};
198   void set_id(int i){id = i;    };
199
200   int get_error_code(){return error_code;};
201   std::string get_error_str(){return err_str;};
202
203   virtual std::string node_type() = 0;
204
205 //              For code generation, does the operator xform its input.
206   virtual bool makes_transform() = 0;
207
208 //              For linking, what external libraries does the operator depend on?
209   virtual std::vector<std::string> external_libs() = 0;
210
211   void set_node_name(std::string n){node_name = n;};
212   std::string get_node_name(){return node_name;};
213
214   void set_definitions(std::map<std::string, std::string> &def){
215           definitions = def;
216   };
217   std::map<std::string, std::string> get_definitions(){return definitions;};
218
219
220 //              call to create the mapping from field name to se in protocol fields.
221 //              Pass in qp_node of data sources, in order.
222   virtual void create_protocol_se(std::vector<qp_node *> q_sources,table_list *Schema)=0;
223 //              get the protocol map.  the parameter is the return value.
224   std::map<std::string, scalarexp_t *> *get_protocol_se(){return &protocol_map;}
225
226 //              Each qp node must be able to return a description
227 //              of the tuples it creates.
228 //              TODO: the get_output_tls method should subsume the get_fields
229 //                      method, but in fact it really just returns the
230 //                      operator name.
231   virtual table_def *get_fields() = 0;  // Should be vector?
232 //              Get the from clause
233   virtual std::vector<tablevar_t *> get_input_tbls() = 0;
234 //              this is a confused function, it acutally return the output
235 //              table name.
236   virtual std::vector<tablevar_t *> get_output_tbls() = 0;
237
238   std::string get_val_of_def(std::string def){
239         if(definitions.count(def) > 0) return definitions[def];
240         return("");
241   };
242   void set_definition(std::string def, std::string val){
243         definitions[def]=val;
244   }
245
246 //              Associate colrefs in SEs with tables
247 //              at code generation time.
248   virtual void bind_to_schema(table_list *Schema) = 0;
249
250 //              Get colrefs of the operator, currently only meaningful for lfta
251 //              operators, and only interested in colrefs with extraction fcns
252   virtual col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema)=0;
253
254   virtual std::string to_query_string() = 0;
255   virtual std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform) = 0;
256   virtual std::string generate_functor_name() = 0;
257
258   virtual std::string generate_operator(int i, std::string params) = 0;
259   virtual std::string get_include_file() = 0;
260
261   virtual cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns) = 0;
262   virtual std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns) = 0;
263
264 //              Split this node into LFTA and HFTA nodes.
265 //              Four possible outcomes:
266 //              1) the qp_node reads from a protocol, but does not need to
267 //                      split (can be evaluated as an LFTA).
268 //                      The lfta node is the only element in the return vector,
269 //                      and hfta_returned is false.
270 //              2) the qp_node reads from no protocol, and therefore cannot be split.
271 //                      THe hfta node is the only element in the return vector,
272 //                      and hfta_returned is true.
273 //              3) reads from at least one protocol, but cannot be split : failure.
274 //                      return vector is empty, the error conditions are written
275 //                      in the qp_node.
276 //              4) The qp_node splits into an hfta node and one or more LFTA nodes.
277 //                      the return vector has two or more elements, and hfta_returned
278 //                      is true.  The last element is the HFTA.
279         virtual std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx) = 0;
280
281
282 //              Ensure that any refs to interface params have been split away.
283         virtual int count_ifp_refs(std::set<std::string> &ifpnames)=0;
284
285
286
287 //              Tag the data sources which are views,
288 //              return the (optimized) source queries and
289 //              record the view access in opview_set
290         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm) = 0;
291
292   param_table *get_param_tbl(){return param_tbl;};
293
294 //                      The "where" clause is a pre-filter
295   virtual  std::vector<cnf_elem *> get_where_clause() = 0;
296 //                      To be more explicit, use get_filter_preds
297   virtual  std::vector<cnf_elem *> get_filter_clause() = 0;
298
299   void add_predecessor(int i){predecessors.push_back(i);};
300   void remove_predecessor(int i){
301         std::vector<int>::iterator vi;
302         for(vi=predecessors.begin(); vi!=predecessors.end();++vi){
303                 if((*vi) == i){
304                         predecessors.erase(vi);
305                         return;
306                 }
307         }
308   };
309   void add_successor(int i){successors.push_back(i);};
310   std::vector<int> get_predecessors(){return predecessors;};
311   int n_predecessors(){return predecessors.size();};
312   std::vector<int> get_successors(){return successors;};
313   int n_successors(){return successors.size();};
314   void clear_predecessors(){predecessors.clear();};
315   void clear_successors(){successors.clear();};
316
317   // the following method is used for distributed query optimization
318   double get_rate_estimate();
319
320
321   // used for cloning query nodes
322   virtual qp_node* make_copy(std::string suffix) = 0;
323 };
324
325
326
327 //              Select, project, transform (xform) query plan node.
328 //              represent the following query fragment
329 //                      select scalar_expression_1, ..., scalar_expression_k
330 //                      from S
331 //                      where predicate
332 //
333 //              the predicates and the scalar expressions can reference
334 //              attributes of S and also functions.
335 class spx_qpn: public qp_node{
336 public:
337         tablevar_t *table_name;                                 //      Source table
338         std::vector<cnf_elem *> where;                  // selection predicate
339         std::vector<select_element *> select_list;      //      Select list
340
341
342
343         std::string node_type(){return("spx_qpn");      };
344     bool makes_transform(){return true;};
345         std::vector<std::string> external_libs(){
346                 std::vector<std::string> ret;
347                 return ret;
348         }
349
350         void bind_to_schema(table_list *Schema);
351         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
352
353         std::string to_query_string();
354         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
355         std::string generate_functor_name();
356         std::string generate_operator(int i, std::string params);
357         std::string get_include_file(){return("#include <selection_operator.h>\n");};
358
359     std::vector<select_element *> get_select_list(){return select_list;};
360     std::vector<scalarexp_t *> get_select_se_list(){
361                 std::vector<scalarexp_t *> ret;
362                 int i;
363                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
364                 return ret;
365         };
366     std::vector<cnf_elem *> get_where_clause(){return where;};
367     std::vector<cnf_elem *> get_filter_clause(){return where;};
368         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
369     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
370
371         table_def *get_fields();
372         std::vector<tablevar_t *> get_input_tbls();
373         std::vector<tablevar_t *> get_output_tbls();
374
375         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
376         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
377 //              Ensure that any refs to interface params have been split away.
378         int count_ifp_refs(std::set<std::string> &ifpnames);
379         int resolve_if_params(ifq_t *ifdb, std::string &err);
380
381         spx_qpn(){
382         };
383         spx_qpn(query_summary_class *qs,table_list *Schema){
384 //                              Get the table name.
385 //                              NOTE the colrefs have the table ref (an int)
386 //                              embedded in them.  Would it make sense
387 //                              to grab the whole table list?
388                 tablevar_list_t *fm = qs->fta_tree->get_from();
389                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
390                 if(tbl_vec.size() != 1){
391                         char tmpstr[200];
392                         sprintf(tmpstr,"INTERNAL ERROR building SPX node: query defined over %lu tables.\n",tbl_vec.size() );
393                         err_str = tmpstr;
394                         error_code = 1;
395                 }
396                 table_name = (tbl_vec[0]);
397
398 //                              Get the select list.
399                 select_list = qs->fta_tree->get_sl_vec();
400
401 //                              Get the selection predicate.
402                 where = qs->wh_cnf;
403
404
405 //                              Get the parameters
406                 param_tbl = qs->param_tbl;
407
408
409
410         };
411
412         // the following method is used for distributed query optimization
413         double get_rate_estimate();
414
415
416         qp_node* make_copy(std::string suffix){
417                 spx_qpn *ret = new spx_qpn();
418
419                 ret->param_tbl = new param_table();
420                 std::vector<std::string> param_names = param_tbl->get_param_names();
421                 int pi;
422                 for(pi=0;pi<param_names.size();pi++){
423                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
424                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
425                                                         param_tbl->handle_access(param_names[pi]));
426                 }
427                 ret->definitions = definitions;
428                 ret->node_name = node_name + suffix;
429
430                 // make shallow copy of all fields
431                 ret->where = where;
432                 ret->select_list = select_list;
433
434                 return ret;
435         };
436         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
437
438 };
439
440
441
442 //              Select, group-by, aggregate.
443 //              Representing
444 //                      Select SE_1, ..., SE_k
445 //                      From T
446 //                      Where predicate
447 //                      Group By gb1, ..., gb_n
448 //                      Having predicate
449 //
450 //              NOTE : the samlping operator is sgahcwcb_qpn.
451 //
452 //              For now, must have group-by variables and aggregates.
453 //              The scalar expressions which are output must be a function
454 //              of the groub-by variables and the aggregates.
455 //              The group-by variables can be references to columsn of T,
456 //              or they can be scalar expressions.
457 class sgah_qpn: public qp_node{
458 public:
459         tablevar_t *table_name;                         // source table
460         std::vector<cnf_elem *> where;          // selection predicate
461         std::vector<cnf_elem *> having;         // post-aggregation predicate
462         std::vector<select_element *> select_list;      // se's of output
463         gb_table gb_tbl;                        // Table of all group-by attributes.
464         aggregate_table aggr_tbl;       // Table of all referenced aggregates.
465
466         std::vector<scalarexp_t *> gb_sources;  // pre-compute for partitioning.
467
468         int lfta_disorder;              // maximum disorder in the steam between lfta, hfta
469         int hfta_disorder;              // maximum disorder in the  hfta
470
471 //              rollup, cube, and grouping_sets cannot be readily reconstructed by
472 //              analyzing the patterns, so explicitly record them here.
473 //              used only so that to_query_string produces something meaningful.
474         std::vector<std::string> gb_entry_type;
475         std::vector<int> gb_entry_count;
476
477         std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
478
479         std::string node_type(){return("sgah_qpn");     };
480     bool makes_transform(){return true;};
481         std::vector<std::string> external_libs(){
482                 std::vector<std::string> ret;
483                 return ret;
484         }
485
486         void bind_to_schema(table_list *Schema);
487         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
488
489         std::string to_query_string();
490         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
491         std::string generate_functor_name();
492
493         std::string generate_operator(int i, std::string params);
494         std::string get_include_file(){
495                         if(hfta_disorder <= 1){
496                                 return("#include <groupby_operator.h>\n");
497                         }else{
498                                 return("#include <groupby_operator_oop.h>\n");
499                         }
500         };
501
502     std::vector<select_element *> get_select_list(){return select_list;};
503     std::vector<scalarexp_t *> get_select_se_list(){
504                 std::vector<scalarexp_t *> ret;
505                 int i;
506                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
507                 return ret;
508         };
509     std::vector<cnf_elem *> get_where_clause(){return where;};
510     std::vector<cnf_elem *> get_filter_clause(){return where;};
511     std::vector<cnf_elem *> get_having_clause(){return having;};
512     gb_table *get_gb_tbl(){return &gb_tbl;};
513     aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
514         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
515     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
516
517 //                              table which represents output tuple.
518         table_def *get_fields();
519         std::vector<tablevar_t *> get_input_tbls();
520         std::vector<tablevar_t *> get_output_tbls();
521
522
523         sgah_qpn(){
524                 lfta_disorder = 1;
525                 hfta_disorder = 1;
526         };
527         sgah_qpn(query_summary_class *qs,table_list *Schema){
528                 lfta_disorder = 1;
529                 hfta_disorder = 1;
530
531 //                              Get the table name.
532 //                              NOTE the colrefs have the tablevar ref (an int)
533 //                              embedded in them.  Would it make sense
534 //                              to grab the whole table list?
535                 tablevar_list_t *fm = qs->fta_tree->get_from();
536                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
537                 if(tbl_vec.size() != 1){
538                         char tmpstr[200];
539                         sprintf(tmpstr,"INTERNAL ERROR building SGAH node: query defined over %lu tables.\n",tbl_vec.size() );
540                         err_str=tmpstr;
541                         error_code = 1;
542                 }
543                 table_name = (tbl_vec[0]);
544
545 //                              Get the select list.
546                 select_list = qs->fta_tree->get_sl_vec();
547
548 //                              Get the selection and having predicates.
549                 where = qs->wh_cnf;
550                 having = qs->hav_cnf;
551
552 //                              Build a new GB var table (don't share, might need to modify)
553                 int g;
554                 for(g=0;g<qs->gb_tbl->size();g++){
555                         gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
556                                 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
557                                 qs->gb_tbl->get_reftype(g)
558                         );
559                 }
560                 gb_tbl.set_pattern_info(qs->gb_tbl);
561 //              gb_tbl.gb_entry_type = qs->gb_tbl->gb_entry_type;
562 //              gb_tbl.gb_entry_count = qs->gb_tbl->gb_entry_count;
563 //              gb_tbl.pattern_components = qs->gb_tbl->pattern_components;
564
565 //                              Build a new aggregate table. (don't share, might need
566 //                              to modify).
567                 int a;
568                 for(a=0;a<qs->aggr_tbl->size();a++){
569                         aggr_tbl.add_aggr(
570 //                              qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
571                                 qs->aggr_tbl->duplicate(a)
572                         );
573                 }
574
575
576 //                              Get the parameters
577                 param_tbl = qs->param_tbl;
578
579         };
580
581
582
583         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
584         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
585 //              Ensure that any refs to interface params have been split away.
586         int count_ifp_refs(std::set<std::string> &ifpnames);
587         int resolve_if_params(ifq_t *ifdb, std::string &err);
588
589         // the following method is used for distributed query optimization
590         double get_rate_estimate();
591
592
593         qp_node* make_copy(std::string suffix){
594                 sgah_qpn *ret = new sgah_qpn();
595
596                 ret->param_tbl = new param_table();
597                 std::vector<std::string> param_names = param_tbl->get_param_names();
598                 int pi;
599                 for(pi=0;pi<param_names.size();pi++){
600                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
601                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
602                                                         param_tbl->handle_access(param_names[pi]));
603                 }
604                 ret->definitions = definitions;
605
606                 ret->node_name = node_name + suffix;
607
608                 // make shallow copy of all fields
609                 ret->where = where;
610                 ret->having = having;
611                 ret->select_list = select_list;
612                 ret->gb_tbl = gb_tbl;
613                 ret->aggr_tbl = aggr_tbl;
614
615                 return ret;
616         };
617
618 //              Split aggregation into two HFTA components - sub and superaggregation
619 //              If unable to split the aggreagates, split into selection and aggregation
620 //              If resulting low-level query is empty (e.g. when aggregates cannot be split and
621 //              where clause is empty) empty vector willb e returned
622         virtual std::vector<qp_node *> split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema);
623
624         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
625
626 };
627
628
629
630
631 //              Select, group-by, aggregate. with running aggregates
632 //              Representing
633 //                      Select SE_1, ..., SE_k
634 //                      From T
635 //                      Where predicate
636 //                      Group By gb1, ..., gb_n
637 //                      Closing When predicate
638 //                      Having predicate
639 //
640 //              NOTE : the sampling operator is sgahcwcb_qpn.
641 //
642 //              For now, must have group-by variables and aggregates.
643 //              The scalar expressions which are output must be a function
644 //              of the groub-by variables and the aggregates.
645 //              The group-by variables can be references to columsn of T,
646 //              or they can be scalar expressions.
647 class rsgah_qpn: public qp_node{
648 public:
649         tablevar_t *table_name;                         // source table
650         std::vector<cnf_elem *> where;          // selection predicate
651         std::vector<cnf_elem *> having;         // post-aggregation predicate
652         std::vector<cnf_elem *> closing_when;           // group closing predicate
653         std::vector<select_element *> select_list;      // se's of output
654         gb_table gb_tbl;                        // Table of all group-by attributes.
655         aggregate_table aggr_tbl;       // Table of all referenced aggregates.
656
657         std::vector<scalarexp_t *> gb_sources;  // pre-compute for partitioning.
658
659         int lfta_disorder;              // maximum disorder allowed in stream between lfta, hfta
660         int hfta_disorder;              // maximum disorder allowed in hfta
661
662         std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
663
664
665         std::string node_type(){return("rsgah_qpn");    };
666     bool makes_transform(){return true;};
667         std::vector<std::string> external_libs(){
668                 std::vector<std::string> ret;
669                 return ret;
670         }
671
672         void bind_to_schema(table_list *Schema);
673         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
674                 fprintf(stderr,"INTERNAL ERROR, calling rsgah_qpn::get_colrefs\n");
675                 exit(1);
676         }
677
678         std::string to_query_string();
679         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
680         std::string generate_functor_name();
681
682         std::string generate_operator(int i, std::string params);
683         std::string get_include_file(){return("#include <running_gb_operator.h>\n");};
684
685     std::vector<select_element *> get_select_list(){return select_list;};
686     std::vector<scalarexp_t *> get_select_se_list(){
687                 std::vector<scalarexp_t *> ret;
688                 int i;
689                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
690                 return ret;
691         };
692     std::vector<cnf_elem *> get_where_clause(){return where;};
693     std::vector<cnf_elem *> get_filter_clause(){return where;};
694     std::vector<cnf_elem *> get_having_clause(){return having;};
695     std::vector<cnf_elem *> get_closing_when_clause(){return closing_when;};
696     gb_table *get_gb_tbl(){return &gb_tbl;};
697     aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
698         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
699     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
700
701 //                              table which represents output tuple.
702         table_def *get_fields();
703         std::vector<tablevar_t *> get_input_tbls();
704         std::vector<tablevar_t *> get_output_tbls();
705
706
707         rsgah_qpn(){
708                 lfta_disorder = 1;
709                 hfta_disorder = 1;
710         };
711         rsgah_qpn(query_summary_class *qs,table_list *Schema){
712                 lfta_disorder = 1;
713                 hfta_disorder = 1;
714
715 //                              Get the table name.
716 //                              NOTE the colrefs have the tablevar ref (an int)
717 //                              embedded in them.  Would it make sense
718 //                              to grab the whole table list?
719                 tablevar_list_t *fm = qs->fta_tree->get_from();
720                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
721                 if(tbl_vec.size() != 1){
722                         char tmpstr[200];
723                         sprintf(tmpstr,"INTERNAL ERROR buildingR SGAH node: query defined over %lu tables.\n",tbl_vec.size() );
724                         err_str=tmpstr;
725                         error_code = 1;
726                 }
727                 table_name = (tbl_vec[0]);
728
729 //                              Get the select list.
730                 select_list = qs->fta_tree->get_sl_vec();
731
732 //                              Get the selection and having predicates.
733                 where = qs->wh_cnf;
734                 having = qs->hav_cnf;
735                 closing_when = qs->closew_cnf;
736
737 //                              Build a new GB var table (don't share, might need to modify)
738                 int g;
739                 for(g=0;g<qs->gb_tbl->size();g++){
740                         gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
741                                 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
742                                 qs->gb_tbl->get_reftype(g)
743                         );
744                 }
745
746 //                              Build a new aggregate table. (don't share, might need
747 //                              to modify).
748                 int a;
749                 for(a=0;a<qs->aggr_tbl->size();a++){
750                         aggr_tbl.add_aggr(
751 //                              qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
752                                 qs->aggr_tbl->duplicate(a)
753                         );
754                 }
755
756
757 //                              Get the parameters
758                 param_tbl = qs->param_tbl;
759
760         };
761
762
763
764         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
765         std::vector<qp_node *> split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema);
766         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
767 //              Ensure that any refs to interface params have been split away.
768         int count_ifp_refs(std::set<std::string> &ifpnames);
769         int resolve_if_params(ifq_t *ifdb, std::string &err){ return 0;}
770
771         // the following method is used for distributed query optimization
772         double get_rate_estimate();
773
774         qp_node* make_copy(std::string suffix){
775                 rsgah_qpn *ret = new rsgah_qpn();
776
777                 ret->param_tbl = new param_table();
778                 std::vector<std::string> param_names = param_tbl->get_param_names();
779                 int pi;
780                 for(pi=0;pi<param_names.size();pi++){
781                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
782                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
783                                                         param_tbl->handle_access(param_names[pi]));
784                 }
785                 ret->definitions = definitions;
786
787                 ret->node_name = node_name + suffix;
788
789                 // make shallow copy of all fields
790                 ret->where = where;
791                 ret->having = having;
792                 ret->closing_when = closing_when;
793                 ret->select_list = select_list;
794                 ret->gb_tbl = gb_tbl;
795                 ret->aggr_tbl = aggr_tbl;
796
797                 return ret;
798         };
799         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
800 };
801
802
803 //              forward reference
804 class filter_join_qpn;
805
806
807 //              (temporal) Merge query plan node.
808 //              represent the following query fragment
809 //                      Merge c1:c2
810 //                      from T1 _t1, T2 _t2
811 //
812 //              T1 and T2 must have compatible schemas,
813 //              that is the same types in the same slots.
814 //              c1 and c2 must be colrefs from T1 and T2,
815 //              both ref'ing the same slot.  Their types
816 //              must be temporal and the same kind of temporal.
817 //              in the output, no other field is temporal.
818 //              the field names ofthe output are drawn from T1.
819 class mrg_qpn: public qp_node{
820 public:
821         std::vector<tablevar_t *> fm;                                   //      Source table
822         std::vector<colref_t *> mvars;                                  // the merge-by columns.
823         scalarexp_t *slack;
824
825         table_def *table_layout;                                // the output schema
826         int merge_fieldpos;                                             // position of merge field,
827                                                                                         // convenience for manipulation.
828
829         int disorder;           // max disorder seen in the input / allowed in the output
830
831
832         // partition definition for merges that combine streams partitioned over multiple interfaces
833         partn_def_t* partn_def;
834
835
836
837         std::string node_type(){return("mrg_qpn");      };
838     bool makes_transform(){return false;};
839         std::vector<std::string> external_libs(){
840                 std::vector<std::string> ret;
841                 return ret;
842         }
843
844         void bind_to_schema(table_list *Schema);
845         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
846                 fprintf(stderr,"INTERNAL ERROR, calling mrg_qpn::get_colrefs\n");
847                 exit(1);
848         }
849
850         std::string to_query_string();
851         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
852         std::string generate_functor_name();
853         std::string generate_operator(int i, std::string params);
854         std::string get_include_file(){
855                 if(disorder>1)
856                         return("#include <merge_operator_oop.h>\n");
857                 return("#include <merge_operator.h>\n");
858         };
859
860         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
861     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
862
863         table_def *get_fields();
864         std::vector<tablevar_t *> get_input_tbls();
865         std::vector<tablevar_t *> get_output_tbls();
866
867         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
868         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);
869 //              Ensure that any refs to interface params have been split away.
870         int count_ifp_refs(std::set<std::string> &ifpnames);
871
872 //                      No predicates, return an empty clause
873     std::vector<cnf_elem *> get_where_clause(){
874                  std::vector<cnf_elem *> t;
875                 return(t);
876         };
877     std::vector<cnf_elem *> get_filter_clause(){
878                 return get_where_clause();
879         }
880
881         mrg_qpn(){
882                 partn_def = NULL;
883         };
884
885         void set_disorder(int d){
886                 disorder = d;
887         }
888
889         mrg_qpn(query_summary_class *qs,table_list *Schema){
890                 disorder = 1;
891
892 //                              Grab the elements of the query node.
893                 fm = qs->fta_tree->get_from()->get_table_list();
894                 mvars = qs->mvars;
895                 slack = qs->slack;
896
897 //                      sanity check
898                 if(fm.size() != mvars.size()){
899                         fprintf(stderr,"INTERNAL ERROR in mrg_qpn::mrg_qpn.  fm.size() = %lu, mvars.size() = %lu\n",fm.size(),mvars.size());
900                         exit(1);
901                 }
902
903 //                              Get the parameters
904                 param_tbl = qs->param_tbl;
905
906 //                              Need to set the node name now, so that the
907 //                              schema (table_layout) can be properly named.
908 //                              TODO: Setting the name of the table might best be done
909 //                              via the set_node_name method, because presumably
910 //                              thats when the node name is really known.
911 //                              This should propogate to the table_def table_layout
912                 node_name=qs->query_name;
913
914 /*
915 int ff;
916 printf("instantiating merge node, name = %s, %d sources.\n\t",node_name.c_str(), fm.size());
917 for(ff=0;ff<fm.size();++ff){
918 printf("%s ",fm[ff]->to_string().c_str());
919 }
920 printf("\n");
921 */
922
923
924 //              Create the output schema.
925 //              strip temporal properites form all fields except the merge field.
926                 std::vector<field_entry *> flva = Schema->get_fields(fm[0]->get_schema_name());
927                 field_entry_list *fel = new field_entry_list();
928                 int f;
929                 for(f=0;f<flva.size();++f){
930                         field_entry *fe;
931                         data_type dt(flva[f]->get_type().c_str(), flva[f]->get_modifier_list());
932                         if(flva[f]->get_name() == mvars[0]->get_field()){
933                                 merge_fieldpos = f;
934 //                              if(slack != NULL) dt.reset_temporal();
935                         }else{
936                                 dt.reset_temporal();
937                         }
938
939                         param_list *plist = new param_list();
940                         std::vector<std::string> param_strings = dt.get_param_keys();
941                         int p;
942                         for(p=0;p<param_strings.size();++p){
943                                 std::string v = dt.get_param_val(param_strings[p]);
944                                 if(v != "")
945                                         plist->append(param_strings[p].c_str(),v.c_str());
946                                 else
947                                         plist->append(param_strings[p].c_str());
948                         }
949
950
951                         fe=new field_entry(
952                                 dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist, flva[f]->get_unpack_fcns());
953                         fel->append_field(fe);
954                 }
955
956
957
958
959                 table_layout = new table_def(
960                         node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA
961                 );
962
963                 partn_def = NULL;
964         };
965
966
967 /////////////////////////////////////////////
968 ///             Created for de-siloing.  to be removed?  or is it otherwise useful?
969 //              Merge existing set of sources (de-siloing)
970         mrg_qpn(std::string n_name, std::vector<std::string> &src_names,table_list *Schema){
971                 int i,f;
972
973                 disorder = 1;
974
975 //                              Construct the fm list
976                 for(f=0;f<src_names.size();++f){
977                         int tbl_ref = Schema->get_table_ref(src_names[f]);
978                         if(tbl_ref < 0){
979                                 fprintf(stderr,"INTERNAL ERROR, can't find %s in the schema when constructing no-silo merge node %s\n",src_names[f].c_str(), n_name.c_str());
980                                 exit(1);
981                         }
982                         table_def *src_tbl = Schema->get_table(tbl_ref);
983                         tablevar_t *fm_t = new tablevar_t(src_names[f].c_str());
984                         string range_name = "_t" + int_to_string(f);
985                         fm_t->set_range_var(range_name);
986                         fm_t->set_schema_ref(tbl_ref);
987                         fm.push_back(fm_t);
988                 }
989
990 //              Create the output schema.
991 //              strip temporal properites form all fields except the merge field.
992                 std::vector<field_entry *> flva = Schema->get_fields(fm[0]->get_schema_name());
993                 field_entry_list *fel = new field_entry_list();
994                 bool temporal_found = false;
995                 for(f=0;f<flva.size();++f){
996                         field_entry *fe;
997                         data_type dt(flva[f]->get_type().c_str(), flva[f]->get_modifier_list());
998                         if(dt.is_temporal() && !temporal_found){
999                                 merge_fieldpos = f;
1000                                 temporal_found = true;
1001                         }else{
1002                                 dt.reset_temporal();
1003                         }
1004
1005                         param_list *plist = new param_list();
1006                         std::vector<std::string> param_strings = dt.get_param_keys();
1007                         int p;
1008                         for(p=0;p<param_strings.size();++p){
1009                                 std::string v = dt.get_param_val(param_strings[p]);
1010                                 if(v != "")
1011                                         plist->append(param_strings[p].c_str(),v.c_str());
1012                                 else
1013                                         plist->append(param_strings[p].c_str());
1014                         }
1015
1016                         fe=new field_entry(
1017                                 dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist,
1018                                 flva[f]->get_unpack_fcns()
1019                         );
1020                         fel->append_field(fe);
1021                 }
1022
1023                 if(! temporal_found){
1024                         fprintf(stderr,"ERROR, can't find temporal field of the sources when constructing no-silo merge node %s\n",n_name.c_str());
1025                         exit(1);
1026                 }
1027
1028                 node_name=n_name;
1029                 table_layout = new table_def(
1030                         node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA
1031                 );
1032
1033                 partn_def = NULL;
1034                 param_tbl = new param_table();
1035
1036 //                      Construct mvars
1037                 for(f=0;f<fm.size();++f){
1038                         std::vector<field_entry *> flv_f = Schema->get_fields(fm[f]->get_schema_name());
1039                         data_type dt_f(flv_f[merge_fieldpos]->get_type().c_str(),
1040                              flva[merge_fieldpos]->get_modifier_list());
1041
1042                         colref_t *mcr = new colref_t(fm[f]->get_var_name().c_str(),
1043                                 flv_f[merge_fieldpos]->get_name().c_str());
1044                         mvars.push_back(mcr);
1045                 }
1046
1047 //              literal_t *s_lit = new literal_t("5",LITERAL_INT);
1048 //              slack = new scalarexp_t(s_lit);
1049                 slack = NULL;
1050
1051         };
1052 //                      end de-siloing
1053 ////////////////////////////////////////
1054
1055         void resolve_slack(scalarexp_t *t_se, std::string fname, std::vector<std::pair<std::string, std::string> > &sources,ifq_t *ifdb, gb_table *gbt);
1056
1057
1058 //                      Merge filter_join LFTAs.
1059
1060         mrg_qpn(filter_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb);
1061
1062 //                      Merge selection LFTAs.
1063
1064         mrg_qpn(spx_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb){
1065
1066                 disorder = 1;
1067
1068                 param_tbl = spx->param_tbl;
1069                 int i;
1070                 node_name = n_name;
1071                 field_entry_list *fel = new field_entry_list();
1072                 merge_fieldpos = -1;
1073
1074
1075
1076
1077                 for(i=0;i<spx->select_list.size();++i){
1078                         data_type *dt = spx->select_list[i]->se->get_data_type()->duplicate();
1079                         if(dt->is_temporal()){
1080                                 if(merge_fieldpos < 0){
1081                                         merge_fieldpos = i;
1082                                 }else{
1083                                         fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), spx->select_list[merge_fieldpos]->name.c_str(), spx->select_list[i]->name.c_str(), spx->select_list[merge_fieldpos]->name.c_str() );
1084                                         dt->reset_temporal();
1085                                 }
1086                         }
1087
1088                         field_entry *fe = dt->make_field_entry(spx->select_list[i]->name);
1089                         fel->append_field(fe);
1090                         delete dt;
1091                 }
1092                 if(merge_fieldpos<0){
1093                         fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
1094                                 exit(1);
1095                 }
1096                 table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1097
1098 //                              NEED TO HANDLE USER_SPECIFIED SLACK
1099                 this->resolve_slack(spx->select_list[merge_fieldpos]->se,
1100                                 spx->select_list[merge_fieldpos]->name, ifaces, ifdb,NULL);
1101 //      if(this->slack == NULL)
1102 //              fprintf(stderr,"Zero slack.\n");
1103 //      else
1104 //              fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
1105
1106                 for(i=0;i<sources.size();i++){
1107                         std::string rvar = "_m"+int_to_string(i);
1108                         mvars.push_back(new colref_t(rvar.c_str(), spx->select_list[merge_fieldpos]->name.c_str()));
1109                         mvars[i]->set_tablevar_ref(i);
1110                         fm.push_back(new tablevar_t(sources[i].c_str()));
1111                         fm[i]->set_range_var(rvar);
1112                 }
1113
1114                 param_tbl = new param_table();
1115                 std::vector<std::string> param_names = spx->param_tbl->get_param_names();
1116                 int pi;
1117                 for(pi=0;pi<param_names.size();pi++){
1118                         data_type *dt = spx->param_tbl->get_data_type(param_names[pi]);
1119                         param_tbl->add_param(param_names[pi],dt->duplicate(),
1120                                                         spx->param_tbl->handle_access(param_names[pi]));
1121                 }
1122                 definitions = spx->definitions;
1123
1124         }
1125
1126 //              Merge aggregation LFTAs
1127
1128         mrg_qpn(sgah_qpn *sgah, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair< std::string, std::string> > &ifaces, ifq_t *ifdb){
1129
1130                 disorder = 1;
1131
1132                 param_tbl = sgah->param_tbl;
1133                 int i;
1134                 node_name = n_name;
1135                 field_entry_list *fel = new field_entry_list();
1136                 merge_fieldpos = -1;
1137                 for(i=0;i<sgah->select_list.size();++i){
1138                         data_type *dt = sgah->select_list[i]->se->get_data_type()->duplicate();
1139                         if(dt->is_temporal()){
1140                                 if(merge_fieldpos < 0){
1141                                         merge_fieldpos = i;
1142                                 }else{
1143                                         fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str(), sgah->select_list[i]->name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str() );
1144                                         dt->reset_temporal();
1145                                 }
1146                         }
1147
1148                         field_entry *fe = dt->make_field_entry(sgah->select_list[i]->name);
1149                         fel->append_field(fe);
1150                         delete dt;
1151                 }
1152                 if(merge_fieldpos<0){
1153                         fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
1154                         exit(1);
1155                 }
1156                 table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1157
1158 //                              NEED TO HANDLE USER_SPECIFIED SLACK
1159                 this->resolve_slack(sgah->select_list[merge_fieldpos]->se,
1160                                 sgah->select_list[merge_fieldpos]->name, ifaces, ifdb,
1161                                 &(sgah->gb_tbl));
1162                 if(this->slack == NULL)
1163                         fprintf(stderr,"Zero slack.\n");
1164                 else
1165                         fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
1166
1167
1168                 for(i=0;i<sources.size();i++){
1169                         std::string rvar = "_m"+int_to_string(i);
1170                         mvars.push_back(new colref_t(rvar.c_str(), sgah->select_list[merge_fieldpos]->name.c_str()));
1171                         mvars[i]->set_tablevar_ref(i);
1172                         fm.push_back(new tablevar_t(sources[i].c_str()));
1173                         fm[i]->set_range_var(rvar);
1174                 }
1175
1176                 param_tbl = new param_table();
1177                 std::vector<std::string> param_names = sgah->param_tbl->get_param_names();
1178                 int pi;
1179                 for(pi=0;pi<param_names.size();pi++){
1180                         data_type *dt = sgah->param_tbl->get_data_type(param_names[pi]);
1181                         param_tbl->add_param(param_names[pi],dt->duplicate(),
1182                                                         sgah->param_tbl->handle_access(param_names[pi]));
1183                 }
1184                 definitions = sgah->definitions;
1185
1186         }
1187
1188         qp_node *make_copy(std::string suffix){
1189                 mrg_qpn *ret = new mrg_qpn();
1190                 ret->slack = slack;
1191                 ret->disorder = disorder;
1192
1193                 ret->param_tbl = new param_table();
1194                 std::vector<std::string> param_names = param_tbl->get_param_names();
1195                 int pi;
1196                 for(pi=0;pi<param_names.size();pi++){
1197                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
1198                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1199                                                         param_tbl->handle_access(param_names[pi]));
1200                 }
1201                 ret->definitions = definitions;
1202
1203                 ret->node_name = node_name + suffix;
1204                 ret->table_layout = table_layout->make_shallow_copy(ret->node_name);
1205                 ret->merge_fieldpos = merge_fieldpos;
1206
1207                 return ret;
1208         };
1209
1210         std::vector<mrg_qpn *> split_sources();
1211
1212         // the following method is used for distributed query optimization
1213         double get_rate_estimate();
1214
1215
1216         // get partition definition for merges that combine streams partitioned over multiple interfaces
1217         // return NULL for regular merges
1218         partn_def_t* get_partn_definition(map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result) {
1219                 if (partn_def)
1220                         return partn_def;
1221
1222                 int err;
1223                 string err_str;
1224                 string partn_name;
1225
1226                 vector<tablevar_t *> input_tables = get_input_tbls();
1227                 for (int i = 0; i <  input_tables.size(); ++i) {
1228                         tablevar_t * table = input_tables[i];
1229
1230                         vector<string> partn_names = ifaces_db->get_iface_vals(table->get_machine(), table->get_interface(),"iface_partition",err,err_str);
1231                         if (partn_names.size() != 1)    // can't have more than one value of partition attribute
1232                                 return NULL;
1233                         string new_partn_name = partn_names[0];
1234
1235                         // need to make sure that all ifaces belong to the same partition
1236                         if (!i)
1237                                 partn_name = new_partn_name;
1238                         else if (new_partn_name != partn_name)
1239                                 return NULL;
1240                 }
1241
1242                 // now find partition definition corresponding to partn_name
1243                 partn_def = partn_parse_result->get_partn_def(partn_name);
1244                 return partn_def;
1245         };
1246
1247         void set_partn_definition(partn_def_t* def) {
1248                 partn_def = def;
1249         }
1250
1251         bool is_multihost_merge() {
1252
1253                 bool is_multihost = false;
1254
1255                 // each input table must be have machine attribute be non-empty
1256                 // and there should be at least 2 different values of machine attributes
1257                 vector<tablevar_t *> input_tables = get_input_tbls();
1258                 string host = input_tables[0]->get_machine();
1259                 for  (int i = 1; i < input_tables.size(); ++i) {
1260                         string new_host = input_tables[i]->get_machine();
1261                         if (new_host == "")
1262                                 return false;
1263                         if (new_host != host)
1264                                 is_multihost = true;
1265                 }
1266                 return is_multihost;
1267         }
1268
1269         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1270 };
1271
1272
1273 //              eq_temporal, hash join query plan node.
1274 //              represent the following query fragment
1275 //                      select scalar_expression_1, ..., scalar_expression_k
1276 //                      from T0 t0, T1 t1
1277 //                      where predicate
1278 //
1279 //              the predicates and the scalar expressions can reference
1280 //              attributes of t0 and t1 and also functions.
1281 //              The predicate must contain CNF elements to enable the
1282 //              efficient evaluation of the query.
1283 //              1) at least one predicate of the form
1284 //                      (temporal se in t0) = (temporal se in t1)
1285 //              2) at least one predicate of the form
1286 //                      (non-temporal se in t0) = (non-temporal se in t1)
1287 //
1288 class join_eq_hash_qpn: public qp_node{
1289 public:
1290         std::vector<tablevar_t *> from;                                 //      Source tables
1291         std::vector<select_element *> select_list;      //      Select list
1292         std::vector<cnf_elem *> prefilter[2];           // source prefilters
1293         std::vector<cnf_elem *> temporal_eq;            // define temporal window
1294         std::vector<cnf_elem *> hash_eq;                        // define hash key
1295         std::vector<cnf_elem *> postfilter;                     // final filter on hash matches.
1296
1297         std::vector<cnf_elem *> where;                          // all the filters
1298                                                                                                 // useful for summary analysis
1299
1300         std::vector<scalarexp_t *> hash_src_r, hash_src_l;
1301
1302         std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
1303         std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
1304
1305         std::string node_type(){return("join_eq_hash_qpn");     };
1306     bool makes_transform(){return true;};
1307         std::vector<std::string> external_libs(){
1308                 std::vector<std::string> ret;
1309                 return ret;
1310         }
1311
1312         void bind_to_schema(table_list *Schema);
1313         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
1314                 fprintf(stderr,"INTERNAL ERROR, calling join_eq_hash_qpn::get_colrefs\n");
1315                 exit(1);
1316         }
1317
1318         std::string to_query_string();
1319         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
1320         std::string generate_functor_name();
1321         std::string generate_operator(int i, std::string params);
1322         std::string get_include_file(){return("#include <join_eq_hash_operator.h>\n");};
1323
1324     std::vector<select_element *> get_select_list(){return select_list;};
1325     std::vector<scalarexp_t *> get_select_se_list(){
1326                 std::vector<scalarexp_t *> ret;
1327                 int i;
1328                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
1329                 return ret;
1330         };
1331 //                      Used for LFTA only
1332     std::vector<cnf_elem *> get_where_clause(){
1333                  std::vector<cnf_elem *> t;
1334                 return(t);
1335         };
1336     std::vector<cnf_elem *> get_filter_clause(){
1337                 return get_where_clause();
1338         }
1339
1340         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1341     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1342
1343         table_def *get_fields();
1344         std::vector<tablevar_t *> get_input_tbls();
1345         std::vector<tablevar_t *> get_output_tbls();
1346
1347         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
1348         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);
1349 //              Ensure that any refs to interface params have been split away.
1350         int count_ifp_refs(std::set<std::string> &ifpnames);
1351
1352         join_eq_hash_qpn(){
1353         };
1354         join_eq_hash_qpn(query_summary_class *qs,table_list *Schema){
1355                 int w;
1356 //                              Get the table name.
1357 //                              NOTE the colrefs have the table ref (an int)
1358 //                              embedded in them.  Would it make sense
1359 //                              to grab the whole table list?
1360                 from = qs->fta_tree->get_from()->get_table_list();
1361                 if(from.size() != 2){
1362                         char tmpstr[200];
1363                         sprintf(tmpstr,"ERROR building join_eq_hash node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
1364                         err_str = tmpstr;
1365                         error_code = 1;
1366                 }
1367
1368 //                              Get the select list.
1369                 select_list = qs->fta_tree->get_sl_vec();
1370
1371 //                              Get the selection predicate.
1372                 where = qs->wh_cnf;
1373                 for(w=0;w<where.size();++w){
1374                         analyze_cnf(where[w]);
1375                         std::vector<int> pred_tbls;
1376                         get_tablevar_ref_pr(where[w]->pr,pred_tbls);
1377 //                              Prefilter if refs only one tablevar
1378                         if(pred_tbls.size()==1){
1379                                 prefilter[pred_tbls[0]].push_back(where[w]);
1380                                 continue;
1381                         }
1382 //                              refs nothing -- might be sampling, do it as postfilter.
1383                         if(pred_tbls.size()==0){
1384                                 postfilter.push_back(where[w]);
1385                                 continue;
1386                         }
1387 //                              See if it can be a hash or temporal predicate.
1388 //                              NOTE: synchronize with the temporality checking
1389 //                              done at join_eq_hash_qpn::get_fields
1390                         if(where[w]->is_atom && where[w]->eq_pred){
1391                                 std::vector<int> sel_tbls, ser_tbls;
1392                                 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
1393                                 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
1394                                 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
1395 //                                              make channel 0 SE on LHS.
1396                                         if(sel_tbls[0] != 0)
1397                                                 where[w]->pr->swap_scalar_operands();
1398
1399                                         data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
1400                                         data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
1401                                         if( (dtl->is_increasing() && dtr->is_increasing()) ||
1402                                             (dtl->is_decreasing() && dtr->is_decreasing()) )
1403                                                         temporal_eq.push_back(where[w]);
1404                                         else
1405                                                         hash_eq.push_back(where[w]);
1406                                         continue;
1407
1408                                 }
1409                         }
1410 //                              All tests failed, fallback is postfilter.
1411                         postfilter.push_back(where[w]);
1412                 }
1413
1414                 if(temporal_eq.size()==0){
1415                         err_str = "ERROR in join query: can't find temporal equality predicate to define a join window.\n";
1416                         error_code = 1;
1417                 }
1418
1419 //                              Get the parameters
1420                 param_tbl = qs->param_tbl;
1421
1422         };
1423
1424         // the following method is used for distributed query optimization
1425         double get_rate_estimate();
1426
1427
1428         qp_node* make_copy(std::string suffix){
1429                 join_eq_hash_qpn *ret = new join_eq_hash_qpn();
1430
1431                 ret->param_tbl = new param_table();
1432                 std::vector<std::string> param_names = param_tbl->get_param_names();
1433                 int pi;
1434                 for(pi=0;pi<param_names.size();pi++){
1435                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
1436                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1437                                                         param_tbl->handle_access(param_names[pi]));
1438                 }
1439                 ret->definitions = definitions;
1440
1441                 ret->node_name = node_name + suffix;
1442
1443                 // make shallow copy of all fields
1444                 ret->where = where;
1445                 ret->from = from;
1446                 ret->select_list = select_list;
1447                 ret->prefilter[0] = prefilter[0];
1448                 ret->prefilter[1] = prefilter[1];
1449                 ret->postfilter = postfilter;
1450                 ret->temporal_eq = temporal_eq;
1451                 ret->hash_eq = hash_eq;
1452
1453                 return ret;
1454         };
1455         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1456
1457 };
1458
1459
1460 // ---------------------------------------------
1461 //              eq_temporal, hash join query plan node.
1462 //              represent the following query fragment
1463 //                      select scalar_expression_1, ..., scalar_expression_k
1464 //                      FILTER_JOIN(col, range) from T0 t0, T1 t1
1465 //                      where predicate
1466 //
1467 //              t0 is the output range variable, t1 is the filtering range
1468 //              variable.  Both must alias a PROTOCOL.
1469 //              The scalar expressions in the select clause may
1470 //              reference t0 only.
1471 //              The predicates are classified as follows
1472 //              prefilter predicates:
1473 //                a cheap predicate in t0 such that there is an equivalent
1474 //                predicate in t1.  Cost decisions about pushing to
1475 //                lfta prefilter made later.
1476 //              t0 predicates (other than prefilter predicates)
1477 //                      -- cheap vs. expensive sorted out at genereate time,
1478 //                              the constructor isn't called with the function list.
1479 //              t1 predicates (other than prefiler predicates).
1480 //              equi-join predicates of the form:
1481 //                      (se in t0) = (se in t1)
1482 //
1483 //              There must be at least one equi-join predicate.
1484 //              No join predicates other than equi-join predicates
1485 //                are allowed.
1486 //              Warn on temporal equi-join predicates.
1487 //              t1 predicates should not be expensive ... warn?
1488 //
1489 class filter_join_qpn: public qp_node{
1490 public:
1491         std::vector<tablevar_t *> from;                                 //      Source tables
1492                 colref_t *temporal_var;                 // join window in FROM
1493                 unsigned int temporal_range;    // metadata.
1494         std::vector<select_element *> select_list;      //      Select list
1495         std::vector<cnf_elem *> shared_pred;            // prefilter preds
1496         std::vector<cnf_elem *> pred_t0;                        // main (R) preds
1497         std::vector<cnf_elem *> pred_t1;                        // filtering (S) preds
1498         std::vector<cnf_elem *> hash_eq;                        // define hash key
1499         std::vector<cnf_elem *> postfilter;                     // ref's no table.
1500
1501         std::vector<cnf_elem *> where;                          // all the filters
1502                                                                                                 // useful for summary analysis
1503
1504         std::vector<scalarexp_t *> hash_src_r, hash_src_l;
1505         std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
1506         std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
1507
1508
1509         bool use_bloom;                 // true => bloom filter, false => limited hash
1510
1511         std::string node_type(){return("filter_join");  };
1512     bool makes_transform(){return true;};
1513         std::vector<std::string> external_libs(){
1514                 std::vector<std::string> ret;
1515                 return ret;
1516         }
1517
1518         void bind_to_schema(table_list *Schema);
1519         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
1520
1521         std::string to_query_string();
1522         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform){
1523                 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor called\n");
1524                 exit(1);
1525         }
1526         std::string generate_functor_name(){
1527                 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor_name called\n");
1528                 exit(1);
1529         }
1530         std::string generate_operator(int i, std::string params){
1531                 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_operator called\n");
1532                 exit(1);
1533         }
1534         std::string get_include_file(){return("#include <join_eq_hash_operator.h>\n");};
1535
1536     std::vector<select_element *> get_select_list(){return select_list;};
1537     std::vector<scalarexp_t *> get_select_se_list(){
1538                 std::vector<scalarexp_t *> ret;
1539                 int i;
1540                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
1541                 return ret;
1542         };
1543 //                      Used for LFTA only
1544     std::vector<cnf_elem *> get_where_clause(){return where;}
1545     std::vector<cnf_elem *> get_filter_clause(){return shared_pred;}
1546
1547         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1548     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1549
1550         table_def *get_fields();
1551         std::vector<tablevar_t *> get_input_tbls();
1552         std::vector<tablevar_t *> get_output_tbls();
1553
1554         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
1555         int resolve_if_params(ifq_t *ifdb, std::string &err);
1556
1557         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);
1558 //              Ensure that any refs to interface params have been split away.
1559         int count_ifp_refs(std::set<std::string> &ifpnames);
1560
1561
1562         filter_join_qpn(){
1563         };
1564         filter_join_qpn(query_summary_class *qs,table_list *Schema){
1565                 int i,w;
1566 //                              Get the table name.
1567 //                              NOTE the colrefs have the table ref (an int)
1568 //                              embedded in them.  Would it make sense
1569 //                              to grab the whole table list?
1570                 from = qs->fta_tree->get_from()->get_table_list();
1571                 temporal_var = qs->fta_tree->get_from()->get_colref();
1572                 temporal_range = qs->fta_tree->get_from()->get_temporal_range();
1573                 if(from.size() != 2){
1574                         char tmpstr[200];
1575                         sprintf(tmpstr,"ERROR building filter_join_qpn node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
1576                         err_str += tmpstr;
1577                         error_code = 1;
1578                 }
1579
1580 //                              Get the select list.
1581                 select_list = qs->fta_tree->get_sl_vec();
1582 //                              Verify that only t0 is referenced.
1583                 bool bad_ref = false;
1584                 for(i=0;i<select_list.size();i++){
1585                         vector<int> sel_tbls;
1586                         get_tablevar_ref_se(select_list[i]->se,sel_tbls);
1587                         if((sel_tbls.size() == 2) || (sel_tbls.size()==1 && sel_tbls[0]==1))
1588                                 bad_ref = true;
1589                 }
1590                 if(bad_ref){
1591                         err_str += "ERROR building filter_join_qpn node: query references range variable "+from[1]->variable_name+", but only the first range variable ("+from[0]->variable_name+" can be referenced.\n";
1592                         error_code = 1;
1593                 }
1594
1595
1596 //                              Get the selection predicate.
1597                 where = qs->wh_cnf;
1598                 std::vector<cnf_elem *> t0_only, t1_only;
1599                 for(w=0;w<where.size();++w){
1600                         analyze_cnf(where[w]);
1601                         std::vector<int> pred_tbls;
1602                         get_tablevar_ref_pr(where[w]->pr,pred_tbls);
1603 //                              Collect the list of preds by src var,
1604 //                              extract the shared preds later.
1605                         if(pred_tbls.size()==1){
1606                                 if(pred_tbls[0] == 0){
1607                                         t0_only.push_back(where[w]);
1608                                 }else{
1609                                         t1_only.push_back(where[w]);
1610                                 }
1611                                 continue;
1612                         }
1613 //                              refs nothing -- might be sampling, do it as postfilter.
1614                         if(pred_tbls.size()==0){
1615                                 postfilter.push_back(where[w]);
1616                                 continue;
1617                         }
1618 //                              See if it can be a hash or temporal predicate.
1619 //                              NOTE: synchronize with the temporality checking
1620 //                              done at join_eq_hash_qpn::get_fields
1621                         if(where[w]->is_atom && where[w]->eq_pred){
1622                                 std::vector<int> sel_tbls, ser_tbls;
1623                                 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
1624                                 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
1625                                 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
1626 //                                              make channel 0 SE on LHS.
1627                                         if(sel_tbls[0] != 0)
1628                                                 where[w]->pr->swap_scalar_operands();
1629
1630                                         hash_eq.push_back(where[w]);
1631
1632                                         data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
1633                                         data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
1634                                         if( (dtl->is_increasing() && dtr->is_increasing()) ||
1635                                             (dtl->is_decreasing() && dtr->is_decreasing()) )
1636                                                 err_str += "Warning, a filter join should not have join predicates on temporal fields.\n";
1637                                         continue;
1638
1639                                 }
1640                         }
1641 //                              All tests failed, fallback is postfilter.
1642                         err_str += "ERROR, join predicates in a filter join should have the form (scalar expression in "+from[0]->variable_name+") = (scalar expression in "+from[1]->variable_name+").\n";
1643                         error_code = 3;
1644                 }
1645 //              Classify the t0_only and t1_only preds.
1646                 set<int> matched_pred;
1647                 int v;
1648                 for(w=0;w<t0_only.size();w++){
1649                         for(v=0;v<t1_only.size();++v)
1650                                 if(is_equivalent_pred_base(t0_only[w]->pr,t1_only[v]->pr,Schema))
1651                                         break;
1652                         if(v<t1_only.size()){
1653                                 shared_pred.push_back(t0_only[w]);
1654                                 matched_pred.insert(v);
1655                         }else{
1656                                 pred_t0.push_back(t0_only[w]);
1657                         }
1658                 }
1659                 for(v=0;v<t1_only.size();++v){
1660                         if(matched_pred.count(v) == 0)
1661                                 pred_t1.push_back(t1_only[v]);
1662                 }
1663
1664
1665 //                              Get the parameters
1666                 param_tbl = qs->param_tbl;
1667                 definitions = qs->definitions;
1668
1669 //                              Determine the algorithm
1670                 if(this->get_val_of_def("algorithm") == "hash"){
1671                         use_bloom = false;
1672                 }else{
1673                         use_bloom = true;
1674                 }
1675         };
1676
1677         // the following method is used for distributed query optimization
1678         double get_rate_estimate();
1679
1680
1681         qp_node* make_copy(std::string suffix){
1682                 filter_join_qpn *ret = new filter_join_qpn();
1683
1684                 ret->param_tbl = new param_table();
1685                 std::vector<std::string> param_names = param_tbl->get_param_names();
1686                 int pi;
1687                 for(pi=0;pi<param_names.size();pi++){
1688                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
1689                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1690                                                         param_tbl->handle_access(param_names[pi]));
1691                 }
1692                 ret->definitions = definitions;
1693
1694                 ret->node_name = node_name + suffix;
1695
1696                 // make shallow copy of all fields
1697                 ret->where = where;
1698                 ret->from = from;
1699                 ret->temporal_range = temporal_range;
1700                 ret->temporal_var = temporal_var;
1701                 ret->select_list = select_list;
1702                 ret->shared_pred = shared_pred;
1703                 ret->pred_t0 = pred_t0;
1704                 ret->pred_t1 = pred_t1;
1705                 ret->postfilter = postfilter;
1706                 ret->hash_eq = hash_eq;
1707
1708                 return ret;
1709         };
1710         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1711
1712 };
1713
1714
1715 enum output_file_type_enum {regular, gzip, bzip};
1716
1717 class output_file_qpn: public qp_node{
1718 public:
1719         std::string source_op_name;                                     //      Source table
1720         std::vector<field_entry *> fields;
1721         ospec_str *output_spec;
1722         vector<tablevar_t *> fm;
1723         std::string hfta_query_name;
1724         std::string filestream_id;
1725         bool eat_input;
1726         std::vector<std::string> params;
1727         bool do_gzip;
1728         output_file_type_enum compression_type;
1729
1730         int n_streams;          // Number of output streams
1731         int n_hfta_clones;      // number of hfta clones
1732         int parallel_idx;       // which close this produces output for.
1733         std::vector<int> hash_flds;     // fields used to hash the output.
1734
1735         std::string node_type(){return("output_file_qpn");      };
1736     bool makes_transform(){return false;};
1737         std::vector<std::string> external_libs(){
1738                 std::vector<std::string> ret;
1739                 switch(compression_type){
1740                 case gzip:
1741                         ret.push_back("-lz");
1742                 break;
1743                 case bzip:
1744                         ret.push_back("-lbz2");
1745                 break;
1746                 default:
1747                 break;
1748                 }
1749                 return ret;
1750         }
1751
1752         void bind_to_schema(table_list *Schema){}
1753         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
1754                 col_id_set ret;
1755                 return ret;
1756         }
1757
1758         std::string to_query_string(){return "// output_file_operator \n";}
1759         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
1760         std::string generate_functor_name();
1761         std::string generate_operator(int i, std::string params);
1762         std::string get_include_file(){
1763                 switch(compression_type){
1764                 case gzip:
1765                         return("#include <zfile_output_operator.h>\n");
1766                 default:
1767                         return("#include <file_output_operator.h>\n");
1768                 }
1769                 return("#include <file_output_operator.h>\n");
1770         };
1771
1772     std::vector<cnf_elem *> get_where_clause(){std::vector<cnf_elem *> ret; return ret;};
1773     std::vector<cnf_elem *> get_filter_clause(){std::vector<cnf_elem *> ret; return ret;};
1774         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){cplx_lit_table *ret = new cplx_lit_table(); return ret;}
1775     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns){std::vector<handle_param_tbl_entry *> ret; return ret;}
1776
1777         table_def *get_fields(){
1778                 field_entry_list *fel = new field_entry_list();
1779                 int i;
1780                 for(i=0;i<fields.size();++i)
1781                         fel->append_field(fields[i]);
1782                 return new table_def(node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1783         }
1784         std::vector<tablevar_t *> get_input_tbls();
1785         std::vector<tablevar_t *> get_output_tbls();
1786
1787         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
1788                 std::vector<qp_node *> ret; ret.push_back(this); hfta_returned = true; return ret;
1789         }
1790         std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm){
1791                 std::vector<table_exp_t *> ret; return ret;
1792         }
1793 //              Ensure that any refs to interface params have been split away.
1794         int count_ifp_refs(std::set<std::string> &ifpnames){return 0;}
1795         int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;};
1796
1797
1798         output_file_qpn(std::string src_op, std::string qn, std::string fs_id, table_def *src_tbl_def, ospec_str *ospec, bool ei){
1799                 source_op_name = src_op;
1800                 node_name = source_op_name + "_output";
1801                 filestream_id = fs_id;
1802                 fields = src_tbl_def->get_fields();
1803                 output_spec = ospec;
1804                 fm.push_back(new tablevar_t(source_op_name.c_str()));
1805                 hfta_query_name = qn;
1806                 eat_input = ei;
1807
1808                 do_gzip = false;
1809                 compression_type = regular;
1810                 if(ospec->operator_type == "zfile")
1811                         compression_type = gzip;
1812
1813                 n_streams = 1;
1814                 parallel_idx = 0;
1815                 n_hfta_clones = 1;
1816
1817                 char buf[1000];
1818                 strncpy(buf, output_spec->operator_param.c_str(),1000);
1819                 buf[999] = '\0';
1820                 char *words[100];
1821                 int nwords = split_string(buf, ':', words,100);
1822                 int i;
1823                 for(i=0;i<nwords;i++){
1824                         params.push_back(words[i]);
1825                 }
1826                 for(i=0;i<params.size();i++){
1827                         if(params[i] == "gzip")
1828                                 do_gzip = true;
1829                 }
1830         }
1831
1832 //              Set output splitting parameters
1833         bool set_splitting_params(int np, int ix, int ns, std::string split_flds, std::string &err_report){
1834                 n_streams = ns;
1835                 n_hfta_clones = np;
1836                 parallel_idx = ix;
1837
1838                 if(split_flds != ""){
1839                         string err_flds = "";
1840                         char *tmpstr = strdup(split_flds.c_str());
1841                         char *words[100];
1842                         int nwords = split_string(tmpstr,':',words,100);
1843                         int i,j;
1844                         for(i=0;i<nwords;++i){
1845                                 string target = words[i];
1846                                 for(j=0;j<fields.size();++j){
1847                                         if(fields[j]->get_name() == target){
1848                                                 hash_flds.push_back(j);
1849                                                 break;
1850                                         }
1851                                 }
1852                                 if(j==fields.size()){
1853                                         err_flds += " "+target;
1854                                 }
1855                         }
1856                         if(err_flds != ""){
1857                                 err_report += "ERROR in "+hfta_query_name+", a file output operator needs to split the output but these splitting fileds are not part of the output:"+err_flds;
1858                                 return true;
1859                         }
1860                 }
1861                 return false;
1862         }
1863
1864         // the following method is used for distributed query optimization
1865         double get_rate_estimate(){return 1.0;}
1866
1867
1868         qp_node* make_copy(std::string suffix){
1869 //              output_file_qpn *ret = new output_file_qpn();
1870                 output_file_qpn *ret = new output_file_qpn(source_op_name, hfta_query_name, filestream_id, this->get_fields(), output_spec, eat_input);
1871                 return ret;
1872         }
1873
1874         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){}
1875
1876 };
1877
1878
1879
1880 //
1881
1882 // ---------------------------------------------
1883
1884
1885 //              Select, group-by, aggregate, sampling.
1886 //              Representing
1887 //                      Select SE_1, ..., SE_k
1888 //                      From T
1889 //                      Where predicate
1890 //                      Group By gb1, ..., gb_n
1891 //                      [Subgroup gb_i1, .., gb_ik]
1892 //                      Cleaning_when  predicate
1893 //                      Cleaning_by predicate
1894 //                      Having predicate
1895 //
1896 //              For now, must have group-by variables and aggregates.
1897 //              The scalar expressions which are output must be a function
1898 //              of the groub-by variables and the aggregates.
1899 //              The group-by variables can be references to columsn of T,
1900 //              or they can be scalar expressions.
1901 class sgahcwcb_qpn: public qp_node{
1902 public:
1903         tablevar_t *table_name;                         // source table
1904         std::vector<cnf_elem *> where;          // selection predicate
1905         std::vector<cnf_elem *> having;         // post-aggregation predicate
1906         std::vector<select_element *> select_list;      // se's of output
1907         gb_table gb_tbl;                        // Table of all group-by attributes.
1908         std::set<int> sg_tbl;           // Names of the superGB attributes
1909         aggregate_table aggr_tbl;       // Table of all referenced aggregates.
1910         std::set<std::string> states_refd;      // states ref'd by stateful fcns.
1911         std::vector<cnf_elem *> cleanby;
1912         std::vector<cnf_elem *> cleanwhen;
1913
1914         std::vector<scalarexp_t *> gb_sources;  // pre-compute for partitioning.
1915
1916         std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
1917
1918         std::string node_type(){return("sgahcwcb_qpn"); };
1919     bool makes_transform(){return true;};
1920         std::vector<std::string> external_libs(){
1921                 std::vector<std::string> ret;
1922                 return ret;
1923         }
1924
1925         void bind_to_schema(table_list *Schema);
1926         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
1927                 fprintf(stderr,"INTERNAL ERROR, calling sgahcwcb_qpn::get_colrefs\n");
1928                 exit(1);
1929         }
1930
1931         std::string to_query_string();
1932         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
1933         std::string generate_functor_name();
1934
1935         std::string generate_operator(int i, std::string params);
1936         std::string get_include_file(){return("#include <clean_operator.h>\n");};
1937
1938     std::vector<select_element *> get_select_list(){return select_list;};
1939     std::vector<scalarexp_t *> get_select_se_list(){
1940                 std::vector<scalarexp_t *> ret;
1941                 int i;
1942                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
1943                 return ret;
1944         };
1945     std::vector<cnf_elem *> get_where_clause(){return where;};
1946     std::vector<cnf_elem *> get_filter_clause(){return where;};
1947     std::vector<cnf_elem *> get_having_clause(){return having;};
1948     gb_table *get_gb_tbl(){return &gb_tbl;};
1949     aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
1950         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1951     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1952
1953 //                              table which represents output tuple.
1954         table_def *get_fields();
1955         std::vector<tablevar_t *> get_input_tbls();
1956         std::vector<tablevar_t *> get_output_tbls();
1957
1958
1959         sgahcwcb_qpn(){
1960         };
1961         sgahcwcb_qpn(query_summary_class *qs,table_list *Schema){
1962 //                              Get the table name.
1963 //                              NOTE the colrefs have the tablevar ref (an int)
1964 //                              embedded in them.  Would it make sense
1965 //                              to grab the whole table list?
1966                 tablevar_list_t *fm = qs->fta_tree->get_from();
1967                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
1968                 if(tbl_vec.size() != 1){
1969                         char tmpstr[200];
1970                         sprintf(tmpstr,"INTERNAL ERROR building SGAHCWCB node: query defined over %lu tables.\n",tbl_vec.size() );
1971                         err_str=tmpstr;
1972                         error_code = 1;
1973                 }
1974                 table_name = (tbl_vec[0]);
1975
1976 //                              Get the select list.
1977                 select_list = qs->fta_tree->get_sl_vec();
1978
1979 //                              Get the selection and having predicates.
1980                 where = qs->wh_cnf;
1981                 having = qs->hav_cnf;
1982                 cleanby = qs->cb_cnf;
1983                 cleanwhen = qs->cw_cnf;
1984
1985 //                              Build a new GB var table (don't share, might need to modify)
1986                 int g;
1987                 for(g=0;g<qs->gb_tbl->size();g++){
1988                         gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
1989                                 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
1990                                 qs->gb_tbl->get_reftype(g)
1991                         );
1992                 }
1993
1994 //                              Build a new aggregate table. (don't share, might need
1995 //                              to modify).
1996                 int a;
1997                 for(a=0;a<qs->aggr_tbl->size();a++){
1998                         aggr_tbl.add_aggr(
1999 //                              qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
2000                                 qs->aggr_tbl->duplicate(a)
2001                         );
2002                 }
2003
2004                 sg_tbl = qs->sg_tbl;
2005                 states_refd = qs->states_refd;
2006
2007
2008 //                              Get the parameters
2009                 param_tbl = qs->param_tbl;
2010
2011         };
2012
2013
2014
2015         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
2016         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
2017 //              Ensure that any refs to interface params have been split away.
2018 //                      CURRENTLY not allowed by split_node_for_fta
2019         int count_ifp_refs(std::set<std::string> &ifpnames){return 0;}
2020         int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;}
2021
2022         // the following method is used for distributed query optimization
2023         double get_rate_estimate();
2024
2025         qp_node* make_copy(std::string suffix){
2026                 sgahcwcb_qpn *ret = new sgahcwcb_qpn();
2027
2028                 ret->param_tbl = new param_table();
2029                 std::vector<std::string> param_names = param_tbl->get_param_names();
2030                 int pi;
2031                 for(pi=0;pi<param_names.size();pi++){
2032                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
2033                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
2034                                                         param_tbl->handle_access(param_names[pi]));
2035                 }
2036                 ret->definitions = definitions;
2037
2038                 ret->node_name = node_name + suffix;
2039
2040                 // make shallow copy of all fields
2041                 ret->where = where;
2042                 ret->having = having;
2043                 ret->select_list = select_list;
2044                 ret->gb_tbl = gb_tbl;
2045                 ret->aggr_tbl = aggr_tbl;
2046                 ret->sg_tbl = sg_tbl;
2047                 ret->states_refd = states_refd;
2048                 ret->cleanby = cleanby;
2049                 ret->cleanwhen = cleanwhen;
2050
2051                 return ret;
2052         };
2053
2054         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
2055 };
2056
2057
2058 std::vector<qp_node *> create_query_nodes(query_summary_class *qs,table_list *Schema);
2059
2060
2061
2062 void untaboo(string &s);
2063
2064 table_def *create_attributes(string tname, vector<select_element *> &select_list);
2065
2066
2067
2068 #endif