Added watchlist support
[com/gs-lite.git] / src / ftacmp / query_plan.cc
index e3ce4be..512299f 100644 (file)
@@ -98,6 +98,66 @@ mrg_qpn::mrg_qpn(filter_join_qpn *spx, std::string n_name, std::vector<std::stri
 }
 
 
+mrg_qpn::mrg_qpn(watch_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb){
+               param_tbl = spx->param_tbl;
+               int i;
+               node_name = n_name;
+               field_entry_list *fel = new field_entry_list();
+               merge_fieldpos = -1;
+
+               disorder = 1;
+
+               for(i=0;i<spx->select_list.size();++i){
+                       data_type *dt = spx->select_list[i]->se->get_data_type()->duplicate();
+                       if(dt->is_temporal()){
+                               if(merge_fieldpos < 0){
+                                       merge_fieldpos = i;
+                               }else{
+                                       fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), spx->select_list[merge_fieldpos]->name.c_str(), spx->select_list[i]->name.c_str(), spx->select_list[merge_fieldpos]->name.c_str() );
+                                       dt->reset_temporal();
+                               }
+                       }
+
+                       field_entry *fe = dt->make_field_entry(spx->select_list[i]->name);
+                       fel->append_field(fe);
+                       delete dt;
+               }
+               if(merge_fieldpos<0){
+                       fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
+                               exit(1);
+               }
+               table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
+
+//                             NEED TO HANDLE USER_SPECIFIED SLACK
+               this->resolve_slack(spx->select_list[merge_fieldpos]->se,
+                               spx->select_list[merge_fieldpos]->name, ifaces, ifdb,NULL);
+//     if(this->slack == NULL)
+//             fprintf(stderr,"Zero slack.\n");
+//     else
+//             fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
+
+               for(i=0;i<sources.size();i++){
+                       std::string rvar = "_m"+int_to_string(i);
+                       mvars.push_back(new colref_t(rvar.c_str(), spx->select_list[merge_fieldpos]->name.c_str()));
+                       mvars[i]->set_tablevar_ref(i);
+                       fm.push_back(new tablevar_t(sources[i].c_str()));
+                       fm[i]->set_range_var(rvar);
+               }
+
+               param_tbl = new param_table();
+               std::vector<std::string> param_names = spx->param_tbl->get_param_names();
+               int pi;
+               for(pi=0;pi<param_names.size();pi++){
+                       data_type *dt = spx->param_tbl->get_data_type(param_names[pi]);
+                       param_tbl->add_param(param_names[pi],dt->duplicate(),
+                                                       spx->param_tbl->handle_access(param_names[pi]));
+               }
+               definitions = spx->definitions;
+
+}
+
+
+
 
 //             This function translates an analyzed parse tree
 //             into one or more query nodes (qp_node).
@@ -121,6 +181,16 @@ vector<qp_node *> create_query_nodes(query_summary_class *qs,table_list *Schema)
 //                     into the qp_node constructors,
 //                     and have this code focus on building the query plan tree.
 
+//             Watchlist node
+       if(qs->query_type == WATCHLIST_QUERY){
+               watch_tbl_qpn *watchnode = new watch_tbl_qpn(qs, Schema);
+
+//                     Done
+               plan_root = watchnode;
+               local_plan.push_back(watchnode);
+       }
+
+
 //             MERGE node
        if(qs->query_type == MERGE_QUERY){
                mrg_qpn *merge_node = new mrg_qpn(qs,Schema);
@@ -159,7 +229,9 @@ printf("\n");
 */
 
 
-       } else{
+       } 
+       
+       if(qs->query_type == SELECT_QUERY){
 
 //             Select / Aggregation / Join
          if(qs->gb_tbl->size() == 0 && qs->aggr_tbl->size() == 0){
@@ -175,9 +247,15 @@ printf("\n");
                                plan_root = join_node;
                                local_plan.push_back(join_node);
                        }else{
-                               join_eq_hash_qpn *join_node = new join_eq_hash_qpn(qs,Schema);
-                               plan_root = join_node;
-                               local_plan.push_back(join_node);
+                               if(qs->fta_tree->get_from()->get_properties() == WATCHLIST_JOIN_PROPERTY){
+                                       watch_join_qpn *join_node = new watch_join_qpn(qs,Schema);
+                                       plan_root = join_node;
+                                       local_plan.push_back(join_node);
+                               }else{
+                                       join_eq_hash_qpn *join_node = new join_eq_hash_qpn(qs,Schema);
+                                       plan_root = join_node;
+                                       local_plan.push_back(join_node);
+                               }
                        }
                }
          }else{
@@ -2077,6 +2155,20 @@ string sgahcwcb_qpn::to_query_string(){
        return(ret);
 }
 
+string watch_tbl_qpn::to_query_string(){
+       string ret;
+//     ret += "DEFINE {\n";
+//     ret += "\tfilename='"+filename+";\n";
+//     ret += "\trefresh_interval="+to_string(refresh_interval)+";\n}\n";
+       ret += "WATCHLIST FIELDS {\n";
+       std::vector<field_entry *> fields = table_layout->get_fields();
+       for(int f=0;f<fields.size();++f){
+               ret += fields[f]->to_string()+"\n";
+       }
+       ret += "}\n";
+
+       return ret;
+}
 
 string mrg_qpn::to_query_string(){
 
@@ -2181,6 +2273,42 @@ string filter_join_qpn::to_query_string(){
        return(ret);
 }
 
+string watch_join_qpn::to_query_string(){
+
+       string ret = "Select ";
+       int s;
+       for(s=0;s<select_list.size();s++){
+               if(s>0) ret+=", ";
+               ret += se_to_query_string(select_list[s]->se, NULL);
+               if(select_list[s]->name != "") ret += " AS "+select_list[s]->name;
+       }
+       ret += "\n";
+
+//                     NOTE: assuming binary join.
+       ret += "WATCHLIST_JOIN ";
+
+       ret += "From ";
+       int f;
+       for(f=0;f<from.size();++f){
+               if(f>0) ret+=", ";
+               ret += from[f]->to_string();
+       }
+       ret += "\n";
+
+       if(where.size() > 0){
+               ret += "Where ";
+               int w;
+               for(w=0;w<where.size();w++){
+                       if(w>0) ret += " AND ";
+                       ret += "(" + pred_to_query_str(where[w]->pr,NULL) + ")";
+               }
+               ret += "\n";
+       }
+
+       return(ret);
+}
+
+
 
 // -----------------------------------------------------------------
 //             Query node subclass specific processing.
@@ -2390,6 +2518,18 @@ void mrg_qpn::resolve_slack(scalarexp_t *t_se, string fname, vector<pair<string,
 //------------------------------------------------------------------
 //             split a node to extract LFTA components.
 
+vector<qp_node *> watch_tbl_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
+       // nothing to do, nothing to split, return copy of self.
+
+       hfta_returned = 0;
+
+       vector<qp_node *> ret_vec;
+
+       ret_vec.push_back(this);
+       return(ret_vec);
+
+}
+
 
 vector<qp_node *> mrg_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
        // nothing to do, nothing to split, return copy of self.
@@ -2429,7 +2569,7 @@ vector<qp_node *> filter_join_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, ta
        vector<string> sel_names;
        vector<pair<string,string> > ifaces = get_ifaces(from[0], ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
        if (ifaces.empty()) {
-               fprintf(stderr,"INTERNAL ERROR in filter_join_qpn::split_node_for_fta - empty interface set\n");
+               fprintf(stderr,"INTERNAL ERROR in filter_join_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
                exit(1);
        }
 
@@ -2481,35 +2621,35 @@ vector<qp_node *> filter_join_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, ta
                        }
 
                        for(p=0;p<shared_pred.size();p++){
-                               predicate_t *new_pr = dup_pr(where[p]->pr, NULL);
+                               predicate_t *new_pr = dup_pr(shared_pred[p]->pr, NULL);
                                cnf_elem *new_cnf = new cnf_elem(new_pr);
                                analyze_cnf(new_cnf);
                                fta_node->shared_pred.push_back(new_cnf);
                                fta_node->where.push_back(new_cnf);
                        }
                        for(p=0;p<pred_t0.size();p++){
-                               predicate_t *new_pr = dup_pr(where[p]->pr, NULL);
+                               predicate_t *new_pr = dup_pr(pred_t0[p]->pr, NULL);
                                cnf_elem *new_cnf = new cnf_elem(new_pr);
                                analyze_cnf(new_cnf);
                                fta_node->pred_t0.push_back(new_cnf);
                                fta_node->where.push_back(new_cnf);
                        }
                        for(p=0;p<pred_t1.size();p++){
-                               predicate_t *new_pr = dup_pr(where[p]->pr, NULL);
+                               predicate_t *new_pr = dup_pr(pred_t1[p]->pr, NULL);
                                cnf_elem *new_cnf = new cnf_elem(new_pr);
                                analyze_cnf(new_cnf);
                                fta_node->pred_t1.push_back(new_cnf);
                                fta_node->where.push_back(new_cnf);
                        }
                        for(p=0;p<hash_eq.size();p++){
-                               predicate_t *new_pr = dup_pr(where[p]->pr, NULL);
+                               predicate_t *new_pr = dup_pr(hash_eq[p]->pr, NULL);
                                cnf_elem *new_cnf = new cnf_elem(new_pr);
                                analyze_cnf(new_cnf);
                                fta_node->hash_eq.push_back(new_cnf);
                                fta_node->where.push_back(new_cnf);
                        }
                        for(p=0;p<postfilter.size();p++){
-                               predicate_t *new_pr = dup_pr(where[p]->pr, NULL);
+                               predicate_t *new_pr = dup_pr(postfilter[p]->pr, NULL);
                                cnf_elem *new_cnf = new cnf_elem(new_pr);
                                analyze_cnf(new_cnf);
                                fta_node->postfilter.push_back(new_cnf);
@@ -2543,6 +2683,155 @@ vector<qp_node *> filter_join_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, ta
 
 }
 
+
+
+
+
+vector<qp_node *> watch_join_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
+       vector<qp_node *> ret_vec;
+
+//             First check if the query can be pushed to the FTA.
+       bool fta_ok = true;
+       int s;
+       for(s=0;s<select_list.size();s++){
+               fta_ok &= check_fta_forbidden_se(select_list[s]->se,NULL, Ext_fcns);
+       }
+       int p;
+       for(p=0;p<where.size();p++){
+               fta_ok &= check_fta_forbidden_pr(where[p]->pr,NULL, Ext_fcns);
+       }
+
+       if(!fta_ok){
+               fprintf(stderr,"ERROR, watchlist join %s is fta-unsafe.\n",node_name.c_str());
+               exit(1);
+       }
+
+//             Can it be done in a single lfta?
+//                     Get the set of interfaces it accesses.
+       int ierr;
+       int si;
+       vector<string> sel_names;
+       vector<pair<string,string> > ifaces = get_ifaces(from[0], ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
+       if (ifaces.empty()) {
+               fprintf(stderr,"INTERNAL ERROR in filter_join_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
+               exit(1);
+       }
+
+       if(ifaces.size() == 1){
+//                             Single interface, no need to merge.
+               hfta_returned = 0;
+               ret_vec.push_back(this);
+
+//     Treat the range vars a bit differently, the 2nd is reading from a _local_ watchlist.
+               from[0]->set_machine(ifaces[0].first);
+               from[0]->set_interface(ifaces[0].second);
+               from[0]->set_ifq(false);
+
+               from[1]->set_machine(ifaces[0].first);
+               from[1]->set_interface("_local_");
+               from[1]->set_ifq(false);
+
+               return(ret_vec);
+       }else{
+//                             Multiple interfaces, generate the interface-specific queries plus
+//                             the merge.
+               hfta_returned = 1;
+
+               vector<string> sel_names;
+               for(si=0;si<ifaces.size();++si){
+                       watch_join_qpn *fta_node = new watch_join_qpn();
+
+//                     Name the fta
+                       if(ifaces.size()==1)
+                               fta_node->set_node_name( node_name );
+                       else{
+                               string new_name =  "_"+node_name+"_"+ifaces[si].first+"_"+ifaces[si].second;
+                               untaboo(new_name);
+                               fta_node->set_node_name(new_name);
+                       }
+                       sel_names.push_back(fta_node->get_node_name());
+
+//                     Assign the table
+                       int f;
+                       for(f=0;f<from.size();f++){
+                               fta_node->from.push_back(from[f]->duplicate());
+                               fta_node->from[f]->set_machine(ifaces[si].first);
+                               if(f==0)
+                                       fta_node->from[f]->set_interface(ifaces[si].second);
+                               else
+                                       fta_node->from[f]->set_interface("_local_");
+                               fta_node->from[f]->set_ifq(false);
+                       }
+
+                       for(s=0;s<select_list.size();s++){
+                               fta_node->select_list.push_back( dup_select(select_list[s], NULL) );
+                       }
+
+                       for(p=0;p<pred_t0.size();p++){
+                               predicate_t *new_pr = dup_pr(pred_t0[p]->pr, NULL);
+                               cnf_elem *new_cnf = new cnf_elem(new_pr);
+                               analyze_cnf(new_cnf);
+                               fta_node->pred_t0.push_back(new_cnf);
+                               fta_node->where.push_back(new_cnf);
+                       }
+                       for(p=0;p<pred_t1.size();p++){
+                               predicate_t *new_pr = dup_pr(pred_t1[p]->pr, NULL);
+                               cnf_elem *new_cnf = new cnf_elem(new_pr);
+                               analyze_cnf(new_cnf);
+                               fta_node->pred_t1.push_back(new_cnf);
+                               fta_node->where.push_back(new_cnf);
+                       }
+                       for(p=0;p<key_flds.size();p++){ // we've checked that all keys are covered
+                               string k = key_flds[p];
+                               predicate_t *new_pr = dup_pr(hash_eq[k]->pr, NULL);
+                               cnf_elem *new_cnf = new cnf_elem(new_pr);
+                               analyze_cnf(new_cnf);
+                               fta_node->hash_eq[k] = new_cnf;
+                               fta_node->where.push_back(new_cnf);
+                       }
+                       for(p=0;p<join_filter.size();p++){
+                               predicate_t *new_pr = dup_pr(join_filter[p]->pr, NULL);
+                               cnf_elem *new_cnf = new cnf_elem(new_pr);
+                               analyze_cnf(new_cnf);
+                               fta_node->postfilter.push_back(new_cnf);
+                               fta_node->where.push_back(new_cnf);
+                       }
+                       for(p=0;p<postfilter.size();p++){
+                               predicate_t *new_pr = dup_pr(postfilter[p]->pr, NULL);
+                               cnf_elem *new_cnf = new cnf_elem(new_pr);
+                               analyze_cnf(new_cnf);
+                               fta_node->postfilter.push_back(new_cnf);
+                               fta_node->where.push_back(new_cnf);
+                       }
+                       fta_node->key_flds = key_flds;
+
+//                     Xfer all of the parameters.
+//                     Use existing handle annotations.
+                       vector<string> param_names = param_tbl->get_param_names();
+                       int pi;
+                       for(pi=0;pi<param_names.size();pi++){
+                               data_type *dt = param_tbl->get_data_type(param_names[pi]);
+                               fta_node->param_tbl->add_param(param_names[pi],dt->duplicate(),
+                                                                       param_tbl->handle_access(param_names[pi]));
+                       }
+                       fta_node->definitions = definitions;
+                       if(fta_node->resolve_if_params(ifdb, this->err_str)){
+                               this->error_code = 3;
+                               return ret_vec;
+                       }
+
+                       ret_vec.push_back(fta_node);
+               }
+
+               mrg_qpn *mrg_node = new mrg_qpn((watch_join_qpn *)ret_vec[0],
+                        node_name,  sel_names,ifaces, ifdb);
+               ret_vec.push_back(mrg_node);
+
+               return(ret_vec);
+       }
+
+}
+
 //             Use to search for unresolved interface param refs in an hfta.
 
 int spx_qpn::count_ifp_refs(set<string> &ifpnames){
@@ -2606,6 +2895,10 @@ int rsgah_qpn::count_ifp_refs(set<string> &ifpnames){
        return ret;
 }
 
+int watch_tbl_qpn::count_ifp_refs(set<string> &ifpnames){
+       return 0;
+}
+
 int mrg_qpn::count_ifp_refs(set<string> &ifpnames){
        return 0;
 }
@@ -2638,6 +2931,17 @@ int filter_join_qpn::count_ifp_refs(set<string> &ifpnames){
        return ret;
 }
 
+int watch_join_qpn::count_ifp_refs(set<string> &ifpnames){
+       int ret = 0;
+       int i;
+       for(i=0;i<select_list.size();++i)
+               ret += count_se_ifp_refs(select_list[i]->se,ifpnames);
+       for(i=0;i<where.size();++i)
+               ret += count_pr_ifp_refs(where[i]->pr,ifpnames);
+       return ret;
+}
+
+
 
 //             Resolve interface params to string literals
 int filter_join_qpn::resolve_if_params( ifq_t *ifdb, string &err){
@@ -2654,6 +2958,20 @@ int filter_join_qpn::resolve_if_params( ifq_t *ifdb, string &err){
        return ret;
 }
 
+int watch_join_qpn::resolve_if_params( ifq_t *ifdb, string &err){
+       int ret = 0;
+       int i;
+       string ifname = from[0]->get_interface();
+       string ifmach = from[0]->get_machine();
+       for(i=0;i<select_list.size();++i)
+               if( resolve_se_ifp_refs(select_list[i]->se,ifmach, ifname, ifdb, err) )
+                       ret = 1;
+       for(i=0;i<where.size();++i)
+               if( resolve_pr_ifp_refs(where[i]->pr,ifmach, ifname, ifdb, err))
+                       ret = 1;
+       return ret;
+}
+
 
 int spx_qpn::resolve_if_params( ifq_t *ifdb, string &err){
        int ret = 0;
@@ -2784,7 +3102,7 @@ vector<qp_node *> spx_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_list
        vector<string> sel_names;
        vector<pair<string,string> > ifaces = get_ifaces(table_name, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
        if (ifaces.empty()) {
-               fprintf(stderr,"INTERNAL ERROR in spx_qpn::split_node_for_fta - empty interface set\n");
+               fprintf(stderr,"INTERNAL ERROR in spx_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
                exit(1);
        }
 
@@ -3138,7 +3456,7 @@ vector<qp_node *> rsgah_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_li
        vector<string> sel_names;
        vector<pair<string,string> > ifaces = get_ifaces(table_name, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
        if (ifaces.empty()) {
-               fprintf(stderr,"INTERNAL ERROR in rsgah_qpn::split_node_for_fta - empty interface set\n");
+               fprintf(stderr,"INTERNAL ERROR in rsgah_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
                exit(1);
        }
 
@@ -3805,7 +4123,7 @@ vector<qp_node *> sgah_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_lis
        vector<string> sel_names;
        vector<pair<string,string> > ifaces = get_ifaces(table_name, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
        if (ifaces.empty()) {
-               fprintf(stderr,"INTERNAL ERROR in sgah_qpn::split_node_for_fta - empty interface set\n");
+               fprintf(stderr,"INTERNAL ERROR in sgah_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
                exit(1);
        }
 
@@ -4735,7 +5053,7 @@ vector<qp_node *> join_eq_hash_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, t
                        spx_qpn *c_node = child_vec[f];
                        vector<pair<string, string> > ifaces = get_ifaces(c_node->table_name, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
                        if (ifaces.empty()) {
-                               fprintf(stderr,"INTERNAL ERROR in join_eq_hash_qpn::split_node_for_fta - empty interface set\n");
+                               fprintf(stderr,"INTERNAL ERROR in join_eq_hash_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
                                exit(1);
                        }
 
@@ -5002,6 +5320,21 @@ vector<table_exp_t *> filter_join_qpn::extract_opview(table_list *Schema,  vecto
     return(ret);
 }
 
+vector<table_exp_t *> watch_join_qpn::extract_opview(table_list *Schema,  vector<query_node *> &qnodes, opview_set &opviews, string rootnm, string silo_name){
+       vector<table_exp_t *> ret;
+       int retval = process_opview(from[0],0,node_name,
+                                                               Schema,qnodes,opviews,ret, rootnm, silo_name);
+       if(retval) exit(1);
+    return(ret);
+}
+
+
+
+vector<table_exp_t *> watch_tbl_qpn::extract_opview(table_list *Schema,  vector<query_node *> &qnodes, opview_set &opviews, string rootnm, string silo_name){
+       vector<table_exp_t *> ret;
+       return ret;             // nothing to process
+}
+
 
 
 //////////////////////////////////////////////////////////////////
@@ -5017,6 +5350,10 @@ table_def *mrg_qpn::get_fields(){
        return(table_layout);
 }
 
+table_def *watch_tbl_qpn::get_fields(){
+       return(table_layout);
+}
+
 
 table_def *spx_qpn::get_fields(){
        return(create_attributes(node_name, select_list));
@@ -5038,6 +5375,9 @@ table_def *filter_join_qpn::get_fields(){
        return(create_attributes(node_name, select_list));
 }
 
+table_def *watch_join_qpn::get_fields(){
+       return(create_attributes(node_name, select_list));
+}
 
 table_def *join_eq_hash_qpn::get_fields(){
        int i, h, s, t;
@@ -5202,6 +5542,11 @@ vector<string> rsgah_qpn::get_tbl_keys(vector<string> &partial_keys){
                return(fm);
        }
 
+       vector<tablevar_t *> watch_tbl_qpn::get_input_tbls(){
+               vector<tablevar_t *> ret;
+               return(ret);
+       }
+
        vector<tablevar_t *> mrg_qpn::get_input_tbls(){
                return(fm);
        }
@@ -5234,6 +5579,10 @@ vector<string> rsgah_qpn::get_tbl_keys(vector<string> &partial_keys){
                return(from);
        }
 
+       vector<tablevar_t *> watch_join_qpn::get_input_tbls(){
+               return(from);
+       }
+
 //-----------------------------------------------------------------
 //                     get output tables
 
@@ -5245,6 +5594,11 @@ vector<string> rsgah_qpn::get_tbl_keys(vector<string> &partial_keys){
                return(retval);
        }
 
+       vector<tablevar_t *> watch_tbl_qpn::get_output_tbls(){
+               vector<tablevar_t *> retval(1,new tablevar_t(node_name.c_str()));
+               return(retval);
+       }
+
        vector<tablevar_t *> mrg_qpn::get_output_tbls(){
                vector<tablevar_t *> retval(1,new tablevar_t(node_name.c_str()));
                return(retval);
@@ -5281,6 +5635,12 @@ vector<string> rsgah_qpn::get_tbl_keys(vector<string> &partial_keys){
        }
 
 
+       vector<tablevar_t *> watch_join_qpn::get_output_tbls(){
+               vector<tablevar_t *> retval(1,new tablevar_t(node_name.c_str()));
+               return(retval);
+       }
+
+
 
 //-----------------------------------------------------------------
 //                     Bind to schema
@@ -5378,6 +5738,30 @@ col_id_set filter_join_qpn::get_colrefs(bool ext_fcns_only,table_list *Schema){
        return tmp_cset;
 }
 
+col_id_set watch_join_qpn::get_colrefs(bool ext_fcns_only,table_list *Schema){
+       col_id_set retval, tmp_cset;
+       int p;
+       for(p=0;p<where.size();++p){
+               gather_pr_col_ids(where[p]->pr, tmp_cset, NULL);
+       }
+       int s;
+       for(s=0;s<select_list.size();++s){
+               gather_se_col_ids(select_list[s]->se, tmp_cset, NULL);
+       }
+       col_id_set::iterator  cisi;
+       if(ext_fcns_only){
+               for(cisi=tmp_cset.begin();cisi!=tmp_cset.end();++cisi){
+                       field_entry *fe = Schema->get_field((*cisi).schema_ref, (*cisi).field);
+                       if(fe->get_unpack_fcns().size()>0)
+                               retval.insert((*cisi));
+               }
+               return retval;
+       }
+
+       return tmp_cset;
+}
+
+
 
 
 //             Associate colrefs in SEs with this schema.
@@ -5437,6 +5821,35 @@ void filter_join_qpn::bind_to_schema(table_list *Schema){
 
 }
 
+void watch_join_qpn::bind_to_schema(table_list *Schema){
+//                     Bind the tablevars in the From clause to the Schema
+//                     (it might have changed from analysis time)
+       int f;
+       for(f=0;f<from.size();++f){
+               string snm = from[f]->get_schema_name();
+               int tbl_ref = Schema->get_table_ref(snm);
+               if(tbl_ref >= 0)
+               from[f]->set_schema_ref(tbl_ref);
+       }
+
+//                     Bind all SEs to this schema
+       tablevar_list_t fm(from);
+
+       int p;
+       for(p=0;p<where.size();++p){
+               bind_to_schema_pr(where[p]->pr, &fm, Schema);
+       }
+       int s;
+       for(s=0;s<select_list.size();++s){
+               bind_to_schema_se(select_list[s]->se, &fm, Schema);
+       }
+
+//             Collect set of tuples referenced in this HFTA
+//             input, internal, or output.
+
+}
+
+
 
 
 
@@ -5624,6 +6037,10 @@ void sgahcwcb_qpn::bind_to_schema(table_list *Schema){
 //-----------------------------------------------------------------
 //             get_cplx_lit_tbl
 
+cplx_lit_table *watch_tbl_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){
+       return(new cplx_lit_table());
+}
+
 cplx_lit_table *mrg_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){
        return(new cplx_lit_table());
 }
@@ -5771,12 +6188,32 @@ cplx_lit_table *filter_join_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){
        return(complex_literals);
 }
 
+cplx_lit_table *watch_join_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){
+       int i;
+       cplx_lit_table *complex_literals = new cplx_lit_table();
+
+       for(i=0;i<select_list.size();i++){
+               find_complex_literal_se(select_list[i]->se, Ext_fcns, complex_literals);
+       }
+       for(i=0;i<where.size();++i){
+               find_complex_literal_pr(where[i]->pr,Ext_fcns, complex_literals);
+       }
+
+       return(complex_literals);
+}
+
+
 
 
 
 //-----------------------------------------------------------------
 //             get_handle_param_tbl
 
+vector<handle_param_tbl_entry *> watch_tbl_qpn::get_handle_param_tbl(ext_fcn_list *Ext_fcns){
+    vector<handle_param_tbl_entry *> retval;
+       return(retval);
+}
+
 vector<handle_param_tbl_entry *> mrg_qpn::get_handle_param_tbl(ext_fcn_list *Ext_fcns){
     vector<handle_param_tbl_entry *> retval;
        return(retval);
@@ -5928,6 +6365,22 @@ vector<handle_param_tbl_entry *> filter_join_qpn::get_handle_param_tbl(ext_fcn_l
        return(retval);
 }
 
+vector<handle_param_tbl_entry *> watch_join_qpn::get_handle_param_tbl(ext_fcn_list *Ext_fcns){
+       int i;
+    vector<handle_param_tbl_entry *> retval;
+
+       for(i=0;i<select_list.size();i++){
+               find_param_handles_se(select_list[i]->se, Ext_fcns, retval);
+       }
+       for(i=0;i<where.size();++i){
+               find_param_handles_pr(where[i]->pr,Ext_fcns, retval);
+       }
+
+       return(retval);
+}
+
+
+
 ///////////////////////////////////////////////////////////////
 ///////////////////////////////////////////////////////////////
 ///            Functions for operator output rates estimations
@@ -5960,6 +6413,12 @@ double sgahcwcb_qpn::get_rate_estimate() {
        return SGAHCWCB_SELECTIVITY * DEFAULT_INTERFACE_RATE;
 }
 
+double watch_tbl_qpn::get_rate_estimate() {
+
+       // dummy method for now
+       return DEFAULT_INTERFACE_RATE;
+}
+
 double mrg_qpn::get_rate_estimate() {
 
        // dummy method for now
@@ -9876,6 +10335,23 @@ string mrg_qpn::generate_operator(int i, string params){
        );
 }
 
+////////////////////////////////////////////////
+///            WATCHLIST_TBL operator
+///            WATCHLIST_TBL functor
+////////////////////////////////////////////
+
+string watch_tbl_qpn::generate_functor_name(){
+       return("watch_tbl_functor_" + normalize_name(this->get_node_name()));
+}
+
+string watch_tbl_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, vector<bool> &needs_xform){
+
+       return("ERROR_WATCH_TBL_FUNCTOR_NOT_YET_IMPLEMENTED");
+}
+
+string watch_tbl_qpn::generate_operator(int i, string params){
+       return("ERROR_WATCH_TBL_FUNCTOR_NOT_YET_IMPLEMENTED");
+}
 
 /////////////////////////////////////////////////////////
 //////                 JOIN_EQ_HASH functor
@@ -13521,6 +13997,29 @@ void filter_join_qpn::create_protocol_se(vector<qp_node *> q_sources, table_list
        }
 }
 
+void watch_join_qpn::create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){
+       int i;
+       vector<map<string, scalarexp_t *> *> src_vec;
+
+       for(i=0;i<q_sources.size();i++){
+               if(q_sources[i] != NULL)
+                       src_vec.push_back(q_sources[i]->get_protocol_se());
+               else
+                       src_vec.push_back(NULL);
+       }
+
+       for(i=0;i<select_list.size();i++){
+               protocol_map[select_list[i]->name] = resolve_protocol_se(select_list[i]->se,src_vec,NULL,Schema);
+       }
+
+       for(i=0;i<key_flds.size();i++){
+               string kfld = key_flds[i];
+               hash_src_l.push_back(resolve_protocol_se(hash_eq[kfld]->pr->get_left_se(),src_vec,NULL,Schema));
+               hash_src_r.push_back(resolve_protocol_se(hash_eq[kfld]->pr->get_right_se(),src_vec,NULL,Schema));
+       }
+}
+
+
 void sgah_qpn::create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){
        int i;
        vector<map<string, scalarexp_t *> *> src_vec;
@@ -13620,3 +14119,8 @@ void mrg_qpn::create_protocol_se(vector<qp_node *> q_sources, table_list *Schema
                        protocol_map[fld_nm] = NULL;
        }
 }
+
+void watch_tbl_qpn::create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){
+       return;
+}
+