Improvements to aggregation code and fucntion library
[com/gs-lite.git] / src / ftacmp / query_plan.h
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15 #ifndef __QUERY_PLAN_H__
16 #define __QUERY_PLAN_H__
17
18 #include<vector>
19 #include<string>
20 #include<map>
21 using namespace std;
22
23 #include"analyze_fta.h"
24 #include"iface_q.h"
25 #include"parse_partn.h"
26 #include"generate_utils.h"
27
28 //              Identify the format of the input, output streams.
29 #define UNKNOWNFORMAT 0
30 #define NETFORMAT 1
31 #define HOSTFORMAT 2
32
33 ///////////////////////////////////////////////////
34 //      representation of an output operator specification
35
36 struct ospec_str{
37         string query;
38         string operator_type;
39         string operator_param;
40         string output_directory;
41         int bucketwidth;
42         string partitioning_flds;
43         int n_partitions;
44 };
45
46
47 ////////////////////////////////////////////////////
48 //      Input representation of a query
49
50 struct query_node{
51         int idx;
52         std::set<int> reads_from;
53         std::set<int> sources_to;
54         std::vector<std::string> refd_tbls;
55         std::vector<var_pair_t *> params;
56         std::string name;
57         std::string file;
58         std::string mangler;            // for UDOPs
59         bool touched;
60         table_exp_t *parse_tree;
61         int n_consumers;
62         bool is_udop;
63         bool is_externally_visible;
64         bool inferred_visible_node;
65
66         set<int> subtree_roots;
67
68         query_node(){
69                 idx = -1;
70                 touched = false;
71                 parse_tree = NULL;
72                 n_consumers = 0;
73                 is_externally_visible = false;
74                 inferred_visible_node = false;
75                 mangler="";
76         };
77         query_node(int i, std::string qnm, std::string flnm, table_exp_t *pt){
78                 idx = i;
79                 touched = false;
80                 name = qnm;
81                 file = flnm;
82                 parse_tree = pt;
83                 n_consumers = 0;
84                 is_udop = false;
85                 is_externally_visible = pt->get_visible();
86                 inferred_visible_node = false;
87                 mangler="";
88
89                 tablevar_list_t *fm = parse_tree->get_from();
90                 if(fm!=NULL){
91                         refd_tbls =  fm->get_table_names();
92                 }
93
94                 params  = pt->query_params;
95         };
96         query_node(int ix, std::string udop_name,table_list *Schema){
97                 idx = ix;
98                 touched = false;
99                 name = udop_name;
100                 file = udop_name;
101                 parse_tree = NULL;
102                 n_consumers = 0;
103                 is_udop = true;
104                 is_externally_visible = true;
105                 inferred_visible_node = false;
106                 mangler="";
107
108                 int sid = Schema->find_tbl(udop_name);
109                 std::vector<subquery_spec *> subq = Schema->get_subqueryspecs(sid);
110                 int i;
111                 for(i=0;i<subq.size();++i){
112                         refd_tbls.push_back(subq[i]->name);
113                 }
114         };
115 };
116
117 struct hfta_node{
118         std::string name;
119         std::string source_name;
120         std::vector<int> query_node_indices;
121         std::set<int> reads_from;
122         std::set<int> sources_to;
123         bool is_udop;
124         bool inferred_visible_node;
125         int n_parallel;
126         int parallel_idx;
127         bool do_generation;     // false means, ignore it.
128
129         hfta_node(){
130                 is_udop = false;
131                 inferred_visible_node = false;
132                 n_parallel = 1;
133                 parallel_idx = 0;
134                 do_generation = true;
135         }
136 };
137
138
139
140
141
142
143 #define SPX_QUERY 1
144 #define SGAH_QUERY 2
145
146 // the following selectivity estimates are used by our primitive rate estimators
147 #define SPX_SELECTIVITY 1.0
148 #define SGAH_SELECTIVITY 0.1
149 #define RSGAH_SELECTIVITY 0.1
150 #define SGAHCWCB_SELECTIVITY 0.1
151 #define MRG_SELECTIVITY 1.0
152 #define JOIN_EQ_HASH_SELECTIVITY 1.0
153
154 // the the output rate of the interface is not given we are going to use
155 // this default value
156 #define DEFAULT_INTERFACE_RATE 100
157
158
159 //                      Define query plan nodes
160 //                      These nodes are intended for query modeling
161 //                      and transformation rather than for code generation.
162
163
164 //                      Query plan node base class.
165 //                      It has an ID, can return its type,
166 //                      and can be linked into lists with the predecessors
167 //                      and successors.
168 //                      To add : serialize, unserialize?
169
170 class qp_node{
171 public:
172   int id;
173   std::vector<int> predecessors;
174   std::vector<int> successors;
175   std::string node_name;
176
177 //              For error reporting without exiting the program.
178   int error_code;
179   std::string err_str;
180
181 //                      These should be moved to the containing stream_query object.
182   std::map<std::string, std::string> definitions;
183   param_table *param_tbl;
184
185 //              The value of a field in terms of protocol fields (if any).
186   std::map<std::string, scalarexp_t *> protocol_map;
187
188   qp_node(){
189         error_code = 0;
190         id = -1;
191         param_tbl = new param_table();
192   };
193   qp_node(int i){
194         error_code = 0;
195         id = i;
196         param_tbl = new param_table();
197   };
198
199   int get_id(){return(id);};
200   void set_id(int i){id = i;    };
201
202   int get_error_code(){return error_code;};
203   std::string get_error_str(){return err_str;};
204
205   virtual std::string node_type() = 0;
206
207 //              For code generation, does the operator xform its input.
208   virtual bool makes_transform() = 0;
209
210 //              For linking, what external libraries does the operator depend on?
211   virtual std::vector<std::string> external_libs() = 0;
212
213   void set_node_name(std::string n){node_name = n;};
214   std::string get_node_name(){return node_name;};
215
216   void set_definitions(std::map<std::string, std::string> &def){
217           definitions = def;
218   };
219   std::map<std::string, std::string> get_definitions(){return definitions;};
220
221
222 //              call to create the mapping from field name to se in protocol fields.
223 //              Pass in qp_node of data sources, in order.
224   virtual void create_protocol_se(std::vector<qp_node *> q_sources,table_list *Schema)=0;
225 //              get the protocol map.  the parameter is the return value.
226   std::map<std::string, scalarexp_t *> *get_protocol_se(){return &protocol_map;}
227
228 //              Each qp node must be able to return a description
229 //              of the tuples it creates.
230 //              TODO: the get_output_tbls method should subsume the get_fields
231 //                      method, but in fact it really just returns the
232 //                      operator name.
233   virtual table_def *get_fields() = 0;  // Should be vector?
234 //              get keys from the operator.  Currently, only works on group-by queries.
235 //              partial_keys set to true if there is a suspicion that the list is partial.
236         virtual std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys) = 0;
237 //              Get the from clause
238   virtual std::vector<tablevar_t *> get_input_tbls() = 0;
239 //              this is a confused function, it acutally return the output
240 //              table name.
241   virtual std::vector<tablevar_t *> get_output_tbls() = 0;
242
243   std::string get_val_of_def(std::string def){
244         if(definitions.count(def) > 0) return definitions[def];
245         return("");
246   };
247   void set_definition(std::string def, std::string val){
248         definitions[def]=val;
249   }
250
251 //              Associate colrefs in SEs with tables
252 //              at code generation time.
253   virtual void bind_to_schema(table_list *Schema) = 0;
254
255 //              Get colrefs of the operator, currently only meaningful for lfta
256 //              operators, and only interested in colrefs with extraction fcns
257   virtual col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema)=0;
258
259   virtual std::string to_query_string() = 0;
260   virtual std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform) = 0;
261   virtual std::string generate_functor_name() = 0;
262
263   virtual std::string generate_operator(int i, std::string params) = 0;
264   virtual std::string get_include_file() = 0;
265
266   virtual cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns) = 0;
267   virtual std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns) = 0;
268
269 //              Split this node into LFTA and HFTA nodes.
270 //              Four possible outcomes:
271 //              1) the qp_node reads from a protocol, but does not need to
272 //                      split (can be evaluated as an LFTA).
273 //                      The lfta node is the only element in the return vector,
274 //                      and hfta_returned is false.
275 //              2) the qp_node reads from no protocol, and therefore cannot be split.
276 //                      THe hfta node is the only element in the return vector,
277 //                      and hfta_returned is true.
278 //              3) reads from at least one protocol, but cannot be split : failure.
279 //                      return vector is empty, the error conditions are written
280 //                      in the qp_node.
281 //              4) The qp_node splits into an hfta node and one or more LFTA nodes.
282 //                      the return vector has two or more elements, and hfta_returned
283 //                      is true.  The last element is the HFTA.
284         virtual std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx) = 0;
285
286
287 //              Ensure that any refs to interface params have been split away.
288         virtual int count_ifp_refs(std::set<std::string> &ifpnames)=0;
289
290
291
292 //              Tag the data sources which are views,
293 //              return the (optimized) source queries and
294 //              record the view access in opview_set
295         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm) = 0;
296
297   param_table *get_param_tbl(){return param_tbl;};
298
299 //                      The "where" clause is a pre-filter
300   virtual  std::vector<cnf_elem *> get_where_clause() = 0;
301 //                      To be more explicit, use get_filter_preds, this is used to compute the prefilter
302   virtual  std::vector<cnf_elem *> get_filter_clause() = 0;
303
304 //              Add an extra predicate.  Currently only used for LFTAs.
305   virtual void append_to_where(cnf_elem *c) = 0;
306
307   void add_predecessor(int i){predecessors.push_back(i);};
308   void remove_predecessor(int i){
309         std::vector<int>::iterator vi;
310         for(vi=predecessors.begin(); vi!=predecessors.end();++vi){
311                 if((*vi) == i){
312                         predecessors.erase(vi);
313                         return;
314                 }
315         }
316   };
317   void add_successor(int i){successors.push_back(i);};
318   std::vector<int> get_predecessors(){return predecessors;};
319   int n_predecessors(){return predecessors.size();};
320   std::vector<int> get_successors(){return successors;};
321   int n_successors(){return successors.size();};
322   void clear_predecessors(){predecessors.clear();};
323   void clear_successors(){successors.clear();};
324
325   // the following method is used for distributed query optimization
326   double get_rate_estimate();
327
328
329   // used for cloning query nodes
330   virtual qp_node* make_copy(std::string suffix) = 0;
331 };
332
333
334
335 //              Select, project, transform (xform) query plan node.
336 //              represent the following query fragment
337 //                      select scalar_expression_1, ..., scalar_expression_k
338 //                      from S
339 //                      where predicate
340 //
341 //              the predicates and the scalar expressions can reference
342 //              attributes of S and also functions.
343 class spx_qpn: public qp_node{
344 public:
345         tablevar_t *table_name;                                 //      Source table
346         std::vector<cnf_elem *> where;                  // selection predicate
347         std::vector<select_element *> select_list;      //      Select list
348
349
350
351         std::string node_type(){return("spx_qpn");      };
352     bool makes_transform(){return true;};
353         std::vector<std::string> external_libs(){
354                 std::vector<std::string> ret;
355                 return ret;
356         }
357
358   void append_to_where(cnf_elem *c){
359                 where.push_back(c);
360         }
361
362
363         void bind_to_schema(table_list *Schema);
364         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
365
366         std::string to_query_string();
367         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
368         std::string generate_functor_name();
369         std::string generate_operator(int i, std::string params);
370         std::string get_include_file(){return("#include <selection_operator.h>\n");};
371
372     std::vector<select_element *> get_select_list(){return select_list;};
373     std::vector<scalarexp_t *> get_select_se_list(){
374                 std::vector<scalarexp_t *> ret;
375                 int i;
376                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
377                 return ret;
378         };
379     std::vector<cnf_elem *> get_where_clause(){return where;};
380     std::vector<cnf_elem *> get_filter_clause(){return where;};
381         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
382     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
383
384         table_def *get_fields();
385         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
386                 std::vector<string> ret;
387                 return ret;
388         }
389
390         std::vector<tablevar_t *> get_input_tbls();
391         std::vector<tablevar_t *> get_output_tbls();
392
393         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
394         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
395 //              Ensure that any refs to interface params have been split away.
396         int count_ifp_refs(std::set<std::string> &ifpnames);
397         int resolve_if_params(ifq_t *ifdb, std::string &err);
398
399         spx_qpn(){
400         };
401         spx_qpn(query_summary_class *qs,table_list *Schema){
402 //                              Get the table name.
403 //                              NOTE the colrefs have the table ref (an int)
404 //                              embedded in them.  Would it make sense
405 //                              to grab the whole table list?
406                 tablevar_list_t *fm = qs->fta_tree->get_from();
407                 
408                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
409                 if(tbl_vec.size() != 1){
410                         char tmpstr[200];
411                         sprintf(tmpstr,"INTERNAL ERROR building SPX node: query defined over %lu tables.\n",tbl_vec.size() );
412                         err_str = tmpstr;
413                         error_code = 1;
414                 }
415                 table_name = (tbl_vec[0]);
416
417                 int t = tbl_vec[0]->get_schema_ref();
418                 if(! Schema->is_stream(t)){
419                         err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
420                         error_code = 1;
421                 }
422
423 //                              Get the select list.
424                 select_list = qs->fta_tree->get_sl_vec();
425
426 //                              Get the selection predicate.
427                 where = qs->wh_cnf;
428
429
430 //                              Get the parameters
431                 param_tbl = qs->param_tbl;
432
433
434
435         };
436
437         // the following method is used for distributed query optimization
438         double get_rate_estimate();
439
440
441         qp_node* make_copy(std::string suffix){
442                 spx_qpn *ret = new spx_qpn();
443
444                 ret->param_tbl = new param_table();
445                 std::vector<std::string> param_names = param_tbl->get_param_names();
446                 int pi;
447                 for(pi=0;pi<param_names.size();pi++){
448                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
449                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
450                                                         param_tbl->handle_access(param_names[pi]));
451                 }
452                 ret->definitions = definitions;
453                 ret->node_name = node_name + suffix;
454
455                 // make shallow copy of all fields
456                 ret->where = where;
457                 ret->select_list = select_list;
458
459                 return ret;
460         };
461         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
462
463 };
464
465
466
467 //              Select, group-by, aggregate.
468 //              Representing
469 //                      Select SE_1, ..., SE_k
470 //                      From T
471 //                      Where predicate
472 //                      Group By gb1, ..., gb_n
473 //                      Having predicate
474 //
475 //              NOTE : the samlping operator is sgahcwcb_qpn.
476 //
477 //              For now, must have group-by variables and aggregates.
478 //              The scalar expressions which are output must be a function
479 //              of the groub-by variables and the aggregates.
480 //              The group-by variables can be references to columsn of T,
481 //              or they can be scalar expressions.
482 class sgah_qpn: public qp_node{
483 public:
484         tablevar_t *table_name;                         // source table
485         std::vector<cnf_elem *> where;          // selection predicate
486         std::vector<cnf_elem *> having;         // post-aggregation predicate
487         std::vector<select_element *> select_list;      // se's of output
488         gb_table gb_tbl;                        // Table of all group-by attributes.
489         aggregate_table aggr_tbl;       // Table of all referenced aggregates.
490
491         std::vector<scalarexp_t *> gb_sources;  // pre-compute for partitioning.
492
493         int lfta_disorder;              // maximum disorder in the steam between lfta, hfta
494         int hfta_disorder;              // maximum disorder in the  hfta
495         int hfta_slow_flush;    // outputs per input, 0 means no slow flush
496
497 //              rollup, cube, and grouping_sets cannot be readily reconstructed by
498 //              analyzing the patterns, so explicitly record them here.
499 //              used only so that to_query_string produces something meaningful.
500         std::vector<std::string> gb_entry_type;
501         std::vector<int> gb_entry_count;
502
503         std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
504
505         std::string node_type(){return("sgah_qpn");     };
506     bool makes_transform(){return true;};
507         std::vector<std::string> external_libs(){
508                 std::vector<std::string> ret;
509                 return ret;
510         }
511
512         void bind_to_schema(table_list *Schema);
513         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
514
515         std::string to_query_string();
516         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
517         std::string generate_functor_name();
518
519         std::string generate_operator(int i, std::string params);
520         std::string get_include_file(){
521                 if(hfta_disorder <= 1){
522                         if(hfta_slow_flush>0){
523                                 return("#include <groupby_slowflush_operator.h>\n");
524                         }else{
525                                 return("#include <groupby_operator.h>\n");
526                         }
527                 }else{
528                         return("#include <groupby_operator_oop.h>\n");
529                 }
530         };
531
532     std::vector<select_element *> get_select_list(){return select_list;};
533     std::vector<scalarexp_t *> get_select_se_list(){
534                 std::vector<scalarexp_t *> ret;
535                 int i;
536                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
537                 return ret;
538         };
539     std::vector<cnf_elem *> get_where_clause(){return where;};
540
541   void append_to_where(cnf_elem *c){
542                 where.push_back(c);
543         }
544
545     std::vector<cnf_elem *> get_filter_clause(){return where;};
546     std::vector<cnf_elem *> get_having_clause(){return having;};
547     gb_table *get_gb_tbl(){return &gb_tbl;};
548     aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
549         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
550     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
551
552 //                              table which represents output tuple.
553         table_def *get_fields();
554         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys);
555         std::vector<tablevar_t *> get_input_tbls();
556         std::vector<tablevar_t *> get_output_tbls();
557
558
559         sgah_qpn(){
560                 lfta_disorder = 1;
561                 hfta_disorder = 1;
562                 hfta_slow_flush = 0;
563         };
564         sgah_qpn(query_summary_class *qs,table_list *Schema){
565                 lfta_disorder = 1;
566                 hfta_disorder = 1;
567                 hfta_slow_flush = 0;
568
569 //                              Get the table name.
570 //                              NOTE the colrefs have the tablevar ref (an int)
571 //                              embedded in them.  Would it make sense
572 //                              to grab the whole table list?
573                 tablevar_list_t *fm = qs->fta_tree->get_from();
574                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
575                 if(tbl_vec.size() != 1){
576                         char tmpstr[200];
577                         sprintf(tmpstr,"INTERNAL ERROR building SGAH node: query defined over %lu tables.\n",tbl_vec.size() );
578                         err_str=tmpstr;
579                         error_code = 1;
580                 }
581                 table_name = (tbl_vec[0]);
582
583                 int t = tbl_vec[0]->get_schema_ref();
584                 if(! Schema->is_stream(t)){
585                         err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
586                         error_code = 1;
587                 }
588
589
590 //                              Get the select list.
591                 select_list = qs->fta_tree->get_sl_vec();
592
593 //                              Get the selection and having predicates.
594                 where = qs->wh_cnf;
595                 having = qs->hav_cnf;
596
597 //                              Build a new GB var table (don't share, might need to modify)
598                 int g;
599                 for(g=0;g<qs->gb_tbl->size();g++){
600                         gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
601                                 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
602                                 qs->gb_tbl->get_reftype(g)
603                         );
604                 }
605                 gb_tbl.set_pattern_info(qs->gb_tbl);
606 //              gb_tbl.gb_entry_type = qs->gb_tbl->gb_entry_type;
607 //              gb_tbl.gb_entry_count = qs->gb_tbl->gb_entry_count;
608 //              gb_tbl.pattern_components = qs->gb_tbl->pattern_components;
609
610 //                              Build a new aggregate table. (don't share, might need
611 //                              to modify).
612                 int a;
613                 for(a=0;a<qs->aggr_tbl->size();a++){
614                         aggr_tbl.add_aggr(
615 //                              qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
616                                 qs->aggr_tbl->duplicate(a)
617                         );
618                 }
619
620
621 //                              Get the parameters
622                 param_tbl = qs->param_tbl;
623
624         };
625
626
627
628         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
629         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
630 //              Ensure that any refs to interface params have been split away.
631         int count_ifp_refs(std::set<std::string> &ifpnames);
632         int resolve_if_params(ifq_t *ifdb, std::string &err);
633
634         // the following method is used for distributed query optimization
635         double get_rate_estimate();
636
637
638         qp_node* make_copy(std::string suffix){
639                 sgah_qpn *ret = new sgah_qpn();
640
641                 ret->param_tbl = new param_table();
642                 std::vector<std::string> param_names = param_tbl->get_param_names();
643                 int pi;
644                 for(pi=0;pi<param_names.size();pi++){
645                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
646                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
647                                                         param_tbl->handle_access(param_names[pi]));
648                 }
649                 ret->definitions = definitions;
650                 ret->hfta_slow_flush = hfta_slow_flush;
651
652                 ret->node_name = node_name + suffix;
653
654                 // make shallow copy of all fields
655                 ret->where = where;
656                 ret->having = having;
657                 ret->select_list = select_list;
658                 ret->gb_tbl = gb_tbl;
659                 ret->aggr_tbl = aggr_tbl;
660
661                 return ret;
662         };
663
664 //              Split aggregation into two HFTA components - sub and superaggregation
665 //              If unable to split the aggreagates, split into selection and aggregation
666 //              If resulting low-level query is empty (e.g. when aggregates cannot be split and
667 //              where clause is empty) empty vector willb e returned
668         virtual std::vector<qp_node *> split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema);
669
670         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
671
672 };
673
674
675
676
677 //              Select, group-by, aggregate. with running aggregates
678 //              Representing
679 //                      Select SE_1, ..., SE_k
680 //                      From T
681 //                      Where predicate
682 //                      Group By gb1, ..., gb_n
683 //                      Closing When predicate
684 //                      Having predicate
685 //
686 //              NOTE : the sampling operator is sgahcwcb_qpn.
687 //
688 //              For now, must have group-by variables and aggregates.
689 //              The scalar expressions which are output must be a function
690 //              of the groub-by variables and the aggregates.
691 //              The group-by variables can be references to columsn of T,
692 //              or they can be scalar expressions.
693 class rsgah_qpn: public qp_node{
694 public:
695         tablevar_t *table_name;                         // source table
696         std::vector<cnf_elem *> where;          // selection predicate
697         std::vector<cnf_elem *> having;         // post-aggregation predicate
698         std::vector<cnf_elem *> closing_when;           // group closing predicate
699         std::vector<select_element *> select_list;      // se's of output
700         gb_table gb_tbl;                        // Table of all group-by attributes.
701         aggregate_table aggr_tbl;       // Table of all referenced aggregates.
702
703         std::vector<scalarexp_t *> gb_sources;  // pre-compute for partitioning.
704
705         int lfta_disorder;              // maximum disorder allowed in stream between lfta, hfta
706         int hfta_disorder;              // maximum disorder allowed in hfta
707
708         std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
709
710
711         std::string node_type(){return("rsgah_qpn");    };
712     bool makes_transform(){return true;};
713         std::vector<std::string> external_libs(){
714                 std::vector<std::string> ret;
715                 return ret;
716         }
717
718         void bind_to_schema(table_list *Schema);
719         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
720                 fprintf(stderr,"INTERNAL ERROR, calling rsgah_qpn::get_colrefs\n");
721                 exit(1);
722         }
723
724         std::string to_query_string();
725         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
726         std::string generate_functor_name();
727
728         std::string generate_operator(int i, std::string params);
729         std::string get_include_file(){return("#include <running_gb_operator.h>\n");};
730
731     std::vector<select_element *> get_select_list(){return select_list;};
732     std::vector<scalarexp_t *> get_select_se_list(){
733                 std::vector<scalarexp_t *> ret;
734                 int i;
735                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
736                 return ret;
737         };
738     std::vector<cnf_elem *> get_where_clause(){return where;};
739   void append_to_where(cnf_elem *c){
740                 where.push_back(c);
741         }
742
743     std::vector<cnf_elem *> get_filter_clause(){return where;};
744     std::vector<cnf_elem *> get_having_clause(){return having;};
745     std::vector<cnf_elem *> get_closing_when_clause(){return closing_when;};
746     gb_table *get_gb_tbl(){return &gb_tbl;};
747     aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
748         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
749     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
750
751 //                              table which represents output tuple.
752         table_def *get_fields();
753         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys);
754
755         std::vector<tablevar_t *> get_input_tbls();
756         std::vector<tablevar_t *> get_output_tbls();
757
758
759         rsgah_qpn(){
760                 lfta_disorder = 1;
761                 hfta_disorder = 1;
762         };
763         rsgah_qpn(query_summary_class *qs,table_list *Schema){
764                 lfta_disorder = 1;
765                 hfta_disorder = 1;
766
767 //                              Get the table name.
768 //                              NOTE the colrefs have the tablevar ref (an int)
769 //                              embedded in them.  Would it make sense
770 //                              to grab the whole table list?
771                 tablevar_list_t *fm = qs->fta_tree->get_from();
772                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
773                 if(tbl_vec.size() != 1){
774                         char tmpstr[200];
775                         sprintf(tmpstr,"INTERNAL ERROR buildingR SGAH node: query defined over %lu tables.\n",tbl_vec.size() );
776                         err_str=tmpstr;
777                         error_code = 1;
778                 }
779                 table_name = (tbl_vec[0]);
780
781                 int t = tbl_vec[0]->get_schema_ref();
782                 if(! Schema->is_stream(t)){
783                         err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
784                         error_code = 1;
785                 }
786
787 //                              Get the select list.
788                 select_list = qs->fta_tree->get_sl_vec();
789
790 //                              Get the selection and having predicates.
791                 where = qs->wh_cnf;
792                 having = qs->hav_cnf;
793                 closing_when = qs->closew_cnf;
794
795 //                              Build a new GB var table (don't share, might need to modify)
796                 int g;
797                 for(g=0;g<qs->gb_tbl->size();g++){
798                         gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
799                                 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
800                                 qs->gb_tbl->get_reftype(g)
801                         );
802                 }
803
804 //                              Build a new aggregate table. (don't share, might need
805 //                              to modify).
806                 int a;
807                 for(a=0;a<qs->aggr_tbl->size();a++){
808                         aggr_tbl.add_aggr(
809 //                              qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
810                                 qs->aggr_tbl->duplicate(a)
811                         );
812                 }
813
814
815 //                              Get the parameters
816                 param_tbl = qs->param_tbl;
817
818         };
819
820
821
822         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
823         std::vector<qp_node *> split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema);
824         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
825 //              Ensure that any refs to interface params have been split away.
826         int count_ifp_refs(std::set<std::string> &ifpnames);
827         int resolve_if_params(ifq_t *ifdb, std::string &err){ return 0;}
828
829         // the following method is used for distributed query optimization
830         double get_rate_estimate();
831
832         qp_node* make_copy(std::string suffix){
833                 rsgah_qpn *ret = new rsgah_qpn();
834
835                 ret->param_tbl = new param_table();
836                 std::vector<std::string> param_names = param_tbl->get_param_names();
837                 int pi;
838                 for(pi=0;pi<param_names.size();pi++){
839                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
840                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
841                                                         param_tbl->handle_access(param_names[pi]));
842                 }
843                 ret->definitions = definitions;
844
845                 ret->node_name = node_name + suffix;
846
847                 // make shallow copy of all fields
848                 ret->where = where;
849                 ret->having = having;
850                 ret->closing_when = closing_when;
851                 ret->select_list = select_list;
852                 ret->gb_tbl = gb_tbl;
853                 ret->aggr_tbl = aggr_tbl;
854
855                 return ret;
856         };
857         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
858 };
859
860
861
862 //              Watchlist - from a table read from an external source.
863
864 class watch_tbl_qpn: public qp_node{
865 public:
866         table_def *table_layout;                                // the output schema
867         std::vector<std::string> key_flds;
868
869 //              Parameters related to loading the table
870         std::string filename;
871         int refresh_interval;
872
873         
874         void append_to_where(cnf_elem *c){
875                 fprintf(stderr, "ERROR, append_to_where called on watch_tbl_qpn, not supported, query %s.\n",  node_name.c_str());
876                 exit(1);
877         }
878
879         std::string node_type(){return("watch_tbl_qpn");        };
880     bool makes_transform(){return false;};
881         std::vector<std::string> external_libs(){
882                 std::vector<std::string> ret;
883                 return ret;
884         }
885
886         void bind_to_schema(table_list *Schema){}
887         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
888                 col_id_set ret;
889                 return ret;
890         }
891
892         std::string to_query_string();
893         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
894         std::string generate_functor_name();
895         std::string generate_operator(int i, std::string params);
896         std::string get_include_file(){
897                 return("#include <watchlist_tbl.h>\n");
898         };
899
900         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
901     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
902
903         table_def *get_fields();
904         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
905                 return key_flds;
906         }
907
908         std::vector<tablevar_t *> get_input_tbls();
909         std::vector<tablevar_t *> get_output_tbls();
910
911         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
912         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);
913 //              Ensure that any refs to interface params have been split away.
914         int count_ifp_refs(std::set<std::string> &ifpnames);
915
916 //                      No predicates, return an empty clause
917     std::vector<cnf_elem *> get_where_clause(){
918                  std::vector<cnf_elem *> t;
919                 return(t);
920         };
921     std::vector<cnf_elem *> get_filter_clause(){
922                 return get_where_clause();
923         }
924
925         watch_tbl_qpn(){
926         };
927
928         watch_tbl_qpn(query_summary_class *qs,table_list *Schema){
929                 node_name=qs->query_name;
930                 param_tbl = qs->param_tbl;
931                 definitions = qs->definitions;
932                 
933                 
934 //                      Populate the schema
935                 table_layout = new table_def(
936                         node_name.c_str(), NULL, NULL,  qs->fta_tree->fel, WATCHLIST_SCHEMA
937                 );
938
939 //                      Find the keys
940                 std::vector<field_entry *> flds = qs->fta_tree->fel->get_list();
941                 for(int f=0;f<flds.size();++f){
942                         if(flds[f]->get_modifier_list()->contains_key("key") ||
943                                 flds[f]->get_modifier_list()->contains_key("Key") ||
944                                 flds[f]->get_modifier_list()->contains_key("KEY") ){
945                                         key_flds.push_back(flds[f]->get_name());
946                         }
947                 }
948                 if(key_flds.size()==0){
949                         fprintf(stderr,"Error, no key fields defined for table watchlist %s\n",node_name.c_str());
950                         exit(1);
951                 }
952
953                 table_layout->set_keys(key_flds);       // communicate keys to consumers
954
955 //                      Get loading parameters
956                 if(definitions.count("filename")>0){
957                         filename = definitions["filename"];
958                 }else{
959                         fprintf(stderr, "Error, no filename for source data defined for table watchlist %s\n",node_name.c_str());
960                         exit(1);
961                 }
962                 if(definitions.count("refresh_interval")>0){
963                         refresh_interval = atoi(definitions["refresh_interval"].c_str());
964                         if(refresh_interval <= 0){
965                                 fprintf(stderr, "Error, the refresh_interval (%s) of table watchlist %s must be a positive non-zero integer.\n",definitions["refresh_interval"].c_str(), node_name.c_str());
966                                 exit(1);
967                         }
968                 }else{
969                         fprintf(stderr, "Error, no refresh_interval defined for table watchlist %s\n",node_name.c_str());
970                         exit(1);
971                 }
972
973         }
974
975         qp_node *make_copy(std::string suffix){
976                 watch_tbl_qpn *ret = new watch_tbl_qpn();
977                 ret->filename = filename;
978                 ret->refresh_interval = refresh_interval;
979                 ret->key_flds = key_flds;
980
981                 ret->param_tbl = new param_table();
982                 std::vector<std::string> param_names = param_tbl->get_param_names();
983                 int pi;
984                 for(pi=0;pi<param_names.size();pi++){
985                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
986                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
987                                                         param_tbl->handle_access(param_names[pi]));
988                 }
989                 ret->definitions = definitions;
990
991                 ret->node_name = node_name + suffix;
992                 ret->table_layout = table_layout->make_shallow_copy(ret->node_name);
993
994                 return ret;
995         };
996
997         // the following method is used for distributed query optimization
998         double get_rate_estimate();
999
1000         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1001         
1002
1003 };
1004
1005
1006
1007
1008
1009
1010 //              forward reference
1011 class filter_join_qpn;
1012 class watch_join_qpn;
1013
1014
1015 //              (temporal) Merge query plan node.
1016 //              represent the following query fragment
1017 //                      Merge c1:c2
1018 //                      from T1 _t1, T2 _t2
1019 //
1020 //              T1 and T2 must have compatible schemas,
1021 //              that is the same types in the same slots.
1022 //              c1 and c2 must be colrefs from T1 and T2,
1023 //              both ref'ing the same slot.  Their types
1024 //              must be temporal and the same kind of temporal.
1025 //              in the output, no other field is temporal.
1026 //              the field names ofthe output are drawn from T1.
1027 class mrg_qpn: public qp_node{
1028 public:
1029         std::vector<tablevar_t *> fm;                                   //      Source table
1030         std::vector<colref_t *> mvars;                                  // the merge-by columns.
1031         scalarexp_t *slack;
1032
1033         table_def *table_layout;                                // the output schema
1034         int merge_fieldpos;                                             // position of merge field,
1035                                                                                         // convenience for manipulation.
1036
1037         int disorder;           // max disorder seen in the input / allowed in the output
1038
1039
1040         // partition definition for merges that combine streams partitioned over multiple interfaces
1041         partn_def_t* partn_def;
1042
1043
1044   void append_to_where(cnf_elem *c){
1045                 fprintf(stderr, "ERROR, append_to_where called on mrg_qpn, not supported, query %s.\n",  node_name.c_str());
1046                 exit(1);
1047         }
1048
1049
1050
1051         std::string node_type(){return("mrg_qpn");      };
1052     bool makes_transform(){return false;};
1053         std::vector<std::string> external_libs(){
1054                 std::vector<std::string> ret;
1055                 return ret;
1056         }
1057
1058         void bind_to_schema(table_list *Schema);
1059         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
1060                 fprintf(stderr,"INTERNAL ERROR, calling mrg_qpn::get_colrefs\n");
1061                 exit(1);
1062         }
1063
1064         std::string to_query_string();
1065         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
1066         std::string generate_functor_name();
1067         std::string generate_operator(int i, std::string params);
1068         std::string get_include_file(){
1069                 if(disorder>1)
1070                         return("#include <merge_operator_oop.h>\n");
1071                 return("#include <merge_operator.h>\n");
1072         };
1073
1074         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1075     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1076
1077         table_def *get_fields();
1078         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
1079                 std::vector<string> ret;
1080                 return ret;
1081         }
1082
1083         std::vector<tablevar_t *> get_input_tbls();
1084         std::vector<tablevar_t *> get_output_tbls();
1085
1086         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
1087         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);
1088 //              Ensure that any refs to interface params have been split away.
1089         int count_ifp_refs(std::set<std::string> &ifpnames);
1090
1091 //                      No predicates, return an empty clause
1092     std::vector<cnf_elem *> get_where_clause(){
1093                  std::vector<cnf_elem *> t;
1094                 return(t);
1095         };
1096     std::vector<cnf_elem *> get_filter_clause(){
1097                 return get_where_clause();
1098         }
1099
1100         mrg_qpn(){
1101                 partn_def = NULL;
1102         };
1103
1104         void set_disorder(int d){
1105                 disorder = d;
1106         }
1107
1108         mrg_qpn(query_summary_class *qs,table_list *Schema){
1109                 disorder = 1;
1110
1111 //                              Grab the elements of the query node.
1112                 fm = qs->fta_tree->get_from()->get_table_list();
1113                 mvars = qs->mvars;
1114                 slack = qs->slack;
1115
1116 //                      sanity check
1117                 if(fm.size() != mvars.size()){
1118                         fprintf(stderr,"INTERNAL ERROR in mrg_qpn::mrg_qpn.  fm.size() = %lu, mvars.size() = %lu\n",fm.size(),mvars.size());
1119                         exit(1);
1120                 }
1121
1122                 for(int f=0;f<fm.size();++f){
1123                         int t=fm[f]->get_schema_ref();
1124                         if(! Schema->is_stream(t)){
1125                                 err_str += "ERROR in query "+node_name+", the source "+fm[f]->get_schema_name()+" is not a stream.\n";
1126                                 error_code = 1;
1127                         }
1128                 }
1129
1130 //                              Get the parameters
1131                 param_tbl = qs->param_tbl;
1132
1133 //                              Need to set the node name now, so that the
1134 //                              schema (table_layout) can be properly named.
1135 //                              TODO: Setting the name of the table might best be done
1136 //                              via the set_node_name method, because presumably
1137 //                              thats when the node name is really known.
1138 //                              This should propogate to the table_def table_layout
1139                 node_name=qs->query_name;
1140
1141 /*
1142 int ff;
1143 printf("instantiating merge node, name = %s, %d sources.\n\t",node_name.c_str(), fm.size());
1144 for(ff=0;ff<fm.size();++ff){
1145 printf("%s ",fm[ff]->to_string().c_str());
1146 }
1147 printf("\n");
1148 */
1149
1150
1151 //              Create the output schema.
1152 //              strip temporal properites form all fields except the merge field.
1153                 std::vector<field_entry *> flva = Schema->get_fields(fm[0]->get_schema_name());
1154                 field_entry_list *fel = new field_entry_list();
1155                 int f;
1156                 for(f=0;f<flva.size();++f){
1157                         field_entry *fe;
1158                         data_type dt(flva[f]->get_type().c_str(), flva[f]->get_modifier_list());
1159                         if(flva[f]->get_name() == mvars[0]->get_field()){
1160                                 merge_fieldpos = f;
1161 //                              if(slack != NULL) dt.reset_temporal();
1162                         }else{
1163                                 dt.reset_temporal();
1164                         }
1165
1166                         param_list *plist = new param_list();
1167                         std::vector<std::string> param_strings = dt.get_param_keys();
1168                         int p;
1169                         for(p=0;p<param_strings.size();++p){
1170                                 std::string v = dt.get_param_val(param_strings[p]);
1171                                 if(v != "")
1172                                         plist->append(param_strings[p].c_str(),v.c_str());
1173                                 else
1174                                         plist->append(param_strings[p].c_str());
1175                         }
1176
1177
1178                         fe=new field_entry(
1179                                 dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist, flva[f]->get_unpack_fcns());
1180                         fel->append_field(fe);
1181                 }
1182
1183
1184
1185
1186                 table_layout = new table_def(
1187                         node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA
1188                 );
1189
1190                 partn_def = NULL;
1191         };
1192
1193
1194 /////////////////////////////////////////////
1195 ///             Created for de-siloing.  to be removed?  or is it otherwise useful?
1196 //              Merge existing set of sources (de-siloing)
1197         mrg_qpn(std::string n_name, std::vector<std::string> &src_names,table_list *Schema){
1198                 int i,f;
1199
1200                 disorder = 1;
1201
1202 //                              Construct the fm list
1203                 for(f=0;f<src_names.size();++f){
1204                         int tbl_ref = Schema->get_table_ref(src_names[f]);
1205                         if(tbl_ref < 0){
1206                                 fprintf(stderr,"INTERNAL ERROR, can't find %s in the schema when constructing no-silo merge node %s\n",src_names[f].c_str(), n_name.c_str());
1207                                 exit(1);
1208                         }
1209                         table_def *src_tbl = Schema->get_table(tbl_ref);
1210                         tablevar_t *fm_t = new tablevar_t(src_names[f].c_str());
1211                         string range_name = "_t" + int_to_string(f);
1212                         fm_t->set_range_var(range_name);
1213                         fm_t->set_schema_ref(tbl_ref);
1214                         fm.push_back(fm_t);
1215                 }
1216
1217 //              Create the output schema.
1218 //              strip temporal properites form all fields except the merge field.
1219                 std::vector<field_entry *> flva = Schema->get_fields(fm[0]->get_schema_name());
1220                 field_entry_list *fel = new field_entry_list();
1221                 bool temporal_found = false;
1222                 for(f=0;f<flva.size();++f){
1223                         field_entry *fe;
1224                         data_type dt(flva[f]->get_type().c_str(), flva[f]->get_modifier_list());
1225                         if(dt.is_temporal() && !temporal_found){
1226                                 merge_fieldpos = f;
1227                                 temporal_found = true;
1228                         }else{
1229                                 dt.reset_temporal();
1230                         }
1231
1232                         param_list *plist = new param_list();
1233                         std::vector<std::string> param_strings = dt.get_param_keys();
1234                         int p;
1235                         for(p=0;p<param_strings.size();++p){
1236                                 std::string v = dt.get_param_val(param_strings[p]);
1237                                 if(v != "")
1238                                         plist->append(param_strings[p].c_str(),v.c_str());
1239                                 else
1240                                         plist->append(param_strings[p].c_str());
1241                         }
1242
1243                         fe=new field_entry(
1244                                 dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist,
1245                                 flva[f]->get_unpack_fcns()
1246                         );
1247                         fel->append_field(fe);
1248                 }
1249
1250                 if(! temporal_found){
1251                         fprintf(stderr,"ERROR, can't find temporal field of the sources when constructing no-silo merge node %s\n",n_name.c_str());
1252                         exit(1);
1253                 }
1254
1255                 node_name=n_name;
1256                 table_layout = new table_def(
1257                         node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA
1258                 );
1259
1260                 partn_def = NULL;
1261                 param_tbl = new param_table();
1262
1263 //                      Construct mvars
1264                 for(f=0;f<fm.size();++f){
1265                         std::vector<field_entry *> flv_f = Schema->get_fields(fm[f]->get_schema_name());
1266                         data_type dt_f(flv_f[merge_fieldpos]->get_type().c_str(),
1267                              flva[merge_fieldpos]->get_modifier_list());
1268
1269                         colref_t *mcr = new colref_t(fm[f]->get_var_name().c_str(),
1270                                 flv_f[merge_fieldpos]->get_name().c_str());
1271                         mvars.push_back(mcr);
1272                 }
1273
1274 //              literal_t *s_lit = new literal_t("5",LITERAL_INT);
1275 //              slack = new scalarexp_t(s_lit);
1276                 slack = NULL;
1277
1278         };
1279 //                      end de-siloing
1280 ////////////////////////////////////////
1281
1282         void resolve_slack(scalarexp_t *t_se, std::string fname, std::vector<std::pair<std::string, std::string> > &sources,ifq_t *ifdb, gb_table *gbt);
1283
1284
1285 //                      Merge filter_join LFTAs.
1286
1287         mrg_qpn(filter_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb);
1288
1289 //                      Merge watch_join LFTAs.
1290
1291         mrg_qpn(watch_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb);
1292
1293 //                      Merge selection LFTAs.
1294
1295         mrg_qpn(spx_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb){
1296
1297                 disorder = 1;
1298
1299                 param_tbl = spx->param_tbl;
1300                 int i;
1301                 node_name = n_name;
1302                 field_entry_list *fel = new field_entry_list();
1303                 merge_fieldpos = -1;
1304
1305
1306
1307
1308                 for(i=0;i<spx->select_list.size();++i){
1309                         data_type *dt = spx->select_list[i]->se->get_data_type()->duplicate();
1310                         if(dt->is_temporal()){
1311                                 if(merge_fieldpos < 0){
1312                                         merge_fieldpos = i;
1313                                 }else{
1314                                         fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), spx->select_list[merge_fieldpos]->name.c_str(), spx->select_list[i]->name.c_str(), spx->select_list[merge_fieldpos]->name.c_str() );
1315                                         dt->reset_temporal();
1316                                 }
1317                         }
1318
1319                         field_entry *fe = dt->make_field_entry(spx->select_list[i]->name);
1320                         fel->append_field(fe);
1321                         delete dt;
1322                 }
1323                 if(merge_fieldpos<0){
1324                         fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
1325                                 exit(1);
1326                 }
1327                 table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1328
1329 //                              NEED TO HANDLE USER_SPECIFIED SLACK
1330                 this->resolve_slack(spx->select_list[merge_fieldpos]->se,
1331                                 spx->select_list[merge_fieldpos]->name, ifaces, ifdb,NULL);
1332 //      if(this->slack == NULL)
1333 //              fprintf(stderr,"Zero slack.\n");
1334 //      else
1335 //              fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
1336
1337                 for(i=0;i<sources.size();i++){
1338                         std::string rvar = "_m"+int_to_string(i);
1339                         mvars.push_back(new colref_t(rvar.c_str(), spx->select_list[merge_fieldpos]->name.c_str()));
1340                         mvars[i]->set_tablevar_ref(i);
1341                         fm.push_back(new tablevar_t(sources[i].c_str()));
1342                         fm[i]->set_range_var(rvar);
1343                 }
1344
1345                 param_tbl = new param_table();
1346                 std::vector<std::string> param_names = spx->param_tbl->get_param_names();
1347                 int pi;
1348                 for(pi=0;pi<param_names.size();pi++){
1349                         data_type *dt = spx->param_tbl->get_data_type(param_names[pi]);
1350                         param_tbl->add_param(param_names[pi],dt->duplicate(),
1351                                                         spx->param_tbl->handle_access(param_names[pi]));
1352                 }
1353                 definitions = spx->definitions;
1354
1355         }
1356
1357 //              Merge aggregation LFTAs
1358
1359         mrg_qpn(sgah_qpn *sgah, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair< std::string, std::string> > &ifaces, ifq_t *ifdb){
1360
1361                 disorder = 1;
1362
1363                 param_tbl = sgah->param_tbl;
1364                 int i;
1365                 node_name = n_name;
1366                 field_entry_list *fel = new field_entry_list();
1367                 merge_fieldpos = -1;
1368                 for(i=0;i<sgah->select_list.size();++i){
1369                         data_type *dt = sgah->select_list[i]->se->get_data_type()->duplicate();
1370                         if(dt->is_temporal()){
1371                                 if(merge_fieldpos < 0){
1372                                         merge_fieldpos = i;
1373                                 }else{
1374                                         fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str(), sgah->select_list[i]->name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str() );
1375                                         dt->reset_temporal();
1376                                 }
1377                         }
1378
1379                         field_entry *fe = dt->make_field_entry(sgah->select_list[i]->name);
1380                         fel->append_field(fe);
1381                         delete dt;
1382                 }
1383                 if(merge_fieldpos<0){
1384                         fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
1385                         exit(1);
1386                 }
1387                 table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1388
1389 //                              NEED TO HANDLE USER_SPECIFIED SLACK
1390                 this->resolve_slack(sgah->select_list[merge_fieldpos]->se,
1391                                 sgah->select_list[merge_fieldpos]->name, ifaces, ifdb,
1392                                 &(sgah->gb_tbl));
1393                 if(this->slack == NULL)
1394                         fprintf(stderr,"Zero slack.\n");
1395                 else
1396                         fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
1397
1398
1399                 for(i=0;i<sources.size();i++){
1400                         std::string rvar = "_m"+int_to_string(i);
1401                         mvars.push_back(new colref_t(rvar.c_str(), sgah->select_list[merge_fieldpos]->name.c_str()));
1402                         mvars[i]->set_tablevar_ref(i);
1403                         fm.push_back(new tablevar_t(sources[i].c_str()));
1404                         fm[i]->set_range_var(rvar);
1405                 }
1406
1407                 param_tbl = new param_table();
1408                 std::vector<std::string> param_names = sgah->param_tbl->get_param_names();
1409                 int pi;
1410                 for(pi=0;pi<param_names.size();pi++){
1411                         data_type *dt = sgah->param_tbl->get_data_type(param_names[pi]);
1412                         param_tbl->add_param(param_names[pi],dt->duplicate(),
1413                                                         sgah->param_tbl->handle_access(param_names[pi]));
1414                 }
1415                 definitions = sgah->definitions;
1416
1417         }
1418
1419         qp_node *make_copy(std::string suffix){
1420                 mrg_qpn *ret = new mrg_qpn();
1421                 ret->slack = slack;
1422                 ret->disorder = disorder;
1423
1424                 ret->param_tbl = new param_table();
1425                 std::vector<std::string> param_names = param_tbl->get_param_names();
1426                 int pi;
1427                 for(pi=0;pi<param_names.size();pi++){
1428                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
1429                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1430                                                         param_tbl->handle_access(param_names[pi]));
1431                 }
1432                 ret->definitions = definitions;
1433
1434                 ret->node_name = node_name + suffix;
1435                 ret->table_layout = table_layout->make_shallow_copy(ret->node_name);
1436                 ret->merge_fieldpos = merge_fieldpos;
1437
1438                 return ret;
1439         };
1440
1441         std::vector<mrg_qpn *> split_sources();
1442
1443         // the following method is used for distributed query optimization
1444         double get_rate_estimate();
1445
1446
1447         // get partition definition for merges that combine streams partitioned over multiple interfaces
1448         // return NULL for regular merges
1449         partn_def_t* get_partn_definition(map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result) {
1450                 if (partn_def)
1451                         return partn_def;
1452
1453                 int err;
1454                 string err_str;
1455                 string partn_name;
1456
1457                 vector<tablevar_t *> input_tables = get_input_tbls();
1458                 for (int i = 0; i <  input_tables.size(); ++i) {
1459                         tablevar_t * table = input_tables[i];
1460
1461                         vector<string> partn_names = ifaces_db->get_iface_vals(table->get_machine(), table->get_interface(),"iface_partition",err,err_str);
1462                         if (partn_names.size() != 1)    // can't have more than one value of partition attribute
1463                                 return NULL;
1464                         string new_partn_name = partn_names[0];
1465
1466                         // need to make sure that all ifaces belong to the same partition
1467                         if (!i)
1468                                 partn_name = new_partn_name;
1469                         else if (new_partn_name != partn_name)
1470                                 return NULL;
1471                 }
1472
1473                 // now find partition definition corresponding to partn_name
1474                 partn_def = partn_parse_result->get_partn_def(partn_name);
1475                 return partn_def;
1476         };
1477
1478         void set_partn_definition(partn_def_t* def) {
1479                 partn_def = def;
1480         }
1481
1482         bool is_multihost_merge() {
1483
1484                 bool is_multihost = false;
1485
1486                 // each input table must be have machine attribute be non-empty
1487                 // and there should be at least 2 different values of machine attributes
1488                 vector<tablevar_t *> input_tables = get_input_tbls();
1489                 string host = input_tables[0]->get_machine();
1490                 for  (int i = 1; i < input_tables.size(); ++i) {
1491                         string new_host = input_tables[i]->get_machine();
1492                         if (new_host == "")
1493                                 return false;
1494                         if (new_host != host)
1495                                 is_multihost = true;
1496                 }
1497                 return is_multihost;
1498         }
1499
1500         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1501 };
1502
1503
1504 //              eq_temporal, hash join query plan node.
1505 //              represent the following query fragment
1506 //                      select scalar_expression_1, ..., scalar_expression_k
1507 //                      from T0 t0, T1 t1
1508 //                      where predicate
1509 //
1510 //              the predicates and the scalar expressions can reference
1511 //              attributes of t0 and t1 and also functions.
1512 //              The predicate must contain CNF elements to enable the
1513 //              efficient evaluation of the query.
1514 //              1) at least one predicate of the form
1515 //                      (temporal se in t0) = (temporal se in t1)
1516 //              2) at least one predicate of the form
1517 //                      (non-temporal se in t0) = (non-temporal se in t1)
1518 //
1519 class join_eq_hash_qpn: public qp_node{
1520 public:
1521         std::vector<tablevar_t *> from;                                 //      Source tables
1522         std::vector<select_element *> select_list;      //      Select list
1523         std::vector<cnf_elem *> prefilter[2];           // source prefilters
1524         std::vector<cnf_elem *> temporal_eq;            // define temporal window
1525         std::vector<cnf_elem *> hash_eq;                        // define hash key
1526         std::vector<cnf_elem *> postfilter;                     // final filter on hash matches.
1527
1528         std::vector<cnf_elem *> where;                          // all the filters
1529                                                                                                 // useful for summary analysis
1530
1531         std::vector<scalarexp_t *> hash_src_r, hash_src_l;
1532
1533         std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
1534         std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
1535
1536         std::string node_type(){return("join_eq_hash_qpn");     };
1537     bool makes_transform(){return true;};
1538         std::vector<std::string> external_libs(){
1539                 std::vector<std::string> ret;
1540                 return ret;
1541         }
1542
1543         void bind_to_schema(table_list *Schema);
1544         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
1545                 fprintf(stderr,"INTERNAL ERROR, calling join_eq_hash_qpn::get_colrefs\n");
1546                 exit(1);
1547         }
1548
1549   void append_to_where(cnf_elem *c){
1550                 fprintf(stderr, "Error, append_to_where called on join_hash_qpn, not supported, query is %s\n",node_name.c_str());
1551                 exit(1);
1552         }
1553
1554
1555         std::string to_query_string();
1556         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
1557         std::string generate_functor_name();
1558         std::string generate_operator(int i, std::string params);
1559         std::string get_include_file(){return("#include <join_eq_hash_operator.h>\n");};
1560
1561     std::vector<select_element *> get_select_list(){return select_list;};
1562     std::vector<scalarexp_t *> get_select_se_list(){
1563                 std::vector<scalarexp_t *> ret;
1564                 int i;
1565                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
1566                 return ret;
1567         };
1568 //                      Used for LFTA only
1569     std::vector<cnf_elem *> get_where_clause(){
1570                  std::vector<cnf_elem *> t;
1571                 return(t);
1572         };
1573     std::vector<cnf_elem *> get_filter_clause(){
1574                 return get_where_clause();
1575         }
1576
1577         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1578     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1579
1580         table_def *get_fields();
1581
1582 //              It might be feasible to find keys in an equijoin expression.
1583         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
1584                 std::vector<string> ret;
1585                 return ret;
1586         }
1587
1588         std::vector<tablevar_t *> get_input_tbls();
1589         std::vector<tablevar_t *> get_output_tbls();
1590
1591         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
1592         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);
1593 //              Ensure that any refs to interface params have been split away.
1594         int count_ifp_refs(std::set<std::string> &ifpnames);
1595
1596         join_eq_hash_qpn(){
1597         };
1598         join_eq_hash_qpn(query_summary_class *qs,table_list *Schema){
1599                 int w;
1600 //                              Get the table name.
1601 //                              NOTE the colrefs have the table ref (an int)
1602 //                              embedded in them.  Would it make sense
1603 //                              to grab the whole table list?
1604                 from = qs->fta_tree->get_from()->get_table_list();
1605                 if(from.size() != 2){
1606                         char tmpstr[200];
1607                         sprintf(tmpstr,"ERROR building join_eq_hash node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
1608                         err_str = tmpstr;
1609                         error_code = 1;
1610                 }
1611
1612                 for(int f=0;f<from.size();++f){
1613                         int t=from[f]->get_schema_ref();
1614                         if(! Schema->is_stream(t)){
1615                                 err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n";
1616                                 error_code = 1;
1617                         }
1618                 }
1619
1620
1621 //                              Get the select list.
1622                 select_list = qs->fta_tree->get_sl_vec();
1623
1624 //                              Get the selection predicate.
1625                 where = qs->wh_cnf;
1626                 for(w=0;w<where.size();++w){
1627                         analyze_cnf(where[w]);
1628                         std::vector<int> pred_tbls;
1629                         get_tablevar_ref_pr(where[w]->pr,pred_tbls);
1630 //                              Prefilter if refs only one tablevar
1631                         if(pred_tbls.size()==1){
1632                                 prefilter[pred_tbls[0]].push_back(where[w]);
1633                                 continue;
1634                         }
1635 //                              refs nothing -- might be sampling, do it as postfilter.
1636                         if(pred_tbls.size()==0){
1637                                 postfilter.push_back(where[w]);
1638                                 continue;
1639                         }
1640 //                              See if it can be a hash or temporal predicate.
1641 //                              NOTE: synchronize with the temporality checking
1642 //                              done at join_eq_hash_qpn::get_fields
1643                         if(where[w]->is_atom && where[w]->eq_pred){
1644                                 std::vector<int> sel_tbls, ser_tbls;
1645                                 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
1646                                 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
1647                                 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
1648 //                                              make channel 0 SE on LHS.
1649                                         if(sel_tbls[0] != 0)
1650                                                 where[w]->pr->swap_scalar_operands();
1651
1652                                         data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
1653                                         data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
1654                                         if( (dtl->is_increasing() && dtr->is_increasing()) ||
1655                                             (dtl->is_decreasing() && dtr->is_decreasing()) )
1656                                                         temporal_eq.push_back(where[w]);
1657                                         else
1658                                                         hash_eq.push_back(where[w]);
1659                                         continue;
1660
1661                                 }
1662                         }
1663 //                              All tests failed, fallback is postfilter.
1664                         postfilter.push_back(where[w]);
1665                 }
1666
1667                 if(temporal_eq.size()==0){
1668                         err_str = "ERROR in join query: can't find temporal equality predicate to define a join window.\n";
1669                         error_code = 1;
1670                 }
1671
1672 //                              Get the parameters
1673                 param_tbl = qs->param_tbl;
1674
1675         };
1676
1677         // the following method is used for distributed query optimization
1678         double get_rate_estimate();
1679
1680
1681         qp_node* make_copy(std::string suffix){
1682                 join_eq_hash_qpn *ret = new join_eq_hash_qpn();
1683
1684                 ret->param_tbl = new param_table();
1685                 std::vector<std::string> param_names = param_tbl->get_param_names();
1686                 int pi;
1687                 for(pi=0;pi<param_names.size();pi++){
1688                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
1689                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1690                                                         param_tbl->handle_access(param_names[pi]));
1691                 }
1692                 ret->definitions = definitions;
1693
1694                 ret->node_name = node_name + suffix;
1695
1696                 // make shallow copy of all fields
1697                 ret->where = where;
1698                 ret->from = from;
1699                 ret->select_list = select_list;
1700                 ret->prefilter[0] = prefilter[0];
1701                 ret->prefilter[1] = prefilter[1];
1702                 ret->postfilter = postfilter;
1703                 ret->temporal_eq = temporal_eq;
1704                 ret->hash_eq = hash_eq;
1705
1706                 return ret;
1707         };
1708         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1709
1710 };
1711
1712
1713 // ---------------------------------------------
1714 //              eq_temporal, hash join query plan node.
1715 //              represent the following query fragment
1716 //                      select scalar_expression_1, ..., scalar_expression_k
1717 //                      FILTER_JOIN(col, range) from T0 t0, T1 t1
1718 //                      where predicate
1719 //
1720 //              t0 is the output range variable, t1 is the filtering range
1721 //              variable.  Both must alias a PROTOCOL.
1722 //              The scalar expressions in the select clause may
1723 //              reference t0 only.
1724 //              The predicates are classified as follows
1725 //              prefilter predicates:
1726 //                a cheap predicate in t0 such that there is an equivalent
1727 //                predicate in t1.  Cost decisions about pushing to
1728 //                lfta prefilter made later.
1729 //              t0 predicates (other than prefilter predicates)
1730 //                      -- cheap vs. expensive sorted out at genereate time,
1731 //                              the constructor isn't called with the function list.
1732 //              t1 predicates (other than prefiler predicates).
1733 //              equi-join predicates of the form:
1734 //                      (se in t0) = (se in t1)
1735 //
1736 //              There must be at least one equi-join predicate.
1737 //              No join predicates other than equi-join predicates
1738 //                are allowed.
1739 //              Warn on temporal equi-join predicates.
1740 //              t1 predicates should not be expensive ... warn?
1741 //
1742 class filter_join_qpn: public qp_node{
1743 public:
1744         std::vector<tablevar_t *> from;                                 //      Source tables
1745                 colref_t *temporal_var;                 // join window in FROM
1746                 unsigned int temporal_range;    // metadata.
1747         std::vector<select_element *> select_list;      //      Select list
1748         std::vector<cnf_elem *> shared_pred;            // prefilter preds
1749         std::vector<cnf_elem *> pred_t0;                        // main (R) preds
1750         std::vector<cnf_elem *> pred_t1;                        // filtering (S) preds
1751         std::vector<cnf_elem *> hash_eq;                        // define hash key
1752         std::vector<cnf_elem *> postfilter;                     // ref's no table.
1753
1754         std::vector<cnf_elem *> where;                          // all the filters
1755                                                                                                 // useful for summary analysis
1756
1757         std::vector<scalarexp_t *> hash_src_r, hash_src_l;
1758         std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
1759         std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
1760
1761
1762         bool use_bloom;                 // true => bloom filter, false => limited hash
1763
1764         std::string node_type(){return("filter_join");  };
1765     bool makes_transform(){return true;};
1766         std::vector<std::string> external_libs(){
1767                 std::vector<std::string> ret;
1768                 return ret;
1769         }
1770
1771         void bind_to_schema(table_list *Schema);
1772         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
1773
1774         std::string to_query_string();
1775         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform){
1776                 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor called\n");
1777                 exit(1);
1778         }
1779         std::string generate_functor_name(){
1780                 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor_name called\n");
1781                 exit(1);
1782         }
1783         std::string generate_operator(int i, std::string params){
1784                 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_operator called\n");
1785                 exit(1);
1786         }
1787         std::string get_include_file(){return("#include <join_eq_hash_operator.h>\n");};
1788
1789     std::vector<select_element *> get_select_list(){return select_list;};
1790     std::vector<scalarexp_t *> get_select_se_list(){
1791                 std::vector<scalarexp_t *> ret;
1792                 int i;
1793                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
1794                 return ret;
1795         };
1796 //                      Used for LFTA only
1797         void append_to_where(cnf_elem *c){
1798                 where.push_back(c);
1799         }
1800
1801     std::vector<cnf_elem *> get_where_clause(){return where;}
1802     std::vector<cnf_elem *> get_filter_clause(){return shared_pred;}
1803
1804         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1805     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1806
1807         table_def *get_fields();
1808 //              It should be feasible to find keys in a filter join
1809         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
1810                 std::vector<string> ret;
1811                 return ret;
1812         }
1813
1814         std::vector<tablevar_t *> get_input_tbls();
1815         std::vector<tablevar_t *> get_output_tbls();
1816
1817         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
1818         int resolve_if_params(ifq_t *ifdb, std::string &err);
1819
1820         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);
1821 //              Ensure that any refs to interface params have been split away.
1822         int count_ifp_refs(std::set<std::string> &ifpnames);
1823
1824 //              CONSTRUCTOR
1825         filter_join_qpn(){
1826         };
1827         filter_join_qpn(query_summary_class *qs,table_list *Schema){
1828                 int i,w;
1829 //                              Get the table name.
1830 //                              NOTE the colrefs have the table ref (an int)
1831 //                              embedded in them.  Would it make sense
1832 //                              to grab the whole table list?
1833                 from = qs->fta_tree->get_from()->get_table_list();
1834                 temporal_var = qs->fta_tree->get_from()->get_colref();
1835                 temporal_range = qs->fta_tree->get_from()->get_temporal_range();
1836                 if(from.size() != 2){
1837                         char tmpstr[200];
1838                         sprintf(tmpstr,"ERROR building filter_join_qpn node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
1839                         err_str += tmpstr;
1840                         error_code = 1;
1841                 }
1842                 if(from[0]->get_interface() != from[1]->get_interface()){
1843                         err_str += "Error building filter_join_qpn node: all range variables must be sourced from the same interface or interface set ("+from[0]->get_interface()+" vs. "+from[1]->get_interface()+")\n";
1844                         error_code = 1;
1845                 }
1846
1847                 for(int f=0;f<from.size();++f){
1848                         int t=from[f]->get_schema_ref();
1849                         if(! Schema->is_stream(t)){
1850                                 err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n";
1851                                 error_code = 1;
1852                         }
1853                 }
1854
1855
1856 //                              Get the select list.
1857                 select_list = qs->fta_tree->get_sl_vec();
1858 //                              Verify that only t0 is referenced.
1859                 bool bad_ref = false;
1860                 for(i=0;i<select_list.size();i++){
1861                         vector<int> sel_tbls;
1862                         get_tablevar_ref_se(select_list[i]->se,sel_tbls);
1863                         if((sel_tbls.size() == 2) || (sel_tbls.size()==1 && sel_tbls[0]==1))
1864                                 bad_ref = true;
1865                 }
1866                 if(bad_ref){
1867                         err_str += "ERROR building filter_join_qpn node: query references range variable "+from[1]->variable_name+", but only the first range variable ("+from[0]->variable_name+" can be referenced.\n";
1868                         error_code = 1;
1869                 }
1870
1871
1872 //                              Get the selection predicate.
1873                 where = qs->wh_cnf;
1874                 std::vector<cnf_elem *> t0_only, t1_only;
1875                 for(w=0;w<where.size();++w){
1876                         analyze_cnf(where[w]);
1877                         std::vector<int> pred_tbls;
1878                         get_tablevar_ref_pr(where[w]->pr,pred_tbls);
1879 //                              Collect the list of preds by src var,
1880 //                              extract the shared preds later.
1881                         if(pred_tbls.size()==1){
1882                                 if(pred_tbls[0] == 0){
1883                                         t0_only.push_back(where[w]);
1884                                 }else{
1885                                         t1_only.push_back(where[w]);
1886                                 }
1887                                 continue;
1888                         }
1889 //                              refs nothing -- might be sampling, do it as postfilter.
1890                         if(pred_tbls.size()==0){
1891                                 postfilter.push_back(where[w]);
1892                                 continue;
1893                         }
1894 //                              See if it can be a hash or temporal predicate.
1895 //                              NOTE: synchronize with the temporality checking
1896 //                              done at join_eq_hash_qpn::get_fields
1897                         if(where[w]->is_atom && where[w]->eq_pred){
1898                                 std::vector<int> sel_tbls, ser_tbls;
1899                                 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
1900                                 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
1901                                 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
1902 //                                              make channel 0 SE on LHS.
1903                                         if(sel_tbls[0] != 0)
1904                                                 where[w]->pr->swap_scalar_operands();
1905
1906                                         hash_eq.push_back(where[w]);
1907
1908                                         data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
1909                                         data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
1910                                         if( (dtl->is_increasing() && dtr->is_increasing()) ||
1911                                             (dtl->is_decreasing() && dtr->is_decreasing()) )
1912                                                 err_str += "Warning, a filter join should not have join predicates on temporal fields.\n";
1913                                         continue;
1914
1915                                 }
1916                         }
1917 //                              All tests failed, fallback is postfilter.
1918                         err_str += "ERROR, join predicates in a filter join should have the form (scalar expression in "+from[0]->variable_name+") = (scalar expression in "+from[1]->variable_name+").\n";
1919                         error_code = 3;
1920                 }
1921 //              Classify the t0_only and t1_only preds.
1922                 set<int> matched_pred;
1923                 int v;
1924                 for(w=0;w<t0_only.size();w++){
1925                         for(v=0;v<t1_only.size();++v)
1926                                 if(is_equivalent_pred_base(t0_only[w]->pr,t1_only[v]->pr,Schema))
1927                                         break;
1928                         if(v<t1_only.size()){
1929                                 shared_pred.push_back(t0_only[w]);
1930                                 matched_pred.insert(v);
1931                         }else{
1932                                 pred_t0.push_back(t0_only[w]);
1933                         }
1934                 }
1935                 for(v=0;v<t1_only.size();++v){
1936                         if(matched_pred.count(v) == 0)
1937                                 pred_t1.push_back(t1_only[v]);
1938                 }
1939
1940
1941 //                              Get the parameters
1942                 param_tbl = qs->param_tbl;
1943                 definitions = qs->definitions;
1944
1945 //                              Determine the algorithm
1946                 if(this->get_val_of_def("algorithm") == "hash"){
1947                         use_bloom = false;
1948                 }else{
1949                         use_bloom = true;
1950                 }
1951         };
1952
1953         // the following method is used for distributed query optimization
1954         double get_rate_estimate();
1955
1956
1957         qp_node* make_copy(std::string suffix){
1958                 filter_join_qpn *ret = new filter_join_qpn();
1959
1960                 ret->param_tbl = new param_table();
1961                 std::vector<std::string> param_names = param_tbl->get_param_names();
1962                 int pi;
1963                 for(pi=0;pi<param_names.size();pi++){
1964                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
1965                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1966                                                         param_tbl->handle_access(param_names[pi]));
1967                 }
1968                 ret->definitions = definitions;
1969
1970                 ret->node_name = node_name + suffix;
1971
1972                 // make shallow copy of all fields
1973                 ret->where = where;
1974                 ret->from = from;
1975                 ret->temporal_range = temporal_range;
1976                 ret->temporal_var = temporal_var;
1977                 ret->select_list = select_list;
1978                 ret->shared_pred = shared_pred;
1979                 ret->pred_t0 = pred_t0;
1980                 ret->pred_t1 = pred_t1;
1981                 ret->postfilter = postfilter;
1982                 ret->hash_eq = hash_eq;
1983
1984                 return ret;
1985         };
1986         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1987
1988 };
1989
1990
1991
1992 //      TODO : put tests on other operators to ensure they dont' read from a watchlist
1993 //      TODO : register with : is_partn_compatible pushdown_partn_operator is_pushdown_compatible pushdown_operator ?
1994 class watch_join_qpn: public qp_node{
1995 public:
1996         std::vector<tablevar_t *> from;                                 //      Source tables
1997         std::vector<select_element *> select_list;      //      Select list
1998         std::vector<cnf_elem *> pred_t0;                        // main (R) preds
1999         std::vector<cnf_elem *> pred_t1;                        // watchlist-only (S) preds (?)
2000         std::map<std::string, cnf_elem *> hash_eq;      // predicates on S hash keys
2001         std::vector<cnf_elem *> join_filter;            // ref's R, S, but not a hash
2002         std::vector<cnf_elem *> postfilter;                     // ref's no table.
2003
2004         std::vector<std::string> key_flds;
2005
2006         std::vector<cnf_elem *> where;                          // all the filters
2007                                                                                                 // useful for summary analysis
2008
2009         std::vector<scalarexp_t *> hash_src_r, hash_src_l;
2010         std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
2011         std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
2012
2013
2014
2015         std::string node_type(){return("watch_join");   };
2016     bool makes_transform(){return true;};
2017         std::vector<std::string> external_libs(){
2018                 std::vector<std::string> ret;
2019                 return ret;
2020         }
2021
2022         void bind_to_schema(table_list *Schema);
2023         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
2024
2025         std::string to_query_string();
2026         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform){
2027                 fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_functor called\n");
2028                 exit(1);
2029         }
2030         std::string generate_functor_name(){
2031                 fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_functor_name called\n");
2032                 exit(1);
2033         }
2034         std::string generate_operator(int i, std::string params){
2035                 fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_operator called\n");
2036                 exit(1);
2037         }
2038         std::string get_include_file(){return("#include <watchlist_operator.h>\n");};
2039
2040     std::vector<select_element *> get_select_list(){return select_list;};
2041     std::vector<scalarexp_t *> get_select_se_list(){
2042                 std::vector<scalarexp_t *> ret;
2043                 int i;
2044                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
2045                 return ret;
2046         };
2047 //                      Used for LFTA only
2048         void append_to_where(cnf_elem *c){
2049                 where.push_back(c);
2050         }
2051
2052     std::vector<cnf_elem *> get_where_clause(){return where;}
2053
2054     std::vector<cnf_elem *> get_filter_clause(){return pred_t0;}
2055
2056         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
2057     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
2058
2059         table_def *get_fields();
2060 //              It should be feasible to find keys in a watchlist join
2061         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
2062                 std::vector<string> ret;
2063                 return ret;
2064         }
2065
2066         std::vector<tablevar_t *> get_input_tbls();
2067         std::vector<tablevar_t *> get_output_tbls();
2068
2069         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
2070         int resolve_if_params(ifq_t *ifdb, std::string &err);
2071
2072         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);
2073 //              Ensure that any refs to interface params have been split away.
2074         int count_ifp_refs(std::set<std::string> &ifpnames);
2075
2076 //              CONSTRUCTOR
2077         watch_join_qpn(){
2078         };
2079         watch_join_qpn(query_summary_class *qs,table_list *Schema){
2080                 int i,w;
2081 //                              Get the table name.
2082 //                              NOTE the colrefs have the table ref (an int)
2083 //                              embedded in them.  Would it make sense
2084 //                              to grab the whole table list?
2085                 from = qs->fta_tree->get_from()->get_table_list();
2086                 if(from.size() != 2){
2087                         char tmpstr[200];
2088                         sprintf(tmpstr,"ERROR building filter_join_qpn node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
2089                         err_str += tmpstr;
2090                         error_code = 1;
2091                 }
2092
2093                 int t = from[0]->get_schema_ref();
2094                 if(Schema->get_schema_type(t) != PROTOCOL_SCHEMA){
2095                         err_str += "ERROR in query "+node_name+", the LHS of the join must be a PROTOCOL\n";
2096                         error_code = 1;
2097                 }
2098                 t = from[1]->get_schema_ref();
2099                 if(Schema->get_schema_type(t) != WATCHLIST_SCHEMA){
2100                         err_str += "ERROR in query "+node_name+", the RHS of the join must be a WATCHLIST\n";
2101                         error_code = 1;
2102                 }
2103                 key_flds = Schema->get_table(t)->get_keys();
2104
2105
2106 //                              Get the select list.
2107                 select_list = qs->fta_tree->get_sl_vec();
2108
2109 //                              Get the selection predicate.
2110                 where = qs->wh_cnf;
2111                 std::vector<cnf_elem *> t0_only, t1_only;
2112                 for(w=0;w<where.size();++w){
2113                         analyze_cnf(where[w]);
2114                         std::vector<int> pred_tbls;
2115                         get_tablevar_ref_pr(where[w]->pr,pred_tbls);
2116 //                              Collect the list of preds by src var,
2117 //                              extract the shared preds later.
2118                         if(pred_tbls.size()==1){
2119                                 if(pred_tbls[0] == 0){
2120                                         pred_t0.push_back(where[w]);
2121                                 }else{
2122                                         pred_t1.push_back(where[w]);
2123                                 }
2124                                 continue;
2125                         }
2126 //                              refs nothing -- might be sampling, do it as postfilter.
2127                         if(pred_tbls.size()==0){
2128                                 postfilter.push_back(where[w]);
2129                                 continue;
2130                         }
2131
2132 //              Must reference both
2133 //                              See if it can be a hash predicate.
2134                         if(where[w]->is_atom && where[w]->eq_pred){
2135                                 std::vector<int> sel_tbls, ser_tbls;
2136                                 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
2137                                 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
2138                                 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
2139 //                                              make channel 0 SE on LHS.
2140                                         if(sel_tbls[0] != 0)
2141                                                 where[w]->swap_scalar_operands();
2142
2143 //              Must be simple (a colref) on the RHS
2144                                                 if(where[w]->r_simple){
2145                                                         string rcol = where[w]->pr->get_right_se()->get_colref()->get_field();
2146                                                         if(std::find(key_flds.begin(), key_flds.end(), rcol) != key_flds.end()){
2147                                                                 hash_eq[rcol] = where[w];
2148
2149                                                                 data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
2150                                                                 data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
2151                                                                 if( (dtl->is_increasing() && dtr->is_increasing()) || (dtl->is_decreasing() && dtr->is_decreasing()) )
2152                                                                         err_str += "Warning, a watchlist join should not have join predicates on temporal fields, query "+node_name+".\n";
2153                                                         continue;
2154                                                 }
2155                                         }
2156                                 }
2157                         }
2158 //                              All tests failed, fallback is join_filter.
2159                         join_filter.push_back(where[w]);
2160                 }
2161
2162                 if(key_flds.size() > hash_eq.size()){
2163                         err_str += "Error, in query "+node_name+" the watchlist join does not cover all fields in the watchlist with an equality predicate.  Missing fields are";
2164                         for(int k=0;k<key_flds.size();++k){
2165                                 if(hash_eq.count(key_flds[k]) < 1){
2166                                         err_str += " "+key_flds[k];
2167                                 }
2168                         }
2169                         err_str += ".\n";
2170                         error_code = 5;
2171                 }
2172                                         
2173
2174 //                              Get the parameters
2175                 param_tbl = qs->param_tbl;
2176                 definitions = qs->definitions;
2177
2178         };
2179
2180         // the following method is used for distributed query optimization
2181         double get_rate_estimate();
2182
2183
2184         qp_node* make_copy(std::string suffix){
2185                 watch_join_qpn *ret = new watch_join_qpn();
2186
2187                 ret->param_tbl = new param_table();
2188                 std::vector<std::string> param_names = param_tbl->get_param_names();
2189                 int pi;
2190                 for(pi=0;pi<param_names.size();pi++){
2191                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
2192                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
2193                                                         param_tbl->handle_access(param_names[pi]));
2194                 }
2195                 ret->definitions = definitions;
2196
2197                 ret->node_name = node_name + suffix;
2198
2199                 // make shallow copy of all fields
2200                 ret->where = where;
2201                 ret->from = from;
2202                 ret->select_list = select_list;
2203                 ret->key_flds = key_flds;
2204                 ret->pred_t0 = pred_t0;
2205                 ret->pred_t1 = pred_t1;
2206                 ret->join_filter = join_filter;
2207                 ret->postfilter = postfilter;
2208                 ret->hash_eq = hash_eq;
2209                 ret->hash_src_r = hash_src_r;
2210                 ret->hash_src_l = hash_src_l;
2211
2212                 return ret;
2213         };
2214
2215         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
2216
2217 };
2218
2219
2220
2221
2222 enum output_file_type_enum {regular, gzip, bzip};
2223
2224 class output_file_qpn: public qp_node{
2225 public:
2226         std::string source_op_name;                                     //      Source table
2227         std::vector<field_entry *> fields;
2228         ospec_str *output_spec;
2229         vector<tablevar_t *> fm;
2230         std::string hfta_query_name;
2231         std::string filestream_id;
2232         bool eat_input;
2233         std::vector<std::string> params;
2234         bool do_gzip;
2235         output_file_type_enum compression_type;
2236
2237         int n_streams;          // Number of output streams
2238         int n_hfta_clones;      // number of hfta clones
2239         int parallel_idx;       // which close this produces output for.
2240         std::vector<int> hash_flds;     // fields used to hash the output.
2241
2242         std::string node_type(){return("output_file_qpn");      };
2243     bool makes_transform(){return false;};
2244         std::vector<std::string> external_libs(){
2245                 std::vector<std::string> ret;
2246                 switch(compression_type){
2247                 case gzip:
2248                         ret.push_back("-lz");
2249                 break;
2250                 case bzip:
2251                         ret.push_back("-lbz2");
2252                 break;
2253                 default:
2254                 break;
2255                 }
2256                 return ret;
2257         }
2258
2259   void append_to_where(cnf_elem *c){
2260                 fprintf(stderr, "ERROR, append_to_where called on output_file_qpn, not supported, query %s.\n",  node_name.c_str());
2261                 exit(1);
2262         }
2263
2264
2265
2266         void bind_to_schema(table_list *Schema){}
2267         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
2268                 col_id_set ret;
2269                 return ret;
2270         }
2271
2272         std::string to_query_string(){return "// output_file_operator \n";}
2273         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
2274         std::string generate_functor_name();
2275         std::string generate_operator(int i, std::string params);
2276         std::string get_include_file(){
2277                 switch(compression_type){
2278                 case gzip:
2279                         return("#include <zfile_output_operator.h>\n");
2280                 default:
2281                         return("#include <file_output_operator.h>\n");
2282                 }
2283                 return("#include <file_output_operator.h>\n");
2284         };
2285
2286     std::vector<cnf_elem *> get_where_clause(){std::vector<cnf_elem *> ret; return ret;};
2287     std::vector<cnf_elem *> get_filter_clause(){std::vector<cnf_elem *> ret; return ret;};
2288         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){cplx_lit_table *ret = new cplx_lit_table(); return ret;}
2289     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns){std::vector<handle_param_tbl_entry *> ret; return ret;}
2290
2291         table_def *get_fields(){
2292                 field_entry_list *fel = new field_entry_list();
2293                 int i;
2294                 for(i=0;i<fields.size();++i)
2295                         fel->append_field(fields[i]);
2296                 return new table_def(node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
2297         }
2298
2299 //              TODO! either bypass the output operator in stream_query,
2300 //                      or propagate key information when the output operator is constructed.
2301         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
2302                 std::vector<string> ret;
2303                 return ret;
2304         }
2305
2306         std::vector<tablevar_t *> get_input_tbls();
2307         std::vector<tablevar_t *> get_output_tbls();
2308
2309         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
2310                 std::vector<qp_node *> ret; ret.push_back(this); hfta_returned = true; return ret;
2311         }
2312         std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm){
2313                 std::vector<table_exp_t *> ret; return ret;
2314         }
2315 //              Ensure that any refs to interface params have been split away.
2316         int count_ifp_refs(std::set<std::string> &ifpnames){return 0;}
2317         int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;};
2318
2319
2320         output_file_qpn(std::string src_op, std::string qn, std::string fs_id, table_def *src_tbl_def, ospec_str *ospec, bool ei){
2321                 source_op_name = src_op;
2322                 node_name = source_op_name + "_output";
2323                 filestream_id = fs_id;
2324                 fields = src_tbl_def->get_fields();
2325                 output_spec = ospec;
2326                 fm.push_back(new tablevar_t(source_op_name.c_str()));
2327                 hfta_query_name = qn;
2328                 eat_input = ei;
2329
2330 //                      TODO stream checking, but it requires passing Schema to output_file_qpn
2331 /*
2332                 for(int f=0;f<fm.size();++f){
2333                         int t=fm[f]->get_schema_ref();
2334                         if(! Schema->is_stream(t)){
2335                                 err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n";
2336                                 error_code = 1;
2337                         }
2338                 }
2339 */
2340
2341
2342                 do_gzip = false;
2343                 compression_type = regular;
2344                 if(ospec->operator_type == "zfile")
2345                         compression_type = gzip;
2346
2347                 n_streams = 1;
2348                 parallel_idx = 0;
2349                 n_hfta_clones = 1;
2350
2351                 char buf[1000];
2352                 strncpy(buf, output_spec->operator_param.c_str(),1000);
2353                 buf[999] = '\0';
2354                 char *words[100];
2355                 int nwords = split_string(buf, ':', words,100);
2356                 int i;
2357                 for(i=0;i<nwords;i++){
2358                         params.push_back(words[i]);
2359                 }
2360                 for(i=0;i<params.size();i++){
2361                         if(params[i] == "gzip")
2362                                 do_gzip = true;
2363                 }
2364         }
2365
2366 //              Set output splitting parameters
2367         bool set_splitting_params(int np, int ix, int ns, std::string split_flds, std::string &err_report){
2368                 n_streams = ns;
2369                 n_hfta_clones = np;
2370                 parallel_idx = ix;
2371
2372                 if(split_flds != ""){
2373                         string err_flds = "";
2374                         char *tmpstr = strdup(split_flds.c_str());
2375                         char *words[100];
2376                         int nwords = split_string(tmpstr,':',words,100);
2377                         int i,j;
2378                         for(i=0;i<nwords;++i){
2379                                 string target = words[i];
2380                                 for(j=0;j<fields.size();++j){
2381                                         if(fields[j]->get_name() == target){
2382                                                 hash_flds.push_back(j);
2383                                                 break;
2384                                         }
2385                                 }
2386                                 if(j==fields.size()){
2387                                         err_flds += " "+target;
2388                                 }
2389                         }
2390                         if(err_flds != ""){
2391                                 err_report += "ERROR in "+hfta_query_name+", a file output operator needs to split the output but these splitting fileds are not part of the output:"+err_flds;
2392                                 return true;
2393                         }
2394                 }
2395                 return false;
2396         }
2397
2398         // the following method is used for distributed query optimization
2399         double get_rate_estimate(){return 1.0;}
2400
2401
2402         qp_node* make_copy(std::string suffix){
2403 //              output_file_qpn *ret = new output_file_qpn();
2404                 output_file_qpn *ret = new output_file_qpn(source_op_name, hfta_query_name, filestream_id, this->get_fields(), output_spec, eat_input);
2405                 return ret;
2406         }
2407
2408         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){}
2409
2410 };
2411
2412
2413
2414 //
2415
2416 // ---------------------------------------------
2417
2418
2419 //              Select, group-by, aggregate, sampling.
2420 //              Representing
2421 //                      Select SE_1, ..., SE_k
2422 //                      From T
2423 //                      Where predicate
2424 //                      Group By gb1, ..., gb_n
2425 //                      [Subgroup gb_i1, .., gb_ik]
2426 //                      Cleaning_when  predicate
2427 //                      Cleaning_by predicate
2428 //                      Having predicate
2429 //
2430 //              For now, must have group-by variables and aggregates.
2431 //              The scalar expressions which are output must be a function
2432 //              of the groub-by variables and the aggregates.
2433 //              The group-by variables can be references to columsn of T,
2434 //              or they can be scalar expressions.
2435 class sgahcwcb_qpn: public qp_node{
2436 public:
2437         tablevar_t *table_name;                         // source table
2438         std::vector<cnf_elem *> where;          // selection predicate
2439         std::vector<cnf_elem *> having;         // post-aggregation predicate
2440         std::vector<select_element *> select_list;      // se's of output
2441         gb_table gb_tbl;                        // Table of all group-by attributes.
2442         std::set<int> sg_tbl;           // Names of the superGB attributes
2443         aggregate_table aggr_tbl;       // Table of all referenced aggregates.
2444         std::set<std::string> states_refd;      // states ref'd by stateful fcns.
2445         std::vector<cnf_elem *> cleanby;
2446         std::vector<cnf_elem *> cleanwhen;
2447
2448         std::vector<scalarexp_t *> gb_sources;  // pre-compute for partitioning.
2449
2450         std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
2451
2452         std::string node_type(){return("sgahcwcb_qpn"); };
2453     bool makes_transform(){return true;};
2454         std::vector<std::string> external_libs(){
2455                 std::vector<std::string> ret;
2456                 return ret;
2457         }
2458
2459         void bind_to_schema(table_list *Schema);
2460         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
2461                 fprintf(stderr,"INTERNAL ERROR, calling sgahcwcb_qpn::get_colrefs\n");
2462                 exit(1);
2463         }
2464
2465         std::string to_query_string();
2466         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
2467         std::string generate_functor_name();
2468
2469         std::string generate_operator(int i, std::string params);
2470         std::string get_include_file(){return("#include <clean_operator.h>\n");};
2471
2472     std::vector<select_element *> get_select_list(){return select_list;};
2473     std::vector<scalarexp_t *> get_select_se_list(){
2474                 std::vector<scalarexp_t *> ret;
2475                 int i;
2476                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
2477                 return ret;
2478         };
2479     std::vector<cnf_elem *> get_where_clause(){return where;};
2480     std::vector<cnf_elem *> get_filter_clause(){return where;};
2481     std::vector<cnf_elem *> get_having_clause(){return having;};
2482     gb_table *get_gb_tbl(){return &gb_tbl;};
2483     aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
2484         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
2485     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
2486
2487 //                              table which represents output tuple.
2488         table_def *get_fields();
2489 //                      TODO Key extraction should be feasible but I'll defer the issue.
2490         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
2491                 std::vector<string> ret;
2492                 return ret;
2493         }
2494
2495         std::vector<tablevar_t *> get_input_tbls();
2496         std::vector<tablevar_t *> get_output_tbls();
2497
2498   void append_to_where(cnf_elem *c){
2499                 where.push_back(c);
2500         }
2501
2502
2503         sgahcwcb_qpn(){
2504         };
2505         sgahcwcb_qpn(query_summary_class *qs,table_list *Schema){
2506 //                              Get the table name.
2507 //                              NOTE the colrefs have the tablevar ref (an int)
2508 //                              embedded in them.  Would it make sense
2509 //                              to grab the whole table list?
2510                 tablevar_list_t *fm = qs->fta_tree->get_from();
2511                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
2512                 if(tbl_vec.size() != 1){
2513                         char tmpstr[200];
2514                         sprintf(tmpstr,"INTERNAL ERROR building SGAHCWCB node: query defined over %lu tables.\n",tbl_vec.size() );
2515                         err_str=tmpstr;
2516                         error_code = 1;
2517                 }
2518                 table_name = (tbl_vec[0]);
2519
2520                 int t = tbl_vec[0]->get_schema_ref();
2521                 if(! Schema->is_stream(t)){
2522                         err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
2523                         error_code = 1;
2524                 }
2525
2526
2527 //                              Get the select list.
2528                 select_list = qs->fta_tree->get_sl_vec();
2529
2530 //                              Get the selection and having predicates.
2531                 where = qs->wh_cnf;
2532                 having = qs->hav_cnf;
2533                 cleanby = qs->cb_cnf;
2534                 cleanwhen = qs->cw_cnf;
2535
2536 //                              Build a new GB var table (don't share, might need to modify)
2537                 int g;
2538                 for(g=0;g<qs->gb_tbl->size();g++){
2539                         gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
2540                                 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
2541                                 qs->gb_tbl->get_reftype(g)
2542                         );
2543                 }
2544
2545 //                              Build a new aggregate table. (don't share, might need
2546 //                              to modify).
2547                 int a;
2548                 for(a=0;a<qs->aggr_tbl->size();a++){
2549                         aggr_tbl.add_aggr(
2550 //                              qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
2551                                 qs->aggr_tbl->duplicate(a)
2552                         );
2553                 }
2554
2555                 sg_tbl = qs->sg_tbl;
2556                 states_refd = qs->states_refd;
2557
2558
2559 //                              Get the parameters
2560                 param_tbl = qs->param_tbl;
2561
2562         };
2563
2564
2565
2566         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
2567         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
2568 //              Ensure that any refs to interface params have been split away.
2569 //                      CURRENTLY not allowed by split_node_for_fta
2570         int count_ifp_refs(std::set<std::string> &ifpnames){return 0;}
2571         int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;}
2572
2573         // the following method is used for distributed query optimization
2574         double get_rate_estimate();
2575
2576         qp_node* make_copy(std::string suffix){
2577                 sgahcwcb_qpn *ret = new sgahcwcb_qpn();
2578
2579                 ret->param_tbl = new param_table();
2580                 std::vector<std::string> param_names = param_tbl->get_param_names();
2581                 int pi;
2582                 for(pi=0;pi<param_names.size();pi++){
2583                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
2584                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
2585                                                         param_tbl->handle_access(param_names[pi]));
2586                 }
2587                 ret->definitions = definitions;
2588
2589                 ret->node_name = node_name + suffix;
2590
2591                 // make shallow copy of all fields
2592                 ret->where = where;
2593                 ret->having = having;
2594                 ret->select_list = select_list;
2595                 ret->gb_tbl = gb_tbl;
2596                 ret->aggr_tbl = aggr_tbl;
2597                 ret->sg_tbl = sg_tbl;
2598                 ret->states_refd = states_refd;
2599                 ret->cleanby = cleanby;
2600                 ret->cleanwhen = cleanwhen;
2601
2602                 return ret;
2603         };
2604
2605         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
2606 };
2607
2608
2609 std::vector<qp_node *> create_query_nodes(query_summary_class *qs,table_list *Schema);
2610
2611
2612
2613 void untaboo(string &s);
2614
2615 table_def *create_attributes(string tname, vector<select_element *> &select_list);
2616
2617
2618 #endif