1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
15 #ifndef __STREAM_QUERY_H_
16 #define __STREAM_QUERY_H_
22 #include"parse_schema.h"
24 #include"analyze_fta.h"
25 #include"query_plan.h"
26 #include"parse_partn.h"
31 // query_plan contains the query nodes, which form a query tree (dag?)
32 // by linking to each other via indices into this vector.
33 // NOTE: there might be holes (NULLs) in this list due to shifting
34 // around operators for optimization.
35 std::vector<qp_node *> query_plan;
36 std::vector<ospec_str *> output_specs;
37 vector<qp_node *> output_operators;
39 int qhead; // index of output operator
40 std::vector<int> qtail;
42 table_def *attributes;
43 param_table *parameters;
44 std::map<std::string, std::string> defines;
45 std::string query_name;
47 int n_parallel; // If the HFTA has been parallelized, # of copies.
48 int parallel_idx; // which of the cloned hftas this one is.
49 // needed for output splitting.
51 int n_successors; // # of hftas which read from this one.
55 // For error reporting.
65 // Create stream query for split-off lfta.
66 // The parent stream query provides annotations.
67 stream_query(qp_node *qnode, stream_query *parent);
69 // Create stream query from analyzed parse tree.
70 stream_query(query_summary_class *qs,table_list *Schema);
73 stream_query(stream_query &source);
74 // used after making the copy
75 void set_nparallel(int n, int i){n_parallel = n; parallel_idx = i;}
77 qp_node *get_query_head(){return query_plan[qhead];};
78 std::string get_sq_name(){return query_plan[qhead]->get_node_name();};
80 // Add a parse tree to the query plan.
81 stream_query *add_query(query_summary_class *qs,table_list *Schema);
82 stream_query *add_query(stream_query &sc);
84 bool generate_linkage();
85 int generate_plan(table_list *Schema);
88 // Create a map of fields to scalar expression in Protocol fields
89 // for each query node in the stream. For partitioning optimizations.
90 void generate_protocol_se(map<string,stream_query *> &sq_map, table_list *Schema);
93 // Checks if the operator i is compatible with interface partitioning
94 // (can be pushed below the merge that combines partitioned stream)
95 bool is_partn_compatible(int i, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result);
97 // Checks if the node i can be pushed below the merge
98 bool is_pushdown_compatible(int i, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names);
100 // Push the operator below the merge that combines partitioned stream
101 void pushdown_partn_operator(int i);
103 // Push the operator below the merge that combines regular (non-partitioned streams)
104 void pushdown_operator(int i, ext_fcn_list *Ext_fcns, table_list *Schema);
106 // Splits query that combines data from multiple hosts into separate hftas
107 std::vector<stream_query*> split_multihost_query();
109 // Extract subtree rooted at node i into separate hfta
110 stream_query* extract_subtree(int i);
112 // Extract any external libraries needed by the oeprators in this hfta
113 void get_external_libs(std::set<std::string> &libset);
116 // Perform local FTA optimizations
117 void optimize(vector<stream_query *>& hfta_list, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ext_fcn_list *Ext_fcns, table_list *Schema, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result);
119 int get_error_code(){return error_code;};
120 std::string get_error_str(){return err_str;};
122 void set_gid(int i){gid=i;};
123 int get_gid(){return gid;};
126 bool stream_input_only(table_list *Schema);
128 std::vector<tablevar_t *> get_input_tables();
130 table_def *get_output_tabledef();
132 // get any inferred key definition, just a pass-through to the head _qpn method
133 std::vector<std::string> get_tbl_keys(std::vector<std::string> &partial_keys);
135 // Extract lfta components of the query
136 std::vector<stream_query *> split_query(ext_fcn_list *Ext_fcns, table_list *Schema, bool &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
138 // Process and register opertor views
139 std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string silo_nm);
141 std::string collect_refd_ifaces();
143 // The serialization function
144 std::string make_schema();
145 std::string make_schema(int i);
147 // hfta generation. Schema must contain the table_def's
148 // of all source tables (but extra tables will not hurt).
149 std::string generate_hfta(table_list *Schema, ext_fcn_list *Ext_fcns, opview_set &opviews, bool distributed_mode);
151 // De-silo an hfta by replacing refs to lfta inputs with a
152 // merge over all silo'ed lftas.
154 void desilo_lftas(std::map<std::string, int> &lfta_names,std::vector<std::string> &ifq_names,table_list *Schema);
156 void add_output_operator(ospec_str *);
159 // Helper function for generate_hfta
160 void compute_node_format(int q, std::vector<int> &nfmt, std::map<std::string, int> &op_idx);
166 ///////////////////////////////////////////////////////////////////////
167 //// Related functions
168 void get_common_lfta_filter(std::vector<stream_query *> lfta_list,table_list *Schema,ext_fcn_list *Ext_fcns, std::vector<cnf_set *> &prefilter_preds, std::set<unsigned int> &pred_ids);
169 void get_prefilter_temporal_cids(std::vector<stream_query *> lfta_list, col_id_set &temp_cids);