Added watchlist support
[com/gs-lite.git] / src / ftacmp / query_plan.h
index 8ba08a4..ec782e0 100644 (file)
@@ -87,7 +87,9 @@ struct query_node{
                mangler="";
 
                tablevar_list_t *fm = parse_tree->get_from();
-               refd_tbls =  fm->get_table_names();
+               if(fm!=NULL){
+                       refd_tbls =  fm->get_table_names();
+               }
 
                params  = pt->query_params;
        };
@@ -296,7 +298,7 @@ public:
 
 //                     The "where" clause is a pre-filter
   virtual  std::vector<cnf_elem *> get_where_clause() = 0;
-//                     To be more explicit, use get_filter_preds
+//                     To be more explicit, use get_filter_preds, this is used to compute the prefilter
   virtual  std::vector<cnf_elem *> get_filter_clause() = 0;
 
 //             Add an extra predicate.  Currently only used for LFTAs.
@@ -402,6 +404,7 @@ public:
 //                             embedded in them.  Would it make sense
 //                             to grab the whole table list?
                tablevar_list_t *fm = qs->fta_tree->get_from();
+               
                std::vector<tablevar_t *> tbl_vec = fm->get_table_list();
                if(tbl_vec.size() != 1){
                        char tmpstr[200];
@@ -411,6 +414,12 @@ public:
                }
                table_name = (tbl_vec[0]);
 
+               int t = tbl_vec[0]->get_schema_ref();
+               if(! Schema->is_stream(t)){
+                       err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
+                       error_code = 1;
+               }
+
 //                             Get the select list.
                select_list = qs->fta_tree->get_sl_vec();
 
@@ -564,6 +573,13 @@ public:
                }
                table_name = (tbl_vec[0]);
 
+               int t = tbl_vec[0]->get_schema_ref();
+               if(! Schema->is_stream(t)){
+                       err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
+                       error_code = 1;
+               }
+
+
 //                             Get the select list.
                select_list = qs->fta_tree->get_sl_vec();
 
@@ -754,6 +770,12 @@ public:
                }
                table_name = (tbl_vec[0]);
 
+               int t = tbl_vec[0]->get_schema_ref();
+               if(! Schema->is_stream(t)){
+                       err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
+                       error_code = 1;
+               }
+
 //                             Get the select list.
                select_list = qs->fta_tree->get_sl_vec();
 
@@ -828,8 +850,158 @@ public:
 };
 
 
+
+//             Watchlist - from a table read from an external source.
+
+class watch_tbl_qpn: public qp_node{
+public:
+       table_def *table_layout;                                // the output schema
+       std::vector<std::string> key_flds;
+
+//             Parameters related to loading the table
+       std::string filename;
+       int refresh_interval;
+
+       
+       void append_to_where(cnf_elem *c){
+               fprintf(stderr, "ERROR, append_to_where called on watch_tbl_qpn, not supported, query %s.\n",  node_name.c_str());
+               exit(1);
+       }
+
+       std::string node_type(){return("watch_tbl_qpn");        };
+    bool makes_transform(){return false;};
+       std::vector<std::string> external_libs(){
+               std::vector<std::string> ret;
+               return ret;
+       }
+
+       void bind_to_schema(table_list *Schema){}
+       col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema){
+               col_id_set ret;
+               return ret;
+       }
+
+       std::string to_query_string();
+       std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform);
+       std::string generate_functor_name();
+       std::string generate_operator(int i, std::string params);
+       std::string get_include_file(){
+               return("#include <watchlist_tbl.h>\n");
+       };
+
+       cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
+    std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
+
+       table_def *get_fields();
+       std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
+               return key_flds;
+       }
+
+       std::vector<tablevar_t *> get_input_tbls();
+       std::vector<tablevar_t *> get_output_tbls();
+
+       std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
+       virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);
+//             Ensure that any refs to interface params have been split away.
+       int count_ifp_refs(std::set<std::string> &ifpnames);
+
+//                     No predicates, return an empty clause
+    std::vector<cnf_elem *> get_where_clause(){
+                std::vector<cnf_elem *> t;
+               return(t);
+       };
+    std::vector<cnf_elem *> get_filter_clause(){
+               return get_where_clause();
+       }
+
+       watch_tbl_qpn(){
+       };
+
+       watch_tbl_qpn(query_summary_class *qs,table_list *Schema){
+               node_name=qs->query_name;
+               param_tbl = qs->param_tbl;
+               definitions = qs->definitions;
+               
+               
+//                     Populate the schema
+               table_layout = new table_def(
+                       node_name.c_str(), NULL, NULL,  qs->fta_tree->fel, WATCHLIST_SCHEMA
+               );
+
+//                     Find the keys
+               std::vector<field_entry *> flds = qs->fta_tree->fel->get_list();
+               for(int f=0;f<flds.size();++f){
+                       if(flds[f]->get_modifier_list()->contains_key("key") ||
+                               flds[f]->get_modifier_list()->contains_key("Key") ||
+                               flds[f]->get_modifier_list()->contains_key("KEY") ){
+                                       key_flds.push_back(flds[f]->get_name());
+                       }
+               }
+               if(key_flds.size()==0){
+                       fprintf(stderr,"Error, no key fields defined for table watchlist %s\n",node_name.c_str());
+                       exit(1);
+               }
+
+               table_layout->set_keys(key_flds);       // communicate keys to consumers
+
+//                     Get loading parameters
+               if(definitions.count("filename")>0){
+                       filename = definitions["filename"];
+               }else{
+                       fprintf(stderr, "Error, no filename for source data defined for table watchlist %s\n",node_name.c_str());
+                       exit(1);
+               }
+               if(definitions.count("refresh_interval")>0){
+                       refresh_interval = atoi(definitions["refresh_interval"].c_str());
+                       if(refresh_interval <= 0){
+                               fprintf(stderr, "Error, the refresh_interval (%s) of table watchlist %s must be a positive non-zero integer.\n",definitions["refresh_interval"].c_str(), node_name.c_str());
+                               exit(1);
+                       }
+               }else{
+                       fprintf(stderr, "Error, no refresh_interval defined for table watchlist %s\n",node_name.c_str());
+                       exit(1);
+               }
+
+       }
+
+       qp_node *make_copy(std::string suffix){
+               watch_tbl_qpn *ret = new watch_tbl_qpn();
+               ret->filename = filename;
+               ret->refresh_interval = refresh_interval;
+               ret->key_flds = key_flds;
+
+               ret->param_tbl = new param_table();
+               std::vector<std::string> param_names = param_tbl->get_param_names();
+               int pi;
+               for(pi=0;pi<param_names.size();pi++){
+                       data_type *dt = param_tbl->get_data_type(param_names[pi]);
+                       ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
+                                                       param_tbl->handle_access(param_names[pi]));
+               }
+               ret->definitions = definitions;
+
+               ret->node_name = node_name + suffix;
+               ret->table_layout = table_layout->make_shallow_copy(ret->node_name);
+
+               return ret;
+       };
+
+       // the following method is used for distributed query optimization
+       double get_rate_estimate();
+
+       void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
+       
+
+};
+
+
+
+
+
+
 //             forward reference
 class filter_join_qpn;
+class watch_join_qpn;
 
 
 //             (temporal) Merge query plan node.
@@ -939,6 +1111,14 @@ public:
                        exit(1);
                }
 
+               for(int f=0;f<fm.size();++f){
+                       int t=fm[f]->get_schema_ref();
+                       if(! Schema->is_stream(t)){
+                               err_str += "ERROR in query "+node_name+", the source "+fm[f]->get_schema_name()+" is not a stream.\n";
+                               error_code = 1;
+                       }
+               }
+
 //                             Get the parameters
                param_tbl = qs->param_tbl;
 
@@ -1098,6 +1278,10 @@ printf("\n");
 
        mrg_qpn(filter_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb);
 
+//                     Merge watch_join LFTAs.
+
+       mrg_qpn(watch_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb);
+
 //                     Merge selection LFTAs.
 
        mrg_qpn(spx_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb){
@@ -1417,6 +1601,15 @@ public:
                        error_code = 1;
                }
 
+               for(int f=0;f<from.size();++f){
+                       int t=from[f]->get_schema_ref();
+                       if(! Schema->is_stream(t)){
+                               err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n";
+                               error_code = 1;
+                       }
+               }
+
+
 //                             Get the select list.
                select_list = qs->fta_tree->get_sl_vec();
 
@@ -1643,6 +1836,15 @@ public:
                        error_code = 1;
                }
 
+               for(int f=0;f<from.size();++f){
+                       int t=from[f]->get_schema_ref();
+                       if(! Schema->is_stream(t)){
+                               err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n";
+                               error_code = 1;
+                       }
+               }
+
+
 //                             Get the select list.
                select_list = qs->fta_tree->get_sl_vec();
 //                             Verify that only t0 is referenced.
@@ -1778,6 +1980,237 @@ public:
 };
 
 
+
+//     TODO : put tests on other operators to ensure they dont' read from a watchlist
+//     TODO : register with : is_partn_compatible pushdown_partn_operator is_pushdown_compatible pushdown_operator ?
+class watch_join_qpn: public qp_node{
+public:
+       std::vector<tablevar_t *> from;                                 //      Source tables
+       std::vector<select_element *> select_list;      //      Select list
+       std::vector<cnf_elem *> pred_t0;                        // main (R) preds
+       std::vector<cnf_elem *> pred_t1;                        // watchlist-only (S) preds (?)
+       std::map<std::string, cnf_elem *> hash_eq;      // predicates on S hash keys
+       std::vector<cnf_elem *> join_filter;            // ref's R, S, but not a hash
+       std::vector<cnf_elem *> postfilter;                     // ref's no table.
+
+       std::vector<std::string> key_flds;
+
+       std::vector<cnf_elem *> where;                          // all the filters
+                                                                                               // useful for summary analysis
+
+       std::vector<scalarexp_t *> hash_src_r, hash_src_l;
+       std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
+       std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
+
+
+
+       std::string node_type(){return("watch_join");   };
+    bool makes_transform(){return true;};
+       std::vector<std::string> external_libs(){
+               std::vector<std::string> ret;
+               return ret;
+       }
+
+       void bind_to_schema(table_list *Schema);
+       col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
+
+       std::string to_query_string();
+       std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform){
+               fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_functor called\n");
+               exit(1);
+       }
+       std::string generate_functor_name(){
+               fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_functor_name called\n");
+               exit(1);
+       }
+       std::string generate_operator(int i, std::string params){
+               fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_operator called\n");
+               exit(1);
+       }
+       std::string get_include_file(){return("#include <watchlist_operator.h>\n");};
+
+    std::vector<select_element *> get_select_list(){return select_list;};
+    std::vector<scalarexp_t *> get_select_se_list(){
+               std::vector<scalarexp_t *> ret;
+               int i;
+               for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
+               return ret;
+       };
+//                     Used for LFTA only
+       void append_to_where(cnf_elem *c){
+               where.push_back(c);
+       }
+
+    std::vector<cnf_elem *> get_where_clause(){return where;}
+
+    std::vector<cnf_elem *> get_filter_clause(){return pred_t0;}
+
+       cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
+    std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
+
+       table_def *get_fields();
+//             It should be feasible to find keys in a watchlist join
+       std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
+               std::vector<string> ret;
+               return ret;
+       }
+
+       std::vector<tablevar_t *> get_input_tbls();
+       std::vector<tablevar_t *> get_output_tbls();
+
+       std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
+       int resolve_if_params(ifq_t *ifdb, std::string &err);
+
+       virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes,  opview_set &opviews, std::string rootnm, std::string silo_nm);
+//             Ensure that any refs to interface params have been split away.
+       int count_ifp_refs(std::set<std::string> &ifpnames);
+
+//             CONSTRUCTOR
+       watch_join_qpn(){
+       };
+       watch_join_qpn(query_summary_class *qs,table_list *Schema){
+               int i,w;
+//                             Get the table name.
+//                             NOTE the colrefs have the table ref (an int)
+//                             embedded in them.  Would it make sense
+//                             to grab the whole table list?
+               from = qs->fta_tree->get_from()->get_table_list();
+               if(from.size() != 2){
+                       char tmpstr[200];
+                       sprintf(tmpstr,"ERROR building filter_join_qpn node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
+                       err_str += tmpstr;
+                       error_code = 1;
+               }
+
+               int t = from[0]->get_schema_ref();
+               if(Schema->get_schema_type(t) != PROTOCOL_SCHEMA){
+                       err_str += "ERROR in query "+node_name+", the LHS of the join must be a PROTOCOL\n";
+                       error_code = 1;
+               }
+               t = from[1]->get_schema_ref();
+               if(Schema->get_schema_type(t) != WATCHLIST_SCHEMA){
+                       err_str += "ERROR in query "+node_name+", the RHS of the join must be a WATCHLIST\n";
+                       error_code = 1;
+               }
+               key_flds = Schema->get_table(t)->get_keys();
+
+
+//                             Get the select list.
+               select_list = qs->fta_tree->get_sl_vec();
+
+//                             Get the selection predicate.
+               where = qs->wh_cnf;
+               std::vector<cnf_elem *> t0_only, t1_only;
+               for(w=0;w<where.size();++w){
+                       analyze_cnf(where[w]);
+                       std::vector<int> pred_tbls;
+                       get_tablevar_ref_pr(where[w]->pr,pred_tbls);
+//                             Collect the list of preds by src var,
+//                             extract the shared preds later.
+                       if(pred_tbls.size()==1){
+                               if(pred_tbls[0] == 0){
+                                       pred_t0.push_back(where[w]);
+                               }else{
+                                       pred_t1.push_back(where[w]);
+                               }
+                               continue;
+                       }
+//                             refs nothing -- might be sampling, do it as postfilter.
+                       if(pred_tbls.size()==0){
+                               postfilter.push_back(where[w]);
+                               continue;
+                       }
+
+//             Must reference both
+//                             See if it can be a hash predicate.
+                       if(where[w]->is_atom && where[w]->eq_pred){
+                               std::vector<int> sel_tbls, ser_tbls;
+                               get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
+                               get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
+                               if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
+//                                             make channel 0 SE on LHS.
+                                       if(sel_tbls[0] != 0)
+                                               where[w]->swap_scalar_operands();
+
+//             Must be simple (a colref) on the RHS
+                                               if(where[w]->r_simple){
+                                                       string rcol = where[w]->pr->get_right_se()->get_colref()->get_field();
+                                                       if(std::find(key_flds.begin(), key_flds.end(), rcol) != key_flds.end()){
+                                                               hash_eq[rcol] = where[w];
+
+                                                               data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
+                                                               data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
+                                                               if( (dtl->is_increasing() && dtr->is_increasing()) || (dtl->is_decreasing() && dtr->is_decreasing()) )
+                                                                       err_str += "Warning, a watchlist join should not have join predicates on temporal fields, query "+node_name+".\n";
+                                                       continue;
+                                               }
+                                       }
+                               }
+                       }
+//                             All tests failed, fallback is join_filter.
+                       join_filter.push_back(where[w]);
+               }
+
+               if(key_flds.size() > hash_eq.size()){
+                       err_str += "Error, in query "+node_name+" the watchlist join does not cover all fields in the watchlist with an equality predicate.  Missing fields are";
+                       for(int k=0;k<key_flds.size();++k){
+                               if(hash_eq.count(key_flds[k]) < 1){
+                                       err_str += " "+key_flds[k];
+                               }
+                       }
+                       err_str += ".\n";
+                       error_code = 5;
+               }
+                                       
+
+//                             Get the parameters
+               param_tbl = qs->param_tbl;
+               definitions = qs->definitions;
+
+       };
+
+       // the following method is used for distributed query optimization
+       double get_rate_estimate();
+
+
+       qp_node* make_copy(std::string suffix){
+               watch_join_qpn *ret = new watch_join_qpn();
+
+               ret->param_tbl = new param_table();
+               std::vector<std::string> param_names = param_tbl->get_param_names();
+               int pi;
+               for(pi=0;pi<param_names.size();pi++){
+                       data_type *dt = param_tbl->get_data_type(param_names[pi]);
+                       ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
+                                                       param_tbl->handle_access(param_names[pi]));
+               }
+               ret->definitions = definitions;
+
+               ret->node_name = node_name + suffix;
+
+               // make shallow copy of all fields
+               ret->where = where;
+               ret->from = from;
+               ret->select_list = select_list;
+               ret->key_flds = key_flds;
+               ret->pred_t0 = pred_t0;
+               ret->pred_t1 = pred_t1;
+               ret->join_filter = join_filter;
+               ret->postfilter = postfilter;
+               ret->hash_eq = hash_eq;
+               ret->hash_src_r = hash_src_r;
+               ret->hash_src_l = hash_src_l;
+
+               return ret;
+       };
+
+       void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
+
+};
+
+
+
+
 enum output_file_type_enum {regular, gzip, bzip};
 
 class output_file_qpn: public qp_node{
@@ -1886,6 +2319,18 @@ public:
                hfta_query_name = qn;
                eat_input = ei;
 
+//                     TODO stream checking, but it requires passing Schema to output_file_qpn
+/*
+               for(int f=0;f<fm.size();++f){
+                       int t=fm[f]->get_schema_ref();
+                       if(! Schema->is_stream(t)){
+                               err_str += "ERROR in query "+node_name+", the source "+from[f]->get_schema_name()+" is not a stream.\n";
+                               error_code = 1;
+                       }
+               }
+*/
+
+
                do_gzip = false;
                compression_type = regular;
                if(ospec->operator_type == "zfile")
@@ -2064,6 +2509,13 @@ public:
                }
                table_name = (tbl_vec[0]);
 
+               int t = tbl_vec[0]->get_schema_ref();
+               if(! Schema->is_stream(t)){
+                       err_str += "ERROR in query "+node_name+", the source "+table_name->get_schema_name()+" is not a stream.\n";
+                       error_code = 1;
+               }
+
+
 //                             Get the select list.
                select_list = qs->fta_tree->get_sl_vec();
 
@@ -2155,5 +2607,4 @@ void untaboo(string &s);
 table_def *create_attributes(string tname, vector<select_element *> &select_list);
 
 
-
 #endif