Fixed newline characters throughout the code
[com/gs-lite.git] / src / ftacmp / stream_query.h
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15 #ifndef __STREAM_QUERY_H_
16 #define __STREAM_QUERY_H_
17
18 #include<vector>
19 #include<string>
20 #include<map>
21
22 #include"parse_schema.h"
23 #include"parse_fta.h"
24 #include"analyze_fta.h"
25 #include"query_plan.h"
26 #include"parse_partn.h"
27
28
29 class stream_query{
30 public:
31 //              query_plan contains the query nodes, which form a query tree (dag?)
32 //              by linking to each other via indices into this vector.
33 //              NOTE: there might be holes (NULLs) in this list due to shifting
34 //              around operators for optimization.
35         std::vector<qp_node *> query_plan;
36         std::vector<ospec_str *> output_specs;
37         vector<qp_node *> output_operators;
38
39         int qhead;
40         std::vector<int> qtail;
41
42         table_def *attributes;
43         param_table *parameters;
44         std::map<std::string, std::string> defines;
45         std::string query_name;
46
47         int n_parallel;         // If the HFTA has been parallelized, # of copies.
48         int parallel_idx;       // which of the cloned hftas this one is.
49                                                 // needed for output splitting.
50
51         int n_successors;       // # of hftas which read from this one.
52
53         int gid;                // global id
54
55 //              For error reporting.
56         int error_code;
57         std::string err_str;
58
59   stream_query(){
60         error_code = 0;
61         attributes = NULL;
62         parameters = NULL;
63   };
64
65 //              Create stream query for split-off lfta.
66 //              The parent stream query provides annotations.
67   stream_query(qp_node *qnode, stream_query *parent);
68
69 //              Create stream query from analyzed parse tree.
70   stream_query(query_summary_class *qs,table_list *Schema);
71
72 //              Make a copy.
73   stream_query(stream_query &source);
74 //      used after making the copy
75   void set_nparallel(int n, int i){n_parallel = n; parallel_idx = i;}
76
77   qp_node *get_query_head(){return query_plan[qhead];};
78   std::string get_sq_name(){return query_plan[qhead]->get_node_name();};
79
80 //              Add a parse tree to the query plan.
81   stream_query *add_query(query_summary_class *qs,table_list *Schema);
82   stream_query *add_query(stream_query &sc);
83
84   bool generate_linkage();
85   int generate_plan(table_list *Schema);
86
87
88 //              Create a map of fields to scalar expression in Protocol fields
89 //              for each query node in the stream.  For partitioning optimizations.
90   void generate_protocol_se(map<string,stream_query *> &sq_map, table_list *Schema);
91
92
93 //              Checks if the operator i is compatible with interface partitioning
94 //              (can be pushed below the merge that combines partitioned stream)
95   bool is_partn_compatible(int i, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result);
96
97 //              Checks if the node i can be pushed below the merge
98   bool is_pushdown_compatible(int i, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names);
99
100 //              Push the operator below the merge that combines partitioned stream
101   void pushdown_partn_operator(int i);
102
103 //              Push the operator below the merge that combines regular (non-partitioned streams)
104   void pushdown_operator(int i, ext_fcn_list *Ext_fcns, table_list *Schema);
105
106 //              Splits query that combines data from multiple hosts into separate hftas
107   std::vector<stream_query*> split_multihost_query();
108
109 //              Extract subtree rooted at node i into separate hfta
110   stream_query* extract_subtree(int i);
111
112 //              Extract any external libraries needed by the oeprators in this hfta
113         void get_external_libs(std::set<std::string> &libset);
114
115
116 //              Perform local FTA optimizations
117   void optimize(vector<stream_query *>& hfta_list, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ext_fcn_list *Ext_fcns, table_list *Schema, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result);
118
119   int get_error_code(){return error_code;};
120   std::string get_error_str(){return err_str;};
121
122   void set_gid(int i){gid=i;};
123   int get_gid(){return gid;};
124
125
126   bool stream_input_only(table_list *Schema);
127
128   std::vector<tablevar_t *> get_input_tables();
129
130   table_def *get_output_tabledef();
131
132 //              Extract lfta components of the query
133   std::vector<stream_query *> split_query(ext_fcn_list *Ext_fcns, table_list *Schema, bool &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
134
135 //              Process and register opertor views
136   std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string silo_nm);
137
138   std::string collect_refd_ifaces();
139
140 //                      The serialization function
141   std::string make_schema();
142   std::string make_schema(int i);
143
144 //                      hfta generation.  Schema must contain the table_def's
145 //                      of all source tables (but extra tables will not hurt).
146   std::string generate_hfta(table_list *Schema, ext_fcn_list *Ext_fcns, opview_set &opviews, bool distributed_mode);
147
148 //              De-silo an hfta by replacing refs to lfta inputs with a
149 //              merge over all silo'ed lftas.
150 //              TO BE REMOVED
151 void desilo_lftas(std::map<std::string, int> &lfta_names,std::vector<std::string> &ifq_names,table_list *Schema);
152
153 void add_output_operator(ospec_str *);
154
155 private:
156 //              Helper function for generate_hfta
157 void compute_node_format(int q, std::vector<int> &nfmt, std::map<std::string, int> &op_idx);
158
159
160
161 };
162
163 ///////////////////////////////////////////////////////////////////////
164 ////            Related functions
165 void get_common_lfta_filter(std::vector<stream_query *> lfta_list,table_list *Schema,ext_fcn_list *Ext_fcns, std::vector<cnf_set *> &prefilter_preds, std::set<unsigned int> &pred_ids);
166 void get_prefilter_temporal_cids(std::vector<stream_query *> lfta_list, col_id_set &temp_cids);
167
168
169
170 #endif