+// The size of the tuple is the size of the tuple struct plus the
+// size of the buffers to be copied in.
+
+ ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
+ for(s=0;s<sl_list.size();s++){
+ data_type *sdt = sl_list[s]->get_data_type();
+ if(sdt->is_buffer_type()){
+ sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
+ ret += tmpstr;
+ }
+ }
+ ret += ";\n";
+
+
+ ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
+ ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
+
+// Test passed, make assignments to the tuple.
+
+ ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
+
+// Mark tuple as REGULAR_TUPLE
+ ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
+
+
+ for(s=0;s<sl_list.size();s++){
+ data_type *sdt = sl_list[s]->get_data_type();
+ if(sdt->is_buffer_type()){
+ sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
+ ret += tmpstr;
+ sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
+ ret += tmpstr;
+ }else{
+ sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
+ ret += tmpstr;
+// if(sdt->needs_hn_translation())
+// ret += sdt->hton_translation() +"( ";
+ ret += generate_se_code(sl_list[s],schema);
+// if(sdt->needs_hn_translation())
+// ret += ") ";
+ ret += ";\n";
+ }
+ }
+
+// Generate output.
+
+ ret += "\tpost_tuple(tuple);\n";
+
+// Increment the counter of posted tuples
+ ret += "\n\t#ifdef LFTA_STATS\n";
+ ret += "\n\tt->out_tuple_cnt++;\n\n";
+ ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
+ ret += "\t#endif\n\n";
+
+
+ return ret;
+}
+
+
+string generate_wj_accept_body(watch_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
+
+int p,s,w;
+string ret;
+
+
+ string wl_schema = fs->from[1]->get_schema_name();
+ string wl_elem_str = generate_watchlist_element_name(wl_schema);
+ string wl_node_str = generate_watchlist_struct_name(wl_schema);
+ string tgt = generate_watchlist_name(wl_schema);
+
+ ret += "//\n//\t\tGenerate test to update watchtable here\n//\n\n";
+
+
+
+
+
+// ------------------------------------------------------------
+// Determine if the R record should be processed.
+
+
+ ret +=
+"// R (main stream) cheap predicate\n"
+"\n"
+;
+
+// Unpack r_filt fields
+ vector<cnf_elem *> r_filt = fs->pred_t0;
+ for(w=0;w<r_filt.size();++w){
+ col_id_set this_pred_cids;
+ gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
+ for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
+ if(unpacked_cids.count( (*csi) ) == 0){
+ int tblref = (*csi).tblvar_ref;
+ int schref = (*csi).schema_ref;
+ string field = (*csi).field;
+ ret += generate_unpack_code(tblref,schref,field,schema,node_name);
+ unpacked_cids.insert( (*csi) );
+ }
+ }
+ }
+
+// Sort R preds based on cost.
+
+ vector<cnf_elem *> tmp_wh;
+ for(w=0;w<r_filt.size();++w){
+ compute_cnf_cost(r_filt[w],Ext_fcns);
+ tmp_wh.push_back(r_filt[w]);
+ }
+ r_filt = tmp_wh;
+
+ sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
+
+// WARNING! the constant 20 below is a wild-ass guess.
+ int cheap_rpos;
+ for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++);
+
+// Test the cheap filters on R.
+ if(cheap_rpos >0){
+
+// Now generate the predicates.
+ for(w=0;w<cheap_rpos;++w){
+ sprintf(tmpstr,"//\t\tCheap R predicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
+ ret += tmpstr;
+
+// Find partial fcns ref'd in this cnf element
+ set<int> pfcn_refs;
+ collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
+// Since set<..> is a "Sorted Associative Container",
+// we can walk through it in sorted order by walking from
+// begin() to end(). (and the partial fcns must be
+// evaluated in this order).
+ set<int>::iterator si;
+ for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
+ if(fcn_ref_cnt[(*si)] > 1){
+ ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
+ }
+ if(is_partial_fcn[(*si)]){
+ ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
+ ret += "\t\tif(retval) goto end;\n";
+ }
+ if(fcn_ref_cnt[(*si)] > 1){
+ if(!is_partial_fcn[(*si)]){
+ ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
+ }
+ ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
+ ret += "\t}\n";
+ }
+ }
+
+ ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
+ ") ) goto end;\n";
+ }
+ }else{
+ ret += "\n\n/*\t\t (no cheap R predicate to test)\t*/\n\n";
+ }
+
+ ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
+ map<string, cnf_elem *> h_eq = ((watch_join_qpn *)fs)-> hash_eq;
+ vector<string> kflds = ((watch_join_qpn *)fs)->key_flds;
+ for(w=0;w<kflds.size();++w){
+ string kfld = kflds[w];
+ col_id_set this_pred_cids;
+ gather_pr_col_ids(h_eq[kfld]->pr, this_pred_cids, gb_tbl);
+ for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
+ if(unpacked_cids.count( (*csi) ) == 0){
+ int tblref = (*csi).tblvar_ref;
+ int schref = (*csi).schema_ref;
+ string field = (*csi).field;
+ if(tblref==0) // LHS from packet, don't unpack the RHS
+ ret += generate_unpack_code(tblref,schref,field,schema,node_name);
+ unpacked_cids.insert( (*csi) );
+ }
+ }
+ }
+
+
+ ret += "\n// Do the join\n\n";
+ ret += "\n// (ensure that the watchtable is fresh)\n";
+ ret += "\tif(t->ux_time >= "+tgt+".next_refresh){\n";
+ ret += "\t\treload_watchlist__"+wl_schema+"();\n";
+ ret += "\t\t"+tgt+".next_refresh = t->ux_time+"+tgt+".refresh_interval;\n";
+ ret += "\t}\n\n";
+
+
+ for(p=0;p<fs->key_flds.size();++p){
+ string kfld = fs->key_flds[p];
+ ret += "\tr_equijoin_"+kfld+" = "+generate_se_code(fs->hash_eq[kfld]->pr->get_left_se(),schema)+";\n";
+ }
+
+
+// Passed the cheap pred, now test the join with S.
+ ret += "\tbucket=0;\n";
+ ret += "\thash=0;\n";
+ for(p=0;p<fs->key_flds.size();++p){
+ string kfld = fs->key_flds[p];
+ ret +=
+" hash ^= (("+hash_nums[p%NRANDS]+" * lfta_"+
+ fs->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_type_str()+
+ +"_to_hash(r_equijoin_"+kfld+")));\n";
+ }
+ ret += "\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n";
+
+ ret += "\t\trec = "+tgt+".ht[bucket];\n";
+ ret += "\t\twhile(rec!=NULL){\n";
+ ret += "\t\t\tif(hash==rec->hashval){\n";
+ ret += "\t\t\t\tif(";
+ for(p=0;p<fs->key_flds.size();++p){
+ string kfld = fs->key_flds[p];
+ if(p>0) ret += " && ";
+ data_type *hdt = fs->hash_eq[kfld]->pr->get_right_se()->get_data_type();
+ string lhs_op = "r_equijoin_"+kfld;
+ string rhs_op = "rec->"+kfld;
+ ret += generate_equality_test(lhs_op,rhs_op,hdt);
+ }
+ ret += ")\n";
+ ret += "\t\t\t\t\tbreak;\n";
+ ret += "\t\t\t}\n";
+ ret += "\t\t\trec=rec->next;\n";
+ ret += "\t\t}\n";
+ ret += "\t\tif(rec==NULL)\n";
+ ret += "\t\t\tgoto end;\n";
+
+ ret += "\n/*\tPassed the hash lookup clause, unpack the other predicate fields. */\n";
+ for(w=0;w<where.size();++w){
+ col_id_set this_pred_cids;
+ gather_pr_col_ids(where[w]->pr, this_pred_cids, gb_tbl);
+ for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
+ if(unpacked_cids.count( (*csi) ) == 0){
+ int tblref = (*csi).tblvar_ref;
+ int schref = (*csi).schema_ref;
+ string field = (*csi).field;
+ if(tblref==0) // LHS from packet
+ ret += generate_unpack_code(tblref,schref,field,schema,node_name);
+ else // RHS from hash bucket
+ ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n";
+ unpacked_cids.insert( (*csi) );
+ }
+ }
+ }
+
+
+// Test the expensive filters on R.
+// TODO Should merge this with other predicates and eval in order
+// of cost - see the fj code.
+// TODO join and postfilter predicates haven't been costed yet.
+ if(cheap_rpos < r_filt.size()){
+
+// Now generate the predicates.
+ for(w=cheap_rpos;w<r_filt.size();++w){
+ sprintf(tmpstr,"//\t\tExpensive R predicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
+ ret += tmpstr;
+
+// Find partial fcns ref'd in this cnf element
+ set<int> pfcn_refs;
+ collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
+// Since set<..> is a "Sorted Associative Container",
+// we can walk through it in sorted order by walking from
+// begin() to end(). (and the partial fcns must be
+// evaluated in this order).
+ set<int>::iterator si;
+ for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
+ if(fcn_ref_cnt[(*si)] > 1){
+ ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
+ }
+ if(is_partial_fcn[(*si)]){
+ ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
+ ret += "\t\tif(retval) goto end;\n";
+ }
+ if(fcn_ref_cnt[(*si)] > 1){
+ if(!is_partial_fcn[(*si)]){
+ ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
+ }
+ ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
+ ret += "\t}\n";
+ }
+ }
+
+ ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
+ ") ) goto end;\n";
+ }
+ }else{
+ ret += "\n\n/*\t\t (no expensive R predicate to test)\t*/\n\n";
+ }
+
+// TODO sort the additional predicates by cost
+
+// S-only
+ for(w=0;w<fs->pred_t1.size();++w){
+ sprintf(tmpstr,"//\t\tS Predicate clause %d.(cost %d)\n",w,fs->pred_t1[w]->cost);
+ ret += tmpstr;
+
+// Find partial fcns ref'd in this cnf element
+ set<int> pfcn_refs;
+ collect_partial_fcns_pr(fs->pred_t1[w]->pr, pfcn_refs);
+// Since set<..> is a "Sorted Associative Container",
+// we can walk through it in sorted order by walking from
+// begin() to end(). (and the partial fcns must be
+// evaluated in this order).
+ set<int>::iterator si;
+ for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
+ if(fcn_ref_cnt[(*si)] > 1){
+ ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
+ }
+ if(is_partial_fcn[(*si)]){
+ ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
+ ret += "\t\tif(retval) goto end;\n";
+ }
+ if(fcn_ref_cnt[(*si)] > 1){
+ if(!is_partial_fcn[(*si)]){
+ ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
+ }
+ ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
+ ret += "\t}\n";
+ }
+ }
+
+ ret += "\tif( !("+generate_predicate_code(fs->pred_t1[w]->pr,schema)+
+ ") ) goto end;\n";
+ }
+
+// non hash-eq join
+ for(w=0;w<fs->join_filter.size();++w){
+ sprintf(tmpstr,"//\t\tJoin Predicate clause %d.(cost %d)\n",w,fs->join_filter[w]->cost);
+ ret += tmpstr;
+
+// Find partial fcns ref'd in this cnf element
+ set<int> pfcn_refs;
+ collect_partial_fcns_pr(fs->join_filter[w]->pr, pfcn_refs);
+// Since set<..> is a "Sorted Associative Container",
+// we can walk through it in sorted order by walking from
+// begin() to end(). (and the partial fcns must be
+// evaluated in this order).
+ set<int>::iterator si;
+ for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
+ if(fcn_ref_cnt[(*si)] > 1){
+ ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
+ }
+ if(is_partial_fcn[(*si)]){
+ ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
+ ret += "\t\tif(retval) goto end;\n";
+ }
+ if(fcn_ref_cnt[(*si)] > 1){
+ if(!is_partial_fcn[(*si)]){
+ ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
+ }
+ ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
+ ret += "\t}\n";
+ }
+ }
+
+ ret += "\tif( !("+generate_predicate_code(fs->join_filter[w]->pr,schema)+
+ ") ) goto end;\n";
+ }
+
+// postfilter
+ for(w=0;w<fs->postfilter.size();++w){
+ sprintf(tmpstr,"//\t\tpostfilter Predicate clause %d.(cost %d)\n",w,fs->postfilter[w]->cost);
+ ret += tmpstr;
+
+// Find partial fcns ref'd in this cnf element
+ set<int> pfcn_refs;
+ collect_partial_fcns_pr(fs->postfilter[w]->pr, pfcn_refs);
+// Since set<..> is a "Sorted Associative Container",
+// we can walk through it in sorted order by walking from
+// begin() to end(). (and the partial fcns must be
+// evaluated in this order).
+ set<int>::iterator si;
+ for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
+ if(fcn_ref_cnt[(*si)] > 1){
+ ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
+ }
+ if(is_partial_fcn[(*si)]){
+ ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
+ ret += "\t\tif(retval) goto end;\n";
+ }
+ if(fcn_ref_cnt[(*si)] > 1){
+ if(!is_partial_fcn[(*si)]){
+ ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
+ }
+ ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
+ ret += "\t}\n";
+ }
+ }
+
+ ret += "\tif( !("+generate_predicate_code(fs->postfilter[w]->pr,schema)+
+ ") ) goto end;\n";
+ }
+
+
+
+/////////////// post the tuple
+
+// test passed : create the tuple, then assign to it.
+ ret += "/*\t\tCreate and post the tuple\t*/\n";
+
+// Unpack R fields
+ for(s=0;s<sl_list.size();++s){
+ col_id_set this_se_cids;
+ gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
+ for(auto csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
+ if(unpacked_cids.count( (*csi) ) == 0){
+ int tblref = (*csi).tblvar_ref;
+ int schref = (*csi).schema_ref;
+ string field = (*csi).field;
+ if(tblref==0) // LHS from packet
+ ret += generate_unpack_code(tblref,schref,field,schema,node_name);
+ else // RHS from hash bucket
+ ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n";
+ unpacked_cids.insert( (*csi) );
+ }
+ }
+ }
+
+
+// Unpack partial fcns ref'd by the select clause.
+// Its a kind of a WHERE clause ...
+ for(p=sl_fcns_start;p<sl_fcns_end;p++){
+ if(fcn_ref_cnt[p] > 1){
+ ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
+ }
+ if(is_partial_fcn[p]){
+ ret += unpack_partial_fcn(partial_fcns[p], p, schema);
+ ret += "\tif(retval) goto end;\n";
+ }
+ if(fcn_ref_cnt[p] > 1){
+ if(!is_partial_fcn[p]){
+ ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
+ }
+ ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
+ ret += "\t}\n";
+ }
+ }
+
+ // increment the counter of accepted tuples
+ ret += "\n\t#ifdef LFTA_STATS\n";
+ ret += "\n\tt->accepted_tuple_cnt++;\n\n";
+ ret += "\t#endif\n\n";
+
+// First, compute the size of the tuple.
+
+// Unpack any BUFFER type selections into temporaries
+// so that I can compute their size and not have
+// to recompute their value during tuple packing.
+// I can use regular assignment here because
+// these temporaries are non-persistent.
+
+ for(s=0;s<sl_list.size();s++){
+ data_type *sdt = sl_list[s]->get_data_type();
+ if(sdt->is_buffer_type()){
+ sprintf(tmpstr,"\tselvar_%d = ",s);
+ ret += tmpstr;
+ ret += generate_se_code(sl_list[s],schema);
+ ret += ";\n";
+ }
+ }
+
+