Added quantiling UDAFs
[com/gs-lite.git] / src / ftacmp / stream_query.h
1 /* ------------------------------------------------\r
2 Copyright 2014 AT&T Intellectual Property\r
3    Licensed under the Apache License, Version 2.0 (the "License");\r
4    you may not use this file except in compliance with the License.\r
5    You may obtain a copy of the License at\r
6 \r
7      http://www.apache.org/licenses/LICENSE-2.0\r
8 \r
9    Unless required by applicable law or agreed to in writing, software\r
10    distributed under the License is distributed on an "AS IS" BASIS,\r
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12    See the License for the specific language governing permissions and\r
13    limitations under the License.\r
14  ------------------------------------------- */\r
15 #ifndef __STREAM_QUERY_H_\r
16 #define __STREAM_QUERY_H_\r
17 \r
18 #include<vector>\r
19 #include<string>\r
20 #include<map>\r
21 \r
22 #include"parse_schema.h"\r
23 #include"parse_fta.h"\r
24 #include"analyze_fta.h"\r
25 #include"query_plan.h"\r
26 #include"parse_partn.h"\r
27 \r
28 \r
29 class stream_query{\r
30 public:\r
31 //              query_plan contains the query nodes, which form a query tree (dag?)\r
32 //              by linking to each other via indices into this vector.\r
33 //              NOTE: there might be holes (NULLs) in this list due to shifting\r
34 //              around operators for optimization.\r
35         std::vector<qp_node *> query_plan;\r
36         std::vector<ospec_str *> output_specs;\r
37         vector<qp_node *> output_operators;\r
38 \r
39         int qhead;\r
40         std::vector<int> qtail;\r
41 \r
42         table_def *attributes;\r
43         param_table *parameters;\r
44         std::map<std::string, std::string> defines;\r
45         std::string query_name;\r
46 \r
47         int n_parallel;         // If the HFTA has been parallelized, # of copies.\r
48         int parallel_idx;       // which of the cloned hftas this one is.\r
49                                                 // needed for output splitting.\r
50 \r
51         int n_successors;       // # of hftas which read from this one.\r
52 \r
53         int gid;                // global id\r
54 \r
55 //              For error reporting.\r
56         int error_code;\r
57         std::string err_str;\r
58 \r
59   stream_query(){\r
60         error_code = 0;\r
61         attributes = NULL;\r
62         parameters = NULL;\r
63   };\r
64 \r
65 //              Create stream query for split-off lfta.\r
66 //              The parent stream query provides annotations.\r
67   stream_query(qp_node *qnode, stream_query *parent);\r
68 \r
69 //              Create stream query from analyzed parse tree.\r
70   stream_query(query_summary_class *qs,table_list *Schema);\r
71 \r
72 //              Make a copy.\r
73   stream_query(stream_query &source);\r
74 //      used after making the copy\r
75   void set_nparallel(int n, int i){n_parallel = n; parallel_idx = i;}\r
76 \r
77   qp_node *get_query_head(){return query_plan[qhead];};\r
78   std::string get_sq_name(){return query_plan[qhead]->get_node_name();};\r
79 \r
80 //              Add a parse tree to the query plan.\r
81   stream_query *add_query(query_summary_class *qs,table_list *Schema);\r
82   stream_query *add_query(stream_query &sc);\r
83 \r
84   bool generate_linkage();\r
85   int generate_plan(table_list *Schema);\r
86 \r
87 \r
88 //              Create a map of fields to scalar expression in Protocol fields\r
89 //              for each query node in the stream.  For partitioning optimizations.\r
90   void generate_protocol_se(map<string,stream_query *> &sq_map, table_list *Schema);\r
91 \r
92 \r
93 //              Checks if the operator i is compatible with interface partitioning\r
94 //              (can be pushed below the merge that combines partitioned stream)\r
95   bool is_partn_compatible(int i, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result);\r
96 \r
97 //              Checks if the node i can be pushed below the merge\r
98   bool is_pushdown_compatible(int i, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names);\r
99 \r
100 //              Push the operator below the merge that combines partitioned stream\r
101   void pushdown_partn_operator(int i);\r
102 \r
103 //              Push the operator below the merge that combines regular (non-partitioned streams)\r
104   void pushdown_operator(int i, ext_fcn_list *Ext_fcns, table_list *Schema);\r
105 \r
106 //              Splits query that combines data from multiple hosts into separate hftas\r
107   std::vector<stream_query*> split_multihost_query();\r
108 \r
109 //              Extract subtree rooted at node i into separate hfta\r
110   stream_query* extract_subtree(int i);\r
111 \r
112 //              Extract any external libraries needed by the oeprators in this hfta\r
113         void get_external_libs(std::set<std::string> &libset);\r
114 \r
115 \r
116 //              Perform local FTA optimizations\r
117   void optimize(vector<stream_query *>& hfta_list, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ext_fcn_list *Ext_fcns, table_list *Schema, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result);\r
118 \r
119   int get_error_code(){return error_code;};\r
120   std::string get_error_str(){return err_str;};\r
121 \r
122   void set_gid(int i){gid=i;};\r
123   int get_gid(){return gid;};\r
124 \r
125 \r
126   bool stream_input_only(table_list *Schema);\r
127 \r
128   std::vector<tablevar_t *> get_input_tables();\r
129 \r
130   table_def *get_output_tabledef();\r
131 \r
132 //              Extract lfta components of the query\r
133   std::vector<stream_query *> split_query(ext_fcn_list *Ext_fcns, table_list *Schema, bool &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);\r
134 \r
135 //              Process and register opertor views\r
136   std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string silo_nm);\r
137 \r
138   std::string collect_refd_ifaces();\r
139 \r
140 //                      The serialization function\r
141   std::string make_schema();\r
142   std::string make_schema(int i);\r
143 \r
144 //                      hfta generation.  Schema must contain the table_def's\r
145 //                      of all source tables (but extra tables will not hurt).\r
146   std::string generate_hfta(table_list *Schema, ext_fcn_list *Ext_fcns, opview_set &opviews, bool distributed_mode);\r
147 \r
148 //              De-silo an hfta by replacing refs to lfta inputs with a\r
149 //              merge over all silo'ed lftas.\r
150 //              TO BE REMOVED\r
151 void desilo_lftas(std::map<std::string, int> &lfta_names,std::vector<std::string> &ifq_names,table_list *Schema);\r
152 \r
153 void add_output_operator(ospec_str *);\r
154 \r
155 private:\r
156 //              Helper function for generate_hfta\r
157 void compute_node_format(int q, std::vector<int> &nfmt, std::map<std::string, int> &op_idx);\r
158 \r
159 \r
160 \r
161 };\r
162 \r
163 ///////////////////////////////////////////////////////////////////////\r
164 ////            Related functions\r
165 void get_common_lfta_filter(std::vector<stream_query *> lfta_list,table_list *Schema,ext_fcn_list *Ext_fcns, std::vector<cnf_set *> &prefilter_preds, std::set<unsigned int> &pred_ids);\r
166 void get_prefilter_temporal_cids(std::vector<stream_query *> lfta_list, col_id_set &temp_cids);\r
167 \r
168 \r
169 \r
170 #endif\r