Fix parsing of floating point numbers
[com/gs-lite.git] / src / ftacmp / query_plan.cc
index c84edb9..8e05ae2 100644 (file)
@@ -98,16 +98,76 @@ mrg_qpn::mrg_qpn(filter_join_qpn *spx, std::string n_name, std::vector<std::stri
 }
 
 
+mrg_qpn::mrg_qpn(watch_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb){
+               param_tbl = spx->param_tbl;
+               int i;
+               node_name = n_name;
+               field_entry_list *fel = new field_entry_list();
+               merge_fieldpos = -1;
+
+               disorder = 1;
+
+               for(i=0;i<spx->select_list.size();++i){
+                       data_type *dt = spx->select_list[i]->se->get_data_type()->duplicate();
+                       if(dt->is_temporal()){
+                               if(merge_fieldpos < 0){
+                                       merge_fieldpos = i;
+                               }else{
+                                       fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), spx->select_list[merge_fieldpos]->name.c_str(), spx->select_list[i]->name.c_str(), spx->select_list[merge_fieldpos]->name.c_str() );
+                                       dt->reset_temporal();
+                               }
+                       }
+
+                       field_entry *fe = dt->make_field_entry(spx->select_list[i]->name);
+                       fel->append_field(fe);
+                       delete dt;
+               }
+               if(merge_fieldpos<0){
+                       fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
+                               exit(1);
+               }
+               table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
+
+//                             NEED TO HANDLE USER_SPECIFIED SLACK
+               this->resolve_slack(spx->select_list[merge_fieldpos]->se,
+                               spx->select_list[merge_fieldpos]->name, ifaces, ifdb,NULL);
+//     if(this->slack == NULL)
+//             fprintf(stderr,"Zero slack.\n");
+//     else
+//             fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
+
+               for(i=0;i<sources.size();i++){
+                       std::string rvar = "_m"+int_to_string(i);
+                       mvars.push_back(new colref_t(rvar.c_str(), spx->select_list[merge_fieldpos]->name.c_str()));
+                       mvars[i]->set_tablevar_ref(i);
+                       fm.push_back(new tablevar_t(sources[i].c_str()));
+                       fm[i]->set_range_var(rvar);
+               }
+
+               param_tbl = new param_table();
+               std::vector<std::string> param_names = spx->param_tbl->get_param_names();
+               int pi;
+               for(pi=0;pi<param_names.size();pi++){
+                       data_type *dt = spx->param_tbl->get_data_type(param_names[pi]);
+                       param_tbl->add_param(param_names[pi],dt->duplicate(),
+                                                       spx->param_tbl->handle_access(param_names[pi]));
+               }
+               definitions = spx->definitions;
+
+}
+
+
+
 
 //             This function translates an analyzed parse tree
 //             into one or more query nodes (qp_node).
 //             Currently only one node is created, but some query
 //             fragments might create more than one query node,
-//             e.g. aggregation over a joim, or nested subqueries
-//             in the FROM clause (unless this is handles at parse tree
+//             e.g. aggregation over a join, or nested subqueries
+//             in the FROM clause (unless this is handled at parse tree
 //             analysis time).  At this stage, they will be linked
 //             by the names in the FROM clause.
-//             INVARIANT : if mroe than one query node is returned,
+//             INVARIANT : if more than one query node is returned,
 //             the last one represents the output of the query.
 vector<qp_node *> create_query_nodes(query_summary_class *qs,table_list *Schema){
 
@@ -121,6 +181,16 @@ vector<qp_node *> create_query_nodes(query_summary_class *qs,table_list *Schema)
 //                     into the qp_node constructors,
 //                     and have this code focus on building the query plan tree.
 
+//             Watchlist node
+       if(qs->query_type == WATCHLIST_QUERY){
+               watch_tbl_qpn *watchnode = new watch_tbl_qpn(qs, Schema);
+
+//                     Done
+               plan_root = watchnode;
+               local_plan.push_back(watchnode);
+       }
+
+
 //             MERGE node
        if(qs->query_type == MERGE_QUERY){
                mrg_qpn *merge_node = new mrg_qpn(qs,Schema);
@@ -159,7 +229,9 @@ printf("\n");
 */
 
 
-       } else{
+       } 
+       
+       if(qs->query_type == SELECT_QUERY){
 
 //             Select / Aggregation / Join
          if(qs->gb_tbl->size() == 0 && qs->aggr_tbl->size() == 0){
@@ -175,9 +247,15 @@ printf("\n");
                                plan_root = join_node;
                                local_plan.push_back(join_node);
                        }else{
-                               join_eq_hash_qpn *join_node = new join_eq_hash_qpn(qs,Schema);
-                               plan_root = join_node;
-                               local_plan.push_back(join_node);
+                               if(qs->fta_tree->get_from()->get_properties() == WATCHLIST_JOIN_PROPERTY){
+                                       watch_join_qpn *join_node = new watch_join_qpn(qs,Schema);
+                                       plan_root = join_node;
+                                       local_plan.push_back(join_node);
+                               }else{
+                                       join_eq_hash_qpn *join_node = new join_eq_hash_qpn(qs,Schema);
+                                       plan_root = join_node;
+                                       local_plan.push_back(join_node);
+                               }
                        }
                }
          }else{
@@ -2077,6 +2155,20 @@ string sgahcwcb_qpn::to_query_string(){
        return(ret);
 }
 
+string watch_tbl_qpn::to_query_string(){
+       string ret;
+//     ret += "DEFINE {\n";
+//     ret += "\tfilename='"+filename+";\n";
+//     ret += "\trefresh_interval="+to_string(refresh_interval)+";\n}\n";
+       ret += "WATCHLIST FIELDS {\n";
+       std::vector<field_entry *> fields = table_layout->get_fields();
+       for(int f=0;f<fields.size();++f){
+               ret += fields[f]->to_string()+"\n";
+       }
+       ret += "}\n";
+
+       return ret;
+}
 
 string mrg_qpn::to_query_string(){
 
@@ -2181,6 +2273,42 @@ string filter_join_qpn::to_query_string(){
        return(ret);
 }
 
+string watch_join_qpn::to_query_string(){
+
+       string ret = "Select ";
+       int s;
+       for(s=0;s<select_list.size();s++){
+               if(s>0) ret+=", ";
+               ret += se_to_query_string(select_list[s]->se, NULL);
+               if(select_list[s]->name != "") ret += " AS "+select_list[s]->name;
+       }
+       ret += "\n";
+
+//                     NOTE: assuming binary join.
+       ret += "WATCHLIST_JOIN ";
+
+       ret += "From ";
+       int f;
+       for(f=0;f<from.size();++f){
+               if(f>0) ret+=", ";
+               ret += from[f]->to_string();
+       }
+       ret += "\n";
+
+       if(where.size() > 0){
+               ret += "Where ";
+               int w;
+               for(w=0;w<where.size();w++){
+                       if(w>0) ret += " AND ";
+                       ret += "(" + pred_to_query_str(where[w]->pr,NULL) + ")";
+               }
+               ret += "\n";
+       }
+
+       return(ret);
+}
+
+
 
 // -----------------------------------------------------------------
 //             Query node subclass specific processing.
@@ -2390,6 +2518,18 @@ void mrg_qpn::resolve_slack(scalarexp_t *t_se, string fname, vector<pair<string,
 //------------------------------------------------------------------
 //             split a node to extract LFTA components.
 
+vector<qp_node *> watch_tbl_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
+       // nothing to do, nothing to split, return copy of self.
+
+       hfta_returned = 0;
+
+       vector<qp_node *> ret_vec;
+
+       ret_vec.push_back(this);
+       return(ret_vec);
+
+}
+
 
 vector<qp_node *> mrg_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
        // nothing to do, nothing to split, return copy of self.
@@ -2429,7 +2569,7 @@ vector<qp_node *> filter_join_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, ta
        vector<string> sel_names;
        vector<pair<string,string> > ifaces = get_ifaces(from[0], ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
        if (ifaces.empty()) {
-               fprintf(stderr,"INTERNAL ERROR in filter_join_qpn::split_node_for_fta - empty interface set\n");
+               fprintf(stderr,"INTERNAL ERROR in filter_join_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
                exit(1);
        }
 
@@ -2481,35 +2621,35 @@ vector<qp_node *> filter_join_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, ta
                        }
 
                        for(p=0;p<shared_pred.size();p++){
-                               predicate_t *new_pr = dup_pr(where[p]->pr, NULL);
+                               predicate_t *new_pr = dup_pr(shared_pred[p]->pr, NULL);
                                cnf_elem *new_cnf = new cnf_elem(new_pr);
                                analyze_cnf(new_cnf);
                                fta_node->shared_pred.push_back(new_cnf);
                                fta_node->where.push_back(new_cnf);
                        }
                        for(p=0;p<pred_t0.size();p++){
-                               predicate_t *new_pr = dup_pr(where[p]->pr, NULL);
+                               predicate_t *new_pr = dup_pr(pred_t0[p]->pr, NULL);
                                cnf_elem *new_cnf = new cnf_elem(new_pr);
                                analyze_cnf(new_cnf);
                                fta_node->pred_t0.push_back(new_cnf);
                                fta_node->where.push_back(new_cnf);
                        }
                        for(p=0;p<pred_t1.size();p++){
-                               predicate_t *new_pr = dup_pr(where[p]->pr, NULL);
+                               predicate_t *new_pr = dup_pr(pred_t1[p]->pr, NULL);
                                cnf_elem *new_cnf = new cnf_elem(new_pr);
                                analyze_cnf(new_cnf);
                                fta_node->pred_t1.push_back(new_cnf);
                                fta_node->where.push_back(new_cnf);
                        }
                        for(p=0;p<hash_eq.size();p++){
-                               predicate_t *new_pr = dup_pr(where[p]->pr, NULL);
+                               predicate_t *new_pr = dup_pr(hash_eq[p]->pr, NULL);
                                cnf_elem *new_cnf = new cnf_elem(new_pr);
                                analyze_cnf(new_cnf);
                                fta_node->hash_eq.push_back(new_cnf);
                                fta_node->where.push_back(new_cnf);
                        }
                        for(p=0;p<postfilter.size();p++){
-                               predicate_t *new_pr = dup_pr(where[p]->pr, NULL);
+                               predicate_t *new_pr = dup_pr(postfilter[p]->pr, NULL);
                                cnf_elem *new_cnf = new cnf_elem(new_pr);
                                analyze_cnf(new_cnf);
                                fta_node->postfilter.push_back(new_cnf);
@@ -2543,6 +2683,155 @@ vector<qp_node *> filter_join_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, ta
 
 }
 
+
+
+
+
+vector<qp_node *> watch_join_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
+       vector<qp_node *> ret_vec;
+
+//             First check if the query can be pushed to the FTA.
+       bool fta_ok = true;
+       int s;
+       for(s=0;s<select_list.size();s++){
+               fta_ok &= check_fta_forbidden_se(select_list[s]->se,NULL, Ext_fcns);
+       }
+       int p;
+       for(p=0;p<where.size();p++){
+               fta_ok &= check_fta_forbidden_pr(where[p]->pr,NULL, Ext_fcns);
+       }
+
+       if(!fta_ok){
+               fprintf(stderr,"ERROR, watchlist join %s is fta-unsafe.\n",node_name.c_str());
+               exit(1);
+       }
+
+//             Can it be done in a single lfta?
+//                     Get the set of interfaces it accesses.
+       int ierr;
+       int si;
+       vector<string> sel_names;
+       vector<pair<string,string> > ifaces = get_ifaces(from[0], ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
+       if (ifaces.empty()) {
+               fprintf(stderr,"INTERNAL ERROR in filter_join_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
+               exit(1);
+       }
+
+       if(ifaces.size() == 1){
+//                             Single interface, no need to merge.
+               hfta_returned = 0;
+               ret_vec.push_back(this);
+
+//     Treat the range vars a bit differently, the 2nd is reading from a _local_ watchlist.
+               from[0]->set_machine(ifaces[0].first);
+               from[0]->set_interface(ifaces[0].second);
+               from[0]->set_ifq(false);
+
+               from[1]->set_machine(ifaces[0].first);
+               from[1]->set_interface("_local_");
+               from[1]->set_ifq(false);
+
+               return(ret_vec);
+       }else{
+//                             Multiple interfaces, generate the interface-specific queries plus
+//                             the merge.
+               hfta_returned = 1;
+
+               vector<string> sel_names;
+               for(si=0;si<ifaces.size();++si){
+                       watch_join_qpn *fta_node = new watch_join_qpn();
+
+//                     Name the fta
+                       if(ifaces.size()==1)
+                               fta_node->set_node_name( node_name );
+                       else{
+                               string new_name =  "_"+node_name+"_"+ifaces[si].first+"_"+ifaces[si].second;
+                               untaboo(new_name);
+                               fta_node->set_node_name(new_name);
+                       }
+                       sel_names.push_back(fta_node->get_node_name());
+
+//                     Assign the table
+                       int f;
+                       for(f=0;f<from.size();f++){
+                               fta_node->from.push_back(from[f]->duplicate());
+                               fta_node->from[f]->set_machine(ifaces[si].first);
+                               if(f==0)
+                                       fta_node->from[f]->set_interface(ifaces[si].second);
+                               else
+                                       fta_node->from[f]->set_interface("_local_");
+                               fta_node->from[f]->set_ifq(false);
+                       }
+
+                       for(s=0;s<select_list.size();s++){
+                               fta_node->select_list.push_back( dup_select(select_list[s], NULL) );
+                       }
+
+                       for(p=0;p<pred_t0.size();p++){
+                               predicate_t *new_pr = dup_pr(pred_t0[p]->pr, NULL);
+                               cnf_elem *new_cnf = new cnf_elem(new_pr);
+                               analyze_cnf(new_cnf);
+                               fta_node->pred_t0.push_back(new_cnf);
+                               fta_node->where.push_back(new_cnf);
+                       }
+                       for(p=0;p<pred_t1.size();p++){
+                               predicate_t *new_pr = dup_pr(pred_t1[p]->pr, NULL);
+                               cnf_elem *new_cnf = new cnf_elem(new_pr);
+                               analyze_cnf(new_cnf);
+                               fta_node->pred_t1.push_back(new_cnf);
+                               fta_node->where.push_back(new_cnf);
+                       }
+                       for(p=0;p<key_flds.size();p++){ // we've checked that all keys are covered
+                               string k = key_flds[p];
+                               predicate_t *new_pr = dup_pr(hash_eq[k]->pr, NULL);
+                               cnf_elem *new_cnf = new cnf_elem(new_pr);
+                               analyze_cnf(new_cnf);
+                               fta_node->hash_eq[k] = new_cnf;
+                               fta_node->where.push_back(new_cnf);
+                       }
+                       for(p=0;p<join_filter.size();p++){
+                               predicate_t *new_pr = dup_pr(join_filter[p]->pr, NULL);
+                               cnf_elem *new_cnf = new cnf_elem(new_pr);
+                               analyze_cnf(new_cnf);
+                               fta_node->postfilter.push_back(new_cnf);
+                               fta_node->where.push_back(new_cnf);
+                       }
+                       for(p=0;p<postfilter.size();p++){
+                               predicate_t *new_pr = dup_pr(postfilter[p]->pr, NULL);
+                               cnf_elem *new_cnf = new cnf_elem(new_pr);
+                               analyze_cnf(new_cnf);
+                               fta_node->postfilter.push_back(new_cnf);
+                               fta_node->where.push_back(new_cnf);
+                       }
+                       fta_node->key_flds = key_flds;
+
+//                     Xfer all of the parameters.
+//                     Use existing handle annotations.
+                       vector<string> param_names = param_tbl->get_param_names();
+                       int pi;
+                       for(pi=0;pi<param_names.size();pi++){
+                               data_type *dt = param_tbl->get_data_type(param_names[pi]);
+                               fta_node->param_tbl->add_param(param_names[pi],dt->duplicate(),
+                                                                       param_tbl->handle_access(param_names[pi]));
+                       }
+                       fta_node->definitions = definitions;
+                       if(fta_node->resolve_if_params(ifdb, this->err_str)){
+                               this->error_code = 3;
+                               return ret_vec;
+                       }
+
+                       ret_vec.push_back(fta_node);
+               }
+
+               mrg_qpn *mrg_node = new mrg_qpn((watch_join_qpn *)ret_vec[0],
+                        node_name,  sel_names,ifaces, ifdb);
+               ret_vec.push_back(mrg_node);
+
+               return(ret_vec);
+       }
+
+}
+
 //             Use to search for unresolved interface param refs in an hfta.
 
 int spx_qpn::count_ifp_refs(set<string> &ifpnames){
@@ -2606,6 +2895,10 @@ int rsgah_qpn::count_ifp_refs(set<string> &ifpnames){
        return ret;
 }
 
+int watch_tbl_qpn::count_ifp_refs(set<string> &ifpnames){
+       return 0;
+}
+
 int mrg_qpn::count_ifp_refs(set<string> &ifpnames){
        return 0;
 }
@@ -2638,6 +2931,17 @@ int filter_join_qpn::count_ifp_refs(set<string> &ifpnames){
        return ret;
 }
 
+int watch_join_qpn::count_ifp_refs(set<string> &ifpnames){
+       int ret = 0;
+       int i;
+       for(i=0;i<select_list.size();++i)
+               ret += count_se_ifp_refs(select_list[i]->se,ifpnames);
+       for(i=0;i<where.size();++i)
+               ret += count_pr_ifp_refs(where[i]->pr,ifpnames);
+       return ret;
+}
+
+
 
 //             Resolve interface params to string literals
 int filter_join_qpn::resolve_if_params( ifq_t *ifdb, string &err){
@@ -2654,6 +2958,20 @@ int filter_join_qpn::resolve_if_params( ifq_t *ifdb, string &err){
        return ret;
 }
 
+int watch_join_qpn::resolve_if_params( ifq_t *ifdb, string &err){
+       int ret = 0;
+       int i;
+       string ifname = from[0]->get_interface();
+       string ifmach = from[0]->get_machine();
+       for(i=0;i<select_list.size();++i)
+               if( resolve_se_ifp_refs(select_list[i]->se,ifmach, ifname, ifdb, err) )
+                       ret = 1;
+       for(i=0;i<where.size();++i)
+               if( resolve_pr_ifp_refs(where[i]->pr,ifmach, ifname, ifdb, err))
+                       ret = 1;
+       return ret;
+}
+
 
 int spx_qpn::resolve_if_params( ifq_t *ifdb, string &err){
        int ret = 0;
@@ -2784,7 +3102,7 @@ vector<qp_node *> spx_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_list
        vector<string> sel_names;
        vector<pair<string,string> > ifaces = get_ifaces(table_name, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
        if (ifaces.empty()) {
-               fprintf(stderr,"INTERNAL ERROR in spx_qpn::split_node_for_fta - empty interface set\n");
+               fprintf(stderr,"INTERNAL ERROR in spx_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
                exit(1);
        }
 
@@ -3138,7 +3456,7 @@ vector<qp_node *> rsgah_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_li
        vector<string> sel_names;
        vector<pair<string,string> > ifaces = get_ifaces(table_name, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
        if (ifaces.empty()) {
-               fprintf(stderr,"INTERNAL ERROR in rsgah_qpn::split_node_for_fta - empty interface set\n");
+               fprintf(stderr,"INTERNAL ERROR in rsgah_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
                exit(1);
        }
 
@@ -3805,7 +4123,7 @@ vector<qp_node *> sgah_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_lis
        vector<string> sel_names;
        vector<pair<string,string> > ifaces = get_ifaces(table_name, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
        if (ifaces.empty()) {
-               fprintf(stderr,"INTERNAL ERROR in sgah_qpn::split_node_for_fta - empty interface set\n");
+               fprintf(stderr,"INTERNAL ERROR in sgah_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
                exit(1);
        }
 
@@ -4596,6 +4914,7 @@ vector<qp_node *> join_eq_hash_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, t
                        child_qpn->table_name = new tablevar_t(
                           from[f]->get_interface().c_str(), from[f]->get_schema_name().c_str(), from[f]->get_ifq());
                        child_qpn->table_name->set_range_var(from[f]->get_var_name());
+                       child_qpn->table_name->set_machine(from[f]->get_machine());
 
                        child_vec.push_back(child_qpn);
                        select_vec.push_back(&(child_qpn->select_list));
@@ -4735,7 +5054,7 @@ vector<qp_node *> join_eq_hash_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, t
                        spx_qpn *c_node = child_vec[f];
                        vector<pair<string, string> > ifaces = get_ifaces(c_node->table_name, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
                        if (ifaces.empty()) {
-                               fprintf(stderr,"INTERNAL ERROR in join_eq_hash_qpn::split_node_for_fta - empty interface set\n");
+                               fprintf(stderr,"INTERNAL ERROR in join_eq_hash_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
                                exit(1);
                        }
 
@@ -5002,6 +5321,21 @@ vector<table_exp_t *> filter_join_qpn::extract_opview(table_list *Schema,  vecto
     return(ret);
 }
 
+vector<table_exp_t *> watch_join_qpn::extract_opview(table_list *Schema,  vector<query_node *> &qnodes, opview_set &opviews, string rootnm, string silo_name){
+       vector<table_exp_t *> ret;
+       int retval = process_opview(from[0],0,node_name,
+                                                               Schema,qnodes,opviews,ret, rootnm, silo_name);
+       if(retval) exit(1);
+    return(ret);
+}
+
+
+
+vector<table_exp_t *> watch_tbl_qpn::extract_opview(table_list *Schema,  vector<query_node *> &qnodes, opview_set &opviews, string rootnm, string silo_name){
+       vector<table_exp_t *> ret;
+       return ret;             // nothing to process
+}
+
 
 
 //////////////////////////////////////////////////////////////////
@@ -5017,6 +5351,10 @@ table_def *mrg_qpn::get_fields(){
        return(table_layout);
 }
 
+table_def *watch_tbl_qpn::get_fields(){
+       return(table_layout);
+}
+
 
 table_def *spx_qpn::get_fields(){
        return(create_attributes(node_name, select_list));
@@ -5038,6 +5376,9 @@ table_def *filter_join_qpn::get_fields(){
        return(create_attributes(node_name, select_list));
 }
 
+table_def *watch_join_qpn::get_fields(){
+       return(create_attributes(node_name, select_list));
+}
 
 table_def *join_eq_hash_qpn::get_fields(){
        int i, h, s, t;
@@ -5132,14 +5473,65 @@ table_def *join_eq_hash_qpn::get_fields(){
                        }
                }
        }
-  }
+  }
+
+
+       return create_attributes(node_name, select_list);
+}
+
+
+//-----------------------------------------------------------------
+//                     get output "keys"
+//                     This is a guess about the set of fields which are a key
+//                     Use as metadata output, e.g. in qtree.xml
+
+
+
+//             refs to GB attribtues are keys, if a SE is not a GB colref
+//             but refers to a GB colref (outside of an aggregation)
+//             then set partial_keys to true
+vector<string> sgah_qpn::get_tbl_keys(vector<string> &partial_keys){
+       vector<string> keys;
+
+       set<int> gref_set;
+       for(int i=0; i<gb_tbl.size();++i)
+               gref_set.insert(i);
+
+       for(int s=0;s<select_list.size();++s){
+               if(select_list[s]->se->is_gb()){
+                       keys.push_back(select_list[s]->name);
+               }else{
+                       if(contains_gb_se(select_list[s]->se, gref_set)){
+                               partial_keys.push_back(select_list[s]->name);
+                       }
+               }
+       }
+       return keys;
+}
 
+vector<string> rsgah_qpn::get_tbl_keys(vector<string> &partial_keys){
+       vector<string> keys;
 
-       return create_attributes(node_name, select_list);
+       set<int> gref_set;
+       for(int i=0; i<gb_tbl.size();++i)
+               gref_set.insert(i);
+
+       for(int s=0;s<select_list.size();++s){
+               if(select_list[s]->se->is_gb()){
+                       keys.push_back(select_list[s]->name);
+               }else{
+                       if(contains_gb_se(select_list[s]->se, gref_set)){
+                               partial_keys.push_back(select_list[s]->name);
+                       }
+               }
+       }
+       return keys;
 }
 
 
 
+
+
 //-----------------------------------------------------------------
 //                     get output tables
 
@@ -5151,6 +5543,11 @@ table_def *join_eq_hash_qpn::get_fields(){
                return(fm);
        }
 
+       vector<tablevar_t *> watch_tbl_qpn::get_input_tbls(){
+               vector<tablevar_t *> ret;
+               return(ret);
+       }
+
        vector<tablevar_t *> mrg_qpn::get_input_tbls(){
                return(fm);
        }
@@ -5183,6 +5580,10 @@ table_def *join_eq_hash_qpn::get_fields(){
                return(from);
        }
 
+       vector<tablevar_t *> watch_join_qpn::get_input_tbls(){
+               return(from);
+       }
+
 //-----------------------------------------------------------------
 //                     get output tables
 
@@ -5194,6 +5595,11 @@ table_def *join_eq_hash_qpn::get_fields(){
                return(retval);
        }
 
+       vector<tablevar_t *> watch_tbl_qpn::get_output_tbls(){
+               vector<tablevar_t *> retval(1,new tablevar_t(node_name.c_str()));
+               return(retval);
+       }
+
        vector<tablevar_t *> mrg_qpn::get_output_tbls(){
                vector<tablevar_t *> retval(1,new tablevar_t(node_name.c_str()));
                return(retval);
@@ -5230,6 +5636,12 @@ table_def *join_eq_hash_qpn::get_fields(){
        }
 
 
+       vector<tablevar_t *> watch_join_qpn::get_output_tbls(){
+               vector<tablevar_t *> retval(1,new tablevar_t(node_name.c_str()));
+               return(retval);
+       }
+
+
 
 //-----------------------------------------------------------------
 //                     Bind to schema
@@ -5327,6 +5739,30 @@ col_id_set filter_join_qpn::get_colrefs(bool ext_fcns_only,table_list *Schema){
        return tmp_cset;
 }
 
+col_id_set watch_join_qpn::get_colrefs(bool ext_fcns_only,table_list *Schema){
+       col_id_set retval, tmp_cset;
+       int p;
+       for(p=0;p<where.size();++p){
+               gather_pr_col_ids(where[p]->pr, tmp_cset, NULL);
+       }
+       int s;
+       for(s=0;s<select_list.size();++s){
+               gather_se_col_ids(select_list[s]->se, tmp_cset, NULL);
+       }
+       col_id_set::iterator  cisi;
+       if(ext_fcns_only){
+               for(cisi=tmp_cset.begin();cisi!=tmp_cset.end();++cisi){
+                       field_entry *fe = Schema->get_field((*cisi).schema_ref, (*cisi).field);
+                       if(fe->get_unpack_fcns().size()>0)
+                               retval.insert((*cisi));
+               }
+               return retval;
+       }
+
+       return tmp_cset;
+}
+
+
 
 
 //             Associate colrefs in SEs with this schema.
@@ -5386,6 +5822,35 @@ void filter_join_qpn::bind_to_schema(table_list *Schema){
 
 }
 
+void watch_join_qpn::bind_to_schema(table_list *Schema){
+//                     Bind the tablevars in the From clause to the Schema
+//                     (it might have changed from analysis time)
+       int f;
+       for(f=0;f<from.size();++f){
+               string snm = from[f]->get_schema_name();
+               int tbl_ref = Schema->get_table_ref(snm);
+               if(tbl_ref >= 0)
+               from[f]->set_schema_ref(tbl_ref);
+       }
+
+//                     Bind all SEs to this schema
+       tablevar_list_t fm(from);
+
+       int p;
+       for(p=0;p<where.size();++p){
+               bind_to_schema_pr(where[p]->pr, &fm, Schema);
+       }
+       int s;
+       for(s=0;s<select_list.size();++s){
+               bind_to_schema_se(select_list[s]->se, &fm, Schema);
+       }
+
+//             Collect set of tuples referenced in this HFTA
+//             input, internal, or output.
+
+}
+
+
 
 
 
@@ -5573,6 +6038,10 @@ void sgahcwcb_qpn::bind_to_schema(table_list *Schema){
 //-----------------------------------------------------------------
 //             get_cplx_lit_tbl
 
+cplx_lit_table *watch_tbl_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){
+       return(new cplx_lit_table());
+}
+
 cplx_lit_table *mrg_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){
        return(new cplx_lit_table());
 }
@@ -5720,12 +6189,32 @@ cplx_lit_table *filter_join_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){
        return(complex_literals);
 }
 
+cplx_lit_table *watch_join_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){
+       int i;
+       cplx_lit_table *complex_literals = new cplx_lit_table();
+
+       for(i=0;i<select_list.size();i++){
+               find_complex_literal_se(select_list[i]->se, Ext_fcns, complex_literals);
+       }
+       for(i=0;i<where.size();++i){
+               find_complex_literal_pr(where[i]->pr,Ext_fcns, complex_literals);
+       }
+
+       return(complex_literals);
+}
+
+
 
 
 
 //-----------------------------------------------------------------
 //             get_handle_param_tbl
 
+vector<handle_param_tbl_entry *> watch_tbl_qpn::get_handle_param_tbl(ext_fcn_list *Ext_fcns){
+    vector<handle_param_tbl_entry *> retval;
+       return(retval);
+}
+
 vector<handle_param_tbl_entry *> mrg_qpn::get_handle_param_tbl(ext_fcn_list *Ext_fcns){
     vector<handle_param_tbl_entry *> retval;
        return(retval);
@@ -5877,6 +6366,22 @@ vector<handle_param_tbl_entry *> filter_join_qpn::get_handle_param_tbl(ext_fcn_l
        return(retval);
 }
 
+vector<handle_param_tbl_entry *> watch_join_qpn::get_handle_param_tbl(ext_fcn_list *Ext_fcns){
+       int i;
+    vector<handle_param_tbl_entry *> retval;
+
+       for(i=0;i<select_list.size();i++){
+               find_param_handles_se(select_list[i]->se, Ext_fcns, retval);
+       }
+       for(i=0;i<where.size();++i){
+               find_param_handles_pr(where[i]->pr,Ext_fcns, retval);
+       }
+
+       return(retval);
+}
+
+
+
 ///////////////////////////////////////////////////////////////
 ///////////////////////////////////////////////////////////////
 ///            Functions for operator output rates estimations
@@ -5909,6 +6414,12 @@ double sgahcwcb_qpn::get_rate_estimate() {
        return SGAHCWCB_SELECTIVITY * DEFAULT_INTERFACE_RATE;
 }
 
+double watch_tbl_qpn::get_rate_estimate() {
+
+       // dummy method for now
+       return DEFAULT_INTERFACE_RATE;
+}
+
 double mrg_qpn::get_rate_estimate() {
 
        // dummy method for now
@@ -6522,6 +7033,29 @@ static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *
        return(ret);
 }
 
+static string generate_lt_test(string &lhs_op, string &rhs_op, data_type *dt){
+       string ret;
+
+    if(dt->complex_comparison(dt) ){
+               ret.append(dt->get_hfta_comparison_fcn(dt));
+               ret.append("(");
+                       if(dt->is_buffer_type() )
+                               ret.append("&");
+               ret.append(lhs_op);
+               ret.append(", ");
+                       if(dt->is_buffer_type() )
+                               ret.append("&");
+               ret.append(rhs_op );
+               ret.append(") == 1");
+       }else{
+               ret.append(lhs_op );
+               ret.append(" < ");
+               ret.append(rhs_op );
+       }
+
+       return(ret);
+}
+
 static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
        string ret;
 
@@ -9825,6 +10359,23 @@ string mrg_qpn::generate_operator(int i, string params){
        );
 }
 
+////////////////////////////////////////////////
+///            WATCHLIST_TBL operator
+///            WATCHLIST_TBL functor
+////////////////////////////////////////////
+
+string watch_tbl_qpn::generate_functor_name(){
+       return("watch_tbl_functor_" + normalize_name(this->get_node_name()));
+}
+
+string watch_tbl_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, vector<bool> &needs_xform){
+
+       return("ERROR_WATCH_TBL_FUNCTOR_NOT_YET_IMPLEMENTED");
+}
+
+string watch_tbl_qpn::generate_operator(int i, string params){
+       return("ERROR_WATCH_TBL_FUNCTOR_NOT_YET_IMPLEMENTED");
+}
 
 /////////////////////////////////////////////////////////
 //////                 JOIN_EQ_HASH functor
@@ -10583,84 +11134,67 @@ string join_eq_hash_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_
 
 
 //             create a temp status tuple
-       ret += "int create_temp_status_tuple(const host_tuple &tup0, const host_tuple &tup1, host_tuple& result) {\n\n";
+       ret += "int create_temp_status_tuple("+this->generate_functor_name()+"_tempeqdef *lts,"+this->generate_functor_name()+"_tempeqdef *rts, host_tuple& result) {\n\n";
 
        ret += "\tgs_retval_t retval = 0;\n";
        ret += "\tgs_int32_t problem = 0;\n";
 
-       ret += "\tif(tup0.data){\n";
-
-//             Unpack all the temporal attributes references in select list
-       col_id_set found_cids;
-
-       for(s=0;s<select_list.size();s++){
-               if (select_list[s]->se->get_data_type()->is_temporal()) {
-//                     Find the set of attributes accessed in this SE
-                       col_id_set new_cids;
-                       get_new_se_cids(select_list[s]->se,found_cids, new_cids, NULL);
-               }
+       for(p=0;p<temporal_dt.size();p++){
+               sprintf(tmpstr,"lhs_var");
+               ret+="\t"+temporal_dt[p]->make_host_cvar(tmpstr)+";\n";         
+               sprintf(tmpstr,"rhs_var");
+               ret+="\t"+temporal_dt[p]->make_host_cvar(tmpstr)+";\n";         
        }
 
-       //                      Deal with outer join stuff
-       l_cids.clear(), r_cids.clear();
-       for(ocsi=found_cids.begin();ocsi!=found_cids.end();++ocsi){
-               if((*ocsi).tblvar_ref == 0) l_cids.insert((*ocsi));
-               else                                            r_cids.insert((*ocsi));
+       ret += "\tif(lts!=NULL){\n";
+       for(p=0;p<temporal_dt.size();p++){
+               ret += "\t\tlhs_var = lts->tempeq_var"+to_string(p)+";\n";
        }
-       unpack_null = "";
-       extra_cids.clear();
-       for(ocsi=r_cids.begin();ocsi!=r_cids.end();++ocsi){
-               string field = (*ocsi).field;
-               if(r_equiv.count(field)){
-                       unpack_null+="\t\tunpack_var_"+field+"_1="+generate_se_code(r_equiv[field],schema)+";\n";
-                       col_id_set addnl_cids;
-                       get_new_se_cids(r_equiv[field],l_cids,addnl_cids,NULL);
-               }else{
-               int schref = (*ocsi).schema_ref;
-                       data_type dt(schema->get_type_name(schref,field));
-                       literal_t empty_lit(dt.type_indicator());
-                       if(empty_lit.is_cpx_lit()){
-                               sprintf(tmpstr,"&(unpack_var_%s_1)",field.c_str());
-                               unpack_null += "\t"+empty_lit.to_hfta_C_code(tmpstr)+";\n";
-                       }else{
-                               unpack_null+="\tunpack_var_"+field+"_1="+empty_lit.to_hfta_C_code("")+";\n";
-                       }
-               }
+       ret += "\t}else{\n";
+       for(p=0;p<temporal_dt.size();p++){
+               ret += "\t\tlhs_var = 0;\n";
        }
-       ret += gen_unpack_cids(schema,  l_cids, "1", needs_xform);
-       ret += gen_unpack_cids(schema,  extra_cids, "1", needs_xform);
-       ret += unpack_null;
+       ret += "\t}\n";
 
-       ret+="\t}else if (tup1.data) {\n";
-       unpack_null = ""; extra_cids.clear();
-       for(ocsi=l_cids.begin();ocsi!=l_cids.end();++ocsi){
-               string field = (*ocsi).field;
-               if(l_equiv.count(field)){
-                       unpack_null+="\t\tunpack_var_"+field+"_0="+generate_se_code(l_equiv[field],schema)+";\n";
-                       col_id_set addnl_cids;
-                       get_new_se_cids(l_equiv[field],r_cids,addnl_cids,NULL);
-               }else{
-               int schref = (*ocsi).schema_ref;
-                       data_type dt(schema->get_type_name(schref,field));
-                       literal_t empty_lit(dt.type_indicator());
-                       if(empty_lit.is_cpx_lit()){
-                               sprintf(tmpstr,"&(unpack_var_%s_0)",field.c_str());
-                               unpack_null += "\t"+empty_lit.to_hfta_C_code(tmpstr)+";\n";
-                       }else{
-                               unpack_null+="\tunpack_var_"+field+"_0="+empty_lit.to_hfta_C_code("")+";\n";
-                       }
-               }
+       ret += "\tif(rts!=NULL){\n";
+       for(p=0;p<temporal_dt.size();p++){
+               ret += "\t\trhs_var = rts->tempeq_var"+to_string(p)+";\n";
        }
-       ret += gen_unpack_cids(schema,  r_cids, "1", needs_xform);
-       ret += gen_unpack_cids(schema,  extra_cids, "1", needs_xform);
-       ret += unpack_null;
-       ret+="\t}\n";
+       ret += "\t}else{\n";
+       for(p=0;p<temporal_dt.size();p++){
+               ret += "\t\trhs_var = 0;\n";
+       }
+       ret += "\t}\n";
 
        ret += gen_init_temp_status_tuple(this->get_node_name());
 
 //             Start packing.
-       ret += "//\t\tPack the fields into the tuple.\n";
-       ret += gen_pack_tuple(schema,select_list,this->get_node_name(), true );
+
+
+//             This is checked in the query analyzer so I think its safe,
+//             But a lot of older code has complex code to propagate multiple
+//             timestamps
+    for(s=0;s<select_list.size();s++){
+               scalarexp_t *se  = select_list[s]->se;
+        data_type *sdt = se->get_data_type();
+               if(sdt->is_temporal()){
+                       string target = "\ttuple->tuple_var"+to_string(s)+" = ";
+                       if(from[0]->get_property()==0 && from[1]->get_property()==0){ // INNER
+                               ret += target+"(lhs_var>rhs_var ? lhs_var : rhs_var); // INNER\n";
+                       }
+                       if(from[0]->get_property()!=0 && from[1]->get_property()==0){ // LEFT
+                               ret += target+"lhs_var; // LEFT\n";
+//                             ret += target+"rhs_var; // LEFT\n";
+                       }
+                       if(from[0]->get_property()==0 && from[1]->get_property()!=0){ // RIGHT
+                               ret += target+"rhs_var; // RIGHT\n";
+//                             ret += target+"lhs_var; // RIGHT\n";
+                       }
+                       if(from[0]->get_property()!=0 && from[1]->get_property()!=0){ // OUTER
+                               ret += target+"(lhs_var<rhs_var ? lhs_var : rhs_var); // OUTER\n";
+                       }
+               }
+       }
 
 
        ret += "\treturn 0;\n";
@@ -12336,13 +12870,13 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
                for(g=0;g<gb_tbl.size();g++){
                        data_type *gdt = gb_tbl.get_data_type(g);
                        if(gdt->is_temporal()){
-                         sprintf(tmpstr,"last_gb%d",g);
+                         sprintf(tmpstr,"curr_gb%d",g);
                          ret+="\t"+gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n";
-                         sprintf(tmpstr,"last_flushed_gb%d",g);
+                         sprintf(tmpstr,"last_gb%d",g);
                          ret+="\t"+gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n";
                        }
                }
-               ret += "\tbool needs_temporal_flush;\n";
+               ret += "\tgs_int32_t needs_temporal_flush;\n";
        }
 
 //                     The publicly exposed functions
@@ -12380,6 +12914,7 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 
 //             temporal flush variables
 //             ASSUME that structured values won't be temporal.
+       gs_int32_t temporal_gb = 0;
        if(uses_temporal_flush){
                ret += "//\t\tInitialize temporal flush variables.\n";
                for(g=0;g<gb_tbl.size();g++){
@@ -12388,9 +12923,12 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
                                literal_t gl(gdt->type_indicator());
                                sprintf(tmpstr,"\tlast_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str());
                                ret.append(tmpstr);
+                               sprintf(tmpstr,"\tcurr_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str());
+                               ret.append(tmpstr);
+                               temporal_gb = g;
                        }
                }
-               ret += "\tneeds_temporal_flush = false;\n";
+               ret += "\tneeds_temporal_flush = 0;\n";
        }
 
        //              Init temporal attributes referenced in select list
@@ -12504,35 +13042,39 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 //                     set flush indicator and update stored GB vars if there is any change.
 
        if(uses_temporal_flush){
-               ret+= "\tif( !( (";
+               ret+= "\tif( ( (";
                bool first_one = true;
                for(g=0;g<gb_tbl.size();g++){
                        data_type *gdt = gb_tbl.get_data_type(g);
 
                        if(gdt->is_temporal()){
-                         sprintf(tmpstr,"last_gb%d",g);   string lhs_op = tmpstr;
+                         sprintf(tmpstr,"curr_gb%d",g);   string lhs_op = tmpstr;
                          sprintf(tmpstr,"gbval->gb_var%d",g);   string rhs_op = tmpstr;
                          if(first_one){first_one = false;} else {ret += ") && (";}
-                         ret += generate_equality_test(lhs_op, rhs_op, gdt);
+                         ret += generate_lt_test(lhs_op, rhs_op, gdt);
                        }
                }
                ret += ") ) ){\n";
                for(g=0;g<gb_tbl.size();g++){
                  data_type *gdt = gb_tbl.get_data_type(g);
                  if(gdt->is_temporal()){
-                         if(gdt->is_buffer_type()){
+                               temporal_gb = g;
+                         if(gdt->is_buffer_type()){    // TODO first, last?  or delete?
                                sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g);
                          }else{
-                               sprintf(tmpstr,"\t\tlast_flushed_gb%d = last_gb%d;\n",g,g);
-                               ret += tmpstr;
-                               sprintf(tmpstr,"\t\tlast_gb%d = gbval->gb_var%d;\n",g,g);
+                               ret += "\t\tif(curr_gb"+to_string(g)+"==0){\n";
+                               ret += "\t\t\tlast_gb"+to_string(g)+" = gbval->gb_var"+to_string(g)+";\n";
+                               ret += "\t\t}else{\n";
+                               ret += "\t\t\tlast_gb"+to_string(g)+" = curr_gb"+to_string(g)+";\n";
+                               ret += "\t\t}\n";
+                               sprintf(tmpstr,"\t\tcurr_gb%d = gbval->gb_var%d;\n",g,g);
                          }
                          ret += tmpstr;
                        }
                }
-               ret += "\t\tneeds_temporal_flush=true;\n";
+               ret += "\t\tneeds_temporal_flush = curr_gb"+to_string (temporal_gb)+" - last_gb"+to_string(temporal_gb)+";\n"; 
                ret += "\t\t}else{\n"
-                       "\t\t\tneeds_temporal_flush=false;\n"
+                       "\t\t\tneeds_temporal_flush=0;\n"
                        "\t\t}\n";
        }
 
@@ -12703,13 +13245,22 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
 //---------------------------------------------------
 //                     Flush test
 
-       ret += "\tbool flush_needed(){\n";
+       ret += "gs_int32_t flush_needed(){\n";
        if(uses_temporal_flush){
-               ret += "\t\treturn needs_temporal_flush;\n";
+               ret += "\treturn needs_temporal_flush;\n";
        }else{
-               ret += "\t\treturn false;\n";
+               ret += "\treturn 0;\n";
        }
-       ret += "\t};\n";
+       ret += "};\n";
+
+//------------------------------------------------
+//     time bucket management
+       ret += "void advance_last_tb(){\n";
+       ret += "\tlast_gb"+to_string(temporal_gb)+"++;\n";
+       ret += "}\n\n";
+       ret += "void reset_last_tb(){\n";
+       ret += "\tlast_gb"+to_string(temporal_gb)+" = curr_gb"+to_string(temporal_gb)+";\n";
+       ret += "}\n\n";
 
 //---------------------------------------------------
 //                     create output tuple
@@ -12942,7 +13493,7 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a];
                if(sdt->is_temporal()){
                        sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
                        ret += tmpstr;
-                       sprintf(tmpstr,"(flush_finished) ? %s : %s ", generate_se_code(select_list[s]->se,schema).c_str(), generate_se_code_fm_aggr(select_list[s]->se,"last_flushed_gb", "", schema).c_str());
+                       sprintf(tmpstr,"(flush_finished) ? %s : %s ", generate_se_code(select_list[s]->se,schema).c_str(), generate_se_code_fm_aggr(select_list[s]->se,"last_gb", "", schema).c_str());
                        ret += tmpstr;
                        ret += ";\n";
                }
@@ -13470,6 +14021,29 @@ void filter_join_qpn::create_protocol_se(vector<qp_node *> q_sources, table_list
        }
 }
 
+void watch_join_qpn::create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){
+       int i;
+       vector<map<string, scalarexp_t *> *> src_vec;
+
+       for(i=0;i<q_sources.size();i++){
+               if(q_sources[i] != NULL)
+                       src_vec.push_back(q_sources[i]->get_protocol_se());
+               else
+                       src_vec.push_back(NULL);
+       }
+
+       for(i=0;i<select_list.size();i++){
+               protocol_map[select_list[i]->name] = resolve_protocol_se(select_list[i]->se,src_vec,NULL,Schema);
+       }
+
+       for(i=0;i<key_flds.size();i++){
+               string kfld = key_flds[i];
+               hash_src_l.push_back(resolve_protocol_se(hash_eq[kfld]->pr->get_left_se(),src_vec,NULL,Schema));
+               hash_src_r.push_back(resolve_protocol_se(hash_eq[kfld]->pr->get_right_se(),src_vec,NULL,Schema));
+       }
+}
+
+
 void sgah_qpn::create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){
        int i;
        vector<map<string, scalarexp_t *> *> src_vec;
@@ -13558,7 +14132,7 @@ void mrg_qpn::create_protocol_se(vector<qp_node *> q_sources, table_list *Schema
                for(s=1;s<src_vec.size() && match;s++){
                        pse_map = src_vec[s];
                        scalarexp_t *match_se = (*pse_map)[fld_nm];
-                       if(match_se == false)
+                       if(match_se == NULL)
                                match = false;
                        else
                                match = is_equivalent_se_base(first_se, match_se, Schema);
@@ -13569,3 +14143,8 @@ void mrg_qpn::create_protocol_se(vector<qp_node *> q_sources, table_list *Schema
                        protocol_map[fld_nm] = NULL;
        }
 }
+
+void watch_tbl_qpn::create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){
+       return;
+}
+