+
+// TODO : put tests on other operators to ensure they dont' read from a watchlist
+// TODO : register with : is_partn_compatible pushdown_partn_operator is_pushdown_compatible pushdown_operator ?
+class watch_join_qpn: public qp_node{
+public:
+ std::vector<tablevar_t *> from; // Source tables
+ std::vector<select_element *> select_list; // Select list
+ std::vector<cnf_elem *> pred_t0; // main (R) preds
+ std::vector<cnf_elem *> pred_t1; // watchlist-only (S) preds (?)
+ std::map<std::string, cnf_elem *> hash_eq; // predicates on S hash keys
+ std::vector<cnf_elem *> join_filter; // ref's R, S, but not a hash
+ std::vector<cnf_elem *> postfilter; // ref's no table.
+
+ std::vector<std::string> key_flds;
+
+ std::vector<cnf_elem *> where; // all the filters
+ // useful for summary analysis
+
+ std::vector<scalarexp_t *> hash_src_r, hash_src_l;
+ std::vector<scalarexp_t *> get_hash_r(){return hash_src_r;}
+ std::vector<scalarexp_t *> get_hash_l(){return hash_src_l;}
+
+
+
+ std::string node_type(){return("watch_join"); };
+ bool makes_transform(){return true;};
+ std::vector<std::string> external_libs(){
+ std::vector<std::string> ret;
+ return ret;
+ }
+
+ void bind_to_schema(table_list *Schema);
+ col_id_set get_colrefs(bool ext_fcns_only,table_list *Schema);
+
+ std::string to_query_string();
+ std::string generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, std::vector<bool> &needs_xform){
+ fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_functor called\n");
+ exit(1);
+ }
+ std::string generate_functor_name(){
+ fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_functor_name called\n");
+ exit(1);
+ }
+ std::string generate_operator(int i, std::string params){
+ fprintf(stderr,"INTERNAL ERROR, watch_join_qpn::generate_operator called\n");
+ exit(1);
+ }
+ std::string get_include_file(){return("#include <watchlist_operator.h>\n");};
+
+ std::vector<select_element *> get_select_list(){return select_list;};
+ std::vector<scalarexp_t *> get_select_se_list(){
+ std::vector<scalarexp_t *> ret;
+ int i;
+ for(i=0;i<select_list.size();++i) ret.push_back(select_list[i]->se);
+ return ret;
+ };
+// Used for LFTA only
+ void append_to_where(cnf_elem *c){
+ where.push_back(c);
+ }
+
+ std::vector<cnf_elem *> get_where_clause(){return where;}
+
+ std::vector<cnf_elem *> get_filter_clause(){return pred_t0;}
+
+ cplx_lit_table *get_cplx_lit_tbl(ext_fcn_list *Ext_fcns);
+ std::vector<handle_param_tbl_entry *> get_handle_param_tbl(ext_fcn_list *Ext_fcns);
+
+ table_def *get_fields();
+// It should be feasible to find keys in a watchlist join
+ std::vector<string> get_tbl_keys(std::vector<std::string> &partial_keys){
+ std::vector<string> ret;
+ return ret;
+ }
+
+ std::vector<tablevar_t *> get_input_tbls();
+ std::vector<tablevar_t *> get_output_tbls();
+
+ std::vector<qp_node *> split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx);
+ int resolve_if_params(ifq_t *ifdb, std::string &err);
+
+ virtual std::vector<table_exp_t *> extract_opview(table_list *Schema, std::vector<query_node *> &qnodes, opview_set &opviews, std::string rootnm, std::string silo_nm);
+// Ensure that any refs to interface params have been split away.
+ int count_ifp_refs(std::set<std::string> &ifpnames);
+
+// CONSTRUCTOR
+ watch_join_qpn(){
+ };
+ watch_join_qpn(query_summary_class *qs,table_list *Schema){
+ int i,w;
+// Get the table name.
+// NOTE the colrefs have the table ref (an int)
+// embedded in them. Would it make sense
+// to grab the whole table list?
+ from = qs->fta_tree->get_from()->get_table_list();
+ if(from.size() != 2){
+ char tmpstr[200];
+ sprintf(tmpstr,"ERROR building filter_join_qpn node: query defined over %lu tables, but joins must be between two sources.\n",from.size() );
+ err_str += tmpstr;
+ error_code = 1;
+ }
+
+ int t = from[0]->get_schema_ref();
+ if(Schema->get_schema_type(t) != PROTOCOL_SCHEMA){
+ err_str += "ERROR in query "+node_name+", the LHS of the join must be a PROTOCOL\n";
+ error_code = 1;
+ }
+ t = from[1]->get_schema_ref();
+ if(Schema->get_schema_type(t) != WATCHLIST_SCHEMA){
+ err_str += "ERROR in query "+node_name+", the RHS of the join must be a WATCHLIST\n";
+ error_code = 1;
+ }
+ key_flds = Schema->get_table(t)->get_keys();
+
+
+// Get the select list.
+ select_list = qs->fta_tree->get_sl_vec();
+
+// Get the selection predicate.
+ where = qs->wh_cnf;
+ std::vector<cnf_elem *> t0_only, t1_only;
+ for(w=0;w<where.size();++w){
+ analyze_cnf(where[w]);
+ std::vector<int> pred_tbls;
+ get_tablevar_ref_pr(where[w]->pr,pred_tbls);
+// Collect the list of preds by src var,
+// extract the shared preds later.
+ if(pred_tbls.size()==1){
+ if(pred_tbls[0] == 0){
+ pred_t0.push_back(where[w]);
+ }else{
+ pred_t1.push_back(where[w]);
+ }
+ continue;
+ }
+// refs nothing -- might be sampling, do it as postfilter.
+ if(pred_tbls.size()==0){
+ postfilter.push_back(where[w]);
+ continue;
+ }
+
+// Must reference both
+// See if it can be a hash predicate.
+ if(where[w]->is_atom && where[w]->eq_pred){
+ std::vector<int> sel_tbls, ser_tbls;
+ get_tablevar_ref_se(where[w]->pr->get_left_se(),sel_tbls);
+ get_tablevar_ref_se(where[w]->pr->get_right_se(),ser_tbls);
+ if(sel_tbls.size()==1 && ser_tbls.size()==1 && sel_tbls[0] != ser_tbls[0]){
+// make channel 0 SE on LHS.
+ if(sel_tbls[0] != 0)
+ where[w]->swap_scalar_operands();
+
+// Must be simple (a colref) on the RHS
+ if(where[w]->r_simple){
+ string rcol = where[w]->pr->get_right_se()->get_colref()->get_field();
+ if(std::find(key_flds.begin(), key_flds.end(), rcol) != key_flds.end()){
+ hash_eq[rcol] = where[w];
+
+ data_type *dtl=where[w]->pr->get_left_se()->get_data_type();
+ data_type *dtr=where[w]->pr->get_right_se()->get_data_type();
+ if( (dtl->is_increasing() && dtr->is_increasing()) || (dtl->is_decreasing() && dtr->is_decreasing()) )
+ err_str += "Warning, a watchlist join should not have join predicates on temporal fields, query "+node_name+".\n";
+ continue;
+ }
+ }
+ }
+ }
+// All tests failed, fallback is join_filter.
+ join_filter.push_back(where[w]);
+ }
+
+ if(key_flds.size() > hash_eq.size()){
+ err_str += "Error, in query "+node_name+" the watchlist join does not cover all fields in the watchlist with an equality predicate. Missing fields are";
+ for(int k=0;k<key_flds.size();++k){
+ if(hash_eq.count(key_flds[k]) < 1){
+ err_str += " "+key_flds[k];
+ }
+ }
+ err_str += ".\n";
+ error_code = 5;
+ }
+
+
+// Get the parameters
+ param_tbl = qs->param_tbl;
+ definitions = qs->definitions;
+
+ };
+
+ // the following method is used for distributed query optimization
+ double get_rate_estimate();
+
+
+ qp_node* make_copy(std::string suffix){
+ watch_join_qpn *ret = new watch_join_qpn();
+
+ ret->param_tbl = new param_table();
+ std::vector<std::string> param_names = param_tbl->get_param_names();
+ int pi;
+ for(pi=0;pi<param_names.size();pi++){
+ data_type *dt = param_tbl->get_data_type(param_names[pi]);
+ ret->param_tbl->add_param(param_names[pi],dt->duplicate(),
+ param_tbl->handle_access(param_names[pi]));
+ }
+ ret->definitions = definitions;
+
+ ret->node_name = node_name + suffix;
+
+ // make shallow copy of all fields
+ ret->where = where;
+ ret->from = from;
+ ret->select_list = select_list;
+ ret->key_flds = key_flds;
+ ret->pred_t0 = pred_t0;
+ ret->pred_t1 = pred_t1;
+ ret->join_filter = join_filter;
+ ret->postfilter = postfilter;
+ ret->hash_eq = hash_eq;
+ ret->hash_src_r = hash_src_r;
+ ret->hash_src_l = hash_src_l;
+
+ return ret;
+ };
+
+ void create_protocol_se(vector<qp_node *> q_sources, table_list *Schema);
+
+};
+
+
+
+