Added quantiling UDAFs
[com/gs-lite.git] / src / ftacmp / query_plan.h
1 /* ------------------------------------------------\r
2 Copyright 2014 AT&T Intellectual Property\r
3    Licensed under the Apache License, Version 2.0 (the "License");\r
4    you may not use this file except in compliance with the License.\r
5    You may obtain a copy of the License at\r
6 \r
7      http://www.apache.org/licenses/LICENSE-2.0\r
8 \r
9    Unless required by applicable law or agreed to in writing, software\r
10    distributed under the License is distributed on an "AS IS" BASIS,\r
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12    See the License for the specific language governing permissions and\r
13    limitations under the License.\r
14  ------------------------------------------- */\r
15 #ifndef __QUERY_PLAN_H__\r
16 #define __QUERY_PLAN_H__\r
17 \r
18 #include<vector>\r
19 #include<string>\r
20 #include<map>\r
21 using namespace std;\r
22 \r
23 #include"analyze_fta.h"\r
24 #include"iface_q.h"\r
25 #include"parse_partn.h"\r
26 #include"generate_utils.h"\r
27 \r
28 //              Identify the format of the input, output streams.\r
29 #define UNKNOWNFORMAT 0\r
30 #define NETFORMAT 1\r
31 #define HOSTFORMAT 2\r
32 \r
33 ///////////////////////////////////////////////////\r
34 //      representation of an output operator specification\r
35 \r
36 struct ospec_str{\r
37         string query;\r
38         string operator_type;\r
39         string operator_param;\r
40         string output_directory;\r
41         int bucketwidth;\r
42         string partitioning_flds;\r
43         int n_partitions;\r
44 };\r
45 \r
46 \r
47 ////////////////////////////////////////////////////\r
48 //      Input representation of a query\r
49 \r
50 struct query_node{\r
51         int idx;\r
52         std::set<int> reads_from;\r
53         std::set<int> sources_to;\r
54         std::vector<std::string> refd_tbls;\r
55         std::vector<var_pair_t *> params;\r
56         std::string name;\r
57         std::string file;\r
58         std::string mangler;            // for UDOPs\r
59         bool touched;\r
60         table_exp_t *parse_tree;\r
61         int n_consumers;\r
62         bool is_udop;\r
63         bool is_externally_visible;\r
64         bool inferred_visible_node;\r
65 \r
66         set<int> subtree_roots;\r
67 \r
68         query_node(){\r
69                 idx = -1;\r
70                 touched = false;\r
71                 parse_tree = NULL;\r
72                 n_consumers = 0;\r
73                 is_externally_visible = false;\r
74                 inferred_visible_node = false;\r
75                 mangler="";\r
76         };\r
77         query_node(int i, std::string qnm, std::string flnm, table_exp_t *pt){\r
78                 idx = i;\r
79                 touched = false;\r
80                 name = qnm;\r
81                 file = flnm;\r
82                 parse_tree = pt;\r
83                 n_consumers = 0;\r
84                 is_udop = false;\r
85                 is_externally_visible = pt->get_visible();\r
86                 inferred_visible_node = false;\r
87                 mangler="";\r
88 \r
89                 tablevar_list_t *fm = parse_tree->get_from();\r
90                 refd_tbls =  fm->get_table_names();\r
91 \r
92                 params  = pt->query_params;\r
93         };\r
94         query_node(int ix, std::string udop_name,table_list *Schema){\r
95                 idx = ix;\r
96                 touched = false;\r
97                 name = udop_name;\r
98                 file = udop_name;\r
99                 parse_tree = NULL;\r
100                 n_consumers = 0;\r
101                 is_udop = true;\r
102                 is_externally_visible = true;\r
103                 inferred_visible_node = false;\r
104                 mangler="";\r
105 \r
106                 int sid = Schema->find_tbl(udop_name);\r
107                 std::vector<subquery_spec *> subq = Schema->get_subqueryspecs(sid);\r
108                 int i;\r
109                 for(i=0;i<subq.size();++i){\r
110                         refd_tbls.push_back(subq[i]->name);\r
111                 }\r
112         };\r
113 };\r
114 \r
115 struct hfta_node{\r
116         std::string name;\r
117         std::string source_name;\r
118         std::vector<int> query_node_indices;\r
119         std::set<int> reads_from;\r
120         std::set<int> sources_to;\r
121         bool is_udop;\r
122         bool inferred_visible_node;\r
123         int n_parallel;\r
124         int parallel_idx;\r
125         bool do_generation;     // false means, ignore it.\r
126 \r
127         hfta_node(){\r
128                 is_udop = false;\r
129                 inferred_visible_node = false;\r
130                 n_parallel = 1;\r
131                 parallel_idx = 0;\r
132                 do_generation = true;\r
133         }\r
134 };\r
135 \r
136 \r
137 \r
138 \r
139 \r
140 \r
141 #define SPX_QUERY 1\r
142 #define SGAH_QUERY 2\r
143 \r
144 // the following selectivity estimates are used by our primitive rate estimators\r
145 #define SPX_SELECTIVITY 1.0\r
146 #define SGAH_SELECTIVITY 0.1\r
147 #define RSGAH_SELECTIVITY 0.1\r
148 #define SGAHCWCB_SELECTIVITY 0.1\r
149 #define MRG_SELECTIVITY 1.0\r
150 #define JOIN_EQ_HASH_SELECTIVITY 1.0\r
151 \r
152 // the the output rate of the interface is not given we are going to use\r
153 // this default value\r
154 #define DEFAULT_INTERFACE_RATE 100\r
155 \r
156 \r
157 //                      Define query plan nodes\r
158 //                      These nodes are intended for query modeling\r
159 //                      and transformation rather than for code generation.\r
160 \r
161 \r
162 //                      Query plan node base class.\r
163 //                      It has an ID, can return its type,\r
164 //                      and can be linked into lists with the predecessors\r
165 //                      and successors.\r
166 //                      To add : serialize, unserialize?\r
167 \r
168 class qp_node{\r
169 public:\r
170   int id;\r
171   std::vector<int> predecessors;\r
172   std::vector<int> successors;\r
173   std::string node_name;\r
174 \r
175 //              For error reporting without exiting the program.\r
176   int error_code;\r
177   std::string err_str;\r
178 \r
179 //                      These should be moved to the containing stream_query object.\r
180   std::map<std::string, std::string> definitions;\r
181   param_table *param_tbl;\r
182 \r
183 //              The value of a field in terms of protocol fields (if any).\r
184   std::map<std::string, scalarexp_t *> protocol_map;\r
185 \r
186   qp_node(){\r
187         error_code = 0;\r
188         id = -1;\r
189         param_tbl = new param_table();\r
190   };\r
191   qp_node(int i){\r
192         error_code = 0;\r
193         id = i;\r
194         param_tbl = new param_table();\r
195   };\r
196 \r
197   int get_id(){return(id);};\r
198   void set_id(int i){id = i;    };\r
199 \r
200   int get_error_code(){return error_code;};\r
201   std::string get_error_str(){return err_str;};\r
202 \r
203   virtual std::string node_type() = 0;\r
204 \r
205 //              For code generation, does the operator xform its input.\r
206   virtual bool makes_transform() = 0;\r
207 \r
208 //              For linking, what external libraries does the operator depend on?\r
209   virtual std::vector<std::string> external_libs() = 0;\r
210 \r
211   void set_node_name(std::string n){node_name = n;};\r
212   std::string get_node_name(){return node_name;};\r
213 \r
214   void set_definitions(std::map<std::string, std::string> &def){\r
215           definitions = def;\r
216   };\r
217   std::map<std::string, std::string> get_definitions(){return definitions;};\r
218 \r
219 \r
220 //              call to create the mapping from field name to se in protocol fields.\r
221 //              Pass in qp_node of data sources, in order.\r
222   virtual void create_protocol_se(std::vector<qp_node *> q_sources,table_list *Schema)=0;\r
223 //              get the protocol map.  the parameter is the return value.\r
224   std::map<std::string, scalarexp_t *> *get_protocol_se(){return &protocol_map;}\r
225 \r
226 //              Each qp node must be able to return a description\r
227 //              of the tuples it creates.\r
228 //              TODO: the get_output_tls method should subsume the get_fields\r
229 //                      method, but in fact it really just returns the\r
230 //                      operator name.\r
231   virtual table_def *get_fields() = 0;  // Should be vector?\r
232 //              Get the from clause\r
233   virtual std::vector<tablevar_t *> get_input_tbls() = 0;\r
234 //              this is a confused function, it acutally return the output\r
235 //              table name.\r
236   virtual std::vector<tablevar_t *> get_output_tbls() = 0;\r
237 \r
238   std::string get_val_of_def(std::string def){\r
239         if(definitions.count(def) > 0) return definitions[def];\r
240         return("");\r
241   };\r
242   void set_definition(std::string def, std::string val){\r
243         definitions[def]=val;\r
244   }\r
245 \r
246 //              Associate colrefs in SEs with tables\r
247 //              at code generation time.\r
248   virtual void bind_to_schema(table_list *Schema) = 0;\r
249 \r
250 //              Get colrefs of the operator, currently only meaningful for lfta\r
251 //              operators, and only interested in colrefs with extraction fcns\r
252   virtual col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema)=0;\r
253 \r
254   virtual std::string to_query_string() = 0;\r
255   virtual std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform) = 0;\r
256   virtual std::string generate_functor_name() = 0;\r
257 \r
258   virtual std::string generate_operator(int i, std::string params) = 0;\r
259   virtual std::string get_include_file() = 0;\r
260 \r
261   virtual cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns) = 0;\r
262   virtual std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns) = 0;\r
263 \r
264 //              Split this node into LFTA and HFTA nodes.\r
265 //              Four possible outcomes:\r
266 //              1) the qp_node reads from a protocol, but does not need to\r
267 //                      split (can be evaluated as an LFTA).\r
268 //                      The lfta node is the only element in the return vector,\r
269 //                      and hfta_returned is false.\r
270 //              2) the qp_node reads from no protocol, and therefore cannot be split.\r
271 //                      THe hfta node is the only element in the return vector,\r
272 //                      and hfta_returned is true.\r
273 //              3) reads from at least one protocol, but cannot be split : failure.\r
274 //                      return vector is empty, the error conditions are written\r
275 //                      in the qp_node.\r
276 //              4) The qp_node splits into an hfta node and one or more LFTA nodes.\r
277 //                      the return vector has two or more elements, and hfta_returned\r
278 //                      is true.  The last element is the HFTA.\r
279         virtual std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx) = 0;\r
280 \r
281 \r
282 //              Ensure that any refs to interface params have been split away.\r
283         virtual int count_ifp_refs(std::set<std::string> &ifpnames)=0;\r
284 \r
285 \r
286 \r
287 //              Tag the data sources which are views,\r
288 //              return the (optimized) source queries and\r
289 //              record the view access in opview_set\r
290         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm) = 0;\r
291 \r
292   param_table *get_param_tbl(){return param_tbl;};\r
293 \r
294 //                      The "where" clause is a pre-filter\r
295   virtual  std::vector<cnf_elem *> get_where_clause() = 0;\r
296 //                      To be more explicit, use get_filter_preds\r
297   virtual  std::vector<cnf_elem *> get_filter_clause() = 0;\r
298 \r
299   void add_predecessor(int i){predecessors.push_back(i);};\r
300   void remove_predecessor(int i){\r
301         std::vector<int>::iterator vi;\r
302         for(vi=predecessors.begin(); vi!=predecessors.end();++vi){\r
303                 if((*vi) == i){\r
304                         predecessors.erase(vi);\r
305                         return;\r
306                 }\r
307         }\r
308   };\r
309   void add_successor(int i){successors.push_back(i);};\r
310   std::vector<int> get_predecessors(){return predecessors;};\r
311   int n_predecessors(){return predecessors.size();};\r
312   std::vector<int> get_successors(){return successors;};\r
313   int n_successors(){return successors.size();};\r
314   void clear_predecessors(){predecessors.clear();};\r
315   void clear_successors(){successors.clear();};\r
316 \r
317   // the following method is used for distributed query optimization\r
318   double get_rate_estimate();\r
319 \r
320 \r
321   // used for cloning query nodes\r
322   virtual qp_node* make_copy(std::string suffix) = 0;\r
323 };\r
324 \r
325 \r
326 \r
327 //              Select, project, transform (xform) query plan node.\r
328 //              represent the following query fragment\r
329 //                      select scalar_expression_1, ..., scalar_expression_k\r
330 //                      from S\r
331 //                      where predicate\r
332 //\r
333 //              the predicates and the scalar expressions can reference\r
334 //              attributes of S and also functions.\r
335 class spx_qpn: public qp_node{\r
336 public:\r
337         tablevar_t *table_name;                                 //      Source table\r
338         std::vector<cnf_elem *> where;                  // selection predicate\r
339         std::vector<select_element *> select_list;      //      Select list\r
340 \r
341 \r
342 \r
343         std::string node_type(){return("spx_qpn");      };\r
344     bool makes_transform(){return true;};\r
345         std::vector<std::string> external_libs(){\r
346                 std::vector<std::string> ret;\r
347                 return ret;\r
348         }\r
349 \r
350         void bind_to_schema(table_list *Schema);\r
351         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);\r
352 \r
353         std::string to_query_string();\r
354         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);\r
355         std::string generate_functor_name();\r
356         std::string generate_operator(int i, std::string params);\r
357         std::string get_include_file(){return("#include <selection_operator.h>\n");};\r
358 \r
359     std::vector<select_element *> get_select_list(){return select_list;};\r
360     std::vector<scalarexp_t *> get_select_se_list(){\r
361                 std::vector<scalarexp_t *> ret;\r
362                 int i;\r
363                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);\r
364                 return ret;\r
365         };\r
366     std::vector<cnf_elem *> get_where_clause(){return where;};\r
367     std::vector<cnf_elem *> get_filter_clause(){return where;};\r
368         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);\r
369     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);\r
370 \r
371         table_def *get_fields();\r
372         std::vector<tablevar_t *> get_input_tbls();\r
373         std::vector<tablevar_t *> get_output_tbls();\r
374 \r
375         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);\r
376         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);\r
377 //              Ensure that any refs to interface params have been split away.\r
378         int count_ifp_refs(std::set<std::string> &ifpnames);\r
379         int resolve_if_params(ifq_t *ifdb, std::string &err);\r
380 \r
381         spx_qpn(){\r
382         };\r
383         spx_qpn(query_summary_class *qs,table_list *Schema){\r
384 //                              Get the table name.\r
385 //                              NOTE the colrefs have the table ref (an int)\r
386 //                              embedded in them.  Would it make sense\r
387 //                              to grab the whole table list?\r
388                 tablevar_list_t *fm = qs->fta_tree->get_from();\r
389                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();\r
390                 if(tbl_vec.size() != 1){\r
391                         char tmpstr[200];\r
392                         sprintf(tmpstr,"INTERNAL ERROR building SPX node: query defined over %lu tables.\n",tbl_vec.size() );\r
393                         err_str = tmpstr;\r
394                         error_code = 1;\r
395                 }\r
396                 table_name = (tbl_vec[0]);\r
397 \r
398 //                              Get the select list.\r
399                 select_list = qs->fta_tree->get_sl_vec();\r
400 \r
401 //                              Get the selection predicate.\r
402                 where = qs->wh_cnf;\r
403 \r
404 \r
405 //                              Get the parameters\r
406                 param_tbl = qs->param_tbl;\r
407 \r
408 \r
409 \r
410         };\r
411 \r
412         // the following method is used for distributed query optimization\r
413         double get_rate_estimate();\r
414 \r
415 \r
416         qp_node* make_copy(std::string suffix){\r
417                 spx_qpn *ret = new spx_qpn();\r
418 \r
419                 ret->param_tbl = new param_table();\r
420                 std::vector<std::string> param_names = param_tbl->get_param_names();\r
421                 int pi;\r
422                 for(pi=0;pi<param_names.size();pi++){\r
423                         data_type *dt = param_tbl->get_data_type(param_names[pi]);\r
424                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),\r
425                                                         param_tbl->handle_access(param_names[pi]));\r
426                 }\r
427                 ret->definitions = definitions;\r
428                 ret->node_name = node_name + suffix;\r
429 \r
430                 // make shallow copy of all fields\r
431                 ret->where = where;\r
432                 ret->select_list = select_list;\r
433 \r
434                 return ret;\r
435         };\r
436         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);\r
437 \r
438 };\r
439 \r
440 \r
441 \r
442 //              Select, group-by, aggregate.\r
443 //              Representing\r
444 //                      Select SE_1, ..., SE_k\r
445 //                      From T\r
446 //                      Where predicate\r
447 //                      Group By gb1, ..., gb_n\r
448 //                      Having predicate\r
449 //\r
450 //              NOTE : the samlping operator is sgahcwcb_qpn.\r
451 //\r
452 //              For now, must have group-by variables and aggregates.\r
453 //              The scalar expressions which are output must be a function\r
454 //              of the groub-by variables and the aggregates.\r
455 //              The group-by variables can be references to columsn of T,\r
456 //              or they can be scalar expressions.\r
457 class sgah_qpn: public qp_node{\r
458 public:\r
459         tablevar_t *table_name;                         // source table\r
460         std::vector<cnf_elem *> where;          // selection predicate\r
461         std::vector<cnf_elem *> having;         // post-aggregation predicate\r
462         std::vector<select_element *> select_list;      // se's of output\r
463         gb_table gb_tbl;                        // Table of all group-by attributes.\r
464         aggregate_table aggr_tbl;       // Table of all referenced aggregates.\r
465 \r
466         std::vector<scalarexp_t *> gb_sources;  // pre-compute for partitioning.\r
467 \r
468         int lfta_disorder;              // maximum disorder in the steam between lfta, hfta\r
469         int hfta_disorder;              // maximum disorder in the  hfta\r
470 \r
471 //              rollup, cube, and grouping_sets cannot be readily reconstructed by\r
472 //              analyzing the patterns, so explicitly record them here.\r
473 //              used only so that to_query_string produces something meaningful.\r
474         std::vector<std::string> gb_entry_type;\r
475         std::vector<int> gb_entry_count;\r
476 \r
477         std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}\r
478 \r
479         std::string node_type(){return("sgah_qpn");     };\r
480     bool makes_transform(){return true;};\r
481         std::vector<std::string> external_libs(){\r
482                 std::vector<std::string> ret;\r
483                 return ret;\r
484         }\r
485 \r
486         void bind_to_schema(table_list *Schema);\r
487         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);\r
488 \r
489         std::string to_query_string();\r
490         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);\r
491         std::string generate_functor_name();\r
492 \r
493         std::string generate_operator(int i, std::string params);\r
494         std::string get_include_file(){\r
495                         if(hfta_disorder <= 1){\r
496                                 return("#include <groupby_operator.h>\n");\r
497                         }else{\r
498                                 return("#include <groupby_operator_oop.h>\n");\r
499                         }\r
500         };\r
501 \r
502     std::vector<select_element *> get_select_list(){return select_list;};\r
503     std::vector<scalarexp_t *> get_select_se_list(){\r
504                 std::vector<scalarexp_t *> ret;\r
505                 int i;\r
506                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);\r
507                 return ret;\r
508         };\r
509     std::vector<cnf_elem *> get_where_clause(){return where;};\r
510     std::vector<cnf_elem *> get_filter_clause(){return where;};\r
511     std::vector<cnf_elem *> get_having_clause(){return having;};\r
512     gb_table *get_gb_tbl(){return &gb_tbl;};\r
513     aggregate_table *get_aggr_tbl(){return &aggr_tbl;};\r
514         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);\r
515     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);\r
516 \r
517 //                              table which represents output tuple.\r
518         table_def *get_fields();\r
519         std::vector<tablevar_t *> get_input_tbls();\r
520         std::vector<tablevar_t *> get_output_tbls();\r
521 \r
522 \r
523         sgah_qpn(){\r
524                 lfta_disorder = 1;\r
525                 hfta_disorder = 1;\r
526         };\r
527         sgah_qpn(query_summary_class *qs,table_list *Schema){\r
528                 lfta_disorder = 1;\r
529                 hfta_disorder = 1;\r
530 \r
531 //                              Get the table name.\r
532 //                              NOTE the colrefs have the tablevar ref (an int)\r
533 //                              embedded in them.  Would it make sense\r
534 //                              to grab the whole table list?\r
535                 tablevar_list_t *fm = qs->fta_tree->get_from();\r
536                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();\r
537                 if(tbl_vec.size() != 1){\r
538                         char tmpstr[200];\r
539                         sprintf(tmpstr,"INTERNAL ERROR building SGAH node: query defined over %lu tables.\n",tbl_vec.size() );\r
540                         err_str=tmpstr;\r
541                         error_code = 1;\r
542                 }\r
543                 table_name = (tbl_vec[0]);\r
544 \r
545 //                              Get the select list.\r
546                 select_list = qs->fta_tree->get_sl_vec();\r
547 \r
548 //                              Get the selection and having predicates.\r
549                 where = qs->wh_cnf;\r
550                 having = qs->hav_cnf;\r
551 \r
552 //                              Build a new GB var table (don't share, might need to modify)\r
553                 int g;\r
554                 for(g=0;g<qs->gb_tbl->size();g++){\r
555                         gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),\r
556                                 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),\r
557                                 qs->gb_tbl->get_reftype(g)\r
558                         );\r
559                 }\r
560                 gb_tbl.set_pattern_info(qs->gb_tbl);\r
561 //              gb_tbl.gb_entry_type = qs->gb_tbl->gb_entry_type;\r
562 //              gb_tbl.gb_entry_count = qs->gb_tbl->gb_entry_count;\r
563 //              gb_tbl.pattern_components = qs->gb_tbl->pattern_components;\r
564 \r
565 //                              Build a new aggregate table. (don't share, might need\r
566 //                              to modify).\r
567                 int a;\r
568                 for(a=0;a<qs->aggr_tbl->size();a++){\r
569                         aggr_tbl.add_aggr(\r
570 //                              qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)\r
571                                 qs->aggr_tbl->duplicate(a)\r
572                         );\r
573                 }\r
574 \r
575 \r
576 //                              Get the parameters\r
577                 param_tbl = qs->param_tbl;\r
578 \r
579         };\r
580 \r
581 \r
582 \r
583         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);\r
584         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);\r
585 //              Ensure that any refs to interface params have been split away.\r
586         int count_ifp_refs(std::set<std::string> &ifpnames);\r
587         int resolve_if_params(ifq_t *ifdb, std::string &err);\r
588 \r
589         // the following method is used for distributed query optimization\r
590         double get_rate_estimate();\r
591 \r
592 \r
593         qp_node* make_copy(std::string suffix){\r
594                 sgah_qpn *ret = new sgah_qpn();\r
595 \r
596                 ret->param_tbl = new param_table();\r
597                 std::vector<std::string> param_names = param_tbl->get_param_names();\r
598                 int pi;\r
599                 for(pi=0;pi<param_names.size();pi++){\r
600                         data_type *dt = param_tbl->get_data_type(param_names[pi]);\r
601                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),\r
602                                                         param_tbl->handle_access(param_names[pi]));\r
603                 }\r
604                 ret->definitions = definitions;\r
605 \r
606                 ret->node_name = node_name + suffix;\r
607 \r
608                 // make shallow copy of all fields\r
609                 ret->where = where;\r
610                 ret->having = having;\r
611                 ret->select_list = select_list;\r
612                 ret->gb_tbl = gb_tbl;\r
613                 ret->aggr_tbl = aggr_tbl;\r
614 \r
615                 return ret;\r
616         };\r
617 \r
618 //              Split aggregation into two HFTA components - sub and superaggregation\r
619 //              If unable to split the aggreagates, split into selection and aggregation\r
620 //              If resulting low-level query is empty (e.g. when aggregates cannot be split and\r
621 //              where clause is empty) empty vector willb e returned\r
622         virtual std::vector<qp_node *> split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema);\r
623 \r
624         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);\r
625 \r
626 };\r
627 \r
628 \r
629 \r
630 \r
631 //              Select, group-by, aggregate. with running aggregates\r
632 //              Representing\r
633 //                      Select SE_1, ..., SE_k\r
634 //                      From T\r
635 //                      Where predicate\r
636 //                      Group By gb1, ..., gb_n\r
637 //                      Closing When predicate\r
638 //                      Having predicate\r
639 //\r
640 //              NOTE : the sampling operator is sgahcwcb_qpn.\r
641 //\r
642 //              For now, must have group-by variables and aggregates.\r
643 //              The scalar expressions which are output must be a function\r
644 //              of the groub-by variables and the aggregates.\r
645 //              The group-by variables can be references to columsn of T,\r
646 //              or they can be scalar expressions.\r
647 class rsgah_qpn: public qp_node{\r
648 public:\r
649         tablevar_t *table_name;                         // source table\r
650         std::vector<cnf_elem *> where;          // selection predicate\r
651         std::vector<cnf_elem *> having;         // post-aggregation predicate\r
652         std::vector<cnf_elem *> closing_when;           // group closing predicate\r
653         std::vector<select_element *> select_list;      // se's of output\r
654         gb_table gb_tbl;                        // Table of all group-by attributes.\r
655         aggregate_table aggr_tbl;       // Table of all referenced aggregates.\r
656 \r
657         std::vector<scalarexp_t *> gb_sources;  // pre-compute for partitioning.\r
658 \r
659         int lfta_disorder;              // maximum disorder allowed in stream between lfta, hfta\r
660         int hfta_disorder;              // maximum disorder allowed in hfta\r
661 \r
662         std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}\r
663 \r
664 \r
665         std::string node_type(){return("rsgah_qpn");    };\r
666     bool makes_transform(){return true;};\r
667         std::vector<std::string> external_libs(){\r
668                 std::vector<std::string> ret;\r
669                 return ret;\r
670         }\r
671 \r
672         void bind_to_schema(table_list *Schema);\r
673         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){\r
674                 fprintf(stderr,"INTERNAL ERROR, calling rsgah_qpn::get_colrefs\n");\r
675                 exit(1);\r
676         }\r
677 \r
678         std::string to_query_string();\r
679         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);\r
680         std::string generate_functor_name();\r
681 \r
682         std::string generate_operator(int i, std::string params);\r
683         std::string get_include_file(){return("#include <running_gb_operator.h>\n");};\r
684 \r
685     std::vector<select_element *> get_select_list(){return select_list;};\r
686     std::vector<scalarexp_t *> get_select_se_list(){\r
687                 std::vector<scalarexp_t *> ret;\r
688                 int i;\r
689                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);\r
690                 return ret;\r
691         };\r
692     std::vector<cnf_elem *> get_where_clause(){return where;};\r
693     std::vector<cnf_elem *> get_filter_clause(){return where;};\r
694     std::vector<cnf_elem *> get_having_clause(){return having;};\r
695     std::vector<cnf_elem *> get_closing_when_clause(){return closing_when;};\r
696     gb_table *get_gb_tbl(){return &gb_tbl;};\r
697     aggregate_table *get_aggr_tbl(){return &aggr_tbl;};\r
698         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);\r
699     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);\r
700 \r
701 //                              table which represents output tuple.\r
702         table_def *get_fields();\r
703         std::vector<tablevar_t *> get_input_tbls();\r
704         std::vector<tablevar_t *> get_output_tbls();\r
705 \r
706 \r
707         rsgah_qpn(){\r
708                 lfta_disorder = 1;\r
709                 hfta_disorder = 1;\r
710         };\r
711         rsgah_qpn(query_summary_class *qs,table_list *Schema){\r
712                 lfta_disorder = 1;\r
713                 hfta_disorder = 1;\r
714 \r
715 //                              Get the table name.\r
716 //                              NOTE the colrefs have the tablevar ref (an int)\r
717 //                              embedded in them.  Would it make sense\r
718 //                              to grab the whole table list?\r
719                 tablevar_list_t *fm = qs->fta_tree->get_from();\r
720                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();\r
721                 if(tbl_vec.size() != 1){\r
722                         char tmpstr[200];\r
723                         sprintf(tmpstr,"INTERNAL ERROR buildingR SGAH node: query defined over %lu tables.\n",tbl_vec.size() );\r
724                         err_str=tmpstr;\r
725                         error_code = 1;\r
726                 }\r
727                 table_name = (tbl_vec[0]);\r
728 \r
729 //                              Get the select list.\r
730                 select_list = qs->fta_tree->get_sl_vec();\r
731 \r
732 //                              Get the selection and having predicates.\r
733                 where = qs->wh_cnf;\r
734                 having = qs->hav_cnf;\r
735                 closing_when = qs->closew_cnf;\r
736 \r
737 //                              Build a new GB var table (don't share, might need to modify)\r
738                 int g;\r
739                 for(g=0;g<qs->gb_tbl->size();g++){\r
740                         gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),\r
741                                 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),\r
742                                 qs->gb_tbl->get_reftype(g)\r
743                         );\r
744                 }\r
745 \r
746 //                              Build a new aggregate table. (don't share, might need\r
747 //                              to modify).\r
748                 int a;\r
749                 for(a=0;a<qs->aggr_tbl->size();a++){\r
750                         aggr_tbl.add_aggr(\r
751 //                              qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)\r
752                                 qs->aggr_tbl->duplicate(a)\r
753                         );\r
754                 }\r
755 \r
756 \r
757 //                              Get the parameters\r
758                 param_tbl = qs->param_tbl;\r
759 \r
760         };\r
761 \r
762 \r
763 \r
764         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);\r
765         std::vector<qp_node *> split_node_for_hfta(ext_fcn_list *Ext_fcns, table_list *Schema);\r
766         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);\r
767 //              Ensure that any refs to interface params have been split away.\r
768         int count_ifp_refs(std::set<std::string> &ifpnames);\r
769         int resolve_if_params(ifq_t *ifdb, std::string &err){ return 0;}\r
770 \r
771         // the following method is used for distributed query optimization\r
772         double get_rate_estimate();\r
773 \r
774         qp_node* make_copy(std::string suffix){\r
775                 rsgah_qpn *ret = new rsgah_qpn();\r
776 \r
777                 ret->param_tbl = new param_table();\r
778                 std::vector<std::string> param_names = param_tbl->get_param_names();\r
779                 int pi;\r
780                 for(pi=0;pi<param_names.size();pi++){\r
781                         data_type *dt = param_tbl->get_data_type(param_names[pi]);\r
782                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),\r
783                                                         param_tbl->handle_access(param_names[pi]));\r
784                 }\r
785                 ret->definitions = definitions;\r
786 \r
787                 ret->node_name = node_name + suffix;\r
788 \r
789                 // make shallow copy of all fields\r
790                 ret->where = where;\r
791                 ret->having = having;\r
792                 ret->closing_when = closing_when;\r
793                 ret->select_list = select_list;\r
794                 ret->gb_tbl = gb_tbl;\r
795                 ret->aggr_tbl = aggr_tbl;\r
796 \r
797                 return ret;\r
798         };\r
799         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);\r
800 };\r
801 \r
802 \r
803 //              forward reference\r
804 class filter_join_qpn;\r
805 \r
806 \r
807 //              (temporal) Merge query plan node.\r
808 //              represent the following query fragment\r
809 //                      Merge c1:c2\r
810 //                      from T1 _t1, T2 _t2\r
811 //\r
812 //              T1 and T2 must have compatible schemas,\r
813 //              that is the same types in the same slots.\r
814 //              c1 and c2 must be colrefs from T1 and T2,\r
815 //              both ref'ing the same slot.  Their types\r
816 //              must be temporal and the same kind of temporal.\r
817 //              in the output, no other field is temporal.\r
818 //              the field names ofthe output are drawn from T1.\r
819 class mrg_qpn: public qp_node{\r
820 public:\r
821         std::vector<tablevar_t *> fm;                                   //      Source table\r
822         std::vector<colref_t *> mvars;                                  // the merge-by columns.\r
823         scalarexp_t *slack;\r
824 \r
825         table_def *table_layout;                                // the output schema\r
826         int merge_fieldpos;                                             // position of merge field,\r
827                                                                                         // convenience for manipulation.\r
828 \r
829         int disorder;           // max disorder seen in the input / allowed in the output\r
830 \r
831 \r
832         // partition definition for merges that combine streams partitioned over multiple interfaces\r
833         partn_def_t* partn_def;\r
834 \r
835 \r
836 \r
837         std::string node_type(){return("mrg_qpn");      };\r
838     bool makes_transform(){return false;};\r
839         std::vector<std::string> external_libs(){\r
840                 std::vector<std::string> ret;\r
841                 return ret;\r
842         }\r
843 \r
844         void bind_to_schema(table_list *Schema);\r
845         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){\r
846                 fprintf(stderr,"INTERNAL ERROR, calling mrg_qpn::get_colrefs\n");\r
847                 exit(1);\r
848         }\r
849 \r
850         std::string to_query_string();\r
851         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);\r
852         std::string generate_functor_name();\r
853         std::string generate_operator(int i, std::string params);\r
854         std::string get_include_file(){\r
855                 if(disorder>1)\r
856                         return("#include <merge_operator_oop.h>\n");\r
857                 return("#include <merge_operator.h>\n");\r
858         };\r
859 \r
860         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);\r
861     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);\r
862 \r
863         table_def *get_fields();\r
864         std::vector<tablevar_t *> get_input_tbls();\r
865         std::vector<tablevar_t *> get_output_tbls();\r
866 \r
867         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);\r
868         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);\r
869 //              Ensure that any refs to interface params have been split away.\r
870         int count_ifp_refs(std::set<std::string> &ifpnames);\r
871 \r
872 //                      No predicates, return an empty clause\r
873     std::vector<cnf_elem *> get_where_clause(){\r
874                  std::vector<cnf_elem *> t;\r
875                 return(t);\r
876         };\r
877     std::vector<cnf_elem *> get_filter_clause(){\r
878                 return get_where_clause();\r
879         }\r
880 \r
881         mrg_qpn(){\r
882                 partn_def = NULL;\r
883         };\r
884 \r
885         void set_disorder(int d){\r
886                 disorder = d;\r
887         }\r
888 \r
889         mrg_qpn(query_summary_class *qs,table_list *Schema){\r
890                 disorder = 1;\r
891 \r
892 //                              Grab the elements of the query node.\r
893                 fm = qs->fta_tree->get_from()->get_table_list();\r
894                 mvars = qs->mvars;\r
895                 slack = qs->slack;\r
896 \r
897 //                      sanity check\r
898                 if(fm.size() != mvars.size()){\r
899                         fprintf(stderr,"INTERNAL ERROR in mrg_qpn::mrg_qpn.  fm.size() = %lu, mvars.size() = %lu\n",fm.size(),mvars.size());\r
900                         exit(1);\r
901                 }\r
902 \r
903 //                              Get the parameters\r
904                 param_tbl = qs->param_tbl;\r
905 \r
906 //                              Need to set the node name now, so that the\r
907 //                              schema (table_layout) can be properly named.\r
908 //                              TODO: Setting the name of the table might best be done\r
909 //                              via the set_node_name method, because presumably\r
910 //                              thats when the node name is really known.\r
911 //                              This should propogate to the table_def table_layout\r
912                 node_name=qs->query_name;\r
913 \r
914 /*\r
915 int ff;\r
916 printf("instantiating merge node, name = %s, %d sources.\n\t",node_name.c_str(), fm.size());\r
917 for(ff=0;ff<fm.size();++ff){\r
918 printf("%s ",fm[ff]->to_string().c_str());\r
919 }\r
920 printf("\n");\r
921 */\r
922 \r
923 \r
924 //              Create the output schema.\r
925 //              strip temporal properites form all fields except the merge field.\r
926                 std::vector<field_entry *> flva = Schema->get_fields(fm[0]->get_schema_name());\r
927                 field_entry_list *fel = new field_entry_list();\r
928                 int f;\r
929                 for(f=0;f<flva.size();++f){\r
930                         field_entry *fe;\r
931                         data_type dt(flva[f]->get_type().c_str(), flva[f]->get_modifier_list());\r
932                         if(flva[f]->get_name() == mvars[0]->get_field()){\r
933                                 merge_fieldpos = f;\r
934 //                              if(slack != NULL) dt.reset_temporal();\r
935                         }else{\r
936                                 dt.reset_temporal();\r
937                         }\r
938 \r
939                         param_list *plist = new param_list();\r
940                         std::vector<std::string> param_strings = dt.get_param_keys();\r
941                         int p;\r
942                         for(p=0;p<param_strings.size();++p){\r
943                                 std::string v = dt.get_param_val(param_strings[p]);\r
944                                 if(v != "")\r
945                                         plist->append(param_strings[p].c_str(),v.c_str());\r
946                                 else\r
947                                         plist->append(param_strings[p].c_str());\r
948                         }\r
949 \r
950 \r
951                         fe=new field_entry(\r
952                                 dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist, flva[f]->get_unpack_fcns());\r
953                         fel->append_field(fe);\r
954                 }\r
955 \r
956 \r
957 \r
958 \r
959                 table_layout = new table_def(\r
960                         node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA\r
961                 );\r
962 \r
963                 partn_def = NULL;\r
964         };\r
965 \r
966 \r
967 /////////////////////////////////////////////\r
968 ///             Created for de-siloing.  to be removed?  or is it otherwise useful?\r
969 //              Merge existing set of sources (de-siloing)\r
970         mrg_qpn(std::string n_name, std::vector<std::string> &src_names,table_list *Schema){\r
971                 int i,f;\r
972 \r
973                 disorder = 1;\r
974 \r
975 //                              Construct the fm list\r
976                 for(f=0;f<src_names.size();++f){\r
977                         int tbl_ref = Schema->get_table_ref(src_names[f]);\r
978                         if(tbl_ref < 0){\r
979                                 fprintf(stderr,"INTERNAL ERROR, can't find %s in the schema when constructing no-silo merge node %s\n",src_names[f].c_str(), n_name.c_str());\r
980                                 exit(1);\r
981                         }\r
982                         table_def *src_tbl = Schema->get_table(tbl_ref);\r
983                         tablevar_t *fm_t = new tablevar_t(src_names[f].c_str());\r
984                         string range_name = "_t" + int_to_string(f);\r
985                         fm_t->set_range_var(range_name);\r
986                         fm_t->set_schema_ref(tbl_ref);\r
987                         fm.push_back(fm_t);\r
988                 }\r
989 \r
990 //              Create the output schema.\r
991 //              strip temporal properites form all fields except the merge field.\r
992                 std::vector<field_entry *> flva = Schema->get_fields(fm[0]->get_schema_name());\r
993                 field_entry_list *fel = new field_entry_list();\r
994                 bool temporal_found = false;\r
995                 for(f=0;f<flva.size();++f){\r
996                         field_entry *fe;\r
997                         data_type dt(flva[f]->get_type().c_str(), flva[f]->get_modifier_list());\r
998                         if(dt.is_temporal() && !temporal_found){\r
999                                 merge_fieldpos = f;\r
1000                                 temporal_found = true;\r
1001                         }else{\r
1002                                 dt.reset_temporal();\r
1003                         }\r
1004 \r
1005                         param_list *plist = new param_list();\r
1006                         std::vector<std::string> param_strings = dt.get_param_keys();\r
1007                         int p;\r
1008                         for(p=0;p<param_strings.size();++p){\r
1009                                 std::string v = dt.get_param_val(param_strings[p]);\r
1010                                 if(v != "")\r
1011                                         plist->append(param_strings[p].c_str(),v.c_str());\r
1012                                 else\r
1013                                         plist->append(param_strings[p].c_str());\r
1014                         }\r
1015 \r
1016                         fe=new field_entry(\r
1017                                 dt.get_type_str().c_str(), flva[f]->get_name().c_str(),"",plist,\r
1018                                 flva[f]->get_unpack_fcns()\r
1019                         );\r
1020                         fel->append_field(fe);\r
1021                 }\r
1022 \r
1023                 if(! temporal_found){\r
1024                         fprintf(stderr,"ERROR, can't find temporal field of the sources when constructing no-silo merge node %s\n",n_name.c_str());\r
1025                         exit(1);\r
1026                 }\r
1027 \r
1028                 node_name=n_name;\r
1029                 table_layout = new table_def(\r
1030                         node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA\r
1031                 );\r
1032 \r
1033                 partn_def = NULL;\r
1034                 param_tbl = new param_table();\r
1035 \r
1036 //                      Construct mvars\r
1037                 for(f=0;f<fm.size();++f){\r
1038                         std::vector<field_entry *> flv_f = Schema->get_fields(fm[f]->get_schema_name());\r
1039                         data_type dt_f(flv_f[merge_fieldpos]->get_type().c_str(),\r
1040                              flva[merge_fieldpos]->get_modifier_list());\r
1041 \r
1042                         colref_t *mcr = new colref_t(fm[f]->get_var_name().c_str(),\r
1043                                 flv_f[merge_fieldpos]->get_name().c_str());\r
1044                         mvars.push_back(mcr);\r
1045                 }\r
1046 \r
1047 //              literal_t *s_lit = new literal_t("5",LITERAL_INT);\r
1048 //              slack = new scalarexp_t(s_lit);\r
1049                 slack = NULL;\r
1050 \r
1051         };\r
1052 //                      end de-siloing\r
1053 ////////////////////////////////////////\r
1054 \r
1055         void resolve_slack(scalarexp_t *t_se, std::string fname, std::vector<std::pair<std::string, std::string> > &sources,ifq_t *ifdb, gb_table *gbt);\r
1056 \r
1057 \r
1058 //                      Merge filter_join LFTAs.\r
1059 \r
1060         mrg_qpn(filter_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb);\r
1061 \r
1062 //                      Merge selection LFTAs.\r
1063 \r
1064         mrg_qpn(spx_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb){\r
1065 \r
1066                 disorder = 1;\r
1067 \r
1068                 param_tbl = spx->param_tbl;\r
1069                 int i;\r
1070                 node_name = n_name;\r
1071                 field_entry_list *fel = new field_entry_list();\r
1072                 merge_fieldpos = -1;\r
1073 \r
1074 \r
1075 \r
1076 \r
1077                 for(i=0;i<spx->select_list.size();++i){\r
1078                         data_type *dt = spx->select_list[i]->se->get_data_type()->duplicate();\r
1079                         if(dt->is_temporal()){\r
1080                                 if(merge_fieldpos < 0){\r
1081                                         merge_fieldpos = i;\r
1082                                 }else{\r
1083                                         fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), spx->select_list[merge_fieldpos]->name.c_str(), spx->select_list[i]->name.c_str(), spx->select_list[merge_fieldpos]->name.c_str() );\r
1084                                         dt->reset_temporal();\r
1085                                 }\r
1086                         }\r
1087 \r
1088                         field_entry *fe = dt->make_field_entry(spx->select_list[i]->name);\r
1089                         fel->append_field(fe);\r
1090                         delete dt;\r
1091                 }\r
1092                 if(merge_fieldpos<0){\r
1093                         fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());\r
1094                                 exit(1);\r
1095                 }\r
1096                 table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);\r
1097 \r
1098 //                              NEED TO HANDLE USER_SPECIFIED SLACK\r
1099                 this->resolve_slack(spx->select_list[merge_fieldpos]->se,\r
1100                                 spx->select_list[merge_fieldpos]->name, ifaces, ifdb,NULL);\r
1101 //      if(this->slack == NULL)\r
1102 //              fprintf(stderr,"Zero slack.\n");\r
1103 //      else\r
1104 //              fprintf(stderr,"slack is %s\n",slack->to_string().c_str());\r
1105 \r
1106                 for(i=0;i<sources.size();i++){\r
1107                         std::string rvar = "_m"+int_to_string(i);\r
1108                         mvars.push_back(new colref_t(rvar.c_str(), spx->select_list[merge_fieldpos]->name.c_str()));\r
1109                         mvars[i]->set_tablevar_ref(i);\r
1110                         fm.push_back(new tablevar_t(sources[i].c_str()));\r
1111                         fm[i]->set_range_var(rvar);\r
1112                 }\r
1113 \r
1114                 param_tbl = new param_table();\r
1115                 std::vector<std::string> param_names = spx->param_tbl->get_param_names();\r
1116                 int pi;\r
1117                 for(pi=0;pi<param_names.size();pi++){\r
1118                         data_type *dt = spx->param_tbl->get_data_type(param_names[pi]);\r
1119                         param_tbl->add_param(param_names[pi],dt->duplicate(),\r
1120                                                         spx->param_tbl->handle_access(param_names[pi]));\r
1121                 }\r
1122                 definitions = spx->definitions;\r
1123 \r
1124         }\r
1125 \r
1126 //              Merge aggregation LFTAs\r
1127 \r
1128         mrg_qpn(sgah_qpn *sgah, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair< std::string, std::string> > &ifaces, ifq_t *ifdb){\r
1129 \r
1130                 disorder = 1;\r
1131 \r
1132                 param_tbl = sgah->param_tbl;\r
1133                 int i;\r
1134                 node_name = n_name;\r
1135                 field_entry_list *fel = new field_entry_list();\r
1136                 merge_fieldpos = -1;\r
1137                 for(i=0;i<sgah->select_list.size();++i){\r
1138                         data_type *dt = sgah->select_list[i]->se->get_data_type()->duplicate();\r
1139                         if(dt->is_temporal()){\r
1140                                 if(merge_fieldpos < 0){\r
1141                                         merge_fieldpos = i;\r
1142                                 }else{\r
1143                                         fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str(), sgah->select_list[i]->name.c_str(), sgah->select_list[merge_fieldpos]->name.c_str() );\r
1144                                         dt->reset_temporal();\r
1145                                 }\r
1146                         }\r
1147 \r
1148                         field_entry *fe = dt->make_field_entry(sgah->select_list[i]->name);\r
1149                         fel->append_field(fe);\r
1150                         delete dt;\r
1151                 }\r
1152                 if(merge_fieldpos<0){\r
1153                         fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());\r
1154                         exit(1);\r
1155                 }\r
1156                 table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);\r
1157 \r
1158 //                              NEED TO HANDLE USER_SPECIFIED SLACK\r
1159                 this->resolve_slack(sgah->select_list[merge_fieldpos]->se,\r
1160                                 sgah->select_list[merge_fieldpos]->name, ifaces, ifdb,\r
1161                                 &(sgah->gb_tbl));\r
1162                 if(this->slack == NULL)\r
1163                         fprintf(stderr,"Zero slack.\n");\r
1164                 else\r
1165                         fprintf(stderr,"slack is %s\n",slack->to_string().c_str());\r
1166 \r
1167 \r
1168                 for(i=0;i<sources.size();i++){\r
1169                         std::string rvar = "_m"+int_to_string(i);\r
1170                         mvars.push_back(new colref_t(rvar.c_str(), sgah->select_list[merge_fieldpos]->name.c_str()));\r
1171                         mvars[i]->set_tablevar_ref(i);\r
1172                         fm.push_back(new tablevar_t(sources[i].c_str()));\r
1173                         fm[i]->set_range_var(rvar);\r
1174                 }\r
1175 \r
1176                 param_tbl = new param_table();\r
1177                 std::vector<std::string> param_names = sgah->param_tbl->get_param_names();\r
1178                 int pi;\r
1179                 for(pi=0;pi<param_names.size();pi++){\r
1180                         data_type *dt = sgah->param_tbl->get_data_type(param_names[pi]);\r
1181                         param_tbl->add_param(param_names[pi],dt->duplicate(),\r
1182                                                         sgah->param_tbl->handle_access(param_names[pi]));\r
1183                 }\r
1184                 definitions = sgah->definitions;\r
1185 \r
1186         }\r
1187 \r
1188         qp_node *make_copy(std::string suffix){\r
1189                 mrg_qpn *ret = new mrg_qpn();\r
1190                 ret->slack = slack;\r
1191                 ret->disorder = disorder;\r
1192 \r
1193                 ret->param_tbl = new param_table();\r
1194                 std::vector<std::string> param_names = param_tbl->get_param_names();\r
1195                 int pi;\r
1196                 for(pi=0;pi<param_names.size();pi++){\r
1197                         data_type *dt = param_tbl->get_data_type(param_names[pi]);\r
1198                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),\r
1199                                                         param_tbl->handle_access(param_names[pi]));\r
1200                 }\r
1201                 ret->definitions = definitions;\r
1202 \r
1203                 ret->node_name = node_name + suffix;\r
1204                 ret->table_layout = table_layout->make_shallow_copy(ret->node_name);\r
1205                 ret->merge_fieldpos = merge_fieldpos;\r
1206 \r
1207                 return ret;\r
1208         };\r
1209 \r
1210         std::vector<mrg_qpn *> split_sources();\r
1211 \r
1212         // the following method is used for distributed query optimization\r
1213         double get_rate_estimate();\r
1214 \r
1215 \r
1216         // get partition definition for merges that combine streams partitioned over multiple interfaces\r
1217         // return NULL for regular merges\r
1218         partn_def_t* get_partn_definition(map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result) {\r
1219                 if (partn_def)\r
1220                         return partn_def;\r
1221 \r
1222                 int err;\r
1223                 string err_str;\r
1224                 string partn_name;\r
1225 \r
1226                 vector<tablevar_t *> input_tables = get_input_tbls();\r
1227                 for (int i = 0; i <  input_tables.size(); ++i) {\r
1228                         tablevar_t * table = input_tables[i];\r
1229 \r
1230                         vector<string> partn_names = ifaces_db->get_iface_vals(table->get_machine(), table->get_interface(),"iface_partition",err,err_str);\r
1231                         if (partn_names.size() != 1)    // can't have more than one value of partition attribute\r
1232                                 return NULL;\r
1233                         string new_partn_name = partn_names[0];\r
1234 \r
1235                         // need to make sure that all ifaces belong to the same partition\r
1236                         if (!i)\r
1237                                 partn_name = new_partn_name;\r
1238                         else if (new_partn_name != partn_name)\r
1239                                 return NULL;\r
1240                 }\r
1241 \r
1242                 // now find partition definition corresponding to partn_name\r
1243                 partn_def = partn_parse_result->get_partn_def(partn_name);\r
1244                 return partn_def;\r
1245         };\r
1246 \r
1247         void set_partn_definition(partn_def_t* def) {\r
1248                 partn_def = def;\r
1249         }\r
1250 \r
1251         bool is_multihost_merge() {\r
1252 \r
1253                 bool is_multihost = false;\r
1254 \r
1255                 // each input table must be have machine attribute be non-empty\r
1256                 // and there should be at least 2 different values of machine attributes\r
1257                 vector<tablevar_t *> input_tables = get_input_tbls();\r
1258                 string host = input_tables[0]->get_machine();\r
1259                 for  (int i = 1; i < input_tables.size(); ++i) {\r
1260                         string new_host = input_tables[i]->get_machine();\r
1261                         if (new_host == "")\r
1262                                 return false;\r
1263                         if (new_host != host)\r
1264                                 is_multihost = true;\r
1265                 }\r
1266                 return is_multihost;\r
1267         }\r
1268 \r
1269         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);\r
1270 };\r
1271 \r
1272 \r
1273 //              eq_temporal, hash join query plan node.\r
1274 //              represent the following query fragment\r
1275 //                      select scalar_expression_1, ..., scalar_expression_k\r
1276 //                      from T0 t0, T1 t1\r
1277 //                      where predicate\r
1278 //\r
1279 //              the predicates and the scalar expressions can reference\r
1280 //              attributes of t0 and t1 and also functions.\r
1281 //              The predicate must contain CNF elements to enable the\r
1282 //              efficient evaluation of the query.\r
1283 //              1) at least one predicate of the form\r
1284 //                      (temporal se in t0) = (temporal se in t1)\r
1285 //              2) at least one predicate of the form\r
1286 //                      (non-temporal se in t0) = (non-temporal se in t1)\r
1287 //\r
1288 class join_eq_hash_qpn: public qp_node{\r
1289 public:\r
1290         std::vector<tablevar_t *> from;                                 //      Source tables\r
1291         std::vector<select_element *> select_list;      //      Select list\r
1292         std::vector<cnf_elem *> prefilter[2];           // source prefilters\r
1293         std::vector<cnf_elem *> temporal_eq;            // define temporal window\r
1294         std::vector<cnf_elem *> hash_eq;                        // define hash key\r
1295         std::vector<cnf_elem *> postfilter;                     // final filter on hash matches.\r
1296 \r
1297         std::vector<cnf_elem *> where;                          // all the filters\r
1298                                                                                                 // useful for summary analysis\r
1299 \r
1300         std::vector<scalarexp_t *> hash_src_r, hash_src_l;\r
1301 \r
1302         std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}\r
1303         std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}\r
1304 \r
1305         std::string node_type(){return("join_eq_hash_qpn");     };\r
1306     bool makes_transform(){return true;};\r
1307         std::vector<std::string> external_libs(){\r
1308                 std::vector<std::string> ret;\r
1309                 return ret;\r
1310         }\r
1311 \r
1312         void bind_to_schema(table_list *Schema);\r
1313         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){\r
1314                 fprintf(stderr,"INTERNAL ERROR, calling join_eq_hash_qpn::get_colrefs\n");\r
1315                 exit(1);\r
1316         }\r
1317 \r
1318         std::string to_query_string();\r
1319         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);\r
1320         std::string generate_functor_name();\r
1321         std::string generate_operator(int i, std::string params);\r
1322         std::string get_include_file(){return("#include <join_eq_hash_operator.h>\n");};\r
1323 \r
1324     std::vector<select_element *> get_select_list(){return select_list;};\r
1325     std::vector<scalarexp_t *> get_select_se_list(){\r
1326                 std::vector<scalarexp_t *> ret;\r
1327                 int i;\r
1328                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);\r
1329                 return ret;\r
1330         };\r
1331 //                      Used for LFTA only\r
1332     std::vector<cnf_elem *> get_where_clause(){\r
1333                  std::vector<cnf_elem *> t;\r
1334                 return(t);\r
1335         };\r
1336     std::vector<cnf_elem *> get_filter_clause(){\r
1337                 return get_where_clause();\r
1338         }\r
1339 \r
1340         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);\r
1341     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);\r
1342 \r
1343         table_def *get_fields();\r
1344         std::vector<tablevar_t *> get_input_tbls();\r
1345         std::vector<tablevar_t *> get_output_tbls();\r
1346 \r
1347         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);\r
1348         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);\r
1349 //              Ensure that any refs to interface params have been split away.\r
1350         int count_ifp_refs(std::set<std::string> &ifpnames);\r
1351 \r
1352         join_eq_hash_qpn(){\r
1353         };\r
1354         join_eq_hash_qpn(query_summary_class *qs,table_list *Schema){\r
1355                 int w;\r
1356 //                              Get the table name.\r
1357 //                              NOTE the colrefs have the table ref (an int)\r
1358 //                              embedded in them.  Would it make sense\r
1359 //                              to grab the whole table list?\r
1360                 from = qs->fta_tree->get_from()->get_table_list();\r
1361                 if(from.size() != 2){\r
1362                         char tmpstr[200];\r
1363                         sprintf(tmpstr,"ERROR building join_eq_hash node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );\r
1364                         err_str = tmpstr;\r
1365                         error_code = 1;\r
1366                 }\r
1367 \r
1368 //                              Get the select list.\r
1369                 select_list = qs->fta_tree->get_sl_vec();\r
1370 \r
1371 //                              Get the selection predicate.\r
1372                 where = qs->wh_cnf;\r
1373                 for(w=0;w<where.size();++w){\r
1374                         analyze_cnf(where[w]);\r
1375                         std::vector<int> pred_tbls;\r
1376                         get_tablevar_ref_pr(where[w]->pr,pred_tbls);\r
1377 //                              Prefilter if refs only one tablevar\r
1378                         if(pred_tbls.size()==1){\r
1379                                 prefilter[pred_tbls[0]].push_back(where[w]);\r
1380                                 continue;\r
1381                         }\r
1382 //                              refs nothing -- might be sampling, do it as postfilter.\r
1383                         if(pred_tbls.size()==0){\r
1384                                 postfilter.push_back(where[w]);\r
1385                                 continue;\r
1386                         }\r
1387 //                              See if it can be a hash or temporal predicate.\r
1388 //                              NOTE: synchronize with the temporality checking\r
1389 //                              done at join_eq_hash_qpn::get_fields\r
1390                         if(where[w]->is_atom && where[w]->eq_pred){\r
1391                                 std::vector<int> sel_tbls, ser_tbls;\r
1392                                 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);\r
1393                                 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);\r
1394                                 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){\r
1395 //                                              make channel 0 SE on LHS.\r
1396                                         if(sel_tbls[0] != 0)\r
1397                                                 where[w]->pr->swap_scalar_operands();\r
1398 \r
1399                                         data_type *dtl=where[w]->pr->get_left_se()->get_data_type();\r
1400                                         data_type *dtr=where[w]->pr->get_right_se()->get_data_type();\r
1401                                         if( (dtl->is_increasing() && dtr->is_increasing()) ||\r
1402                                             (dtl->is_decreasing() && dtr->is_decreasing()) )\r
1403                                                         temporal_eq.push_back(where[w]);\r
1404                                         else\r
1405                                                         hash_eq.push_back(where[w]);\r
1406                                         continue;\r
1407 \r
1408                                 }\r
1409                         }\r
1410 //                              All tests failed, fallback is postfilter.\r
1411                         postfilter.push_back(where[w]);\r
1412                 }\r
1413 \r
1414                 if(temporal_eq.size()==0){\r
1415                         err_str = "ERROR in join query: can't find temporal equality predicate to define a join window.\n";\r
1416                         error_code = 1;\r
1417                 }\r
1418 \r
1419 //                              Get the parameters\r
1420                 param_tbl = qs->param_tbl;\r
1421 \r
1422         };\r
1423 \r
1424         // the following method is used for distributed query optimization\r
1425         double get_rate_estimate();\r
1426 \r
1427 \r
1428         qp_node* make_copy(std::string suffix){\r
1429                 join_eq_hash_qpn *ret = new join_eq_hash_qpn();\r
1430 \r
1431                 ret->param_tbl = new param_table();\r
1432                 std::vector<std::string> param_names = param_tbl->get_param_names();\r
1433                 int pi;\r
1434                 for(pi=0;pi<param_names.size();pi++){\r
1435                         data_type *dt = param_tbl->get_data_type(param_names[pi]);\r
1436                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),\r
1437                                                         param_tbl->handle_access(param_names[pi]));\r
1438                 }\r
1439                 ret->definitions = definitions;\r
1440 \r
1441                 ret->node_name = node_name + suffix;\r
1442 \r
1443                 // make shallow copy of all fields\r
1444                 ret->where = where;\r
1445                 ret->from = from;\r
1446                 ret->select_list = select_list;\r
1447                 ret->prefilter[0] = prefilter[0];\r
1448                 ret->prefilter[1] = prefilter[1];\r
1449                 ret->postfilter = postfilter;\r
1450                 ret->temporal_eq = temporal_eq;\r
1451                 ret->hash_eq = hash_eq;\r
1452 \r
1453                 return ret;\r
1454         };\r
1455         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);\r
1456 \r
1457 };\r
1458 \r
1459 \r
1460 // ---------------------------------------------\r
1461 //              eq_temporal, hash join query plan node.\r
1462 //              represent the following query fragment\r
1463 //                      select scalar_expression_1, ..., scalar_expression_k\r
1464 //                      FILTER_JOIN(col, range) from T0 t0, T1 t1\r
1465 //                      where predicate\r
1466 //\r
1467 //              t0 is the output range variable, t1 is the filtering range\r
1468 //              variable.  Both must alias a PROTOCOL.\r
1469 //              The scalar expressions in the select clause may\r
1470 //              reference t0 only.\r
1471 //              The predicates are classified as follows\r
1472 //              prefilter predicates:\r
1473 //                a cheap predicate in t0 such that there is an equivalent\r
1474 //                predicate in t1.  Cost decisions about pushing to\r
1475 //                lfta prefilter made later.\r
1476 //              t0 predicates (other than prefilter predicates)\r
1477 //                      -- cheap vs. expensive sorted out at genereate time,\r
1478 //                              the constructor isn't called with the function list.\r
1479 //              t1 predicates (other than prefiler predicates).\r
1480 //              equi-join predicates of the form:\r
1481 //                      (se in t0) = (se in t1)\r
1482 //\r
1483 //              There must be at least one equi-join predicate.\r
1484 //              No join predicates other than equi-join predicates\r
1485 //                are allowed.\r
1486 //              Warn on temporal equi-join predicates.\r
1487 //              t1 predicates should not be expensive ... warn?\r
1488 //\r
1489 class filter_join_qpn: public qp_node{\r
1490 public:\r
1491         std::vector<tablevar_t *> from;                                 //      Source tables\r
1492                 colref_t *temporal_var;                 // join window in FROM\r
1493                 unsigned int temporal_range;    // metadata.\r
1494         std::vector<select_element *> select_list;      //      Select list\r
1495         std::vector<cnf_elem *> shared_pred;            // prefilter preds\r
1496         std::vector<cnf_elem *> pred_t0;                        // main (R) preds\r
1497         std::vector<cnf_elem *> pred_t1;                        // filtering (S) preds\r
1498         std::vector<cnf_elem *> hash_eq;                        // define hash key\r
1499         std::vector<cnf_elem *> postfilter;                     // ref's no table.\r
1500 \r
1501         std::vector<cnf_elem *> where;                          // all the filters\r
1502                                                                                                 // useful for summary analysis\r
1503 \r
1504         std::vector<scalarexp_t *> hash_src_r, hash_src_l;\r
1505         std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}\r
1506         std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}\r
1507 \r
1508 \r
1509         bool use_bloom;                 // true => bloom filter, false => limited hash\r
1510 \r
1511         std::string node_type(){return("filter_join");  };\r
1512     bool makes_transform(){return true;};\r
1513         std::vector<std::string> external_libs(){\r
1514                 std::vector<std::string> ret;\r
1515                 return ret;\r
1516         }\r
1517 \r
1518         void bind_to_schema(table_list *Schema);\r
1519         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);\r
1520 \r
1521         std::string to_query_string();\r
1522         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform){\r
1523                 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor called\n");\r
1524                 exit(1);\r
1525         }\r
1526         std::string generate_functor_name(){\r
1527                 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_functor_name called\n");\r
1528                 exit(1);\r
1529         }\r
1530         std::string generate_operator(int i, std::string params){\r
1531                 fprintf(stderr,"INTERNAL ERROR, filter_join_qpn::generate_operator called\n");\r
1532                 exit(1);\r
1533         }\r
1534         std::string get_include_file(){return("#include <join_eq_hash_operator.h>\n");};\r
1535 \r
1536     std::vector<select_element *> get_select_list(){return select_list;};\r
1537     std::vector<scalarexp_t *> get_select_se_list(){\r
1538                 std::vector<scalarexp_t *> ret;\r
1539                 int i;\r
1540                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);\r
1541                 return ret;\r
1542         };\r
1543 //                      Used for LFTA only\r
1544     std::vector<cnf_elem *> get_where_clause(){return where;}\r
1545     std::vector<cnf_elem *> get_filter_clause(){return shared_pred;}\r
1546 \r
1547         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);\r
1548     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);\r
1549 \r
1550         table_def *get_fields();\r
1551         std::vector<tablevar_t *> get_input_tbls();\r
1552         std::vector<tablevar_t *> get_output_tbls();\r
1553 \r
1554         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);\r
1555         int resolve_if_params(ifq_t *ifdb, std::string &err);\r
1556 \r
1557         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);\r
1558 //              Ensure that any refs to interface params have been split away.\r
1559         int count_ifp_refs(std::set<std::string> &ifpnames);\r
1560 \r
1561 \r
1562         filter_join_qpn(){\r
1563         };\r
1564         filter_join_qpn(query_summary_class *qs,table_list *Schema){\r
1565                 int i,w;\r
1566 //                              Get the table name.\r
1567 //                              NOTE the colrefs have the table ref (an int)\r
1568 //                              embedded in them.  Would it make sense\r
1569 //                              to grab the whole table list?\r
1570                 from = qs->fta_tree->get_from()->get_table_list();\r
1571                 temporal_var = qs->fta_tree->get_from()->get_colref();\r
1572                 temporal_range = qs->fta_tree->get_from()->get_temporal_range();\r
1573                 if(from.size() != 2){\r
1574                         char tmpstr[200];\r
1575                         sprintf(tmpstr,"ERROR building filter_join_qpn node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );\r
1576                         err_str += tmpstr;\r
1577                         error_code = 1;\r
1578                 }\r
1579 \r
1580 //                              Get the select list.\r
1581                 select_list = qs->fta_tree->get_sl_vec();\r
1582 //                              Verify that only t0 is referenced.\r
1583                 bool bad_ref = false;\r
1584                 for(i=0;i<select_list.size();i++){\r
1585                         vector<int> sel_tbls;\r
1586                         get_tablevar_ref_se(select_list[i]->se,sel_tbls);\r
1587                         if((sel_tbls.size() == 2) || (sel_tbls.size()==1 && sel_tbls[0]==1))\r
1588                                 bad_ref = true;\r
1589                 }\r
1590                 if(bad_ref){\r
1591                         err_str += "ERROR building filter_join_qpn node: query references range variable "+from[1]->variable_name+", but only the first range variable ("+from[0]->variable_name+" can be referenced.\n";\r
1592                         error_code = 1;\r
1593                 }\r
1594 \r
1595 \r
1596 //                              Get the selection predicate.\r
1597                 where = qs->wh_cnf;\r
1598                 std::vector<cnf_elem *> t0_only, t1_only;\r
1599                 for(w=0;w<where.size();++w){\r
1600                         analyze_cnf(where[w]);\r
1601                         std::vector<int> pred_tbls;\r
1602                         get_tablevar_ref_pr(where[w]->pr,pred_tbls);\r
1603 //                              Collect the list of preds by src var,\r
1604 //                              extract the shared preds later.\r
1605                         if(pred_tbls.size()==1){\r
1606                                 if(pred_tbls[0] == 0){\r
1607                                         t0_only.push_back(where[w]);\r
1608                                 }else{\r
1609                                         t1_only.push_back(where[w]);\r
1610                                 }\r
1611                                 continue;\r
1612                         }\r
1613 //                              refs nothing -- might be sampling, do it as postfilter.\r
1614                         if(pred_tbls.size()==0){\r
1615                                 postfilter.push_back(where[w]);\r
1616                                 continue;\r
1617                         }\r
1618 //                              See if it can be a hash or temporal predicate.\r
1619 //                              NOTE: synchronize with the temporality checking\r
1620 //                              done at join_eq_hash_qpn::get_fields\r
1621                         if(where[w]->is_atom && where[w]->eq_pred){\r
1622                                 std::vector<int> sel_tbls, ser_tbls;\r
1623                                 get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);\r
1624                                 get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);\r
1625                                 if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){\r
1626 //                                              make channel 0 SE on LHS.\r
1627                                         if(sel_tbls[0] != 0)\r
1628                                                 where[w]->pr->swap_scalar_operands();\r
1629 \r
1630                                         hash_eq.push_back(where[w]);\r
1631 \r
1632                                         data_type *dtl=where[w]->pr->get_left_se()->get_data_type();\r
1633                                         data_type *dtr=where[w]->pr->get_right_se()->get_data_type();\r
1634                                         if( (dtl->is_increasing() && dtr->is_increasing()) ||\r
1635                                             (dtl->is_decreasing() && dtr->is_decreasing()) )\r
1636                                                 err_str += "Warning, a filter join should not have join predicates on temporal fields.\n";\r
1637                                         continue;\r
1638 \r
1639                                 }\r
1640                         }\r
1641 //                              All tests failed, fallback is postfilter.\r
1642                         err_str += "ERROR, join predicates in a filter join should have the form (scalar expression in "+from[0]->variable_name+") = (scalar expression in "+from[1]->variable_name+").\n";\r
1643                         error_code = 3;\r
1644                 }\r
1645 //              Classify the t0_only and t1_only preds.\r
1646                 set<int> matched_pred;\r
1647                 int v;\r
1648                 for(w=0;w<t0_only.size();w++){\r
1649                         for(v=0;v<t1_only.size();++v)\r
1650                                 if(is_equivalent_pred_base(t0_only[w]->pr,t1_only[v]->pr,Schema))\r
1651                                         break;\r
1652                         if(v<t1_only.size()){\r
1653                                 shared_pred.push_back(t0_only[w]);\r
1654                                 matched_pred.insert(v);\r
1655                         }else{\r
1656                                 pred_t0.push_back(t0_only[w]);\r
1657                         }\r
1658                 }\r
1659                 for(v=0;v<t1_only.size();++v){\r
1660                         if(matched_pred.count(v) == 0)\r
1661                                 pred_t1.push_back(t1_only[v]);\r
1662                 }\r
1663 \r
1664 \r
1665 //                              Get the parameters\r
1666                 param_tbl = qs->param_tbl;\r
1667                 definitions = qs->definitions;\r
1668 \r
1669 //                              Determine the algorithm\r
1670                 if(this->get_val_of_def("algorithm") == "hash"){\r
1671                         use_bloom = false;\r
1672                 }else{\r
1673                         use_bloom = true;\r
1674                 }\r
1675         };\r
1676 \r
1677         // the following method is used for distributed query optimization\r
1678         double get_rate_estimate();\r
1679 \r
1680 \r
1681         qp_node* make_copy(std::string suffix){\r
1682                 filter_join_qpn *ret = new filter_join_qpn();\r
1683 \r
1684                 ret->param_tbl = new param_table();\r
1685                 std::vector<std::string> param_names = param_tbl->get_param_names();\r
1686                 int pi;\r
1687                 for(pi=0;pi<param_names.size();pi++){\r
1688                         data_type *dt = param_tbl->get_data_type(param_names[pi]);\r
1689                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),\r
1690                                                         param_tbl->handle_access(param_names[pi]));\r
1691                 }\r
1692                 ret->definitions = definitions;\r
1693 \r
1694                 ret->node_name = node_name + suffix;\r
1695 \r
1696                 // make shallow copy of all fields\r
1697                 ret->where = where;\r
1698                 ret->from = from;\r
1699                 ret->temporal_range = temporal_range;\r
1700                 ret->temporal_var = temporal_var;\r
1701                 ret->select_list = select_list;\r
1702                 ret->shared_pred = shared_pred;\r
1703                 ret->pred_t0 = pred_t0;\r
1704                 ret->pred_t1 = pred_t1;\r
1705                 ret->postfilter = postfilter;\r
1706                 ret->hash_eq = hash_eq;\r
1707 \r
1708                 return ret;\r
1709         };\r
1710         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);\r
1711 \r
1712 };\r
1713 \r
1714 \r
1715 enum output_file_type_enum {regular, gzip, bzip};\r
1716 \r
1717 class output_file_qpn: public qp_node{\r
1718 public:\r
1719         std::string source_op_name;                                     //      Source table\r
1720         std::vector<field_entry *> fields;\r
1721         ospec_str *output_spec;\r
1722         vector<tablevar_t *> fm;\r
1723         std::string hfta_query_name;\r
1724         std::string filestream_id;\r
1725         bool eat_input;\r
1726         std::vector<std::string> params;\r
1727         bool do_gzip;\r
1728         output_file_type_enum compression_type;\r
1729 \r
1730         int n_streams;          // Number of output streams\r
1731         int n_hfta_clones;      // number of hfta clones\r
1732         int parallel_idx;       // which close this produces output for.\r
1733         std::vector<int> hash_flds;     // fields used to hash the output.\r
1734 \r
1735         std::string node_type(){return("output_file_qpn");      };\r
1736     bool makes_transform(){return false;};\r
1737         std::vector<std::string> external_libs(){\r
1738                 std::vector<std::string> ret;\r
1739                 switch(compression_type){\r
1740                 case gzip:\r
1741                         ret.push_back("-lz");\r
1742                 break;\r
1743                 case bzip:\r
1744                         ret.push_back("-lbz2");\r
1745                 break;\r
1746                 default:\r
1747                 break;\r
1748                 }\r
1749                 return ret;\r
1750         }\r
1751 \r
1752         void bind_to_schema(table_list *Schema){}\r
1753         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){\r
1754                 col_id_set ret;\r
1755                 return ret;\r
1756         }\r
1757 \r
1758         std::string to_query_string(){return "// output_file_operator \n";}\r
1759         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);\r
1760         std::string generate_functor_name();\r
1761         std::string generate_operator(int i, std::string params);\r
1762         std::string get_include_file(){\r
1763                 switch(compression_type){\r
1764                 case gzip:\r
1765                         return("#include <zfile_output_operator.h>\n");\r
1766                 default:\r
1767                         return("#include <file_output_operator.h>\n");\r
1768                 }\r
1769                 return("#include <file_output_operator.h>\n");\r
1770         };\r
1771 \r
1772     std::vector<cnf_elem *> get_where_clause(){std::vector<cnf_elem *> ret; return ret;};\r
1773     std::vector<cnf_elem *> get_filter_clause(){std::vector<cnf_elem *> ret; return ret;};\r
1774         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){cplx_lit_table *ret = new cplx_lit_table(); return ret;}\r
1775     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns){std::vector<handle_param_tbl_entry *> ret; return ret;}\r
1776 \r
1777         table_def *get_fields(){\r
1778                 field_entry_list *fel = new field_entry_list();\r
1779                 int i;\r
1780                 for(i=0;i<fields.size();++i)\r
1781                         fel->append_field(fields[i]);\r
1782                 return new table_def(node_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);\r
1783         }\r
1784         std::vector<tablevar_t *> get_input_tbls();\r
1785         std::vector<tablevar_t *> get_output_tbls();\r
1786 \r
1787         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){\r
1788                 std::vector<qp_node *> ret; ret.push_back(this); hfta_returned = true; return ret;\r
1789         }\r
1790         std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm){\r
1791                 std::vector<table_exp_t *> ret; return ret;\r
1792         }\r
1793 //              Ensure that any refs to interface params have been split away.\r
1794         int count_ifp_refs(std::set<std::string> &ifpnames){return 0;}\r
1795         int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;};\r
1796 \r
1797 \r
1798         output_file_qpn(std::string src_op, std::string qn, std::string fs_id, table_def *src_tbl_def, ospec_str *ospec, bool ei){\r
1799                 source_op_name = src_op;\r
1800                 node_name = source_op_name + "_output";\r
1801                 filestream_id = fs_id;\r
1802                 fields = src_tbl_def->get_fields();\r
1803                 output_spec = ospec;\r
1804                 fm.push_back(new tablevar_t(source_op_name.c_str()));\r
1805                 hfta_query_name = qn;\r
1806                 eat_input = ei;\r
1807 \r
1808                 do_gzip = false;\r
1809                 compression_type = regular;\r
1810                 if(ospec->operator_type == "zfile")\r
1811                         compression_type = gzip;\r
1812 \r
1813                 n_streams = 1;\r
1814                 parallel_idx = 0;\r
1815                 n_hfta_clones = 1;\r
1816 \r
1817                 char buf[1000];\r
1818                 strncpy(buf, output_spec->operator_param.c_str(),1000);\r
1819                 buf[999] = '\0';\r
1820                 char *words[100];\r
1821                 int nwords = split_string(buf, ':', words,100);\r
1822                 int i;\r
1823                 for(i=0;i<nwords;i++){\r
1824                         params.push_back(words[i]);\r
1825                 }\r
1826                 for(i=0;i<params.size();i++){\r
1827                         if(params[i] == "gzip")\r
1828                                 do_gzip = true;\r
1829                 }\r
1830         }\r
1831 \r
1832 //              Set output splitting parameters\r
1833         bool set_splitting_params(int np, int ix, int ns, std::string split_flds, std::string &err_report){\r
1834                 n_streams = ns;\r
1835                 n_hfta_clones = np;\r
1836                 parallel_idx = ix;\r
1837 \r
1838                 if(split_flds != ""){\r
1839                         string err_flds = "";\r
1840                         char *tmpstr = strdup(split_flds.c_str());\r
1841                         char *words[100];\r
1842                         int nwords = split_string(tmpstr,':',words,100);\r
1843                         int i,j;\r
1844                         for(i=0;i<nwords;++i){\r
1845                                 string target = words[i];\r
1846                                 for(j=0;j<fields.size();++j){\r
1847                                         if(fields[j]->get_name() == target){\r
1848                                                 hash_flds.push_back(j);\r
1849                                                 break;\r
1850                                         }\r
1851                                 }\r
1852                                 if(j==fields.size()){\r
1853                                         err_flds += " "+target;\r
1854                                 }\r
1855                         }\r
1856                         if(err_flds != ""){\r
1857                                 err_report += "ERROR in "+hfta_query_name+", a file output operator needs to split the output but these splitting fileds are not part of the output:"+err_flds;\r
1858                                 return true;\r
1859                         }\r
1860                 }\r
1861                 return false;\r
1862         }\r
1863 \r
1864         // the following method is used for distributed query optimization\r
1865         double get_rate_estimate(){return 1.0;}\r
1866 \r
1867 \r
1868         qp_node* make_copy(std::string suffix){\r
1869 //              output_file_qpn *ret = new output_file_qpn();\r
1870                 output_file_qpn *ret = new output_file_qpn(source_op_name, hfta_query_name, filestream_id, this->get_fields(), output_spec, eat_input);\r
1871                 return ret;\r
1872         }\r
1873 \r
1874         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){}\r
1875 \r
1876 };\r
1877 \r
1878 \r
1879 \r
1880 //\r
1881 \r
1882 // ---------------------------------------------\r
1883 \r
1884 \r
1885 //              Select, group-by, aggregate, sampling.\r
1886 //              Representing\r
1887 //                      Select SE_1, ..., SE_k\r
1888 //                      From T\r
1889 //                      Where predicate\r
1890 //                      Group By gb1, ..., gb_n\r
1891 //                      [Subgroup gb_i1, .., gb_ik]\r
1892 //                      Cleaning_when  predicate\r
1893 //                      Cleaning_by predicate\r
1894 //                      Having predicate\r
1895 //\r
1896 //              For now, must have group-by variables and aggregates.\r
1897 //              The scalar expressions which are output must be a function\r
1898 //              of the groub-by variables and the aggregates.\r
1899 //              The group-by variables can be references to columsn of T,\r
1900 //              or they can be scalar expressions.\r
1901 class sgahcwcb_qpn: public qp_node{\r
1902 public:\r
1903         tablevar_t *table_name;                         // source table\r
1904         std::vector<cnf_elem *> where;          // selection predicate\r
1905         std::vector<cnf_elem *> having;         // post-aggregation predicate\r
1906         std::vector<select_element *> select_list;      // se's of output\r
1907         gb_table gb_tbl;                        // Table of all group-by attributes.\r
1908         std::set<int> sg_tbl;           // Names of the superGB attributes\r
1909         aggregate_table aggr_tbl;       // Table of all referenced aggregates.\r
1910         std::set<std::string> states_refd;      // states ref'd by stateful fcns.\r
1911         std::vector<cnf_elem *> cleanby;\r
1912         std::vector<cnf_elem *> cleanwhen;\r
1913 \r
1914         std::vector<scalarexp_t *> gb_sources;  // pre-compute for partitioning.\r
1915 \r
1916         std::vector<scalarexp_t *> get_gb_sources(){return gb_sources;}\r
1917 \r
1918         std::string node_type(){return("sgahcwcb_qpn"); };\r
1919     bool makes_transform(){return true;};\r
1920         std::vector<std::string> external_libs(){\r
1921                 std::vector<std::string> ret;\r
1922                 return ret;\r
1923         }\r
1924 \r
1925         void bind_to_schema(table_list *Schema);\r
1926         col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){\r
1927                 fprintf(stderr,"INTERNAL ERROR, calling sgahcwcb_qpn::get_colrefs\n");\r
1928                 exit(1);\r
1929         }\r
1930 \r
1931         std::string to_query_string();\r
1932         std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);\r
1933         std::string generate_functor_name();\r
1934 \r
1935         std::string generate_operator(int i, std::string params);\r
1936         std::string get_include_file(){return("#include <clean_operator.h>\n");};\r
1937 \r
1938     std::vector<select_element *> get_select_list(){return select_list;};\r
1939     std::vector<scalarexp_t *> get_select_se_list(){\r
1940                 std::vector<scalarexp_t *> ret;\r
1941                 int i;\r
1942                 for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);\r
1943                 return ret;\r
1944         };\r
1945     std::vector<cnf_elem *> get_where_clause(){return where;};\r
1946     std::vector<cnf_elem *> get_filter_clause(){return where;};\r
1947     std::vector<cnf_elem *> get_having_clause(){return having;};\r
1948     gb_table *get_gb_tbl(){return &gb_tbl;};\r
1949     aggregate_table *get_aggr_tbl(){return &aggr_tbl;};\r
1950         cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);\r
1951     std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);\r
1952 \r
1953 //                              table which represents output tuple.\r
1954         table_def *get_fields();\r
1955         std::vector<tablevar_t *> get_input_tbls();\r
1956         std::vector<tablevar_t *> get_output_tbls();\r
1957 \r
1958 \r
1959         sgahcwcb_qpn(){\r
1960         };\r
1961         sgahcwcb_qpn(query_summary_class *qs,table_list *Schema){\r
1962 //                              Get the table name.\r
1963 //                              NOTE the colrefs have the tablevar ref (an int)\r
1964 //                              embedded in them.  Would it make sense\r
1965 //                              to grab the whole table list?\r
1966                 tablevar_list_t *fm = qs->fta_tree->get_from();\r
1967                 std::vector<tablevar_t *> tbl_vec = fm->get_table_list();\r
1968                 if(tbl_vec.size() != 1){\r
1969                         char tmpstr[200];\r
1970                         sprintf(tmpstr,"INTERNAL ERROR building SGAHCWCB node: query defined over %lu tables.\n",tbl_vec.size() );\r
1971                         err_str=tmpstr;\r
1972                         error_code = 1;\r
1973                 }\r
1974                 table_name = (tbl_vec[0]);\r
1975 \r
1976 //                              Get the select list.\r
1977                 select_list = qs->fta_tree->get_sl_vec();\r
1978 \r
1979 //                              Get the selection and having predicates.\r
1980                 where = qs->wh_cnf;\r
1981                 having = qs->hav_cnf;\r
1982                 cleanby = qs->cb_cnf;\r
1983                 cleanwhen = qs->cw_cnf;\r
1984 \r
1985 //                              Build a new GB var table (don't share, might need to modify)\r
1986                 int g;\r
1987                 for(g=0;g<qs->gb_tbl->size();g++){\r
1988                         gb_tbl.add_gb_var(qs->gb_tbl->get_name(g),\r
1989                                 qs->gb_tbl->get_tblvar_ref(g), qs->gb_tbl->get_def(g),\r
1990                                 qs->gb_tbl->get_reftype(g)\r
1991                         );\r
1992                 }\r
1993 \r
1994 //                              Build a new aggregate table. (don't share, might need\r
1995 //                              to modify).\r
1996                 int a;\r
1997                 for(a=0;a<qs->aggr_tbl->size();a++){\r
1998                         aggr_tbl.add_aggr(\r
1999 //                              qs->aggr_tbl->get_op(a), qs->aggr_tbl->get_aggr_se(a)\r
2000                                 qs->aggr_tbl->duplicate(a)\r
2001                         );\r
2002                 }\r
2003 \r
2004                 sg_tbl = qs->sg_tbl;\r
2005                 states_refd = qs->states_refd;\r
2006 \r
2007 \r
2008 //                              Get the parameters\r
2009                 param_tbl = qs->param_tbl;\r
2010 \r
2011         };\r
2012 \r
2013 \r
2014 \r
2015         std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);\r
2016         virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);\r
2017 //              Ensure that any refs to interface params have been split away.\r
2018 //                      CURRENTLY not allowed by split_node_for_fta\r
2019         int count_ifp_refs(std::set<std::string> &ifpnames){return 0;}\r
2020         int resolve_if_params(ifq_t *ifdb, std::string &err){return 0;}\r
2021 \r
2022         // the following method is used for distributed query optimization\r
2023         double get_rate_estimate();\r
2024 \r
2025         qp_node* make_copy(std::string suffix){\r
2026                 sgahcwcb_qpn *ret = new sgahcwcb_qpn();\r
2027 \r
2028                 ret->param_tbl = new param_table();\r
2029                 std::vector<std::string> param_names = param_tbl->get_param_names();\r
2030                 int pi;\r
2031                 for(pi=0;pi<param_names.size();pi++){\r
2032                         data_type *dt = param_tbl->get_data_type(param_names[pi]);\r
2033                         ret->param_tbl->add_param(param_names[pi],dt->duplicate(),\r
2034                                                         param_tbl->handle_access(param_names[pi]));\r
2035                 }\r
2036                 ret->definitions = definitions;\r
2037 \r
2038                 ret->node_name = node_name + suffix;\r
2039 \r
2040                 // make shallow copy of all fields\r
2041                 ret->where = where;\r
2042                 ret->having = having;\r
2043                 ret->select_list = select_list;\r
2044                 ret->gb_tbl = gb_tbl;\r
2045                 ret->aggr_tbl = aggr_tbl;\r
2046                 ret->sg_tbl = sg_tbl;\r
2047                 ret->states_refd = states_refd;\r
2048                 ret->cleanby = cleanby;\r
2049                 ret->cleanwhen = cleanwhen;\r
2050 \r
2051                 return ret;\r
2052         };\r
2053 \r
2054         void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);\r
2055 };\r
2056 \r
2057 \r
2058 std::vector<qp_node *> create_query_nodes(query_summary_class *qs,table_list *Schema);\r
2059 \r
2060 \r
2061 \r
2062 void untaboo(string &s);\r
2063 \r
2064 table_def *create_attributes(string tname, vector<select_element *> &select_list);\r
2065 \r
2066 \r
2067 \r
2068 #endif\r