Fixed newline characters throughout the code
[com/gs-lite.git] / src / ftacmp / stream_query.h
index 30453f1..075d143 100644 (file)
-/* ------------------------------------------------\r
-Copyright 2014 AT&T Intellectual Property\r
-   Licensed under the Apache License, Version 2.0 (the "License");\r
-   you may not use this file except in compliance with the License.\r
-   You may obtain a copy of the License at\r
-\r
-     http://www.apache.org/licenses/LICENSE-2.0\r
-\r
-   Unless required by applicable law or agreed to in writing, software\r
-   distributed under the License is distributed on an "AS IS" BASIS,\r
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
-   See the License for the specific language governing permissions and\r
-   limitations under the License.\r
- ------------------------------------------- */\r
-#ifndef __STREAM_QUERY_H_\r
-#define __STREAM_QUERY_H_\r
-\r
-#include<vector>\r
-#include<string>\r
-#include<map>\r
-\r
-#include"parse_schema.h"\r
-#include"parse_fta.h"\r
-#include"analyze_fta.h"\r
-#include"query_plan.h"\r
-#include"parse_partn.h"\r
-\r
-\r
-class stream_query{\r
-public:\r
-//             query_plan contains the query nodes, which form a query tree (dag?)\r
-//             by linking to each other via indices into this vector.\r
-//             NOTE: there might be holes (NULLs) in this list due to shifting\r
-//             around operators for optimization.\r
-       std::vector<qp_node *> query_plan;\r
-       std::vector<ospec_str *> output_specs;\r
-       vector<qp_node *> output_operators;\r
-\r
-       int qhead;\r
-       std::vector<int> qtail;\r
-\r
-       table_def *attributes;\r
-       param_table *parameters;\r
-       std::map<std::string, std::string> defines;\r
-       std::string query_name;\r
-\r
-       int n_parallel;         // If the HFTA has been parallelized, # of copies.\r
-       int parallel_idx;       // which of the cloned hftas this one is.\r
-                                               // needed for output splitting.\r
-\r
-       int n_successors;       // # of hftas which read from this one.\r
-\r
-       int gid;                // global id\r
-\r
-//             For error reporting.\r
-       int error_code;\r
-       std::string err_str;\r
-\r
-  stream_query(){\r
-       error_code = 0;\r
-       attributes = NULL;\r
-       parameters = NULL;\r
-  };\r
-\r
-//             Create stream query for split-off lfta.\r
-//             The parent stream query provides annotations.\r
-  stream_query(qp_node *qnode, stream_query *parent);\r
-\r
-//             Create stream query from analyzed parse tree.\r
-  stream_query(query_summary_class *qs,table_list *Schema);\r
-\r
-//             Make a copy.\r
-  stream_query(stream_query &source);\r
-//     used after making the copy\r
-  void set_nparallel(int n, int i){n_parallel = n; parallel_idx = i;}\r
-\r
-  qp_node *get_query_head(){return query_plan[qhead];};\r
-  std::string get_sq_name(){return query_plan[qhead]->get_node_name();};\r
-\r
-//             Add a parse tree to the query plan.\r
-  stream_query *add_query(query_summary_class *qs,table_list *Schema);\r
-  stream_query *add_query(stream_query &sc);\r
-\r
-  bool generate_linkage();\r
-  int generate_plan(table_list *Schema);\r
-\r
-\r
-//             Create a map of fields to scalar expression in Protocol fields\r
-//             for each query node in the stream.  For partitioning optimizations.\r
-  void generate_protocol_se(map<string,stream_query *> &sq_map, table_list *Schema);\r
-\r
-\r
-//             Checks if the operator i is compatible with interface partitioning\r
-//             (can be pushed below the merge that combines partitioned stream)\r
-  bool is_partn_compatible(int i, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result);\r
-\r
-//             Checks if the node i can be pushed below the merge\r
-  bool is_pushdown_compatible(int i, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names);\r
-\r
-//             Push the operator below the merge that combines partitioned stream\r
-  void pushdown_partn_operator(int i);\r
-\r
-//             Push the operator below the merge that combines regular (non-partitioned streams)\r
-  void pushdown_operator(int i, ext_fcn_list *Ext_fcns, table_list *Schema);\r
-\r
-//             Splits query that combines data from multiple hosts into separate hftas\r
-  std::vector<stream_query*> split_multihost_query();\r
-\r
-//             Extract subtree rooted at node i into separate hfta\r
-  stream_query* extract_subtree(int i);\r
-\r
-//             Extract any external libraries needed by the oeprators in this hfta\r
-       void get_external_libs(std::set<std::string> &libset);\r
-\r
-\r
-//             Perform local FTA optimizations\r
-  void optimize(vector<stream_query *>& hfta_list, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ext_fcn_list *Ext_fcns, table_list *Schema, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result);\r
-\r
-  int get_error_code(){return error_code;};\r
-  std::string get_error_str(){return err_str;};\r
-\r
-  void set_gid(int i){gid=i;};\r
-  int get_gid(){return gid;};\r
-\r
-\r
-  bool stream_input_only(table_list *Schema);\r
-\r
-  std::vector<tablevar_t *> get_input_tables();\r
-\r
-  table_def *get_output_tabledef();\r
-\r
-//             Extract lfta components of the query\r
-  std::vector<stream_query *> split_query(ext_fcn_list *Ext_fcns, table_list *Schema, bool &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);\r
-\r
-//             Process and register opertor views\r
-  std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string silo_nm);\r
-\r
-  std::string collect_refd_ifaces();\r
-\r
-//                     The serialization function\r
-  std::string make_schema();\r
-  std::string make_schema(int i);\r
-\r
-//                     hfta generation.  Schema must contain the table_def's\r
-//                     of all source tables (but extra tables will not hurt).\r
-  std::string generate_hfta(table_list *Schema, ext_fcn_list *Ext_fcns, opview_set &opviews, bool distributed_mode);\r
-\r
-//             De-silo an hfta by replacing refs to lfta inputs with a\r
-//             merge over all silo'ed lftas.\r
-//             TO BE REMOVED\r
-void desilo_lftas(std::map<std::string, int> &lfta_names,std::vector<std::string> &ifq_names,table_list *Schema);\r
-\r
-void add_output_operator(ospec_str *);\r
-\r
-private:\r
-//             Helper function for generate_hfta\r
-void compute_node_format(int q, std::vector<int> &nfmt, std::map<std::string, int> &op_idx);\r
-\r
-\r
-\r
-};\r
-\r
-///////////////////////////////////////////////////////////////////////\r
-////           Related functions\r
-void get_common_lfta_filter(std::vector<stream_query *> lfta_list,table_list *Schema,ext_fcn_list *Ext_fcns, std::vector<cnf_set *> &prefilter_preds, std::set<unsigned int> &pred_ids);\r
-void get_prefilter_temporal_cids(std::vector<stream_query *> lfta_list, col_id_set &temp_cids);\r
-\r
-\r
-\r
-#endif\r
+/* ------------------------------------------------
+Copyright 2014 AT&T Intellectual Property
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ ------------------------------------------- */
+#ifndef __STREAM_QUERY_H_
+#define __STREAM_QUERY_H_
+
+#include<vector>
+#include<string>
+#include<map>
+
+#include"parse_schema.h"
+#include"parse_fta.h"
+#include"analyze_fta.h"
+#include"query_plan.h"
+#include"parse_partn.h"
+
+
+class stream_query{
+public:
+//             query_plan contains the query nodes, which form a query tree (dag?)
+//             by linking to each other via indices into this vector.
+//             NOTE: there might be holes (NULLs) in this list due to shifting
+//             around operators for optimization.
+       std::vector<qp_node *> query_plan;
+       std::vector<ospec_str *> output_specs;
+       vector<qp_node *> output_operators;
+
+       int qhead;
+       std::vector<int> qtail;
+
+       table_def *attributes;
+       param_table *parameters;
+       std::map<std::string, std::string> defines;
+       std::string query_name;
+
+       int n_parallel;         // If the HFTA has been parallelized, # of copies.
+       int parallel_idx;       // which of the cloned hftas this one is.
+                                               // needed for output splitting.
+
+       int n_successors;       // # of hftas which read from this one.
+
+       int gid;                // global id
+
+//             For error reporting.
+       int error_code;
+       std::string err_str;
+
+  stream_query(){
+       error_code = 0;
+       attributes = NULL;
+       parameters = NULL;
+  };
+
+//             Create stream query for split-off lfta.
+//             The parent stream query provides annotations.
+  stream_query(qp_node *qnode, stream_query *parent);
+
+//             Create stream query from analyzed parse tree.
+  stream_query(query_summary_class *qs,table_list *Schema);
+
+//             Make a copy.
+  stream_query(stream_query &source);
+//     used after making the copy
+  void set_nparallel(int n, int i){n_parallel = n; parallel_idx = i;}
+
+  qp_node *get_query_head(){return query_plan[qhead];};
+  std::string get_sq_name(){return query_plan[qhead]->get_node_name();};
+
+//             Add a parse tree to the query plan.
+  stream_query *add_query(query_summary_class *qs,table_list *Schema);
+  stream_query *add_query(stream_query &sc);
+
+  bool generate_linkage();
+  int generate_plan(table_list *Schema);
+
+
+//             Create a map of fields to scalar expression in Protocol fields
+//             for each query node in the stream.  For partitioning optimizations.
+  void generate_protocol_se(map<string,stream_query *> &sq_map, table_list *Schema);
+
+
+//             Checks if the operator i is compatible with interface partitioning
+//             (can be pushed below the merge that combines partitioned stream)
+  bool is_partn_compatible(int i, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result);
+
+//             Checks if the node i can be pushed below the merge
+  bool is_pushdown_compatible(int i, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names);
+
+//             Push the operator below the merge that combines partitioned stream
+  void pushdown_partn_operator(int i);
+
+//             Push the operator below the merge that combines regular (non-partitioned streams)
+  void pushdown_operator(int i, ext_fcn_list *Ext_fcns, table_list *Schema);
+
+//             Splits query that combines data from multiple hosts into separate hftas
+  std::vector<stream_query*> split_multihost_query();
+
+//             Extract subtree rooted at node i into separate hfta
+  stream_query* extract_subtree(int i);
+
+//             Extract any external libraries needed by the oeprators in this hfta
+       void get_external_libs(std::set<std::string> &libset);
+
+
+//             Perform local FTA optimizations
+  void optimize(vector<stream_query *>& hfta_list, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ext_fcn_list *Ext_fcns, table_list *Schema, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result);
+
+  int get_error_code(){return error_code;};
+  std::string get_error_str(){return err_str;};
+
+  void set_gid(int i){gid=i;};
+  int get_gid(){return gid;};
+
+
+  bool stream_input_only(table_list *Schema);
+
+  std::vector<tablevar_t *> get_input_tables();
+
+  table_def *get_output_tabledef();
+
+//             Extract lfta components of the query
+  std::vector<stream_query *> split_query(ext_fcn_list *Ext_fcns, table_list *Schema, bool &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
+
+//             Process and register opertor views
+  std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string silo_nm);
+
+  std::string collect_refd_ifaces();
+
+//                     The serialization function
+  std::string make_schema();
+  std::string make_schema(int i);
+
+//                     hfta generation.  Schema must contain the table_def's
+//                     of all source tables (but extra tables will not hurt).
+  std::string generate_hfta(table_list *Schema, ext_fcn_list *Ext_fcns, opview_set &opviews, bool distributed_mode);
+
+//             De-silo an hfta by replacing refs to lfta inputs with a
+//             merge over all silo'ed lftas.
+//             TO BE REMOVED
+void desilo_lftas(std::map<std::string, int> &lfta_names,std::vector<std::string> &ifq_names,table_list *Schema);
+
+void add_output_operator(ospec_str *);
+
+private:
+//             Helper function for generate_hfta
+void compute_node_format(int q, std::vector<int> &nfmt, std::map<std::string, int> &op_idx);
+
+
+
+};
+
+///////////////////////////////////////////////////////////////////////
+////           Related functions
+void get_common_lfta_filter(std::vector<stream_query *> lfta_list,table_list *Schema,ext_fcn_list *Ext_fcns, std::vector<cnf_set *> &prefilter_preds, std::set<unsigned int> &pred_ids);
+void get_prefilter_temporal_cids(std::vector<stream_query *> lfta_list, col_id_set &temp_cids);
+
+
+
+#endif