Added watchlist support
[com/gs-lite.git] / src / ftacmp / generate_lfta_code.cc
index aea03a8..55227b1 100644 (file)
@@ -169,6 +169,36 @@ string generate_fj_struct_name(string node_name){
        return(ret);
 }
 
+string generate_watchlist_element_name(string node_name){
+       string ret = normalize_name(node_name);
+       if(ret == ""){
+               ret = "default";
+       }
+       ret += "__wl_elem";
+
+       return(ret);
+}
+
+string generate_watchlist_struct_name(string node_name){
+       string ret = normalize_name(node_name);
+       if(ret == ""){
+               ret = "default";
+       }
+       ret += "__wl_struct";
+
+       return(ret);
+}
+
+string generate_watchlist_name(string node_name){
+       string ret = normalize_name(node_name);
+       if(ret == ""){
+               ret = "default";
+       }
+       ret += "__wl";
+
+       return(ret);
+}
+
 string generate_unpack_code(int tblref, int schref, string field, table_list *schema, string node_name, string end_goto = string("end")){
        string ret;
        if(! packed_return){
@@ -246,14 +276,178 @@ string generate_fj_struct(filter_join_qpn *fs, string node_name ){
   return(ret);
 }
 
+string generate_watchlist_structs(string node_name, table_def *tbl,
+               std::string filename, int refresh_interval){
+       string ret;
 
+       ret += "struct "+generate_watchlist_element_name(node_name)+"{\n";
+       vector<field_entry *> fields = tbl->get_fields();
+       for(int f=0;f<fields.size();++f){
+               data_type dt(fields[f]->get_type());
+               ret += "\t"+dt.make_cvar(fields[f]->get_name())+";\n";
+       }
+       ret += "\tgs_uint64_t hashval;\n";
+       ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next;\n";
+       ret += "};\n\n";
 
+       ret += "char *"+generate_watchlist_name(node_name)+"__fstr = \""+filename+"\";\n";
+       ret += "struct "+generate_watchlist_struct_name(node_name)+"{\n";
+       ret += "\tstruct "+ generate_watchlist_element_name(node_name)+" **ht;\n";
+       ret += "\tgs_uint32_t ht_size;\n";
+       ret += "\tgs_uint32_t n_elem;\n";
+       ret += "\tgs_uint32_t refresh_interval;\n";
+       ret += "\ttime_t next_refresh;\n";
+       ret += "\ttime_t last_mtime;\n";        
+       ret += "\tchar *filename;\n";
+       ret += "} "+generate_watchlist_name(node_name)+" = { NULL, 0, 0, "+std::to_string(refresh_interval)+", 0, 0, NULL};\n\n";
+
+       return ret;
+}
 
+string generate_watchlist_load(string node_name, table_def *tbl, vector<string> keys){
+       string ret;
+       string tgt = generate_watchlist_name(node_name);
+       vector<field_entry *> fields = tbl->get_fields();
+
+       ret += "void reload_watchlist__"+node_name+"(){\n";
+       ret += "\tgs_uint32_t i;\n";
+       ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *ptr = NULL;\n";
+       ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next = NULL;\n";
+       ret += "\tFILE *fl;\n";
+       ret += "\tchar buf[10000];\n";
+       ret += "\tgs_uint32_t buflen = 10000;\n";
+       ret += "\tchar *flds["+std::to_string(fields.size())+"];\n";
+       ret += "\tgs_uint32_t pos, f, linelen, malformed;\n";
+       ret += "\tgs_uint32_t n_malformed, short_lines, toolong_lines, ok;\n";
+       ret += "\tgs_uint64_t hash, bucket;\n";
+       ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *rec;\n\n";
+
+       ret += "//  make sure watchlist file has changed since the last time we loaded it\n";
+    ret += "\tstruct stat file_stat;\n";
+    ret += "\tint err = stat(" + tgt + ".filename, &file_stat);\n";
+    ret += "\tif (err) {\n";
+    ret += "\t\tgslog(LOG_INFO,\"Warning, unable to stat() watchlist file %s to reload " + node_name + ", continue using old version\\n\", " + tgt + ".filename);\n";
+    ret += "\t\treturn;\n";
+    ret += "\t}\n";
+       ret += "\tif (file_stat.st_mtime <= " + tgt + ".last_mtime && file_stat.st_ctime <= " + tgt + ".last_mtime)     // watchlist file hasn't changed since last time\n";
+       ret += "\t\treturn;\n";
+       ret += "\t" + tgt + ".last_mtime = (file_stat.st_mtime>file_stat.st_ctime)?file_stat.st_mtime:file_stat.st_ctime;\n\n";
+
+       ret += "//  Delete old entries.\n";
+       ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n";
+       ret += "\t\tptr="+tgt+".ht[i];\n";
+       ret += "\t\twhile(ptr!=NULL){\n";
+       for(int f=0;f<fields.size();++f){
+               data_type dt(fields[f]->get_type());
+               if(dt.is_buffer_type()){
+                       ret += "\t\t\t"+dt.get_buffer_destroy()+"(&(ptr->"+fields[f]->get_name()+"));\n";
+               }
+       }
+       ret += "\t\t\tnext = ptr->next;\n";
+       ret += "\t\t\tfree(ptr);\n";
+       ret += "\t\t\tptr = next;\n";
+       ret += "\t\t}\n";
+       ret += "\t}\n";
+       ret += "\n// prepare new table. \n";
+       ret += "\tif("+tgt+".n_elem > "+tgt+".ht_size || "+tgt+".ht_size==0){\n";
+       ret += "\t\tif("+tgt+".ht)\n";
+       ret += "\t\t\tfree("+tgt+".ht);\n";
+       ret += "\t\tif("+tgt+".ht_size == 0)\n";
+       ret += "\t\t\t"+tgt+".ht_size = 100000;\n";
+       ret += "\t\telse\n";    
+       ret += "\t\t\t"+tgt+".ht_size = "+tgt+".n_elem;\n";
+       ret += "\t\t"+tgt+".ht = (struct "+generate_watchlist_element_name(node_name)+" **)malloc("+tgt+".ht_size * sizeof(struct "+generate_watchlist_element_name(node_name)+" *));\n";
+       ret += "\t}\n";
+       ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n";
+       ret += "\t\t"+tgt+".ht[i] = NULL;\n";
+       ret += "\t}\n";
+       ret += "\n// load new table\n";
+       ret += "\t"+tgt+".n_elem = 0;\n";
+       ret += "\tfl = fopen("+tgt+".filename, \"r\");\n";
+       ret += "\tif(fl==NULL){\n";
+       ret += "\t\tgslog(LOG_INFO,\"Warning, can't open file %s for watchlist "+node_name+"\\n\","+tgt+".filename);\n";
+       ret += "\t\treturn;\n";
+       ret += "\t}\n";
+       ret += "\tmalformed = 0;\n";
+       ret += "\tshort_lines = 0;\n";
+       ret += "\ttoolong_lines = 0;\n";
+       ret += "\twhile(fgets(buf, buflen, fl) != NULL){\n";
+       ret += "\t\tlinelen = strlen(buf);\n";
+       ret += "\t\tbuf[linelen-1]='\\0';       // strip off trailing newline\n";
+       ret += "\t\tlinelen--;\n";
+
+       ret += "\t\tpos=0;\n";
+       ret += "\t\tmalformed=0;\n";
+       ret += "\t\tok=1;\n";
+       ret += "\t\tflds[0] = buf;\n";
+       ret += "\t\tfor(f=1;pos < linelen && f<"+std::to_string(fields.size())+";++f){\n";
+       ret += "\t\t\tfor(;pos < linelen && buf[pos]!=',' && buf[pos]!='\\n';++pos);\n";
+       ret += "\t\t\tif(pos >= linelen){\n";
+       ret += "\t\t\t\tmalformed = 1;\n";
+       ret += "\t\t\t\tbreak;\n";
+       ret += "\t\t\t}\n";
+       ret += "\t\t\tbuf[pos]='\\0';\n";
+       ret += "\t\t\tpos++;\n";
+       ret += "\t\t\tflds[f]=buf+pos;\n";
+       ret += "\t\t}\n";
+       ret += "\t\tif(malformed){\n";
+       ret += "\t\t\tok=0;\n";
+       ret += "\t\t\tn_malformed++;\n";
+       ret += "\t\t}\n";
+       ret += "\t\tif(f<"+std::to_string(fields.size())+"){\n";
+       ret += "\t\t\tok=0;\n";
+       ret += "\t\t\tshort_lines++;\n";
+       ret += "\t\t}\n";
+       ret += "\t\tif(pos && (pos<linelen)){\n";
+       ret += "\t\t\tok=0;\n";
+       ret += "\t\t\ttoolong_lines++;\n";
+       ret += "\t\t}\n";
+       ret += "\t\tif(f>="+std::to_string(fields.size())+"){\n";
+       ret += "\t\t\trec = (struct "+generate_watchlist_element_name(node_name)+" *)malloc(sizeof(struct "+generate_watchlist_element_name(node_name)+"));\n";
+//             Extract fields
+       for(int f=0;f<fields.size();++f){
+               data_type dt(fields[f]->get_type());
+               ret += "\t\t\t"+dt.get_wl_extract_fcn()+"(flds["+std::to_string(f)+"], &(rec->"+fields[f]->get_name()+"));\n";
+       }
+//             Compute the hash value
+       ret += "\t\t\thash=0;\n";
+       for(int k=0;k<keys.size();++k){
+               string key_fld = keys[k];
+               int f;
+               for(f=0;f<fields.size();++f){
+                       if(fields[f]->get_name() == key_fld)
+                               break;
+               }
+               data_type dt(fields[f]->get_type());
+
+               ret +=
+"\t\t\thash ^= (("+hash_nums[f%NRANDS]+" * lfta_"+
+                       dt.get_type_str()+"_to_hash(rec->"+fields[f]->get_name()+")));\n";
+               }
+       ret += "\t\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n";
+       ret += "\t\t\trec->hashval = hash;\n";
+       ret += "\t\t\trec->next = "+tgt+".ht[bucket];\n";
+       ret += "\t\t\t"+tgt+".ht[bucket] = rec;\n";
+       ret += "\t\t\t"+tgt+".n_elem++;\n";
+
+       ret += "\t\t}\n";
+       ret += "\t}\n";
+       ret += "\tif(n_malformed+toolong_lines > 0){\n";
+       ret += "\t\tgslog(LOG_INFO,\"Errors reading data for watchlist "+node_name+" from file %s: malformed=%d, too short=%d, too long=%d\\n\","+tgt+".filename, malformed, short_lines, toolong_lines);\n";
+       ret += "\t}\n";
+       ret += "}\n\n";
+
+       return ret;
+}
+
+
+
+               
 string generate_fta_struct(string node_name, gb_table *gb_tbl,
                aggregate_table *aggr_tbl, param_table *param_tbl,
                cplx_lit_table *complex_literals,
                vector<handle_param_tbl_entry *> &param_handle_table,
-               bool is_aggr_query, bool is_fj, bool uses_bloom,
+               bool is_aggr_query, bool is_fj, bool is_wj, bool uses_bloom,
                table_list *schema){
 
        string ret = "struct " + generate_fta_name(node_name) + "{\n";
@@ -337,6 +531,12 @@ string generate_fta_struct(string node_name, gb_table *gb_tbl,
 
        }
 
+// --------------------------------------------
+//             watchlist-join specific
+       if(is_wj){
+               ret += "\ttime_t ux_time;\n";
+       }
+
 //--------------------------------------------------------
 //                     Common fields
 
@@ -1174,6 +1374,7 @@ string generate_preamble(table_list *schema, //map<string,string> &int_fcn_defs,
        ret += "#include<stdio.h>\n";
        ret += "#include <limits.h>\n";
        ret += "#include <float.h>\n";
+       ret += "#include <sys/stat.h>\n";
        ret += "#include \"rdtsc.h\"\n";
        ret += "#endif\n";
 
@@ -1766,7 +1967,7 @@ string generate_fta_control(string node_name, table_list *schema, bool is_aggr_q
        return(ret);
 }
 
-string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query){
+string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query, bool advance_uxtime){
        string ret="static gs_retval_t clock_fta_"+node_name+"(struct FTA *f){\n";
        ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
                ret += generate_fta_name(node_name)+" *) f;\n\n";
@@ -1879,6 +2080,11 @@ string generate_fta_clock(string node_name, table_list *schema, unsigned time_co
                }
        }
 
+
+       if(advance_uxtime){
+               ret += "\tt->ux_time = time(&(t->ux_time));\n";
+       }
+
        // for aggregation lftas we need to check if the time was advanced beyond the current epoch
        if (is_aggr_query) {
 
@@ -2284,6 +2490,7 @@ string ret;
        return ret;
 }
 
+//     TODO Ensure that postfilter predicates are being generated
 string generate_fj_accept_body(filter_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
 
 int p,s,w;
@@ -2587,14 +2794,14 @@ printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=%
 
 //             WARNING! the constant 20 below is a wild-ass guess.
        int cheap_rpos;
-       for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++)
+       for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++);
 
 //             Test the cheap filters on R.
   if(cheap_rpos >0){
 
 //             Now generate the predicates.
        for(w=0;w<cheap_rpos;++w){
-               sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
+               sprintf(tmpstr,"//\t\tcheap R predicate clause %d. (cost %d)\n",w,r_filt[w]->cost);
                ret += tmpstr;
 
 //                     Find partial fcns ref'd in this cnf element
@@ -2815,6 +3022,463 @@ printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=%
          }
 
 
+//             The size of the tuple is the size of the tuple struct plus the
+//             size of the buffers to be copied in.
+
+         ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
+         for(s=0;s<sl_list.size();s++){
+               data_type *sdt = sl_list[s]->get_data_type();
+               if(sdt->is_buffer_type()){
+                       sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
+                       ret += tmpstr;
+               }
+         }
+         ret += ";\n";
+
+
+         ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
+         ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
+
+//                     Test passed, make assignments to the tuple.
+
+         ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
+
+//                     Mark tuple as REGULAR_TUPLE
+         ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
+
+
+         for(s=0;s<sl_list.size();s++){
+               data_type *sdt = sl_list[s]->get_data_type();
+               if(sdt->is_buffer_type()){
+                       sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
+                       ret += tmpstr;
+                       sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
+                       ret += tmpstr;
+               }else{
+                       sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
+                       ret += tmpstr;
+//                     if(sdt->needs_hn_translation())
+//                             ret += sdt->hton_translation() +"( ";
+                       ret += generate_se_code(sl_list[s],schema);
+//                     if(sdt->needs_hn_translation())
+//                             ret += ") ";
+                       ret += ";\n";
+               }
+         }
+
+//             Generate output.
+
+         ret += "\tpost_tuple(tuple);\n";
+
+//             Increment the counter of posted tuples
+  ret += "\n\t#ifdef LFTA_STATS\n";
+  ret += "\n\tt->out_tuple_cnt++;\n\n";
+  ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
+  ret += "\t#endif\n\n";
+
+
+       return ret;
+}
+
+
+string generate_wj_accept_body(watch_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
+
+int p,s,w;
+string ret;
+
+
+       string wl_schema = fs->from[1]->get_schema_name();
+       string wl_elem_str = generate_watchlist_element_name(wl_schema);
+       string wl_node_str = generate_watchlist_struct_name(wl_schema);
+       string tgt = generate_watchlist_name(wl_schema);
+
+       ret += "//\n//\t\tGenerate test to update watchtable here\n//\n\n";
+
+
+
+
+
+//     ------------------------------------------------------------
+//             Determine if the R record should be processed.
+
+
+       ret +=
+"//            R (main stream) cheap predicate\n"
+"\n"
+;
+
+//             Unpack r_filt fields
+       vector<cnf_elem *> r_filt = fs->pred_t0;
+       for(w=0;w<r_filt.size();++w){
+               col_id_set this_pred_cids;
+               gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
+               for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
+                       if(unpacked_cids.count( (*csi) ) == 0){
+                               int tblref = (*csi).tblvar_ref;
+                               int schref = (*csi).schema_ref;
+                               string field = (*csi).field;
+                               ret += generate_unpack_code(tblref,schref,field,schema,node_name);
+                               unpacked_cids.insert( (*csi) );
+                       }
+               }
+       }
+
+// Sort R preds based on cost.
+
+       vector<cnf_elem *> tmp_wh;
+       for(w=0;w<r_filt.size();++w){
+               compute_cnf_cost(r_filt[w],Ext_fcns);
+               tmp_wh.push_back(r_filt[w]);
+       }
+       r_filt = tmp_wh;
+
+       sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
+
+//             WARNING! the constant 20 below is a wild-ass guess.
+       int cheap_rpos;
+       for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++);
+
+//             Test the cheap filters on R.
+  if(cheap_rpos >0){
+
+//             Now generate the predicates.
+       for(w=0;w<cheap_rpos;++w){
+               sprintf(tmpstr,"//\t\tCheap R predicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
+               ret += tmpstr;
+
+//                     Find partial fcns ref'd in this cnf element
+               set<int> pfcn_refs;
+               collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
+//                     Since set<..> is a "Sorted Associative Container",
+//                     we can walk through it in sorted order by walking from
+//                     begin() to end().  (and the partial fcns must be
+//                     evaluated in this order).
+               set<int>::iterator si;
+               for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
+                       if(fcn_ref_cnt[(*si)] > 1){
+                               ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
+                       }
+                       if(is_partial_fcn[(*si)]){
+                               ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
+                               ret += "\t\tif(retval) goto end;\n";
+                       }
+                       if(fcn_ref_cnt[(*si)] > 1){
+                               if(!is_partial_fcn[(*si)]){
+                                       ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
+                               }
+                               ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
+                               ret += "\t}\n";
+                       }
+               }
+
+               ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
+                               ") ) goto end;\n";
+       }
+  }else{
+         ret += "\n\n/*\t\t (no cheap R predicate to test)\t*/\n\n";
+  }
+
+       ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
+       map<string, cnf_elem *> h_eq = ((watch_join_qpn *)fs)-> hash_eq;
+       vector<string> kflds = ((watch_join_qpn *)fs)->key_flds;
+       for(w=0;w<kflds.size();++w){
+               string kfld = kflds[w];
+               col_id_set this_pred_cids;
+               gather_pr_col_ids(h_eq[kfld]->pr, this_pred_cids, gb_tbl);
+               for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
+                       if(unpacked_cids.count( (*csi) ) == 0){
+                               int tblref = (*csi).tblvar_ref;
+                               int schref = (*csi).schema_ref;
+                               string field = (*csi).field;
+                               if(tblref==0) // LHS from packet, don't unpack the RHS
+                                       ret += generate_unpack_code(tblref,schref,field,schema,node_name);
+                               unpacked_cids.insert( (*csi) );
+                       }
+               }
+       }
+
+
+       ret += "\n//    Do the join\n\n";
+       ret += "\n//            (ensure that the watchtable is fresh)\n";
+       ret += "\tif(t->ux_time >= "+tgt+".next_refresh){\n";
+       ret += "\t\treload_watchlist__"+wl_schema+"();\n";
+       ret += "\t\t"+tgt+".next_refresh = t->ux_time+"+tgt+".refresh_interval;\n";
+       ret += "\t}\n\n";
+
+
+       for(p=0;p<fs->key_flds.size();++p){
+               string kfld = fs->key_flds[p];
+               ret += "\tr_equijoin_"+kfld+" = "+generate_se_code(fs->hash_eq[kfld]->pr->get_left_se(),schema)+";\n";
+       }
+
+
+//                     Passed the cheap pred, now test the join with S.
+       ret += "\tbucket=0;\n";
+       ret += "\thash=0;\n";
+       for(p=0;p<fs->key_flds.size();++p){
+               string kfld = fs->key_flds[p];
+               ret +=
+"              hash ^= (("+hash_nums[p%NRANDS]+" * lfta_"+
+       fs->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_type_str()+
+       +"_to_hash(r_equijoin_"+kfld+")));\n";
+       }
+       ret += "\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n";
+
+       ret += "\t\trec = "+tgt+".ht[bucket];\n";
+       ret += "\t\twhile(rec!=NULL){\n";
+       ret += "\t\t\tif(hash==rec->hashval){\n";
+       ret += "\t\t\t\tif(";
+       for(p=0;p<fs->key_flds.size();++p){
+               string kfld = fs->key_flds[p];
+               if(p>0) ret += " && ";
+               data_type *hdt = fs->hash_eq[kfld]->pr->get_right_se()->get_data_type();
+               string lhs_op = "r_equijoin_"+kfld;
+               string rhs_op = "rec->"+kfld;
+               ret += generate_equality_test(lhs_op,rhs_op,hdt);
+       }
+       ret += ")\n";
+       ret += "\t\t\t\t\tbreak;\n";
+       ret += "\t\t\t}\n";
+       ret += "\t\t\trec=rec->next;\n";
+       ret += "\t\t}\n";
+       ret += "\t\tif(rec==NULL)\n";
+       ret += "\t\t\tgoto end;\n";
+               
+       ret += "\n/*\tPassed the hash lookup clause, unpack the other predicate fields. */\n";
+       for(w=0;w<where.size();++w){
+               col_id_set this_pred_cids;
+               gather_pr_col_ids(where[w]->pr, this_pred_cids, gb_tbl);
+               for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
+                       if(unpacked_cids.count( (*csi) ) == 0){
+                               int tblref = (*csi).tblvar_ref;
+                               int schref = (*csi).schema_ref;
+                               string field = (*csi).field;
+                               if(tblref==0) // LHS from packet
+                                       ret += generate_unpack_code(tblref,schref,field,schema,node_name);
+                               else    // RHS from hash bucket
+                                       ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n";
+                               unpacked_cids.insert( (*csi) );
+                       }
+               }
+       }
+
+
+//             Test the expensive filters on R.
+//                     TODO Should merge this with other predicates and eval in order
+//                             of cost - see the fj code.
+//                     TODO join and postfilter predicates haven't been costed yet.
+  if(cheap_rpos < r_filt.size()){
+
+//             Now generate the predicates.
+       for(w=cheap_rpos;w<r_filt.size();++w){
+               sprintf(tmpstr,"//\t\tExpensive R predicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
+               ret += tmpstr;
+
+//                     Find partial fcns ref'd in this cnf element
+               set<int> pfcn_refs;
+               collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
+//                     Since set<..> is a "Sorted Associative Container",
+//                     we can walk through it in sorted order by walking from
+//                     begin() to end().  (and the partial fcns must be
+//                     evaluated in this order).
+               set<int>::iterator si;
+               for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
+                       if(fcn_ref_cnt[(*si)] > 1){
+                               ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
+                       }
+                       if(is_partial_fcn[(*si)]){
+                               ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
+                               ret += "\t\tif(retval) goto end;\n";
+                       }
+                       if(fcn_ref_cnt[(*si)] > 1){
+                               if(!is_partial_fcn[(*si)]){
+                                       ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
+                               }
+                               ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
+                               ret += "\t}\n";
+                       }
+               }
+
+               ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
+                               ") ) goto end;\n";
+       }
+  }else{
+         ret += "\n\n/*\t\t (no expensive R predicate to test)\t*/\n\n";
+  }
+
+//             TODO sort the additional predicates by cost
+
+//             S-only
+       for(w=0;w<fs->pred_t1.size();++w){
+               sprintf(tmpstr,"//\t\tS Predicate clause %d.(cost %d)\n",w,fs->pred_t1[w]->cost);
+               ret += tmpstr;
+
+//                     Find partial fcns ref'd in this cnf element
+               set<int> pfcn_refs;
+               collect_partial_fcns_pr(fs->pred_t1[w]->pr, pfcn_refs);
+//                     Since set<..> is a "Sorted Associative Container",
+//                     we can walk through it in sorted order by walking from
+//                     begin() to end().  (and the partial fcns must be
+//                     evaluated in this order).
+               set<int>::iterator si;
+               for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
+                       if(fcn_ref_cnt[(*si)] > 1){
+                               ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
+                       }
+                       if(is_partial_fcn[(*si)]){
+                               ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
+                               ret += "\t\tif(retval) goto end;\n";
+                       }
+                       if(fcn_ref_cnt[(*si)] > 1){
+                               if(!is_partial_fcn[(*si)]){
+                                       ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
+                               }
+                               ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
+                               ret += "\t}\n";
+                       }
+               }
+
+               ret += "\tif( !("+generate_predicate_code(fs->pred_t1[w]->pr,schema)+
+                               ") ) goto end;\n";
+       }
+
+//             non hash-eq join 
+       for(w=0;w<fs->join_filter.size();++w){
+               sprintf(tmpstr,"//\t\tJoin Predicate clause %d.(cost %d)\n",w,fs->join_filter[w]->cost);
+               ret += tmpstr;
+
+//                     Find partial fcns ref'd in this cnf element
+               set<int> pfcn_refs;
+               collect_partial_fcns_pr(fs->join_filter[w]->pr, pfcn_refs);
+//                     Since set<..> is a "Sorted Associative Container",
+//                     we can walk through it in sorted order by walking from
+//                     begin() to end().  (and the partial fcns must be
+//                     evaluated in this order).
+               set<int>::iterator si;
+               for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
+                       if(fcn_ref_cnt[(*si)] > 1){
+                               ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
+                       }
+                       if(is_partial_fcn[(*si)]){
+                               ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
+                               ret += "\t\tif(retval) goto end;\n";
+                       }
+                       if(fcn_ref_cnt[(*si)] > 1){
+                               if(!is_partial_fcn[(*si)]){
+                                       ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
+                               }
+                               ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
+                               ret += "\t}\n";
+                       }
+               }
+
+               ret += "\tif( !("+generate_predicate_code(fs->join_filter[w]->pr,schema)+
+                               ") ) goto end;\n";
+       }
+
+//             postfilter
+       for(w=0;w<fs->postfilter.size();++w){
+               sprintf(tmpstr,"//\t\tpostfilter Predicate clause %d.(cost %d)\n",w,fs->postfilter[w]->cost);
+               ret += tmpstr;
+
+//                     Find partial fcns ref'd in this cnf element
+               set<int> pfcn_refs;
+               collect_partial_fcns_pr(fs->postfilter[w]->pr, pfcn_refs);
+//                     Since set<..> is a "Sorted Associative Container",
+//                     we can walk through it in sorted order by walking from
+//                     begin() to end().  (and the partial fcns must be
+//                     evaluated in this order).
+               set<int>::iterator si;
+               for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
+                       if(fcn_ref_cnt[(*si)] > 1){
+                               ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
+                       }
+                       if(is_partial_fcn[(*si)]){
+                               ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
+                               ret += "\t\tif(retval) goto end;\n";
+                       }
+                       if(fcn_ref_cnt[(*si)] > 1){
+                               if(!is_partial_fcn[(*si)]){
+                                       ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
+                               }
+                               ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
+                               ret += "\t}\n";
+                       }
+               }
+
+               ret += "\tif( !("+generate_predicate_code(fs->postfilter[w]->pr,schema)+
+                               ") ) goto end;\n";
+       }
+
+
+
+///////////////                        post the tuple
+
+//                     test passed : create the tuple, then assign to it.
+         ret += "/*\t\tCreate and post the tuple\t*/\n";
+
+//             Unpack R fields
+       for(s=0;s<sl_list.size();++s){
+               col_id_set this_se_cids;
+               gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
+               for(auto csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
+                       if(unpacked_cids.count( (*csi) ) == 0){
+                               int tblref = (*csi).tblvar_ref;
+                               int schref = (*csi).schema_ref;
+                               string field = (*csi).field;
+                               if(tblref==0) // LHS from packet
+                                       ret += generate_unpack_code(tblref,schref,field,schema,node_name);
+                               else    // RHS from hash bucket
+                                       ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n";
+                               unpacked_cids.insert( (*csi) );
+                       }
+               }
+       }
+
+
+//                     Unpack partial fcns ref'd by the select clause.
+//                     Its a kind of a WHERE clause ...
+  for(p=sl_fcns_start;p<sl_fcns_end;p++){
+       if(fcn_ref_cnt[p] > 1){
+               ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
+       }
+       if(is_partial_fcn[p]){
+               ret += unpack_partial_fcn(partial_fcns[p], p, schema);
+               ret += "\tif(retval) goto end;\n";
+       }
+       if(fcn_ref_cnt[p] > 1){
+               if(!is_partial_fcn[p]){
+                       ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
+               }
+               ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
+               ret += "\t}\n";
+       }
+  }
+
+  // increment the counter of accepted tuples
+  ret += "\n\t#ifdef LFTA_STATS\n";
+  ret += "\n\tt->accepted_tuple_cnt++;\n\n";
+  ret += "\t#endif\n\n";
+
+//                     First, compute the size of the tuple.
+
+//                     Unpack any BUFFER type selections into temporaries
+//                     so that I can compute their size and not have
+//                     to recompute their value during tuple packing.
+//                     I can use regular assignment here because
+//                     these temporaries are non-persistent.
+
+         for(s=0;s<sl_list.size();s++){
+               data_type *sdt = sl_list[s]->get_data_type();
+               if(sdt->is_buffer_type()){
+                       sprintf(tmpstr,"\tselvar_%d = ",s);
+                       ret += tmpstr;
+                       ret += generate_se_code(sl_list[s],schema);
+                       ret += ";\n";
+               }
+         }
+
+
 //             The size of the tuple is the size of the tuple struct plus the
 //             size of the buffers to be copied in.
 
@@ -3056,7 +3720,7 @@ printf("sgah_qpn name is %s, disorder is %d\n",fs->node_name.c_str(),((sgah_qpn
 
 
 
-string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, set<unsigned int> &s_pids){
+string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, bool is_wj, set<unsigned int> &s_pids){
 
        string ret="static gs_retval_t accept_packet_"+node_name+
                "(struct FTA *f, FTAID * ftaid, void *pkt, gs_int32_t sz){\n";
@@ -3084,9 +3748,9 @@ string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ex
   }
 
   for(w=0;w<where.size();++w){
-       if(is_fj || s_pids.count(w) == 0)
+       if(is_wj || is_fj || s_pids.count(w) == 0)
                gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
-       }
+  }
   for(s=0;s<sl_list.size();s++){
        gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
   }
@@ -3201,12 +3865,32 @@ string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ex
   }
 
 
+  if(fs->node_type() == "watch_join"){
+       watch_join_qpn *wlq = (watch_join_qpn *)fs;
+       ret += "/*\t\tJoin fields\t*/\n";
+       for(int k=0;k<wlq->key_flds.size(); ++k){
+               string kfld = wlq->key_flds[k];
+               ret += "\t"+wlq->hash_eq[kfld]->pr->get_right_se()->get_data_type()->get_cvar_type()+" s_equijoin_"+kfld+";\n";
+               ret += "\t"+wlq->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_cvar_type()+" r_equijoin_"+kfld+";\n";
+         }
+       ret +=
+"  /*          Variables for wl join table     */ \n"
+"\tunsigned int i, bucket;\n"
+"\tunsigned long long int hash; \n";
+       string wl_schema = wlq->from[1]->get_schema_name();
+       string wl_elem_str = generate_watchlist_element_name(wl_schema);
+       ret += "\tstruct "+wl_elem_str+" *rec = NULL;\n";
+"\n"
+;
+  }
+
+
 //             Variables needed to store selected attributes of BUFFER type
 //             temporarily, in order to compute their size for storage
 //             in an output tuple.
 
   string select_var_defs = "";
-  for(s=0;s<sl_list.size();s++){
+  for(int s=0;s<sl_list.size();s++){
        data_type *sdt = sl_list[s]->get_data_type();
        if(sdt->is_buffer_type()){
          sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
@@ -3306,7 +3990,7 @@ string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ex
 
   vector<cnf_elem *> filter = fs->get_filter_clause();
 //             Test the filter predicate (some query types have additional preds).
-  if(filter.size() > 0){
+  if(filter.size() > 0 && !is_wj){     // watchlist join does specialized processing
 
 //             Sort by evaluation cost.
 //             First, estimate evaluation costs
@@ -3379,7 +4063,7 @@ string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ex
   if(is_fj){
        ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
        vector<cnf_elem *> h_eq = ((filter_join_qpn *)fs)-> hash_eq;
-               for(w=0;w<h_eq.size();++w){
+       for(w=0;w<h_eq.size();++w){
                col_id_set this_pred_cids;
                gather_pr_col_ids(h_eq[w]->pr, this_pred_cids, gb_tbl);
                for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
@@ -3392,6 +4076,26 @@ string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ex
                        }
                }
        }
+  }else if(is_wj){             // STOPPED HERE move this to wj main body
+/*
+       ret += "\n//\tPassed the WHERE clause, unpack the hash fields. \n";
+       map<string, cnf_elem *> h_eq = ((watch_join_qpn *)fs)-> hash_eq;
+       vector<string> kflds = ((watch_join_qpn *)fs)->key_flds;
+       for(w=0;w<kflds.size();++w){
+               string kfld = kflds[w];
+               col_id_set this_pred_cids;
+               gather_pr_col_ids(h_eq[kfld]->pr, this_pred_cids, gb_tbl);
+               for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
+                       if(unpacked_cids.count( (*csi) ) == 0){
+                               int tblref = (*csi).tblvar_ref;
+                               int schref = (*csi).schema_ref;
+                               string field = (*csi).field;
+                               ret += generate_unpack_code(tblref,schref,field,schema,node_name);
+                               unpacked_cids.insert( (*csi) );
+                       }
+               }
+       }
+*/
   }else{
          ret += "\n/*\tPassed the WHERE clause, unpack the rest of the accessed fields. */\n";
 
@@ -3411,12 +4115,17 @@ string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ex
 //////////////////     After this, the query types
 //////////////////     are processed differently.
 
-  if(!is_aggr_query && !is_fj)
+  if(!is_aggr_query && !is_fj & !is_wj)
        ret += generate_sel_accept_body(fs, node_name, schema);
   else if(is_aggr_query)
        ret += generate_aggr_accept_body(fs, node_name, schema, temporal_flush);
-  else
-       ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
+  else{
+       if(is_fj)
+               ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
+       else
+               ret += generate_wj_accept_body((watch_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
+  }
+       
 
 
 //             Finish up.
@@ -3733,6 +4442,8 @@ string generate_lfta_block(qp_node *fs, table_list *schema, int gid,
        param_handle_table = fs->get_handle_param_tbl(Ext_fcns);
        string node_name = fs->get_node_name();
     bool is_fj = false, uses_bloom = false;
+       bool is_wj = false;
+       bool is_watch_tbl = false;
 
 
        if(fs->node_type() == "spx_qpn"){
@@ -3764,6 +4475,25 @@ string generate_lfta_block(qp_node *fs, table_list *schema, int gid,
                uses_bloom = fj_node->use_bloom;
                gb_tbl = NULL;
                aggr_tbl = NULL;
+       }else
+       if(fs->node_type() == "watch_join"){
+               is_aggr_query = false;
+        is_wj = true;
+               watch_join_qpn *wl_node = (watch_join_qpn *)fs;
+               sl_list = wl_node->get_select_se_list();
+               where = wl_node->get_where_clause();
+               gb_tbl = NULL;
+               aggr_tbl = NULL;
+       }else
+       if(fs->node_type() == "watch_tbl_qpn"){
+               is_aggr_query = false;
+        is_watch_tbl = true;
+               vector<scalarexp_t *> empty_sl_list;
+               vector<cnf_elem *> empty_where;
+               sl_list = empty_sl_list;
+               where = empty_where;
+               gb_tbl = NULL;
+               aggr_tbl = NULL;
        } else {
                fprintf(stderr,"INTERNAL ERROR, unrecognized node type %s in generate_lfta_block\n", fs->node_type().c_str());
                exit(1);
@@ -3856,36 +4586,44 @@ string generate_lfta_block(qp_node *fs, table_list *schema, int gid,
 /////////////////////////////////////////////////////
 //                     Common stuff unpacked, do some generation
 
+
        if(is_aggr_query)
          retval += generate_aggr_struct(node_name, gb_tbl, aggr_tbl);
        if(is_fj)
                retval += generate_fj_struct((filter_join_qpn *)fs, node_name);
-
-       retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, uses_bloom, schema);
-       retval += generate_tuple_struct(node_name, sl_list) ;
-
-       if(is_aggr_query)
-               retval += generate_fta_flush(node_name, schema, Ext_fcns) ;
-       if(param_tbl->size() > 0)
-               retval += generate_fta_load_params(node_name) ;
-       retval += generate_fta_free(node_name, is_aggr_query) ;
-       retval +=  generate_fta_control(node_name, schema, is_aggr_query) ;
-       retval +=  generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, s_pids) ;
-
+       if(is_watch_tbl){
+               retval += "\n\n// watchtable code here \n\n";
+               watch_tbl_qpn *wl_node = (watch_tbl_qpn *)fs;
+               retval += generate_watchlist_structs(node_name, wl_node->table_layout, wl_node->filename, wl_node->refresh_interval);
+               retval += generate_watchlist_load(node_name, wl_node->table_layout, wl_node->key_flds);
+       }
+       
+       if(! is_watch_tbl){
+               retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, is_wj, uses_bloom, schema);
+               retval += generate_tuple_struct(node_name, sl_list) ;
+
+               if(is_aggr_query)
+                       retval += generate_fta_flush(node_name, schema, Ext_fcns) ;
+               if(param_tbl->size() > 0)
+                       retval += generate_fta_load_params(node_name) ;
+               retval += generate_fta_free(node_name, is_aggr_query) ;
+               retval +=  generate_fta_control(node_name, schema, is_aggr_query) ;
+               retval +=  generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, is_wj, s_pids) ;
 
        /* extract the value of Time_Correlation from interface definition */
-       int e,v;
-       string es;
-       unsigned time_corr;
-       vector<tablevar_t *> tvec =  fs->get_input_tbls();
-       vector<string> time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es);
-       if (time_corr_vec.empty())
-               time_corr = DEFAULT_TIME_CORR;
-       else
-               time_corr = atoi(time_corr_vec[0].c_str());
+               int e,v;
+               string es;
+               unsigned time_corr;
+               vector<tablevar_t *> tvec =  fs->get_input_tbls();
+               vector<string> time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es);
+               if (time_corr_vec.empty())
+                       time_corr = DEFAULT_TIME_CORR;
+               else
+                       time_corr = atoi(time_corr_vec[0].c_str());
 
-       retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query) );
-       retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) );
+               retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query, is_wj) );
+               retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) );
+       }
 
   return(retval);
 }
@@ -3898,6 +4636,11 @@ int compute_snap_len(qp_node *fs, table_list *schema){
        gb_tbl = NULL;
        sl_list.clear(); where.clear();
 
+       
+       if(fs->node_type() == "watch_tbl_qpn"){
+               return -1;
+       }
+
        if(fs->node_type() == "spx_qpn"){
                spx_qpn *spx_node = (spx_qpn *)fs;
                sl_list = spx_node->get_select_se_list();
@@ -3913,6 +4656,11 @@ int compute_snap_len(qp_node *fs, table_list *schema){
                filter_join_qpn *fj_node = (filter_join_qpn *)fs;
                sl_list = fj_node->get_select_se_list();
                where = fj_node->get_where_clause();
+       }
+       else if(fs->node_type() == "watch_join"){
+               watch_join_qpn *fj_node = (watch_join_qpn *)fs;
+               sl_list = fj_node->get_select_se_list();
+               where = fj_node->get_where_clause();
        } else{
                fprintf(stderr,"INTERNAL ERROR, node type %s not recognized in compute_snap_len\n",fs->node_type().c_str());
                exit(1);