ec782e0318cdcecd80372efba8cf080700d26e39
[com/gs-lite.git] / src / ftacmp / query_plan.h
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15 #ifndef __QUERY_PLAN_H__
16 #define __QUERY_PLAN_H__
17
18 #include<vector>
19 #include<string>
20 #include<map>
21 using namespace std;
22
23 #include"analyze_fta.h"
24 #include"iface_q.h"
25 #include"parse_partn.h"
26 #include"generate_utils.h"
27
28 //              Identify the format of the input, output streams.
29 #define UNKNOWNFORMAT 0
30 #define NETFORMAT 1
31 #define HOSTFORMAT 2
32
33 ///////////////////////////////////////////////////
34 //      representation of an output operator specification
35
36 struct ospec_str{
37         string query;
38         string operator_type;
39         string operator_param;
40         string output_directory;
41         int bucketwidth;
42         string partitioning_flds;
43         int n_partitions;
44 };
45
46
47 ////////////////////////////////////////////////////
48 //      Input representation of a query
49
50 struct query_node{
51         int idx;
52         std::set<int> reads_from;
53         std::set<int> sources_to;
54         std::vector<std::string> refd_tbls;
55         std::vector<var_pair_t *> params;
56         std::string name;
57         std::string file;
58         std::string mangler;            // for UDOPs
59         bool touched;
60         table_exp_t *parse_tree;
61         int n_consumers;
62         bool is_udop;
63         bool is_externally_visible;
64         bool inferred_visible_node;
65
66         set<int> subtree_roots;
67
68         query_node(){
69                 idx = -1;
70                 touched = false;
71                 parse_tree = NULL;
72                 n_consumers = 0;
73                 is_externally_visible = false;
74                 inferred_visible_node = false;
75                 mangler="";
76         };
77         query_node(int i, std::string qnm, std::string flnm, table_exp_t *pt){
78                 idx = i;
79                 touched = false;
80                 name = qnm;
81                 file = flnm;
82                 parse_tree = pt;
83                 n_consumers = 0;
84                 is_udop = false;
85                 is_externally_visible = pt->get_visible();
86                 inferred_visible_node = false;
87                 mangler="";
88
89                 tablevar_list_t *fm = parse_tree->get_from();
90                 if(fm!=NULL){
91                         refd_tbls =  fm->get_table_names();
92                 }
93
94                 params  = pt->query_params;
95         };
96         query_node(int ix, std::string udop_name,table_list *Schema){
97                 idx = ix;
98                 touched = false;
99                 name = udop_name;
100                 file = udop_name;
101                 parse_tree = NULL;
102                 n_consumers = 0;
103                 is_udop = true;
104                 is_externally_visible = true;
105                 inferred_visible_node = false;
106                 mangler="";
107
108                 int sid = Schema->find_tbl(udop_name);
109                 std::vector<subquery_spec *> subq = Schema->get_subqueryspecs(sid);
110                 int i;
111                 for(i=0;i<subq.size();++i){
112                         refd_tbls.push_back(subq[i]->name);
113                 }
114         };
115 };
116
117 struct hfta_node{
118         std::string name;
119         std::string source_name;
120         std::vector<int> query_node_indices;
121         std::set<int> reads_from;
122         std::set<int> sources_to;
123         bool is_udop;
124         bool inferred_visible_node;
125         int n_parallel;
126         int parallel_idx;
127         bool do_generation;     // false means, ignore it.
128
129         hfta_node(){
130                 is_udop = false;
131                 inferred_visible_node = false;
132                 n_parallel = 1;
133                 parallel_idx = 0;
134                 do_generation = true;
135         }
136 };
137
138
139
140
141
142
143 #define SPX_QUERY 1
144 #define SGAH_QUERY 2
145
146 // the following selectivity estimates are used by our primitive rate estimators
147 #define SPX_SELECTIVITY 1.0
148 #define SGAH_SELECTIVITY 0.1
149 #define RSGAH_SELECTIVITY 0.1
150 #define SGAHCWCB_SELECTIVITY 0.1
151 #define MRG_SELECTIVITY 1.0
152 #define JOIN_EQ_HASH_SELECTIVITY 1.0
153
154 // the the output rate of the interface is not given we are going to use
155 // this default value
156 #define DEFAULT_INTERFACE_RATE 100
157
158
159 //                      Define query plan nodes
160 //                      These nodes are intended for query modeling
161 //                      and transformation rather than for code generation.
162
163
164 //                      Query plan node base class.
165 //                      It has an ID, can return its type,
166 //                      and can be linked into lists with the predecessors
167 //                      and successors.
168 //                      To add : serialize, unserialize?
169
170 class qp_node{
171 public:
172   int id;
173   std::vector<int> predecessors;
174   std::vector<int> successors;
175   std::string node_name;
176
177 //              For error reporting without exiting the program.
178   int error_code;
179   std::string err_str;
180
181 //                      These should be moved to the containing stream_query object.
182   std::map<std::string, std::string> definitions;
183   param_table *param_tbl;
184
185 //              The value of a field in terms of protocol fields (if any).
186   std::map<std::string, scalarexp_t *> protocol_map;
187
188   qp_node(){
189         error_code = 0;
190         id = -1;
191         param_tbl = new param_table();
192   };
193   qp_node(int i){
194         error_code = 0;
195         id = i;
196         param_tbl = new param_table();
197   };
198
199   int get_id(){return(id);};
200   void set_id(int i){id = i;    };
201
202   int get_error_code(){return error_code;};
203   std::string get_error_str(){return err_str;};
204
205   virtual std::string node_type() = 0;
206
207 //              For code generation, does the operator xform its input.
208   virtual bool makes_transform() = 0;
209
210 //              For linking, what external libraries does the operator depend on?
211   virtual std::vector<std::string> external_libs() = 0;
212
213   void set_node_name(std::string n){node_name = n;};
214   std::string get_node_name(){return node_name;};
215
216   void set_definitions(std::map<std::string, std::string> &def){
217           definitions = def;
218   };
219   std::map<std::string, std::string> get_definitions(){return definitions;};
220
221
222 //              call to create the mapping from field name to se in protocol fields.
223 //              Pass in qp_node of data sources, in order.
224   virtual void create_protocol_se(std::vector<qp_node *> q_sources,table_list *Schema)=0;
225 //              get the protocol map.  the parameter is the return value.
226   std::map<std::string, scalarexp_t *> *get_protocol_se(){return &protocol_map;}
227
228 //              Each qp node must be able to return a description
229 //              of the tuples it creates.
230 //              TODO: the get_output_tbls method should subsume the get_fields
231 //                      method, but in fact it really just returns the
232 //                      operator name.
233   virtual table_def *get_fields() = 0;  // Should be vector?
234 //              get keys from the operator.  Currently, only works on group-by queries.
235 //              partial_keys set to true if there is a suspicion that the list is partial.
236         virtual std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys) = 0;
237 //              Get the from clause
238   virtual std::vector<tablevar_t *> get_input_tbls() = 0;
239 //              this is a confused function, it acutally return the output
240 //              table name.
241   virtual std::vector<tablevar_t *> get_output_tbls() = 0;
242
243   std::string get_val_of_def(std::string def){
244         if(definitions.count(def) > 0) return definitions[def];
245         return("");
246   };
247   void set_definition(std::string def, std::string val){
248         definitions[def]=val;
249   }
250
251 //              Associate colrefs in SEs with tables
252 //              at code generation time.
253   virtual void bind_to_schema(table_list *Schema) = 0;
254
255 //              Get colrefs of the operator, currently only meaningful for lfta
256 //              operators, and only interested in colrefs with extraction fcns
257   virtual col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema)=0;
258
259   virtual std::string to_query_string() = 0;
260   virtual std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform) = 0;
261   virtual std::string generate_functor_name() = 0;
262
263   virtual std::string generate_operator(int i, std::string params) = 0;
264   virtual std::string get_include_file() = 0;
265
266   virtual cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns) = 0;
267   virtual std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns) = 0;
268
269 //              Split this node into LFTA and HFTA nodes.
270 //              Four possible outcomes:
271 //              1) the qp_node reads from a protocol, but does not need to
272 //                      split (can be evaluated as an LFTA).
273 //                      The lfta node is the only element in the return vector,
274 //                      and hfta_returned is false.
275 //              2) the qp_node reads from no protocol, and therefore cannot be split.
276 //                      THe hfta node is the only element in the return vector,
277 //                      and hfta_returned is true.
278 //              3) reads from at least one protocol, but cannot be split : failure.
279 //                      return vector is empty, the error conditions are written
280 //                      in the qp_node.
281 //              4) The qp_node splits into an hfta node and one or more LFTA nodes.
282 //                      the return vector has two or more elements, and hfta_returned
283 //                      is true.  The last element is the HFTA.
284         virtual std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx) = 0;
285
286
287 //              Ensure that any refs to interface params have been split away.
288         virtual int count_ifp_refs(std::set<std::string> &ifpnames)=0;
289
290
291
292 //              Tag the data sources which are views,
293 //              return the (optimized) source queries and
294 //              record the view access in opview_set
295         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm) = 0;
296
297   param_table *get_param_tbl(){return param_tbl;};
298
299 //                      The "where" clause is a pre-filter
300   virtual  std::vector<cnf_elem *> get_where_clause() = 0;
301 //                      To be more explicit, use get_filter_preds, this is used to compute the prefilter
302   virtual  std::vector<cnf_elem *> get_filter_clause() = 0;
303
304 //              Add an extra predicate.  Currently only used for LFTAs.
305   virtual void append_to_where(cnf_elem *c) = 0;
306
307   void add_predecessor(int i){predecessors.push_back(i);};
308   void remove_predecessor(int i){
309         std::vector<int>::iterator vi;
310         for(vi=predecessors.begin(); vi!=predecessors.end();++vi){
311                 if((*vi) == i){
312                         predecessors.erase(vi);
313                         return;
314                 }
315         }
316   };
317   void add_successor(int i){successors.push_back(i);};
318   std::vector<int> get_predecessors(){return predecessors;};
319   int n_predecessors(){return predecessors.size();};
320   std::vector<int> get_successors(){return successors;};
321   int n_successors(){return successors.size();};
322   void clear_predecessors(){predecessors.clear();};
323   void clear_successors(){successors.clear();};
324
325   // the following method is used for distributed query optimization
326   double get_rate_estimate();
327
328
329   // used for cloning query nodes
330   virtual qp_node* make_copy(std::string suffix) = 0;
331 };
332
333
334
335 //              Select, project, transform (xform) query plan node.
336 //              represent the following query fragment
337 //                      select scalar_expression_1, ..., scalar_expression_k
338 //                      from S
339 //                      where predicate
340 //
341 //              the predicates and the scalar expressions can reference
342 //              attributes of S and also functions.
343 class spx_qpn: public qp_node{
344 public:
345         tablevar_t *table_name;                                 //      Source table
346         std::vector<cnf_elem *> where;                  // selection predicate
347         std::vector<select_element *> select_list;      //      Select list
348
349
350
351         std::string node_type(){return("spx_qpn");      };
352     bool makes_transform(){return true;};
353         std::vector<std::string> external_libs(){
354                 std::vector<std::string> ret;
355                 return ret;
356         }
357
358   void append_to_where(cnf_elem *c){
359                 where.push_back(c);
360         }
361
362
363         void bind_to_schema(table_list *Schema);
364         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
365
366         std::string to_query_string();
367         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
368         std::string generate_functor_name();
369         std::string generate_operator(int i, std::string params);
370         std::string get_include_file(){return("#include <selection_operator.h>\n");};
371
372     std::vector<select_element *> get_select_list(){return select_list;};
373     std::vector<scalarexp_t *> get_select_se_list(){
374                 std::vector<scalarexp_t *> ret;
375                 int i;
376                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
377                 return ret;
378         };
379     std::vector<cnf_elem *> get_where_clause(){return where;};
380     std::vector<cnf_elem *> get_filter_clause(){return where;};
381         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
382     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
383
384         table_def *get_fields();
385         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
386                 std::vector<string> ret;
387                 return ret;
388         }
389
390         std::vector<tablevar_t *> get_input_tbls();
391         std::vector<tablevar_t *> get_output_tbls();
392
393         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
394         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
395 //              Ensure that any refs to interface params have been split away.
396         int count_ifp_refs(std::set<std::string> &ifpnames);
397         int resolve_if_params(ifq_t *ifdb, std::string &err);
398
399         spx_qpn(){
400         };
401         spx_qpn(query_summary_class *qs,table_list *Schema){
402 //                              Get the table name.
403 //                              NOTE the colrefs have the table ref (an int)
404 //                              embedded in them.  Would it make sense
405 //                              to grab the whole table list?
406                 tablevar_list_t *fm = qs->fta_tree->get_from();
407                 
408                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
409                 if(tbl_vec.size() != 1){
410                         char tmpstr[200];
411                         sprintf(tmpstr,"INTERNAL ERROR building SPX node: query defined over %lu tables.\n",tbl_vec.size() );
412                         err_str = tmpstr;
413                         error_code = 1;
414                 }
415                 table_name = (tbl_vec[0]);
416
417                 int t = tbl_vec[0]->get_schema_ref();
418                 if(! Schema->is_stream(t)){
419                         err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
420                         error_code = 1;
421                 }
422
423 //                              Get the select list.
424                 select_list = qs->fta_tree->get_sl_vec();
425
426 //                              Get the selection predicate.
427                 where = qs->wh_cnf;
428
429
430 //                              Get the parameters
431                 param_tbl = qs->param_tbl;
432
433
434
435         };
436
437         // the following method is used for distributed query optimization
438         double get_rate_estimate();
439
440
441         qp_node* make_copy(std::string suffix){
442                 spx_qpn *ret = new spx_qpn();
443
444                 ret->param_tbl = new param_table();
445                 std::vector<std::string> param_names = param_tbl->get_param_names();
446                 int pi;
447                 for(pi=0;pi<param_names.size();pi++){
448                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
449                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
450                                                         param_tbl->handle_access(param_names[pi]));
451                 }
452                 ret->definitions = definitions;
453                 ret->node_name = node_name + suffix;
454
455                 // make shallow copy of all fields
456                 ret->where = where;
457                 ret->select_list = select_list;
458
459                 return ret;
460         };
461         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
462
463 };
464
465
466
467 //              Select, group-by, aggregate.
468 //              Representing
469 //                      Select SE_1, ..., SE_k
470 //                      From T
471 //                      Where predicate
472 //                      Group By gb1, ..., gb_n
473 //                      Having predicate
474 //
475 //              NOTE : the samlping operator is sgahcwcb_qpn.
476 //
477 //              For now, must have group-by variables and aggregates.
478 //              The scalar expressions which are output must be a function
479 //              of the groub-by variables and the aggregates.
480 //              The group-by variables can be references to columsn of T,
481 //              or they can be scalar expressions.
482 class sgah_qpn: public qp_node{
483 public:
484         tablevar_t *table_name;                         // source table
485         std::vector<cnf_elem *> where;          // selection predicate
486         std::vector<cnf_elem *> having;         // post-aggregation predicate
487         std::vector<select_element *> select_list;      // se's of output
488         gb_table gb_tbl;                        // Table of all group-by attributes.
489         aggregate_table aggr_tbl;       // Table of all referenced aggregates.
490
491         std::vector<scalarexp_t *> gb_sources;  // pre-compute for partitioning.
492
493         int lfta_disorder;              // maximum disorder in the steam between lfta, hfta
494         int hfta_disorder;              // maximum disorder in the  hfta
495
496 //              rollup, cube, and grouping_sets cannot be readily reconstructed by
497 //              analyzing the patterns, so explicitly record them here.
498 //              used only so that to_query_string produces something meaningful.
499         std::vector<std::string> gb_entry_type;
500         std::vector<int> gb_entry_count;
501
502         std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
503
504         std::string node_type(){return("sgah_qpn");     };
505     bool makes_transform(){return true;};
506         std::vector<std::string> external_libs(){
507                 std::vector<std::string> ret;
508                 return ret;
509         }
510
511         void bind_to_schema(table_list *Schema);
512         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
513
514         std::string to_query_string();
515         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
516         std::string generate_functor_name();
517
518         std::string generate_operator(int i, std::string params);
519         std::string get_include_file(){
520                         if(hfta_disorder <= 1){
521                                 return("#include <groupby_operator.h>\n");
522                         }else{
523                                 return("#include <groupby_operator_oop.h>\n");
524                         }
525         };
526
527     std::vector<select_element *> get_select_list(){return select_list;};
528     std::vector<scalarexp_t *> get_select_se_list(){
529                 std::vector<scalarexp_t *> ret;
530                 int i;
531                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
532                 return ret;
533         };
534     std::vector<cnf_elem *> get_where_clause(){return where;};
535
536   void append_to_where(cnf_elem *c){
537                 where.push_back(c);
538         }
539
540     std::vector<cnf_elem *> get_filter_clause(){return where;};
541     std::vector<cnf_elem *> get_having_clause(){return having;};
542     gb_table *get_gb_tbl(){return &gb_tbl;};
543     aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
544         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
545     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
546
547 //                              table which represents output tuple.
548         table_def *get_fields();
549         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys);
550         std::vector<tablevar_t *> get_input_tbls();
551         std::vector<tablevar_t *> get_output_tbls();
552
553
554         sgah_qpn(){
555                 lfta_disorder = 1;
556                 hfta_disorder = 1;
557         };
558         sgah_qpn(query_summary_class *qs,table_list *Schema){
559                 lfta_disorder = 1;
560                 hfta_disorder = 1;
561
562 //                              Get the table name.
563 //                              NOTE the colrefs have the tablevar ref (an int)
564 //                              embedded in them.  Would it make sense
565 //                              to grab the whole table list?
566                 tablevar_list_t *fm = qs->fta_tree->get_from();
567                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
568                 if(tbl_vec.size() != 1){
569                         char tmpstr[200];
570                         sprintf(tmpstr,"INTERNAL ERROR building SGAH node: query defined over %lu tables.\n",tbl_vec.size() );
571                         err_str=tmpstr;
572                         error_code = 1;
573                 }
574                 table_name = (tbl_vec[0]);
575
576                 int t = tbl_vec[0]->get_schema_ref();
577                 if(! Schema->is_stream(t)){
578                         err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
579                         error_code = 1;
580                 }
581
582
583 //                              Get the select list.
584                 select_list = qs->fta_tree->get_sl_vec();
585
586 //                              Get the selection and having predicates.
587                 where = qs->wh_cnf;
588                 having = qs->hav_cnf;
589
590 //                              Build a new GB var table (don't share, might need to modify)
591                 int g;
592                 for(g=0;g<qs->gb_tbl->size();g++){
593                         gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
594                                 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
595                                 qs->gb_tbl->get_reftype(g)
596                         );
597                 }
598                 gb_tbl.set_pattern_info(qs->gb_tbl);
599 //              gb_tbl.gb_entry_type = qs->gb_tbl->gb_entry_type;
600 //              gb_tbl.gb_entry_count = qs->gb_tbl->gb_entry_count;
601 //              gb_tbl.pattern_components = qs->gb_tbl->pattern_components;
602
603 //                              Build a new aggregate table. (don't share, might need
604 //                              to modify).
605                 int a;
606                 for(a=0;a<qs->aggr_tbl->size();a++){
607                         aggr_tbl.add_aggr(
608 //                              qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
609                                 qs->aggr_tbl->duplicate(a)
610                         );
611                 }
612
613
614 //                              Get the parameters
615                 param_tbl = qs->param_tbl;
616
617         };
618
619
620
621         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
622         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
623 //              Ensure that any refs to interface params have been split away.
624         int count_ifp_refs(std::set<std::string> &ifpnames);
625         int resolve_if_params(ifq_t *ifdb, std::string &err);
626
627         // the following method is used for distributed query optimization
628         double get_rate_estimate();
629
630
631         qp_node* make_copy(std::string suffix){
632                 sgah_qpn *ret = new sgah_qpn();
633
634                 ret->param_tbl = new param_table();
635                 std::vector<std::string> param_names = param_tbl->get_param_names();
636                 int pi;
637                 for(pi=0;pi<param_names.size();pi++){
638                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
639                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
640                                                         param_tbl->handle_access(param_names[pi]));
641                 }
642                 ret->definitions = definitions;
643
644                 ret->node_name = node_name + suffix;
645
646                 // make shallow copy of all fields
647                 ret->where = where;
648                 ret->having = having;
649                 ret->select_list = select_list;
650                 ret->gb_tbl = gb_tbl;
651                 ret->aggr_tbl = aggr_tbl;
652
653                 return ret;
654         };
655
656 //              Split aggregation into two HFTA components - sub and superaggregation
657 //              If unable to split the aggreagates, split into selection and aggregation
658 //              If resulting low-level query is empty (e.g. when aggregates cannot be split and
659 //              where clause is empty) empty vector willb e returned
660         virtual std::vector<qp_node *> split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema);
661
662         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
663
664 };
665
666
667
668
669 //              Select, group-by, aggregate. with running aggregates
670 //              Representing
671 //                      Select SE_1, ..., SE_k
672 //                      From T
673 //                      Where predicate
674 //                      Group By gb1, ..., gb_n
675 //                      Closing When predicate
676 //                      Having predicate
677 //
678 //              NOTE : the sampling operator is sgahcwcb_qpn.
679 //
680 //              For now, must have group-by variables and aggregates.
681 //              The scalar expressions which are output must be a function
682 //              of the groub-by variables and the aggregates.
683 //              The group-by variables can be references to columsn of T,
684 //              or they can be scalar expressions.
685 class rsgah_qpn: public qp_node{
686 public:
687         tablevar_t *table_name;                         // source table
688         std::vector<cnf_elem *> where;          // selection predicate
689         std::vector<cnf_elem *> having;         // post-aggregation predicate
690         std::vector<cnf_elem *> closing_when;           // group closing predicate
691         std::vector<select_element *> select_list;      // se's of output
692         gb_table gb_tbl;                        // Table of all group-by attributes.
693         aggregate_table aggr_tbl;       // Table of all referenced aggregates.
694
695         std::vector<scalarexp_t *> gb_sources;  // pre-compute for partitioning.
696
697         int lfta_disorder;              // maximum disorder allowed in stream between lfta, hfta
698         int hfta_disorder;              // maximum disorder allowed in hfta
699
700         std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
701
702
703         std::string node_type(){return("rsgah_qpn");    };
704     bool makes_transform(){return true;};
705         std::vector<std::string> external_libs(){
706                 std::vector<std::string> ret;
707                 return ret;
708         }
709
710         void bind_to_schema(table_list *Schema);
711         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
712                 fprintf(stderr,"INTERNAL ERROR, calling rsgah_qpn::get_colrefs\n");
713                 exit(1);
714         }
715
716         std::string to_query_string();
717         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
718         std::string generate_functor_name();
719
720         std::string generate_operator(int i, std::string params);
721         std::string get_include_file(){return("#include <running_gb_operator.h>\n");};
722
723     std::vector<select_element *> get_select_list(){return select_list;};
724     std::vector<scalarexp_t *> get_select_se_list(){
725                 std::vector<scalarexp_t *> ret;
726                 int i;
727                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
728                 return ret;
729         };
730     std::vector<cnf_elem *> get_where_clause(){return where;};
731   void append_to_where(cnf_elem *c){
732                 where.push_back(c);
733         }
734
735     std::vector<cnf_elem *> get_filter_clause(){return where;};
736     std::vector<cnf_elem *> get_having_clause(){return having;};
737     std::vector<cnf_elem *> get_closing_when_clause(){return closing_when;};
738     gb_table *get_gb_tbl(){return &gb_tbl;};
739     aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
740         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
741     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
742
743 //                              table which represents output tuple.
744         table_def *get_fields();
745         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys);
746
747         std::vector<tablevar_t *> get_input_tbls();
748         std::vector<tablevar_t *> get_output_tbls();
749
750
751         rsgah_qpn(){
752                 lfta_disorder = 1;
753                 hfta_disorder = 1;
754         };
755         rsgah_qpn(query_summary_class *qs,table_list *Schema){
756                 lfta_disorder = 1;
757                 hfta_disorder = 1;
758
759 //                              Get the table name.
760 //                              NOTE the colrefs have the tablevar ref (an int)
761 //                              embedded in them.  Would it make sense
762 //                              to grab the whole table list?
763                 tablevar_list_t *fm = qs->fta_tree->get_from();
764                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
765                 if(tbl_vec.size() != 1){
766                         char tmpstr[200];
767                         sprintf(tmpstr,"INTERNAL ERROR buildingR SGAH node: query defined over %lu tables.\n",tbl_vec.size() );
768                         err_str=tmpstr;
769                         error_code = 1;
770                 }
771                 table_name = (tbl_vec[0]);
772
773                 int t = tbl_vec[0]->get_schema_ref();
774                 if(! Schema->is_stream(t)){
775                         err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
776                         error_code = 1;
777                 }
778
779 //                              Get the select list.
780                 select_list = qs->fta_tree->get_sl_vec();
781
782 //                              Get the selection and having predicates.
783                 where = qs->wh_cnf;
784                 having = qs->hav_cnf;
785                 closing_when = qs->closew_cnf;
786
787 //                              Build a new GB var table (don't share, might need to modify)
788                 int g;
789                 for(g=0;g<qs->gb_tbl->size();g++){
790                         gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
791                                 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
792                                 qs->gb_tbl->get_reftype(g)
793                         );
794                 }
795
796 //                              Build a new aggregate table. (don't share, might need
797 //                              to modify).
798                 int a;
799                 for(a=0;a<qs->aggr_tbl->size();a++){
800                         aggr_tbl.add_aggr(
801 //                              qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
802                                 qs->aggr_tbl->duplicate(a)
803                         );
804                 }
805
806
807 //                              Get the parameters
808                 param_tbl = qs->param_tbl;
809
810         };
811
812
813
814         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
815         std::vector<qp_node *> split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema);
816         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
817 //              Ensure that any refs to interface params have been split away.
818         int count_ifp_refs(std::set<std::string> &ifpnames);
819         int resolve_if_params(ifq_t *ifdb, std::string &err){ return 0;}
820
821         // the following method is used for distributed query optimization
822         double get_rate_estimate();
823
824         qp_node* make_copy(std::string suffix){
825                 rsgah_qpn *ret = new rsgah_qpn();
826
827                 ret->param_tbl = new param_table();
828                 std::vector<std::string> param_names = param_tbl->get_param_names();
829                 int pi;
830                 for(pi=0;pi<param_names.size();pi++){
831                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
832                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
833                                                         param_tbl->handle_access(param_names[pi]));
834                 }
835                 ret->definitions = definitions;
836
837                 ret->node_name = node_name + suffix;
838
839                 // make shallow copy of all fields
840                 ret->where = where;
841                 ret->having = having;
842                 ret->closing_when = closing_when;
843                 ret->select_list = select_list;
844                 ret->gb_tbl = gb_tbl;
845                 ret->aggr_tbl = aggr_tbl;
846
847                 return ret;
848         };
849         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
850 };
851
852
853
854 //              Watchlist - from a table read from an external source.
855
856 class watch_tbl_qpn: public qp_node{
857 public:
858         table_def *table_layout;                                // the output schema
859         std::vector<std::string> key_flds;
860
861 //              Parameters related to loading the table
862         std::string filename;
863         int refresh_interval;
864
865         
866         void append_to_where(cnf_elem *c){
867                 fprintf(stderr, "ERROR, append_to_where called on watch_tbl_qpn, not supported, query %s.\n",  node_name.c_str());
868                 exit(1);
869         }
870
871         std::string node_type(){return("watch_tbl_qpn");        };
872     bool makes_transform(){return false;};
873         std::vector<std::string> external_libs(){
874                 std::vector<std::string> ret;
875                 return ret;
876         }
877
878         void bind_to_schema(table_list *Schema){}
879         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
880                 col_id_set ret;
881                 return ret;
882         }
883
884         std::string to_query_string();
885         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
886         std::string generate_functor_name();
887         std::string generate_operator(int i, std::string params);
888         std::string get_include_file(){
889                 return("#include <watchlist_tbl.h>\n");
890         };
891
892         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
893     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
894
895         table_def *get_fields();
896         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
897                 return key_flds;
898         }
899
900         std::vector<tablevar_t *> get_input_tbls();
901         std::vector<tablevar_t *> get_output_tbls();
902
903         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
904         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);
905 //              Ensure that any refs to interface params have been split away.
906         int count_ifp_refs(std::set<std::string> &ifpnames);
907
908 //                      No predicates, return an empty clause
909     std::vector<cnf_elem *> get_where_clause(){
910                  std::vector<cnf_elem *> t;
911                 return(t);
912         };
913     std::vector<cnf_elem *> get_filter_clause(){
914                 return get_where_clause();
915         }
916
917         watch_tbl_qpn(){
918         };
919
920         watch_tbl_qpn(query_summary_class *qs,table_list *Schema){
921                 node_name=qs->query_name;
922                 param_tbl = qs->param_tbl;
923                 definitions = qs->definitions;
924                 
925                 
926 //                      Populate the schema
927                 table_layout = new table_def(
928                         node_name.c_str(), NULL, NULL,  qs->fta_tree->fel, WATCHLIST_SCHEMA
929                 );
930
931 //                      Find the keys
932                 std::vector<field_entry *> flds = qs->fta_tree->fel->get_list();
933                 for(int f=0;f<flds.size();++f){
934                         if(flds[f]->get_modifier_list()->contains_key("key") ||
935                                 flds[f]->get_modifier_list()->contains_key("Key") ||
936                                 flds[f]->get_modifier_list()->contains_key("KEY") ){
937                                         key_flds.push_back(flds[f]->get_name());
938                         }
939                 }
940                 if(key_flds.size()==0){
941                         fprintf(stderr,"Error, no key fields defined for table watchlist %s\n",node_name.c_str());
942                         exit(1);
943                 }
944
945                 table_layout->set_keys(key_flds);       // communicate keys to consumers
946
947 //                      Get loading parameters
948                 if(definitions.count("filename")>0){
949                         filename = definitions["filename"];
950                 }else{
951                         fprintf(stderr, "Error, no filename for source data defined for table watchlist %s\n",node_name.c_str());
952                         exit(1);
953                 }
954                 if(definitions.count("refresh_interval")>0){
955                         refresh_interval = atoi(definitions["refresh_interval"].c_str());
956                         if(refresh_interval <= 0){
957                                 fprintf(stderr, "Error, the refresh_interval (%s) of table watchlist %s must be a positive non-zero integer.\n",definitions["refresh_interval"].c_str(), node_name.c_str());
958                                 exit(1);
959                         }
960                 }else{
961                         fprintf(stderr, "Error, no refresh_interval defined for table watchlist %s\n",node_name.c_str());
962                         exit(1);
963                 }
964
965         }
966
967         qp_node *make_copy(std::string suffix){
968                 watch_tbl_qpn *ret = new watch_tbl_qpn();
969                 ret->filename = filename;
970                 ret->refresh_interval = refresh_interval;
971                 ret->key_flds = key_flds;
972
973                 ret->param_tbl = new param_table();
974                 std::vector<std::string> param_names = param_tbl->get_param_names();
975                 int pi;
976                 for(pi=0;pi<param_names.size();pi++){
977                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
978                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
979                                                         param_tbl->handle_access(param_names[pi]));
980                 }
981                 ret->definitions = definitions;
982
983                 ret->node_name = node_name + suffix;
984                 ret->table_layout = table_layout->make_shallow_copy(ret->node_name);
985
986                 return ret;
987         };
988
989         // the following method is used for distributed query optimization
990         double get_rate_estimate();
991
992         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
993         
994
995 };
996
997
998
999
1000
1001
1002 //              forward reference
1003 class filter_join_qpn;
1004 class watch_join_qpn;
1005
1006
1007 //              (temporal) Merge query plan node.
1008 //              represent the following query fragment
1009 //                      Merge c1:c2
1010 //                      from T1 _t1, T2 _t2
1011 //
1012 //              T1 and T2 must have compatible schemas,
1013 //              that is the same types in the same slots.
1014 //              c1 and c2 must be colrefs from T1 and T2,
1015 //              both ref'ing the same slot.  Their types
1016 //              must be temporal and the same kind of temporal.
1017 //              in the output, no other field is temporal.
1018 //              the field names ofthe output are drawn from T1.
1019 class mrg_qpn: public qp_node{
1020 public:
1021         std::vector<tablevar_t *> fm;                                   //      Source table
1022         std::vector<colref_t *> mvars;                                  // the merge-by columns.
1023         scalarexp_t *slack;
1024
1025         table_def *table_layout;                                // the output schema
1026         int merge_fieldpos;                                             // position of merge field,
1027                                                                                         // convenience for manipulation.
1028
1029         int disorder;           // max disorder seen in the input / allowed in the output
1030
1031
1032         // partition definition for merges that combine streams partitioned over multiple interfaces
1033         partn_def_t* partn_def;
1034
1035
1036   void append_to_where(cnf_elem *c){
1037                 fprintf(stderr, "ERROR, append_to_where called on mrg_qpn, not supported, query %s.\n",  node_name.c_str());
1038                 exit(1);
1039         }
1040
1041
1042
1043         std::string node_type(){return("mrg_qpn");      };
1044     bool makes_transform(){return false;};
1045         std::vector<std::string> external_libs(){
1046                 std::vector<std::string> ret;
1047                 return ret;
1048         }
1049
1050         void bind_to_schema(table_list *Schema);
1051         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
1052                 fprintf(stderr,"INTERNAL ERROR, calling mrg_qpn::get_colrefs\n");
1053                 exit(1);
1054         }
1055
1056         std::string to_query_string();
1057         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
1058         std::string generate_functor_name();
1059         std::string generate_operator(int i, std::string params);
1060         std::string get_include_file(){
1061                 if(disorder>1)
1062                         return("#include <merge_operator_oop.h>\n");
1063                 return("#include <merge_operator.h>\n");
1064         };
1065
1066         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1067     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1068
1069         table_def *get_fields();
1070         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
1071                 std::vector<string> ret;
1072                 return ret;
1073         }
1074
1075         std::vector<tablevar_t *> get_input_tbls();
1076         std::vector<tablevar_t *> get_output_tbls();
1077
1078         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
1079         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);
1080 //              Ensure that any refs to interface params have been split away.
1081         int count_ifp_refs(std::set<std::string> &ifpnames);
1082
1083 //                      No predicates, return an empty clause
1084     std::vector<cnf_elem *> get_where_clause(){
1085                  std::vector<cnf_elem *> t;
1086                 return(t);
1087         };
1088     std::vector<cnf_elem *> get_filter_clause(){
1089                 return get_where_clause();
1090         }
1091
1092         mrg_qpn(){
1093                 partn_def = NULL;
1094         };
1095
1096         void set_disorder(int d){
1097                 disorder = d;
1098         }
1099
1100         mrg_qpn(query_summary_class *qs,table_list *Schema){
1101                 disorder = 1;
1102
1103 //                              Grab the elements of the query node.
1104                 fm = qs->fta_tree->get_from()->get_table_list();
1105                 mvars = qs->mvars;
1106                 slack = qs->slack;
1107
1108 //                      sanity check
1109                 if(fm.size() != mvars.size()){
1110                         fprintf(stderr,"INTERNAL ERROR in mrg_qpn::mrg_qpn.  fm.size() = %lu, mvars.size() = %lu\n",fm.size(),mvars.size());
1111                         exit(1);
1112                 }
1113
1114                 for(int f=0;f<fm.size();++f){
1115                         int t=fm[f]->get_schema_ref();
1116                         if(! Schema->is_stream(t)){
1117                                 err_str += "ERROR in query "+node_name+", the source "+fm[f]->get_schema_name()+" is not a stream.\n";
1118                                 error_code = 1;
1119                         }
1120                 }
1121
1122 //                              Get the parameters
1123                 param_tbl = qs->param_tbl;
1124
1125 //                              Need to set the node name now, so that the
1126 //                              schema (table_layout) can be properly named.
1127 //                              TODO: Setting the name of the table might best be done
1128 //                              via the set_node_name method, because presumably
1129 //                              thats when the node name is really known.
1130 //                              This should propogate to the table_def table_layout
1131                 node_name=qs->query_name;
1132
1133 /*
1134 int ff;
1135 printf("instantiating merge node, name = %s, %d sources.\n\t",node_name.c_str(), fm.size());
1136 for(ff=0;ff<fm.size();++ff){
1137 printf("%s ",fm[ff]->to_string().c_str());
1138 }
1139 printf("\n");
1140 */
1141
1142
1143 //              Create the output schema.
1144 //              strip temporal properites form all fields except the merge field.
1145                 std::vector<field_entry *> flva = Schema->get_fields(fm[0]->get_schema_name());
1146                 field_entry_list *fel = new field_entry_list();
1147                 int f;
1148                 for(f=0;f<flva.size();++f){
1149                         field_entry *fe;
1150                         data_type dt(flva[f]->get_type().c_str(), flva[f]->get_modifier_list());
1151                         if(flva[f]->get_name() == mvars[0]->get_field()){
1152                                 merge_fieldpos = f;
1153 //                              if(slack != NULL) dt.reset_temporal();
1154                         }else{
1155                                 dt.reset_temporal();
1156                         }
1157
1158                         param_list *plist = new param_list();
1159                         std::vector<std::string> param_strings = dt.get_param_keys();
1160                         int p;
1161                         for(p=0;p<param_strings.size();++p){
1162                                 std::string v = dt.get_param_val(param_strings[p]);
1163                                 if(v != "")
1164                                         plist->append(param_strings[p].c_str(),v.c_str());
1165                                 else
1166                                         plist->append(param_strings[p].c_str());
1167                         }
1168
1169
1170                         fe=new field_entry(
1171                                 dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist, flva[f]->get_unpack_fcns());
1172                         fel->append_field(fe);
1173                 }
1174
1175
1176
1177
1178                 table_layout = new table_def(
1179                         node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA
1180                 );
1181
1182                 partn_def = NULL;
1183         };
1184
1185
1186 /////////////////////////////////////////////
1187 ///             Created for de-siloing.  to be removed?  or is it otherwise useful?
1188 //              Merge existing set of sources (de-siloing)
1189         mrg_qpn(std::string n_name, std::vector<std::string> &src_names,table_list *Schema){
1190                 int i,f;
1191
1192                 disorder = 1;
1193
1194 //                              Construct the fm list
1195                 for(f=0;f<src_names.size();++f){
1196                         int tbl_ref = Schema->get_table_ref(src_names[f]);
1197                         if(tbl_ref < 0){
1198                                 fprintf(stderr,"INTERNAL ERROR, can't find %s in the schema when constructing no-silo merge node %s\n",src_names[f].c_str(), n_name.c_str());
1199                                 exit(1);
1200                         }
1201                         table_def *src_tbl = Schema->get_table(tbl_ref);
1202                         tablevar_t *fm_t = new tablevar_t(src_names[f].c_str());
1203                         string range_name = "_t" + int_to_string(f);
1204                         fm_t->set_range_var(range_name);
1205                         fm_t->set_schema_ref(tbl_ref);
1206                         fm.push_back(fm_t);
1207                 }
1208
1209 //              Create the output schema.
1210 //              strip temporal properites form all fields except the merge field.
1211                 std::vector<field_entry *> flva = Schema->get_fields(fm[0]->get_schema_name());
1212                 field_entry_list *fel = new field_entry_list();
1213                 bool temporal_found = false;
1214                 for(f=0;f<flva.size();++f){
1215                         field_entry *fe;
1216                         data_type dt(flva[f]->get_type().c_str(), flva[f]->get_modifier_list());
1217                         if(dt.is_temporal() && !temporal_found){
1218                                 merge_fieldpos = f;
1219                                 temporal_found = true;
1220                         }else{
1221                                 dt.reset_temporal();
1222                         }
1223
1224                         param_list *plist = new param_list();
1225                         std::vector<std::string> param_strings = dt.get_param_keys();
1226                         int p;
1227                         for(p=0;p<param_strings.size();++p){
1228                                 std::string v = dt.get_param_val(param_strings[p]);
1229                                 if(v != "")
1230                                         plist->append(param_strings[p].c_str(),v.c_str());
1231                                 else
1232                                         plist->append(param_strings[p].c_str());
1233                         }
1234
1235                         fe=new field_entry(
1236                                 dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist,
1237                                 flva[f]->get_unpack_fcns()
1238                         );
1239                         fel->append_field(fe);
1240                 }
1241
1242                 if(! temporal_found){
1243                         fprintf(stderr,"ERROR, can't find temporal field of the sources when constructing no-silo merge node %s\n",n_name.c_str());
1244                         exit(1);
1245                 }
1246
1247                 node_name=n_name;
1248                 table_layout = new table_def(
1249                         node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA
1250                 );
1251
1252                 partn_def = NULL;
1253                 param_tbl = new param_table();
1254
1255 //                      Construct mvars
1256                 for(f=0;f<fm.size();++f){
1257                         std::vector<field_entry *> flv_f = Schema->get_fields(fm[f]->get_schema_name());
1258                         data_type dt_f(flv_f[merge_fieldpos]->get_type().c_str(),
1259                              flva[merge_fieldpos]->get_modifier_list());
1260
1261                         colref_t *mcr = new colref_t(fm[f]->get_var_name().c_str(),
1262                                 flv_f[merge_fieldpos]->get_name().c_str());
1263                         mvars.push_back(mcr);
1264                 }
1265
1266 //              literal_t *s_lit = new literal_t("5",LITERAL_INT);
1267 //              slack = new scalarexp_t(s_lit);
1268                 slack = NULL;
1269
1270         };
1271 //                      end de-siloing
1272 ////////////////////////////////////////
1273
1274         void resolve_slack(scalarexp_t *t_se, std::string fname, std::vector<std::pair<std::string, std::string> > &sources,ifq_t *ifdb, gb_table *gbt);
1275
1276
1277 //                      Merge filter_join LFTAs.
1278
1279         mrg_qpn(filter_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb);
1280
1281 //                      Merge watch_join LFTAs.
1282
1283         mrg_qpn(watch_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb);
1284
1285 //                      Merge selection LFTAs.
1286
1287         mrg_qpn(spx_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb){
1288
1289                 disorder = 1;
1290
1291                 param_tbl = spx->param_tbl;
1292                 int i;
1293                 node_name = n_name;
1294                 field_entry_list *fel = new field_entry_list();
1295                 merge_fieldpos = -1;
1296
1297
1298
1299
1300                 for(i=0;i<spx->select_list.size();++i){
1301                         data_type *dt = spx->select_list[i]->se->get_data_type()->duplicate();
1302                         if(dt->is_temporal()){
1303                                 if(merge_fieldpos < 0){
1304                                         merge_fieldpos = i;
1305                                 }else{
1306                                         fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), spx->select_list[merge_fieldpos]->name.c_str(), spx->select_list[i]->name.c_str(), spx->select_list[merge_fieldpos]->name.c_str() );
1307                                         dt->reset_temporal();
1308                                 }
1309                         }
1310
1311                         field_entry *fe = dt->make_field_entry(spx->select_list[i]->name);
1312                         fel->append_field(fe);
1313                         delete dt;
1314                 }
1315                 if(merge_fieldpos<0){
1316                         fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
1317                                 exit(1);
1318                 }
1319                 table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1320
1321 //                              NEED TO HANDLE USER_SPECIFIED SLACK
1322                 this->resolve_slack(spx->select_list[merge_fieldpos]->se,
1323                                 spx->select_list[merge_fieldpos]->name, ifaces, ifdb,NULL);
1324 //      if(this->slack == NULL)
1325 //              fprintf(stderr,"Zero slack.\n");
1326 //      else
1327 //              fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
1328
1329                 for(i=0;i<sources.size();i++){
1330                         std::string rvar = "_m"+int_to_string(i);
1331                         mvars.push_back(new colref_t(rvar.c_str(), spx->select_list[merge_fieldpos]->name.c_str()));
1332                         mvars[i]->set_tablevar_ref(i);
1333                         fm.push_back(new tablevar_t(sources[i].c_str()));
1334                         fm[i]->set_range_var(rvar);
1335                 }
1336
1337                 param_tbl = new param_table();
1338                 std::vector<std::string> param_names = spx->param_tbl->get_param_names();
1339                 int pi;
1340                 for(pi=0;pi<param_names.size();pi++){
1341                         data_type *dt = spx->param_tbl->get_data_type(param_names[pi]);
1342                         param_tbl->add_param(param_names[pi],dt->duplicate(),
1343                                                         spx->param_tbl->handle_access(param_names[pi]));
1344                 }
1345                 definitions = spx->definitions;
1346
1347         }
1348
1349 //              Merge aggregation LFTAs
1350
1351         mrg_qpn(sgah_qpn *sgah, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair< std::string, std::string> > &ifaces, ifq_t *ifdb){
1352
1353                 disorder = 1;
1354
1355                 param_tbl = sgah->param_tbl;
1356                 int i;
1357                 node_name = n_name;
1358                 field_entry_list *fel = new field_entry_list();
1359                 merge_fieldpos = -1;
1360                 for(i=0;i<sgah->select_list.size();++i){
1361                         data_type *dt = sgah->select_list[i]->se->get_data_type()->duplicate();
1362                         if(dt->is_temporal()){
1363                                 if(merge_fieldpos < 0){
1364                                         merge_fieldpos = i;
1365                                 }else{
1366                                         fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str(), sgah->select_list[i]->name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str() );
1367                                         dt->reset_temporal();
1368                                 }
1369                         }
1370
1371                         field_entry *fe = dt->make_field_entry(sgah->select_list[i]->name);
1372                         fel->append_field(fe);
1373                         delete dt;
1374                 }
1375                 if(merge_fieldpos<0){
1376                         fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
1377                         exit(1);
1378                 }
1379                 table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
1380
1381 //                              NEED TO HANDLE USER_SPECIFIED SLACK
1382                 this->resolve_slack(sgah->select_list[merge_fieldpos]->se,
1383                                 sgah->select_list[merge_fieldpos]->name, ifaces, ifdb,
1384                                 &(sgah->gb_tbl));
1385                 if(this->slack == NULL)
1386                         fprintf(stderr,"Zero slack.\n");
1387                 else
1388                         fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
1389
1390
1391                 for(i=0;i<sources.size();i++){
1392                         std::string rvar = "_m"+int_to_string(i);
1393                         mvars.push_back(new colref_t(rvar.c_str(), sgah->select_list[merge_fieldpos]->name.c_str()));
1394                         mvars[i]->set_tablevar_ref(i);
1395                         fm.push_back(new tablevar_t(sources[i].c_str()));
1396                         fm[i]->set_range_var(rvar);
1397                 }
1398
1399                 param_tbl = new param_table();
1400                 std::vector<std::string> param_names = sgah->param_tbl->get_param_names();
1401                 int pi;
1402                 for(pi=0;pi<param_names.size();pi++){
1403                         data_type *dt = sgah->param_tbl->get_data_type(param_names[pi]);
1404                         param_tbl->add_param(param_names[pi],dt->duplicate(),
1405                                                         sgah->param_tbl->handle_access(param_names[pi]));
1406                 }
1407                 definitions = sgah->definitions;
1408
1409         }
1410
1411         qp_node *make_copy(std::string suffix){
1412                 mrg_qpn *ret = new mrg_qpn();
1413                 ret->slack = slack;
1414                 ret->disorder = disorder;
1415
1416                 ret->param_tbl = new param_table();
1417                 std::vector<std::string> param_names = param_tbl->get_param_names();
1418                 int pi;
1419                 for(pi=0;pi<param_names.size();pi++){
1420                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
1421                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1422                                                         param_tbl->handle_access(param_names[pi]));
1423                 }
1424                 ret->definitions = definitions;
1425
1426                 ret->node_name = node_name + suffix;
1427                 ret->table_layout = table_layout->make_shallow_copy(ret->node_name);
1428                 ret->merge_fieldpos = merge_fieldpos;
1429
1430                 return ret;
1431         };
1432
1433         std::vector<mrg_qpn *> split_sources();
1434
1435         // the following method is used for distributed query optimization
1436         double get_rate_estimate();
1437
1438
1439         // get partition definition for merges that combine streams partitioned over multiple interfaces
1440         // return NULL for regular merges
1441         partn_def_t* get_partn_definition(map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result) {
1442                 if (partn_def)
1443                         return partn_def;
1444
1445                 int err;
1446                 string err_str;
1447                 string partn_name;
1448
1449                 vector<tablevar_t *> input_tables = get_input_tbls();
1450                 for (int i = 0; i <  input_tables.size(); ++i) {
1451                         tablevar_t * table = input_tables[i];
1452
1453                         vector<string> partn_names = ifaces_db->get_iface_vals(table->get_machine(), table->get_interface(),"iface_partition",err,err_str);
1454                         if (partn_names.size() != 1)    // can't have more than one value of partition attribute
1455                                 return NULL;
1456                         string new_partn_name = partn_names[0];
1457
1458                         // need to make sure that all ifaces belong to the same partition
1459                         if (!i)
1460                                 partn_name = new_partn_name;
1461                         else if (new_partn_name != partn_name)
1462                                 return NULL;
1463                 }
1464
1465                 // now find partition definition corresponding to partn_name
1466                 partn_def = partn_parse_result->get_partn_def(partn_name);
1467                 return partn_def;
1468         };
1469
1470         void set_partn_definition(partn_def_t* def) {
1471                 partn_def = def;
1472         }
1473
1474         bool is_multihost_merge() {
1475
1476                 bool is_multihost = false;
1477
1478                 // each input table must be have machine attribute be non-empty
1479                 // and there should be at least 2 different values of machine attributes
1480                 vector<tablevar_t *> input_tables = get_input_tbls();
1481                 string host = input_tables[0]->get_machine();
1482                 for  (int i = 1; i < input_tables.size(); ++i) {
1483                         string new_host = input_tables[i]->get_machine();
1484                         if (new_host == "")
1485                                 return false;
1486                         if (new_host != host)
1487                                 is_multihost = true;
1488                 }
1489                 return is_multihost;
1490         }
1491
1492         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1493 };
1494
1495
1496 //              eq_temporal, hash join query plan node.
1497 //              represent the following query fragment
1498 //                      select scalar_expression_1, ..., scalar_expression_k
1499 //                      from T0 t0, T1 t1
1500 //                      where predicate
1501 //
1502 //              the predicates and the scalar expressions can reference
1503 //              attributes of t0 and t1 and also functions.
1504 //              The predicate must contain CNF elements to enable the
1505 //              efficient evaluation of the query.
1506 //              1) at least one predicate of the form
1507 //                      (temporal se in t0) = (temporal se in t1)
1508 //              2) at least one predicate of the form
1509 //                      (non-temporal se in t0) = (non-temporal se in t1)
1510 //
1511 class join_eq_hash_qpn: public qp_node{
1512 public:
1513         std::vector<tablevar_t *> from;                                 //      Source tables
1514         std::vector<select_element *> select_list;      //      Select list
1515         std::vector<cnf_elem *> prefilter[2];           // source prefilters
1516         std::vector<cnf_elem *> temporal_eq;            // define temporal window
1517         std::vector<cnf_elem *> hash_eq;                        // define hash key
1518         std::vector<cnf_elem *> postfilter;                     // final filter on hash matches.
1519
1520         std::vector<cnf_elem *> where;                          // all the filters
1521                                                                                                 // useful for summary analysis
1522
1523         std::vector<scalarexp_t *> hash_src_r, hash_src_l;
1524
1525         std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
1526         std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
1527
1528         std::string node_type(){return("join_eq_hash_qpn");     };
1529     bool makes_transform(){return true;};
1530         std::vector<std::string> external_libs(){
1531                 std::vector<std::string> ret;
1532                 return ret;
1533         }
1534
1535         void bind_to_schema(table_list *Schema);
1536         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
1537                 fprintf(stderr,"INTERNAL ERROR, calling join_eq_hash_qpn::get_colrefs\n");
1538                 exit(1);
1539         }
1540
1541   void append_to_where(cnf_elem *c){
1542                 fprintf(stderr, "Error, append_to_where called on join_hash_qpn, not supported, query is %s\n",node_name.c_str());
1543                 exit(1);
1544         }
1545
1546
1547         std::string to_query_string();
1548         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
1549         std::string generate_functor_name();
1550         std::string generate_operator(int i, std::string params);
1551         std::string get_include_file(){return("#include <join_eq_hash_operator.h>\n");};
1552
1553     std::vector<select_element *> get_select_list(){return select_list;};
1554     std::vector<scalarexp_t *> get_select_se_list(){
1555                 std::vector<scalarexp_t *> ret;
1556                 int i;
1557                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
1558                 return ret;
1559         };
1560 //                      Used for LFTA only
1561     std::vector<cnf_elem *> get_where_clause(){
1562                  std::vector<cnf_elem *> t;
1563                 return(t);
1564         };
1565     std::vector<cnf_elem *> get_filter_clause(){
1566                 return get_where_clause();
1567         }
1568
1569         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1570     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1571
1572         table_def *get_fields();
1573
1574 //              It might be feasible to find keys in an equijoin expression.
1575         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
1576                 std::vector<string> ret;
1577                 return ret;
1578         }
1579
1580         std::vector<tablevar_t *> get_input_tbls();
1581         std::vector<tablevar_t *> get_output_tbls();
1582
1583         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
1584         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);
1585 //              Ensure that any refs to interface params have been split away.
1586         int count_ifp_refs(std::set<std::string> &ifpnames);
1587
1588         join_eq_hash_qpn(){
1589         };
1590         join_eq_hash_qpn(query_summary_class *qs,table_list *Schema){
1591                 int w;
1592 //                              Get the table name.
1593 //                              NOTE the colrefs have the table ref (an int)
1594 //                              embedded in them.  Would it make sense
1595 //                              to grab the whole table list?
1596                 from = qs->fta_tree->get_from()->get_table_list();
1597                 if(from.size() != 2){
1598                         char tmpstr[200];
1599                         sprintf(tmpstr,"ERROR building join_eq_hash node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
1600                         err_str = tmpstr;
1601                         error_code = 1;
1602                 }
1603
1604                 for(int f=0;f<from.size();++f){
1605                         int t=from[f]->get_schema_ref();
1606                         if(! Schema->is_stream(t)){
1607                                 err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n";
1608                                 error_code = 1;
1609                         }
1610                 }
1611
1612
1613 //                              Get the select list.
1614                 select_list = qs->fta_tree->get_sl_vec();
1615
1616 //                              Get the selection predicate.
1617                 where = qs->wh_cnf;
1618                 for(w=0;w<where.size();++w){
1619                         analyze_cnf(where[w]);
1620                         std::vector<int> pred_tbls;
1621                         get_tablevar_ref_pr(where[w]->pr,pred_tbls);
1622 //                              Prefilter if refs only one tablevar
1623                         if(pred_tbls.size()==1){
1624                                 prefilter[pred_tbls[0]].push_back(where[w]);
1625                                 continue;
1626                         }
1627 //                              refs nothing -- might be sampling, do it as postfilter.
1628                         if(pred_tbls.size()==0){
1629                                 postfilter.push_back(where[w]);
1630                                 continue;
1631                         }
1632 //                              See if it can be a hash or temporal predicate.
1633 //                              NOTE: synchronize with the temporality checking
1634 //                              done at join_eq_hash_qpn::get_fields
1635                         if(where[w]->is_atom && where[w]->eq_pred){
1636                                 std::vector<int> sel_tbls, ser_tbls;
1637                                 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
1638                                 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
1639                                 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
1640 //                                              make channel 0 SE on LHS.
1641                                         if(sel_tbls[0] != 0)
1642                                                 where[w]->pr->swap_scalar_operands();
1643
1644                                         data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
1645                                         data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
1646                                         if( (dtl->is_increasing() && dtr->is_increasing()) ||
1647                                             (dtl->is_decreasing() && dtr->is_decreasing()) )
1648                                                         temporal_eq.push_back(where[w]);
1649                                         else
1650                                                         hash_eq.push_back(where[w]);
1651                                         continue;
1652
1653                                 }
1654                         }
1655 //                              All tests failed, fallback is postfilter.
1656                         postfilter.push_back(where[w]);
1657                 }
1658
1659                 if(temporal_eq.size()==0){
1660                         err_str = "ERROR in join query: can't find temporal equality predicate to define a join window.\n";
1661                         error_code = 1;
1662                 }
1663
1664 //                              Get the parameters
1665                 param_tbl = qs->param_tbl;
1666
1667         };
1668
1669         // the following method is used for distributed query optimization
1670         double get_rate_estimate();
1671
1672
1673         qp_node* make_copy(std::string suffix){
1674                 join_eq_hash_qpn *ret = new join_eq_hash_qpn();
1675
1676                 ret->param_tbl = new param_table();
1677                 std::vector<std::string> param_names = param_tbl->get_param_names();
1678                 int pi;
1679                 for(pi=0;pi<param_names.size();pi++){
1680                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
1681                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1682                                                         param_tbl->handle_access(param_names[pi]));
1683                 }
1684                 ret->definitions = definitions;
1685
1686                 ret->node_name = node_name + suffix;
1687
1688                 // make shallow copy of all fields
1689                 ret->where = where;
1690                 ret->from = from;
1691                 ret->select_list = select_list;
1692                 ret->prefilter[0] = prefilter[0];
1693                 ret->prefilter[1] = prefilter[1];
1694                 ret->postfilter = postfilter;
1695                 ret->temporal_eq = temporal_eq;
1696                 ret->hash_eq = hash_eq;
1697
1698                 return ret;
1699         };
1700         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1701
1702 };
1703
1704
1705 // ---------------------------------------------
1706 //              eq_temporal, hash join query plan node.
1707 //              represent the following query fragment
1708 //                      select scalar_expression_1, ..., scalar_expression_k
1709 //                      FILTER_JOIN(col, range) from T0 t0, T1 t1
1710 //                      where predicate
1711 //
1712 //              t0 is the output range variable, t1 is the filtering range
1713 //              variable.  Both must alias a PROTOCOL.
1714 //              The scalar expressions in the select clause may
1715 //              reference t0 only.
1716 //              The predicates are classified as follows
1717 //              prefilter predicates:
1718 //                a cheap predicate in t0 such that there is an equivalent
1719 //                predicate in t1.  Cost decisions about pushing to
1720 //                lfta prefilter made later.
1721 //              t0 predicates (other than prefilter predicates)
1722 //                      -- cheap vs. expensive sorted out at genereate time,
1723 //                              the constructor isn't called with the function list.
1724 //              t1 predicates (other than prefiler predicates).
1725 //              equi-join predicates of the form:
1726 //                      (se in t0) = (se in t1)
1727 //
1728 //              There must be at least one equi-join predicate.
1729 //              No join predicates other than equi-join predicates
1730 //                are allowed.
1731 //              Warn on temporal equi-join predicates.
1732 //              t1 predicates should not be expensive ... warn?
1733 //
1734 class filter_join_qpn: public qp_node{
1735 public:
1736         std::vector<tablevar_t *> from;                                 //      Source tables
1737                 colref_t *temporal_var;                 // join window in FROM
1738                 unsigned int temporal_range;    // metadata.
1739         std::vector<select_element *> select_list;      //      Select list
1740         std::vector<cnf_elem *> shared_pred;            // prefilter preds
1741         std::vector<cnf_elem *> pred_t0;                        // main (R) preds
1742         std::vector<cnf_elem *> pred_t1;                        // filtering (S) preds
1743         std::vector<cnf_elem *> hash_eq;                        // define hash key
1744         std::vector<cnf_elem *> postfilter;                     // ref's no table.
1745
1746         std::vector<cnf_elem *> where;                          // all the filters
1747                                                                                                 // useful for summary analysis
1748
1749         std::vector<scalarexp_t *> hash_src_r, hash_src_l;
1750         std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
1751         std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
1752
1753
1754         bool use_bloom;                 // true => bloom filter, false => limited hash
1755
1756         std::string node_type(){return("filter_join");  };
1757     bool makes_transform(){return true;};
1758         std::vector<std::string> external_libs(){
1759                 std::vector<std::string> ret;
1760                 return ret;
1761         }
1762
1763         void bind_to_schema(table_list *Schema);
1764         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
1765
1766         std::string to_query_string();
1767         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform){
1768                 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor called\n");
1769                 exit(1);
1770         }
1771         std::string generate_functor_name(){
1772                 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor_name called\n");
1773                 exit(1);
1774         }
1775         std::string generate_operator(int i, std::string params){
1776                 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_operator called\n");
1777                 exit(1);
1778         }
1779         std::string get_include_file(){return("#include <join_eq_hash_operator.h>\n");};
1780
1781     std::vector<select_element *> get_select_list(){return select_list;};
1782     std::vector<scalarexp_t *> get_select_se_list(){
1783                 std::vector<scalarexp_t *> ret;
1784                 int i;
1785                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
1786                 return ret;
1787         };
1788 //                      Used for LFTA only
1789         void append_to_where(cnf_elem *c){
1790                 where.push_back(c);
1791         }
1792
1793     std::vector<cnf_elem *> get_where_clause(){return where;}
1794     std::vector<cnf_elem *> get_filter_clause(){return shared_pred;}
1795
1796         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
1797     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
1798
1799         table_def *get_fields();
1800 //              It should be feasible to find keys in a filter join
1801         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
1802                 std::vector<string> ret;
1803                 return ret;
1804         }
1805
1806         std::vector<tablevar_t *> get_input_tbls();
1807         std::vector<tablevar_t *> get_output_tbls();
1808
1809         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
1810         int resolve_if_params(ifq_t *ifdb, std::string &err);
1811
1812         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);
1813 //              Ensure that any refs to interface params have been split away.
1814         int count_ifp_refs(std::set<std::string> &ifpnames);
1815
1816 //              CONSTRUCTOR
1817         filter_join_qpn(){
1818         };
1819         filter_join_qpn(query_summary_class *qs,table_list *Schema){
1820                 int i,w;
1821 //                              Get the table name.
1822 //                              NOTE the colrefs have the table ref (an int)
1823 //                              embedded in them.  Would it make sense
1824 //                              to grab the whole table list?
1825                 from = qs->fta_tree->get_from()->get_table_list();
1826                 temporal_var = qs->fta_tree->get_from()->get_colref();
1827                 temporal_range = qs->fta_tree->get_from()->get_temporal_range();
1828                 if(from.size() != 2){
1829                         char tmpstr[200];
1830                         sprintf(tmpstr,"ERROR building filter_join_qpn node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
1831                         err_str += tmpstr;
1832                         error_code = 1;
1833                 }
1834                 if(from[0]->get_interface() != from[1]->get_interface()){
1835                         err_str += "Error building filter_join_qpn node: all range variables must be sourced from the same interface or interface set ("+from[0]->get_interface()+" vs. "+from[1]->get_interface()+")\n";
1836                         error_code = 1;
1837                 }
1838
1839                 for(int f=0;f<from.size();++f){
1840                         int t=from[f]->get_schema_ref();
1841                         if(! Schema->is_stream(t)){
1842                                 err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n";
1843                                 error_code = 1;
1844                         }
1845                 }
1846
1847
1848 //                              Get the select list.
1849                 select_list = qs->fta_tree->get_sl_vec();
1850 //                              Verify that only t0 is referenced.
1851                 bool bad_ref = false;
1852                 for(i=0;i<select_list.size();i++){
1853                         vector<int> sel_tbls;
1854                         get_tablevar_ref_se(select_list[i]->se,sel_tbls);
1855                         if((sel_tbls.size() == 2) || (sel_tbls.size()==1 && sel_tbls[0]==1))
1856                                 bad_ref = true;
1857                 }
1858                 if(bad_ref){
1859                         err_str += "ERROR building filter_join_qpn node: query references range variable "+from[1]->variable_name+", but only the first range variable ("+from[0]->variable_name+" can be referenced.\n";
1860                         error_code = 1;
1861                 }
1862
1863
1864 //                              Get the selection predicate.
1865                 where = qs->wh_cnf;
1866                 std::vector<cnf_elem *> t0_only, t1_only;
1867                 for(w=0;w<where.size();++w){
1868                         analyze_cnf(where[w]);
1869                         std::vector<int> pred_tbls;
1870                         get_tablevar_ref_pr(where[w]->pr,pred_tbls);
1871 //                              Collect the list of preds by src var,
1872 //                              extract the shared preds later.
1873                         if(pred_tbls.size()==1){
1874                                 if(pred_tbls[0] == 0){
1875                                         t0_only.push_back(where[w]);
1876                                 }else{
1877                                         t1_only.push_back(where[w]);
1878                                 }
1879                                 continue;
1880                         }
1881 //                              refs nothing -- might be sampling, do it as postfilter.
1882                         if(pred_tbls.size()==0){
1883                                 postfilter.push_back(where[w]);
1884                                 continue;
1885                         }
1886 //                              See if it can be a hash or temporal predicate.
1887 //                              NOTE: synchronize with the temporality checking
1888 //                              done at join_eq_hash_qpn::get_fields
1889                         if(where[w]->is_atom && where[w]->eq_pred){
1890                                 std::vector<int> sel_tbls, ser_tbls;
1891                                 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
1892                                 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
1893                                 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
1894 //                                              make channel 0 SE on LHS.
1895                                         if(sel_tbls[0] != 0)
1896                                                 where[w]->pr->swap_scalar_operands();
1897
1898                                         hash_eq.push_back(where[w]);
1899
1900                                         data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
1901                                         data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
1902                                         if( (dtl->is_increasing() && dtr->is_increasing()) ||
1903                                             (dtl->is_decreasing() && dtr->is_decreasing()) )
1904                                                 err_str += "Warning, a filter join should not have join predicates on temporal fields.\n";
1905                                         continue;
1906
1907                                 }
1908                         }
1909 //                              All tests failed, fallback is postfilter.
1910                         err_str += "ERROR, join predicates in a filter join should have the form (scalar expression in "+from[0]->variable_name+") = (scalar expression in "+from[1]->variable_name+").\n";
1911                         error_code = 3;
1912                 }
1913 //              Classify the t0_only and t1_only preds.
1914                 set<int> matched_pred;
1915                 int v;
1916                 for(w=0;w<t0_only.size();w++){
1917                         for(v=0;v<t1_only.size();++v)
1918                                 if(is_equivalent_pred_base(t0_only[w]->pr,t1_only[v]->pr,Schema))
1919                                         break;
1920                         if(v<t1_only.size()){
1921                                 shared_pred.push_back(t0_only[w]);
1922                                 matched_pred.insert(v);
1923                         }else{
1924                                 pred_t0.push_back(t0_only[w]);
1925                         }
1926                 }
1927                 for(v=0;v<t1_only.size();++v){
1928                         if(matched_pred.count(v) == 0)
1929                                 pred_t1.push_back(t1_only[v]);
1930                 }
1931
1932
1933 //                              Get the parameters
1934                 param_tbl = qs->param_tbl;
1935                 definitions = qs->definitions;
1936
1937 //                              Determine the algorithm
1938                 if(this->get_val_of_def("algorithm") == "hash"){
1939                         use_bloom = false;
1940                 }else{
1941                         use_bloom = true;
1942                 }
1943         };
1944
1945         // the following method is used for distributed query optimization
1946         double get_rate_estimate();
1947
1948
1949         qp_node* make_copy(std::string suffix){
1950                 filter_join_qpn *ret = new filter_join_qpn();
1951
1952                 ret->param_tbl = new param_table();
1953                 std::vector<std::string> param_names = param_tbl->get_param_names();
1954                 int pi;
1955                 for(pi=0;pi<param_names.size();pi++){
1956                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
1957                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
1958                                                         param_tbl->handle_access(param_names[pi]));
1959                 }
1960                 ret->definitions = definitions;
1961
1962                 ret->node_name = node_name + suffix;
1963
1964                 // make shallow copy of all fields
1965                 ret->where = where;
1966                 ret->from = from;
1967                 ret->temporal_range = temporal_range;
1968                 ret->temporal_var = temporal_var;
1969                 ret->select_list = select_list;
1970                 ret->shared_pred = shared_pred;
1971                 ret->pred_t0 = pred_t0;
1972                 ret->pred_t1 = pred_t1;
1973                 ret->postfilter = postfilter;
1974                 ret->hash_eq = hash_eq;
1975
1976                 return ret;
1977         };
1978         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
1979
1980 };
1981
1982
1983
1984 //      TODO : put tests on other operators to ensure they dont' read from a watchlist
1985 //      TODO : register with : is_partn_compatible pushdown_partn_operator is_pushdown_compatible pushdown_operator ?
1986 class watch_join_qpn: public qp_node{
1987 public:
1988         std::vector<tablevar_t *> from;                                 //      Source tables
1989         std::vector<select_element *> select_list;      //      Select list
1990         std::vector<cnf_elem *> pred_t0;                        // main (R) preds
1991         std::vector<cnf_elem *> pred_t1;                        // watchlist-only (S) preds (?)
1992         std::map<std::string, cnf_elem *> hash_eq;      // predicates on S hash keys
1993         std::vector<cnf_elem *> join_filter;            // ref's R, S, but not a hash
1994         std::vector<cnf_elem *> postfilter;                     // ref's no table.
1995
1996         std::vector<std::string> key_flds;
1997
1998         std::vector<cnf_elem *> where;                          // all the filters
1999                                                                                                 // useful for summary analysis
2000
2001         std::vector<scalarexp_t *> hash_src_r, hash_src_l;
2002         std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
2003         std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
2004
2005
2006
2007         std::string node_type(){return("watch_join");   };
2008     bool makes_transform(){return true;};
2009         std::vector<std::string> external_libs(){
2010                 std::vector<std::string> ret;
2011                 return ret;
2012         }
2013
2014         void bind_to_schema(table_list *Schema);
2015         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
2016
2017         std::string to_query_string();
2018         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform){
2019                 fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_functor called\n");
2020                 exit(1);
2021         }
2022         std::string generate_functor_name(){
2023                 fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_functor_name called\n");
2024                 exit(1);
2025         }
2026         std::string generate_operator(int i, std::string params){
2027                 fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_operator called\n");
2028                 exit(1);
2029         }
2030         std::string get_include_file(){return("#include <watchlist_operator.h>\n");};
2031
2032     std::vector<select_element *> get_select_list(){return select_list;};
2033     std::vector<scalarexp_t *> get_select_se_list(){
2034                 std::vector<scalarexp_t *> ret;
2035                 int i;
2036                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
2037                 return ret;
2038         };
2039 //                      Used for LFTA only
2040         void append_to_where(cnf_elem *c){
2041                 where.push_back(c);
2042         }
2043
2044     std::vector<cnf_elem *> get_where_clause(){return where;}
2045
2046     std::vector<cnf_elem *> get_filter_clause(){return pred_t0;}
2047
2048         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
2049     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
2050
2051         table_def *get_fields();
2052 //              It should be feasible to find keys in a watchlist join
2053         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
2054                 std::vector<string> ret;
2055                 return ret;
2056         }
2057
2058         std::vector<tablevar_t *> get_input_tbls();
2059         std::vector<tablevar_t *> get_output_tbls();
2060
2061         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
2062         int resolve_if_params(ifq_t *ifdb, std::string &err);
2063
2064         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);
2065 //              Ensure that any refs to interface params have been split away.
2066         int count_ifp_refs(std::set<std::string> &ifpnames);
2067
2068 //              CONSTRUCTOR
2069         watch_join_qpn(){
2070         };
2071         watch_join_qpn(query_summary_class *qs,table_list *Schema){
2072                 int i,w;
2073 //                              Get the table name.
2074 //                              NOTE the colrefs have the table ref (an int)
2075 //                              embedded in them.  Would it make sense
2076 //                              to grab the whole table list?
2077                 from = qs->fta_tree->get_from()->get_table_list();
2078                 if(from.size() != 2){
2079                         char tmpstr[200];
2080                         sprintf(tmpstr,"ERROR building filter_join_qpn node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
2081                         err_str += tmpstr;
2082                         error_code = 1;
2083                 }
2084
2085                 int t = from[0]->get_schema_ref();
2086                 if(Schema->get_schema_type(t) != PROTOCOL_SCHEMA){
2087                         err_str += "ERROR in query "+node_name+", the LHS of the join must be a PROTOCOL\n";
2088                         error_code = 1;
2089                 }
2090                 t = from[1]->get_schema_ref();
2091                 if(Schema->get_schema_type(t) != WATCHLIST_SCHEMA){
2092                         err_str += "ERROR in query "+node_name+", the RHS of the join must be a WATCHLIST\n";
2093                         error_code = 1;
2094                 }
2095                 key_flds = Schema->get_table(t)->get_keys();
2096
2097
2098 //                              Get the select list.
2099                 select_list = qs->fta_tree->get_sl_vec();
2100
2101 //                              Get the selection predicate.
2102                 where = qs->wh_cnf;
2103                 std::vector<cnf_elem *> t0_only, t1_only;
2104                 for(w=0;w<where.size();++w){
2105                         analyze_cnf(where[w]);
2106                         std::vector<int> pred_tbls;
2107                         get_tablevar_ref_pr(where[w]->pr,pred_tbls);
2108 //                              Collect the list of preds by src var,
2109 //                              extract the shared preds later.
2110                         if(pred_tbls.size()==1){
2111                                 if(pred_tbls[0] == 0){
2112                                         pred_t0.push_back(where[w]);
2113                                 }else{
2114                                         pred_t1.push_back(where[w]);
2115                                 }
2116                                 continue;
2117                         }
2118 //                              refs nothing -- might be sampling, do it as postfilter.
2119                         if(pred_tbls.size()==0){
2120                                 postfilter.push_back(where[w]);
2121                                 continue;
2122                         }
2123
2124 //              Must reference both
2125 //                              See if it can be a hash predicate.
2126                         if(where[w]->is_atom && where[w]->eq_pred){
2127                                 std::vector<int> sel_tbls, ser_tbls;
2128                                 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
2129                                 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
2130                                 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
2131 //                                              make channel 0 SE on LHS.
2132                                         if(sel_tbls[0] != 0)
2133                                                 where[w]->swap_scalar_operands();
2134
2135 //              Must be simple (a colref) on the RHS
2136                                                 if(where[w]->r_simple){
2137                                                         string rcol = where[w]->pr->get_right_se()->get_colref()->get_field();
2138                                                         if(std::find(key_flds.begin(), key_flds.end(), rcol) != key_flds.end()){
2139                                                                 hash_eq[rcol] = where[w];
2140
2141                                                                 data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
2142                                                                 data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
2143                                                                 if( (dtl->is_increasing() && dtr->is_increasing()) || (dtl->is_decreasing() && dtr->is_decreasing()) )
2144                                                                         err_str += "Warning, a watchlist join should not have join predicates on temporal fields, query "+node_name+".\n";
2145                                                         continue;
2146                                                 }
2147                                         }
2148                                 }
2149                         }
2150 //                              All tests failed, fallback is join_filter.
2151                         join_filter.push_back(where[w]);
2152                 }
2153
2154                 if(key_flds.size() > hash_eq.size()){
2155                         err_str += "Error, in query "+node_name+" the watchlist join does not cover all fields in the watchlist with an equality predicate.  Missing fields are";
2156                         for(int k=0;k<key_flds.size();++k){
2157                                 if(hash_eq.count(key_flds[k]) < 1){
2158                                         err_str += " "+key_flds[k];
2159                                 }
2160                         }
2161                         err_str += ".\n";
2162                         error_code = 5;
2163                 }
2164                                         
2165
2166 //                              Get the parameters
2167                 param_tbl = qs->param_tbl;
2168                 definitions = qs->definitions;
2169
2170         };
2171
2172         // the following method is used for distributed query optimization
2173         double get_rate_estimate();
2174
2175
2176         qp_node* make_copy(std::string suffix){
2177                 watch_join_qpn *ret = new watch_join_qpn();
2178
2179                 ret->param_tbl = new param_table();
2180                 std::vector<std::string> param_names = param_tbl->get_param_names();
2181                 int pi;
2182                 for(pi=0;pi<param_names.size();pi++){
2183                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
2184                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
2185                                                         param_tbl->handle_access(param_names[pi]));
2186                 }
2187                 ret->definitions = definitions;
2188
2189                 ret->node_name = node_name + suffix;
2190
2191                 // make shallow copy of all fields
2192                 ret->where = where;
2193                 ret->from = from;
2194                 ret->select_list = select_list;
2195                 ret->key_flds = key_flds;
2196                 ret->pred_t0 = pred_t0;
2197                 ret->pred_t1 = pred_t1;
2198                 ret->join_filter = join_filter;
2199                 ret->postfilter = postfilter;
2200                 ret->hash_eq = hash_eq;
2201                 ret->hash_src_r = hash_src_r;
2202                 ret->hash_src_l = hash_src_l;
2203
2204                 return ret;
2205         };
2206
2207         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
2208
2209 };
2210
2211
2212
2213
2214 enum output_file_type_enum {regular, gzip, bzip};
2215
2216 class output_file_qpn: public qp_node{
2217 public:
2218         std::string source_op_name;                                     //      Source table
2219         std::vector<field_entry *> fields;
2220         ospec_str *output_spec;
2221         vector<tablevar_t *> fm;
2222         std::string hfta_query_name;
2223         std::string filestream_id;
2224         bool eat_input;
2225         std::vector<std::string> params;
2226         bool do_gzip;
2227         output_file_type_enum compression_type;
2228
2229         int n_streams;          // Number of output streams
2230         int n_hfta_clones;      // number of hfta clones
2231         int parallel_idx;       // which close this produces output for.
2232         std::vector<int> hash_flds;     // fields used to hash the output.
2233
2234         std::string node_type(){return("output_file_qpn");      };
2235     bool makes_transform(){return false;};
2236         std::vector<std::string> external_libs(){
2237                 std::vector<std::string> ret;
2238                 switch(compression_type){
2239                 case gzip:
2240                         ret.push_back("-lz");
2241                 break;
2242                 case bzip:
2243                         ret.push_back("-lbz2");
2244                 break;
2245                 default:
2246                 break;
2247                 }
2248                 return ret;
2249         }
2250
2251   void append_to_where(cnf_elem *c){
2252                 fprintf(stderr, "ERROR, append_to_where called on output_file_qpn, not supported, query %s.\n",  node_name.c_str());
2253                 exit(1);
2254         }
2255
2256
2257
2258         void bind_to_schema(table_list *Schema){}
2259         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
2260                 col_id_set ret;
2261                 return ret;
2262         }
2263
2264         std::string to_query_string(){return "// output_file_operator \n";}
2265         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
2266         std::string generate_functor_name();
2267         std::string generate_operator(int i, std::string params);
2268         std::string get_include_file(){
2269                 switch(compression_type){
2270                 case gzip:
2271                         return("#include <zfile_output_operator.h>\n");
2272                 default:
2273                         return("#include <file_output_operator.h>\n");
2274                 }
2275                 return("#include <file_output_operator.h>\n");
2276         };
2277
2278     std::vector<cnf_elem *> get_where_clause(){std::vector<cnf_elem *> ret; return ret;};
2279     std::vector<cnf_elem *> get_filter_clause(){std::vector<cnf_elem *> ret; return ret;};
2280         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){cplx_lit_table *ret = new cplx_lit_table(); return ret;}
2281     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns){std::vector<handle_param_tbl_entry *> ret; return ret;}
2282
2283         table_def *get_fields(){
2284                 field_entry_list *fel = new field_entry_list();
2285                 int i;
2286                 for(i=0;i<fields.size();++i)
2287                         fel->append_field(fields[i]);
2288                 return new table_def(node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
2289         }
2290
2291 //              TODO! either bypass the output operator in stream_query,
2292 //                      or propagate key information when the output operator is constructed.
2293         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
2294                 std::vector<string> ret;
2295                 return ret;
2296         }
2297
2298         std::vector<tablevar_t *> get_input_tbls();
2299         std::vector<tablevar_t *> get_output_tbls();
2300
2301         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
2302                 std::vector<qp_node *> ret; ret.push_back(this); hfta_returned = true; return ret;
2303         }
2304         std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm){
2305                 std::vector<table_exp_t *> ret; return ret;
2306         }
2307 //              Ensure that any refs to interface params have been split away.
2308         int count_ifp_refs(std::set<std::string> &ifpnames){return 0;}
2309         int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;};
2310
2311
2312         output_file_qpn(std::string src_op, std::string qn, std::string fs_id, table_def *src_tbl_def, ospec_str *ospec, bool ei){
2313                 source_op_name = src_op;
2314                 node_name = source_op_name + "_output";
2315                 filestream_id = fs_id;
2316                 fields = src_tbl_def->get_fields();
2317                 output_spec = ospec;
2318                 fm.push_back(new tablevar_t(source_op_name.c_str()));
2319                 hfta_query_name = qn;
2320                 eat_input = ei;
2321
2322 //                      TODO stream checking, but it requires passing Schema to output_file_qpn
2323 /*
2324                 for(int f=0;f<fm.size();++f){
2325                         int t=fm[f]->get_schema_ref();
2326                         if(! Schema->is_stream(t)){
2327                                 err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n";
2328                                 error_code = 1;
2329                         }
2330                 }
2331 */
2332
2333
2334                 do_gzip = false;
2335                 compression_type = regular;
2336                 if(ospec->operator_type == "zfile")
2337                         compression_type = gzip;
2338
2339                 n_streams = 1;
2340                 parallel_idx = 0;
2341                 n_hfta_clones = 1;
2342
2343                 char buf[1000];
2344                 strncpy(buf, output_spec->operator_param.c_str(),1000);
2345                 buf[999] = '\0';
2346                 char *words[100];
2347                 int nwords = split_string(buf, ':', words,100);
2348                 int i;
2349                 for(i=0;i<nwords;i++){
2350                         params.push_back(words[i]);
2351                 }
2352                 for(i=0;i<params.size();i++){
2353                         if(params[i] == "gzip")
2354                                 do_gzip = true;
2355                 }
2356         }
2357
2358 //              Set output splitting parameters
2359         bool set_splitting_params(int np, int ix, int ns, std::string split_flds, std::string &err_report){
2360                 n_streams = ns;
2361                 n_hfta_clones = np;
2362                 parallel_idx = ix;
2363
2364                 if(split_flds != ""){
2365                         string err_flds = "";
2366                         char *tmpstr = strdup(split_flds.c_str());
2367                         char *words[100];
2368                         int nwords = split_string(tmpstr,':',words,100);
2369                         int i,j;
2370                         for(i=0;i<nwords;++i){
2371                                 string target = words[i];
2372                                 for(j=0;j<fields.size();++j){
2373                                         if(fields[j]->get_name() == target){
2374                                                 hash_flds.push_back(j);
2375                                                 break;
2376                                         }
2377                                 }
2378                                 if(j==fields.size()){
2379                                         err_flds += " "+target;
2380                                 }
2381                         }
2382                         if(err_flds != ""){
2383                                 err_report += "ERROR in "+hfta_query_name+", a file output operator needs to split the output but these splitting fileds are not part of the output:"+err_flds;
2384                                 return true;
2385                         }
2386                 }
2387                 return false;
2388         }
2389
2390         // the following method is used for distributed query optimization
2391         double get_rate_estimate(){return 1.0;}
2392
2393
2394         qp_node* make_copy(std::string suffix){
2395 //              output_file_qpn *ret = new output_file_qpn();
2396                 output_file_qpn *ret = new output_file_qpn(source_op_name, hfta_query_name, filestream_id, this->get_fields(), output_spec, eat_input);
2397                 return ret;
2398         }
2399
2400         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){}
2401
2402 };
2403
2404
2405
2406 //
2407
2408 // ---------------------------------------------
2409
2410
2411 //              Select, group-by, aggregate, sampling.
2412 //              Representing
2413 //                      Select SE_1, ..., SE_k
2414 //                      From T
2415 //                      Where predicate
2416 //                      Group By gb1, ..., gb_n
2417 //                      [Subgroup gb_i1, .., gb_ik]
2418 //                      Cleaning_when  predicate
2419 //                      Cleaning_by predicate
2420 //                      Having predicate
2421 //
2422 //              For now, must have group-by variables and aggregates.
2423 //              The scalar expressions which are output must be a function
2424 //              of the groub-by variables and the aggregates.
2425 //              The group-by variables can be references to columsn of T,
2426 //              or they can be scalar expressions.
2427 class sgahcwcb_qpn: public qp_node{
2428 public:
2429         tablevar_t *table_name;                         // source table
2430         std::vector<cnf_elem *> where;          // selection predicate
2431         std::vector<cnf_elem *> having;         // post-aggregation predicate
2432         std::vector<select_element *> select_list;      // se's of output
2433         gb_table gb_tbl;                        // Table of all group-by attributes.
2434         std::set<int> sg_tbl;           // Names of the superGB attributes
2435         aggregate_table aggr_tbl;       // Table of all referenced aggregates.
2436         std::set<std::string> states_refd;      // states ref'd by stateful fcns.
2437         std::vector<cnf_elem *> cleanby;
2438         std::vector<cnf_elem *> cleanwhen;
2439
2440         std::vector<scalarexp_t *> gb_sources;  // pre-compute for partitioning.
2441
2442         std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}
2443
2444         std::string node_type(){return("sgahcwcb_qpn"); };
2445     bool makes_transform(){return true;};
2446         std::vector<std::string> external_libs(){
2447                 std::vector<std::string> ret;
2448                 return ret;
2449         }
2450
2451         void bind_to_schema(table_list *Schema);
2452         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
2453                 fprintf(stderr,"INTERNAL ERROR, calling sgahcwcb_qpn::get_colrefs\n");
2454                 exit(1);
2455         }
2456
2457         std::string to_query_string();
2458         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
2459         std::string generate_functor_name();
2460
2461         std::string generate_operator(int i, std::string params);
2462         std::string get_include_file(){return("#include <clean_operator.h>\n");};
2463
2464     std::vector<select_element *> get_select_list(){return select_list;};
2465     std::vector<scalarexp_t *> get_select_se_list(){
2466                 std::vector<scalarexp_t *> ret;
2467                 int i;
2468                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
2469                 return ret;
2470         };
2471     std::vector<cnf_elem *> get_where_clause(){return where;};
2472     std::vector<cnf_elem *> get_filter_clause(){return where;};
2473     std::vector<cnf_elem *> get_having_clause(){return having;};
2474     gb_table *get_gb_tbl(){return &gb_tbl;};
2475     aggregate_table *get_aggr_tbl(){return &aggr_tbl;};
2476         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
2477     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
2478
2479 //                              table which represents output tuple.
2480         table_def *get_fields();
2481 //                      TODO Key extraction should be feasible but I'll defer the issue.
2482         std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
2483                 std::vector<string> ret;
2484                 return ret;
2485         }
2486
2487         std::vector<tablevar_t *> get_input_tbls();
2488         std::vector<tablevar_t *> get_output_tbls();
2489
2490   void append_to_where(cnf_elem *c){
2491                 where.push_back(c);
2492         }
2493
2494
2495         sgahcwcb_qpn(){
2496         };
2497         sgahcwcb_qpn(query_summary_class *qs,table_list *Schema){
2498 //                              Get the table name.
2499 //                              NOTE the colrefs have the tablevar ref (an int)
2500 //                              embedded in them.  Would it make sense
2501 //                              to grab the whole table list?
2502                 tablevar_list_t *fm = qs->fta_tree->get_from();
2503                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
2504                 if(tbl_vec.size() != 1){
2505                         char tmpstr[200];
2506                         sprintf(tmpstr,"INTERNAL ERROR building SGAHCWCB node: query defined over %lu tables.\n",tbl_vec.size() );
2507                         err_str=tmpstr;
2508                         error_code = 1;
2509                 }
2510                 table_name = (tbl_vec[0]);
2511
2512                 int t = tbl_vec[0]->get_schema_ref();
2513                 if(! Schema->is_stream(t)){
2514                         err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
2515                         error_code = 1;
2516                 }
2517
2518
2519 //                              Get the select list.
2520                 select_list = qs->fta_tree->get_sl_vec();
2521
2522 //                              Get the selection and having predicates.
2523                 where = qs->wh_cnf;
2524                 having = qs->hav_cnf;
2525                 cleanby = qs->cb_cnf;
2526                 cleanwhen = qs->cw_cnf;
2527
2528 //                              Build a new GB var table (don't share, might need to modify)
2529                 int g;
2530                 for(g=0;g<qs->gb_tbl->size();g++){
2531                         gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),
2532                                 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),
2533                                 qs->gb_tbl->get_reftype(g)
2534                         );
2535                 }
2536
2537 //                              Build a new aggregate table. (don't share, might need
2538 //                              to modify).
2539                 int a;
2540                 for(a=0;a<qs->aggr_tbl->size();a++){
2541                         aggr_tbl.add_aggr(
2542 //                              qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)
2543                                 qs->aggr_tbl->duplicate(a)
2544                         );
2545                 }
2546
2547                 sg_tbl = qs->sg_tbl;
2548                 states_refd = qs->states_refd;
2549
2550
2551 //                              Get the parameters
2552                 param_tbl = qs->param_tbl;
2553
2554         };
2555
2556
2557
2558         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
2559         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
2560 //              Ensure that any refs to interface params have been split away.
2561 //                      CURRENTLY not allowed by split_node_for_fta
2562         int count_ifp_refs(std::set<std::string> &ifpnames){return 0;}
2563         int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;}
2564
2565         // the following method is used for distributed query optimization
2566         double get_rate_estimate();
2567
2568         qp_node* make_copy(std::string suffix){
2569                 sgahcwcb_qpn *ret = new sgahcwcb_qpn();
2570
2571                 ret->param_tbl = new param_table();
2572                 std::vector<std::string> param_names = param_tbl->get_param_names();
2573                 int pi;
2574                 for(pi=0;pi<param_names.size();pi++){
2575                         data_type *dt = param_tbl->get_data_type(param_names[pi]);
2576                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
2577                                                         param_tbl->handle_access(param_names[pi]));
2578                 }
2579                 ret->definitions = definitions;
2580
2581                 ret->node_name = node_name + suffix;
2582
2583                 // make shallow copy of all fields
2584                 ret->where = where;
2585                 ret->having = having;
2586                 ret->select_list = select_list;
2587                 ret->gb_tbl = gb_tbl;
2588                 ret->aggr_tbl = aggr_tbl;
2589                 ret->sg_tbl = sg_tbl;
2590                 ret->states_refd = states_refd;
2591                 ret->cleanby = cleanby;
2592                 ret->cleanwhen = cleanwhen;
2593
2594                 return ret;
2595         };
2596
2597         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
2598 };
2599
2600
2601 std::vector<qp_node *> create_query_nodes(query_summary_class *qs,table_list *Schema);
2602
2603
2604
2605 void untaboo(string &s);
2606
2607 table_def *create_attributes(string tname, vector<select_element *> &select_list);
2608
2609
2610 #endif