Add support for query key extraction
[com/gs-lite.git] / src / ftacmp / stream_query.h
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15 #ifndef __STREAM_QUERY_H_
16 #define __STREAM_QUERY_H_
17
18 #include<vector>
19 #include<string>
20 #include<map>
21
22 #include"parse_schema.h"
23 #include"parse_fta.h"
24 #include"analyze_fta.h"
25 #include"query_plan.h"
26 #include"parse_partn.h"
27
28
29 class stream_query{
30 public:
31 //              query_plan contains the query nodes, which form a query tree (dag?)
32 //              by linking to each other via indices into this vector.
33 //              NOTE: there might be holes (NULLs) in this list due to shifting
34 //              around operators for optimization.
35         std::vector<qp_node *> query_plan;
36         std::vector<ospec_str *> output_specs;
37         vector<qp_node *> output_operators;
38
39         int qhead;              // index of output operator
40         std::vector<int> qtail;
41
42         table_def *attributes;
43         param_table *parameters;
44         std::map<std::string, std::string> defines;
45         std::string query_name;
46
47         int n_parallel;         // If the HFTA has been parallelized, # of copies.
48         int parallel_idx;       // which of the cloned hftas this one is.
49                                                 // needed for output splitting.
50
51         int n_successors;       // # of hftas which read from this one.
52
53         int gid;                // global id
54
55 //              For error reporting.
56         int error_code;
57         std::string err_str;
58
59   stream_query(){
60         error_code = 0;
61         attributes = NULL;
62         parameters = NULL;
63   };
64
65 //              Create stream query for split-off lfta.
66 //              The parent stream query provides annotations.
67   stream_query(qp_node *qnode, stream_query *parent);
68
69 //              Create stream query from analyzed parse tree.
70   stream_query(query_summary_class *qs,table_list *Schema);
71
72 //              Make a copy.
73   stream_query(stream_query &source);
74 //      used after making the copy
75   void set_nparallel(int n, int i){n_parallel = n; parallel_idx = i;}
76
77   qp_node *get_query_head(){return query_plan[qhead];};
78   std::string get_sq_name(){return query_plan[qhead]->get_node_name();};
79
80 //              Add a parse tree to the query plan.
81   stream_query *add_query(query_summary_class *qs,table_list *Schema);
82   stream_query *add_query(stream_query &sc);
83
84   bool generate_linkage();
85   int generate_plan(table_list *Schema);
86
87
88 //              Create a map of fields to scalar expression in Protocol fields
89 //              for each query node in the stream.  For partitioning optimizations.
90   void generate_protocol_se(map<string,stream_query *> &sq_map, table_list *Schema);
91
92
93 //              Checks if the operator i is compatible with interface partitioning
94 //              (can be pushed below the merge that combines partitioned stream)
95   bool is_partn_compatible(int i, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result);
96
97 //              Checks if the node i can be pushed below the merge
98   bool is_pushdown_compatible(int i, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names);
99
100 //              Push the operator below the merge that combines partitioned stream
101   void pushdown_partn_operator(int i);
102
103 //              Push the operator below the merge that combines regular (non-partitioned streams)
104   void pushdown_operator(int i, ext_fcn_list *Ext_fcns, table_list *Schema);
105
106 //              Splits query that combines data from multiple hosts into separate hftas
107   std::vector<stream_query*> split_multihost_query();
108
109 //              Extract subtree rooted at node i into separate hfta
110   stream_query* extract_subtree(int i);
111
112 //              Extract any external libraries needed by the oeprators in this hfta
113         void get_external_libs(std::set<std::string> &libset);
114
115
116 //              Perform local FTA optimizations
117   void optimize(vector<stream_query *>& hfta_list, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ext_fcn_list *Ext_fcns, table_list *Schema, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result);
118
119   int get_error_code(){return error_code;};
120   std::string get_error_str(){return err_str;};
121
122   void set_gid(int i){gid=i;};
123   int get_gid(){return gid;};
124
125
126   bool stream_input_only(table_list *Schema);
127
128   std::vector<tablevar_t *> get_input_tables();
129
130   table_def *get_output_tabledef();
131
132 // get any inferred key definition, just a pass-through to the head _qpn method
133   std::vector<std::string> get_tbl_keys(std::vector<std::string> &partial_keys);
134
135 //              Extract lfta components of the query
136   std::vector<stream_query *> split_query(ext_fcn_list *Ext_fcns, table_list *Schema, bool &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
137
138 //              Process and register opertor views
139   std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string silo_nm);
140
141   std::string collect_refd_ifaces();
142
143 //                      The serialization function
144   std::string make_schema();
145   std::string make_schema(int i);
146
147 //                      hfta generation.  Schema must contain the table_def's
148 //                      of all source tables (but extra tables will not hurt).
149   std::string generate_hfta(table_list *Schema, ext_fcn_list *Ext_fcns, opview_set &opviews, bool distributed_mode);
150
151 //              De-silo an hfta by replacing refs to lfta inputs with a
152 //              merge over all silo'ed lftas.
153 //              TO BE REMOVED
154 void desilo_lftas(std::map<std::string, int> &lfta_names,std::vector<std::string> &ifq_names,table_list *Schema);
155
156 void add_output_operator(ospec_str *);
157
158 private:
159 //              Helper function for generate_hfta
160 void compute_node_format(int q, std::vector<int> &nfmt, std::map<std::string, int> &op_idx);
161
162
163
164 };
165
166 ///////////////////////////////////////////////////////////////////////
167 ////            Related functions
168 void get_common_lfta_filter(std::vector<stream_query *> lfta_list,table_list *Schema,ext_fcn_list *Ext_fcns, std::vector<cnf_set *> &prefilter_preds, std::set<unsigned int> &pred_ids);
169 void get_prefilter_temporal_cids(std::vector<stream_query *> lfta_list, col_id_set &temp_cids);
170
171
172
173 #endif