X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fftacmp%2Fstream_query.h;h=ae4b31e7495329e6894fb60749aee5c67b1a3eed;hb=f1754ecea2eab7bd0a302042ac82eb11667b166c;hp=30453f1ea100ccc4652f773b79a68dead2b471a6;hpb=07495effe193ca3f73c3bf0ce417068f9ac9dcdd;p=com%2Fgs-lite.git diff --git a/src/ftacmp/stream_query.h b/src/ftacmp/stream_query.h index 30453f1..ae4b31e 100644 --- a/src/ftacmp/stream_query.h +++ b/src/ftacmp/stream_query.h @@ -1,170 +1,173 @@ -/* ------------------------------------------------ -Copyright 2014 AT&T Intellectual Property - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - ------------------------------------------- */ -#ifndef __STREAM_QUERY_H_ -#define __STREAM_QUERY_H_ - -#include -#include -#include - -#include"parse_schema.h" -#include"parse_fta.h" -#include"analyze_fta.h" -#include"query_plan.h" -#include"parse_partn.h" - - -class stream_query{ -public: -// query_plan contains the query nodes, which form a query tree (dag?) -// by linking to each other via indices into this vector. -// NOTE: there might be holes (NULLs) in this list due to shifting -// around operators for optimization. - std::vector query_plan; - std::vector output_specs; - vector output_operators; - - int qhead; - std::vector qtail; - - table_def *attributes; - param_table *parameters; - std::map defines; - std::string query_name; - - int n_parallel; // If the HFTA has been parallelized, # of copies. - int parallel_idx; // which of the cloned hftas this one is. - // needed for output splitting. - - int n_successors; // # of hftas which read from this one. - - int gid; // global id - -// For error reporting. - int error_code; - std::string err_str; - - stream_query(){ - error_code = 0; - attributes = NULL; - parameters = NULL; - }; - -// Create stream query for split-off lfta. -// The parent stream query provides annotations. - stream_query(qp_node *qnode, stream_query *parent); - -// Create stream query from analyzed parse tree. - stream_query(query_summary_class *qs,table_list *Schema); - -// Make a copy. - stream_query(stream_query &source); -// used after making the copy - void set_nparallel(int n, int i){n_parallel = n; parallel_idx = i;} - - qp_node *get_query_head(){return query_plan[qhead];}; - std::string get_sq_name(){return query_plan[qhead]->get_node_name();}; - -// Add a parse tree to the query plan. - stream_query *add_query(query_summary_class *qs,table_list *Schema); - stream_query *add_query(stream_query &sc); - - bool generate_linkage(); - int generate_plan(table_list *Schema); - - -// Create a map of fields to scalar expression in Protocol fields -// for each query node in the stream. For partitioning optimizations. - void generate_protocol_se(map &sq_map, table_list *Schema); - - -// Checks if the operator i is compatible with interface partitioning -// (can be pushed below the merge that combines partitioned stream) - bool is_partn_compatible(int i, map lfta_names, vector interface_names, vector machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result); - -// Checks if the node i can be pushed below the merge - bool is_pushdown_compatible(int i, map lfta_names, vector interface_names, vector machine_names); - -// Push the operator below the merge that combines partitioned stream - void pushdown_partn_operator(int i); - -// Push the operator below the merge that combines regular (non-partitioned streams) - void pushdown_operator(int i, ext_fcn_list *Ext_fcns, table_list *Schema); - -// Splits query that combines data from multiple hosts into separate hftas - std::vector split_multihost_query(); - -// Extract subtree rooted at node i into separate hfta - stream_query* extract_subtree(int i); - -// Extract any external libraries needed by the oeprators in this hfta - void get_external_libs(std::set &libset); - - -// Perform local FTA optimizations - void optimize(vector& hfta_list, map lfta_names, vector interface_names, vector machine_names, ext_fcn_list *Ext_fcns, table_list *Schema, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result); - - int get_error_code(){return error_code;}; - std::string get_error_str(){return err_str;}; - - void set_gid(int i){gid=i;}; - int get_gid(){return gid;}; - - - bool stream_input_only(table_list *Schema); - - std::vector get_input_tables(); - - table_def *get_output_tabledef(); - -// Extract lfta components of the query - std::vector split_query(ext_fcn_list *Ext_fcns, table_list *Schema, bool &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx); - -// Process and register opertor views - std::vector extract_opview(table_list *Schema, std::vector &qnodes, opview_set &opviews, std::string silo_nm); - - std::string collect_refd_ifaces(); - -// The serialization function - std::string make_schema(); - std::string make_schema(int i); - -// hfta generation. Schema must contain the table_def's -// of all source tables (but extra tables will not hurt). - std::string generate_hfta(table_list *Schema, ext_fcn_list *Ext_fcns, opview_set &opviews, bool distributed_mode); - -// De-silo an hfta by replacing refs to lfta inputs with a -// merge over all silo'ed lftas. -// TO BE REMOVED -void desilo_lftas(std::map &lfta_names,std::vector &ifq_names,table_list *Schema); - -void add_output_operator(ospec_str *); - -private: -// Helper function for generate_hfta -void compute_node_format(int q, std::vector &nfmt, std::map &op_idx); - - - -}; - -/////////////////////////////////////////////////////////////////////// -//// Related functions -void get_common_lfta_filter(std::vector lfta_list,table_list *Schema,ext_fcn_list *Ext_fcns, std::vector &prefilter_preds, std::set &pred_ids); -void get_prefilter_temporal_cids(std::vector lfta_list, col_id_set &temp_cids); - - - -#endif +/* ------------------------------------------------ +Copyright 2014 AT&T Intellectual Property + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ------------------------------------------- */ +#ifndef __STREAM_QUERY_H_ +#define __STREAM_QUERY_H_ + +#include +#include +#include + +#include"parse_schema.h" +#include"parse_fta.h" +#include"analyze_fta.h" +#include"query_plan.h" +#include"parse_partn.h" + + +class stream_query{ +public: +// query_plan contains the query nodes, which form a query tree (dag?) +// by linking to each other via indices into this vector. +// NOTE: there might be holes (NULLs) in this list due to shifting +// around operators for optimization. + std::vector query_plan; + std::vector output_specs; + vector output_operators; + + int qhead; // index of output operator + std::vector qtail; + + table_def *attributes; + param_table *parameters; + std::map defines; + std::string query_name; + + int n_parallel; // If the HFTA has been parallelized, # of copies. + int parallel_idx; // which of the cloned hftas this one is. + // needed for output splitting. + + int n_successors; // # of hftas which read from this one. + + int gid; // global id + +// For error reporting. + int error_code; + std::string err_str; + + stream_query(){ + error_code = 0; + attributes = NULL; + parameters = NULL; + }; + +// Create stream query for split-off lfta. +// The parent stream query provides annotations. + stream_query(qp_node *qnode, stream_query *parent); + +// Create stream query from analyzed parse tree. + stream_query(query_summary_class *qs,table_list *Schema); + +// Make a copy. + stream_query(stream_query &source); +// used after making the copy + void set_nparallel(int n, int i){n_parallel = n; parallel_idx = i;} + + qp_node *get_query_head(){return query_plan[qhead];}; + std::string get_sq_name(){return query_plan[qhead]->get_node_name();}; + +// Add a parse tree to the query plan. + stream_query *add_query(query_summary_class *qs,table_list *Schema); + stream_query *add_query(stream_query &sc); + + bool generate_linkage(); + int generate_plan(table_list *Schema); + + +// Create a map of fields to scalar expression in Protocol fields +// for each query node in the stream. For partitioning optimizations. + void generate_protocol_se(map &sq_map, table_list *Schema); + + +// Checks if the operator i is compatible with interface partitioning +// (can be pushed below the merge that combines partitioned stream) + bool is_partn_compatible(int i, map lfta_names, vector interface_names, vector machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result); + +// Checks if the node i can be pushed below the merge + bool is_pushdown_compatible(int i, map lfta_names, vector interface_names, vector machine_names); + +// Push the operator below the merge that combines partitioned stream + void pushdown_partn_operator(int i); + +// Push the operator below the merge that combines regular (non-partitioned streams) + void pushdown_operator(int i, ext_fcn_list *Ext_fcns, table_list *Schema); + +// Splits query that combines data from multiple hosts into separate hftas + std::vector split_multihost_query(); + +// Extract subtree rooted at node i into separate hfta + stream_query* extract_subtree(int i); + +// Extract any external libraries needed by the oeprators in this hfta + void get_external_libs(std::set &libset); + + +// Perform local FTA optimizations + void optimize(vector& hfta_list, map lfta_names, vector interface_names, vector machine_names, ext_fcn_list *Ext_fcns, table_list *Schema, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result); + + int get_error_code(){return error_code;}; + std::string get_error_str(){return err_str;}; + + void set_gid(int i){gid=i;}; + int get_gid(){return gid;}; + + + bool stream_input_only(table_list *Schema); + + std::vector get_input_tables(); + + table_def *get_output_tabledef(); + +// get any inferred key definition, just a pass-through to the head _qpn method + std::vector get_tbl_keys(std::vector &partial_keys); + +// Extract lfta components of the query + std::vector split_query(ext_fcn_list *Ext_fcns, table_list *Schema, bool &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx); + +// Process and register opertor views + std::vector extract_opview(table_list *Schema, std::vector &qnodes, opview_set &opviews, std::string silo_nm); + + std::string collect_refd_ifaces(); + +// The serialization function + std::string make_schema(); + std::string make_schema(int i); + +// hfta generation. Schema must contain the table_def's +// of all source tables (but extra tables will not hurt). + std::string generate_hfta(table_list *Schema, ext_fcn_list *Ext_fcns, opview_set &opviews, bool distributed_mode); + +// De-silo an hfta by replacing refs to lfta inputs with a +// merge over all silo'ed lftas. +// TO BE REMOVED +void desilo_lftas(std::map &lfta_names,std::vector &ifq_names,table_list *Schema); + +void add_output_operator(ospec_str *); + +private: +// Helper function for generate_hfta +void compute_node_format(int q, std::vector &nfmt, std::map &op_idx); + + + +}; + +/////////////////////////////////////////////////////////////////////// +//// Related functions +void get_common_lfta_filter(std::vector lfta_list,table_list *Schema,ext_fcn_list *Ext_fcns, std::vector &prefilter_preds, std::set &pred_ids); +void get_prefilter_temporal_cids(std::vector lfta_list, col_id_set &temp_cids); + + + +#endif