}
+mrg_qpn::mrg_qpn(watch_join_qpn *spx, std::string n_name, std::vector<std::string> &sources, std::vector<std::pair<std::string, std::string> > &ifaces, ifq_t *ifdb){
+ param_tbl = spx->param_tbl;
+ int i;
+ node_name = n_name;
+ field_entry_list *fel = new field_entry_list();
+ merge_fieldpos = -1;
+
+ disorder = 1;
+
+ for(i=0;i<spx->select_list.size();++i){
+ data_type *dt = spx->select_list[i]->se->get_data_type()->duplicate();
+ if(dt->is_temporal()){
+ if(merge_fieldpos < 0){
+ merge_fieldpos = i;
+ }else{
+ fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), spx->select_list[merge_fieldpos]->name.c_str(), spx->select_list[i]->name.c_str(), spx->select_list[merge_fieldpos]->name.c_str() );
+ dt->reset_temporal();
+ }
+ }
+
+ field_entry *fe = dt->make_field_entry(spx->select_list[i]->name);
+ fel->append_field(fe);
+ delete dt;
+ }
+ if(merge_fieldpos<0){
+ fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str());
+ exit(1);
+ }
+ table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA);
+
+// NEED TO HANDLE USER_SPECIFIED SLACK
+ this->resolve_slack(spx->select_list[merge_fieldpos]->se,
+ spx->select_list[merge_fieldpos]->name, ifaces, ifdb,NULL);
+// if(this->slack == NULL)
+// fprintf(stderr,"Zero slack.\n");
+// else
+// fprintf(stderr,"slack is %s\n",slack->to_string().c_str());
+
+ for(i=0;i<sources.size();i++){
+ std::string rvar = "_m"+int_to_string(i);
+ mvars.push_back(new colref_t(rvar.c_str(), spx->select_list[merge_fieldpos]->name.c_str()));
+ mvars[i]->set_tablevar_ref(i);
+ fm.push_back(new tablevar_t(sources[i].c_str()));
+ fm[i]->set_range_var(rvar);
+ }
+
+ param_tbl = new param_table();
+ std::vector<std::string> param_names = spx->param_tbl->get_param_names();
+ int pi;
+ for(pi=0;pi<param_names.size();pi++){
+ data_type *dt = spx->param_tbl->get_data_type(param_names[pi]);
+ param_tbl->add_param(param_names[pi],dt->duplicate(),
+ spx->param_tbl->handle_access(param_names[pi]));
+ }
+ definitions = spx->definitions;
+
+}
+
+
+
// This function translates an analyzed parse tree
// into one or more query nodes (qp_node).
// into the qp_node constructors,
// and have this code focus on building the query plan tree.
+// Watchlist node
+ if(qs->query_type == WATCHLIST_QUERY){
+ watch_tbl_qpn *watchnode = new watch_tbl_qpn(qs, Schema);
+
+// Done
+ plan_root = watchnode;
+ local_plan.push_back(watchnode);
+ }
+
+
// MERGE node
if(qs->query_type == MERGE_QUERY){
mrg_qpn *merge_node = new mrg_qpn(qs,Schema);
*/
- } else{
+ }
+
+ if(qs->query_type == SELECT_QUERY){
// Select / Aggregation / Join
if(qs->gb_tbl->size() == 0 && qs->aggr_tbl->size() == 0){
plan_root = join_node;
local_plan.push_back(join_node);
}else{
- join_eq_hash_qpn *join_node = new join_eq_hash_qpn(qs,Schema);
- plan_root = join_node;
- local_plan.push_back(join_node);
+ if(qs->fta_tree->get_from()->get_properties() == WATCHLIST_JOIN_PROPERTY){
+ watch_join_qpn *join_node = new watch_join_qpn(qs,Schema);
+ plan_root = join_node;
+ local_plan.push_back(join_node);
+ }else{
+ join_eq_hash_qpn *join_node = new join_eq_hash_qpn(qs,Schema);
+ plan_root = join_node;
+ local_plan.push_back(join_node);
+ }
}
}
}else{
return(ret);
}
+string watch_tbl_qpn::to_query_string(){
+ string ret;
+// ret += "DEFINE {\n";
+// ret += "\tfilename='"+filename+";\n";
+// ret += "\trefresh_interval="+to_string(refresh_interval)+";\n}\n";
+ ret += "WATCHLIST FIELDS {\n";
+ std::vector<field_entry *> fields = table_layout->get_fields();
+ for(int f=0;f<fields.size();++f){
+ ret += fields[f]->to_string()+"\n";
+ }
+ ret += "}\n";
+
+ return ret;
+}
string mrg_qpn::to_query_string(){
return(ret);
}
+string watch_join_qpn::to_query_string(){
+
+ string ret = "Select ";
+ int s;
+ for(s=0;s<select_list.size();s++){
+ if(s>0) ret+=", ";
+ ret += se_to_query_string(select_list[s]->se, NULL);
+ if(select_list[s]->name != "") ret += " AS "+select_list[s]->name;
+ }
+ ret += "\n";
+
+// NOTE: assuming binary join.
+ ret += "WATCHLIST_JOIN ";
+
+ ret += "From ";
+ int f;
+ for(f=0;f<from.size();++f){
+ if(f>0) ret+=", ";
+ ret += from[f]->to_string();
+ }
+ ret += "\n";
+
+ if(where.size() > 0){
+ ret += "Where ";
+ int w;
+ for(w=0;w<where.size();w++){
+ if(w>0) ret += " AND ";
+ ret += "(" + pred_to_query_str(where[w]->pr,NULL) + ")";
+ }
+ ret += "\n";
+ }
+
+ return(ret);
+}
+
+
// -----------------------------------------------------------------
// Query node subclass specific processing.
//------------------------------------------------------------------
// split a node to extract LFTA components.
+vector<qp_node *> watch_tbl_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
+ // nothing to do, nothing to split, return copy of self.
+
+ hfta_returned = 0;
+
+ vector<qp_node *> ret_vec;
+
+ ret_vec.push_back(this);
+ return(ret_vec);
+
+}
+
vector<qp_node *> mrg_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
// nothing to do, nothing to split, return copy of self.
vector<string> sel_names;
vector<pair<string,string> > ifaces = get_ifaces(from[0], ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
if (ifaces.empty()) {
- fprintf(stderr,"INTERNAL ERROR in filter_join_qpn::split_node_for_fta - empty interface set\n");
+ fprintf(stderr,"INTERNAL ERROR in filter_join_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
exit(1);
}
}
for(p=0;p<shared_pred.size();p++){
- predicate_t *new_pr = dup_pr(where[p]->pr, NULL);
+ predicate_t *new_pr = dup_pr(shared_pred[p]->pr, NULL);
cnf_elem *new_cnf = new cnf_elem(new_pr);
analyze_cnf(new_cnf);
fta_node->shared_pred.push_back(new_cnf);
fta_node->where.push_back(new_cnf);
}
for(p=0;p<pred_t0.size();p++){
- predicate_t *new_pr = dup_pr(where[p]->pr, NULL);
+ predicate_t *new_pr = dup_pr(pred_t0[p]->pr, NULL);
cnf_elem *new_cnf = new cnf_elem(new_pr);
analyze_cnf(new_cnf);
fta_node->pred_t0.push_back(new_cnf);
fta_node->where.push_back(new_cnf);
}
for(p=0;p<pred_t1.size();p++){
- predicate_t *new_pr = dup_pr(where[p]->pr, NULL);
+ predicate_t *new_pr = dup_pr(pred_t1[p]->pr, NULL);
cnf_elem *new_cnf = new cnf_elem(new_pr);
analyze_cnf(new_cnf);
fta_node->pred_t1.push_back(new_cnf);
fta_node->where.push_back(new_cnf);
}
for(p=0;p<hash_eq.size();p++){
- predicate_t *new_pr = dup_pr(where[p]->pr, NULL);
+ predicate_t *new_pr = dup_pr(hash_eq[p]->pr, NULL);
cnf_elem *new_cnf = new cnf_elem(new_pr);
analyze_cnf(new_cnf);
fta_node->hash_eq.push_back(new_cnf);
fta_node->where.push_back(new_cnf);
}
for(p=0;p<postfilter.size();p++){
- predicate_t *new_pr = dup_pr(where[p]->pr, NULL);
+ predicate_t *new_pr = dup_pr(postfilter[p]->pr, NULL);
cnf_elem *new_cnf = new cnf_elem(new_pr);
analyze_cnf(new_cnf);
fta_node->postfilter.push_back(new_cnf);
}
+
+
+
+
+vector<qp_node *> watch_join_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
+ vector<qp_node *> ret_vec;
+
+// First check if the query can be pushed to the FTA.
+ bool fta_ok = true;
+ int s;
+ for(s=0;s<select_list.size();s++){
+ fta_ok &= check_fta_forbidden_se(select_list[s]->se,NULL, Ext_fcns);
+ }
+ int p;
+ for(p=0;p<where.size();p++){
+ fta_ok &= check_fta_forbidden_pr(where[p]->pr,NULL, Ext_fcns);
+ }
+
+ if(!fta_ok){
+ fprintf(stderr,"ERROR, watchlist join %s is fta-unsafe.\n",node_name.c_str());
+ exit(1);
+ }
+
+// Can it be done in a single lfta?
+// Get the set of interfaces it accesses.
+ int ierr;
+ int si;
+ vector<string> sel_names;
+ vector<pair<string,string> > ifaces = get_ifaces(from[0], ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
+ if (ifaces.empty()) {
+ fprintf(stderr,"INTERNAL ERROR in filter_join_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
+ exit(1);
+ }
+
+ if(ifaces.size() == 1){
+// Single interface, no need to merge.
+ hfta_returned = 0;
+ ret_vec.push_back(this);
+
+// Treat the range vars a bit differently, the 2nd is reading from a _local_ watchlist.
+ from[0]->set_machine(ifaces[0].first);
+ from[0]->set_interface(ifaces[0].second);
+ from[0]->set_ifq(false);
+
+ from[1]->set_machine(ifaces[0].first);
+ from[1]->set_interface("_local_");
+ from[1]->set_ifq(false);
+
+ return(ret_vec);
+ }else{
+// Multiple interfaces, generate the interface-specific queries plus
+// the merge.
+ hfta_returned = 1;
+
+ vector<string> sel_names;
+ for(si=0;si<ifaces.size();++si){
+ watch_join_qpn *fta_node = new watch_join_qpn();
+
+// Name the fta
+ if(ifaces.size()==1)
+ fta_node->set_node_name( node_name );
+ else{
+ string new_name = "_"+node_name+"_"+ifaces[si].first+"_"+ifaces[si].second;
+ untaboo(new_name);
+ fta_node->set_node_name(new_name);
+ }
+ sel_names.push_back(fta_node->get_node_name());
+
+// Assign the table
+ int f;
+ for(f=0;f<from.size();f++){
+ fta_node->from.push_back(from[f]->duplicate());
+ fta_node->from[f]->set_machine(ifaces[si].first);
+ if(f==0)
+ fta_node->from[f]->set_interface(ifaces[si].second);
+ else
+ fta_node->from[f]->set_interface("_local_");
+ fta_node->from[f]->set_ifq(false);
+ }
+
+ for(s=0;s<select_list.size();s++){
+ fta_node->select_list.push_back( dup_select(select_list[s], NULL) );
+ }
+
+ for(p=0;p<pred_t0.size();p++){
+ predicate_t *new_pr = dup_pr(pred_t0[p]->pr, NULL);
+ cnf_elem *new_cnf = new cnf_elem(new_pr);
+ analyze_cnf(new_cnf);
+ fta_node->pred_t0.push_back(new_cnf);
+ fta_node->where.push_back(new_cnf);
+ }
+ for(p=0;p<pred_t1.size();p++){
+ predicate_t *new_pr = dup_pr(pred_t1[p]->pr, NULL);
+ cnf_elem *new_cnf = new cnf_elem(new_pr);
+ analyze_cnf(new_cnf);
+ fta_node->pred_t1.push_back(new_cnf);
+ fta_node->where.push_back(new_cnf);
+ }
+ for(p=0;p<key_flds.size();p++){ // we've checked that all keys are covered
+ string k = key_flds[p];
+ predicate_t *new_pr = dup_pr(hash_eq[k]->pr, NULL);
+ cnf_elem *new_cnf = new cnf_elem(new_pr);
+ analyze_cnf(new_cnf);
+ fta_node->hash_eq[k] = new_cnf;
+ fta_node->where.push_back(new_cnf);
+ }
+ for(p=0;p<join_filter.size();p++){
+ predicate_t *new_pr = dup_pr(join_filter[p]->pr, NULL);
+ cnf_elem *new_cnf = new cnf_elem(new_pr);
+ analyze_cnf(new_cnf);
+ fta_node->postfilter.push_back(new_cnf);
+ fta_node->where.push_back(new_cnf);
+ }
+ for(p=0;p<postfilter.size();p++){
+ predicate_t *new_pr = dup_pr(postfilter[p]->pr, NULL);
+ cnf_elem *new_cnf = new cnf_elem(new_pr);
+ analyze_cnf(new_cnf);
+ fta_node->postfilter.push_back(new_cnf);
+ fta_node->where.push_back(new_cnf);
+ }
+ fta_node->key_flds = key_flds;
+
+// Xfer all of the parameters.
+// Use existing handle annotations.
+ vector<string> param_names = param_tbl->get_param_names();
+ int pi;
+ for(pi=0;pi<param_names.size();pi++){
+ data_type *dt = param_tbl->get_data_type(param_names[pi]);
+ fta_node->param_tbl->add_param(param_names[pi],dt->duplicate(),
+ param_tbl->handle_access(param_names[pi]));
+ }
+ fta_node->definitions = definitions;
+ if(fta_node->resolve_if_params(ifdb, this->err_str)){
+ this->error_code = 3;
+ return ret_vec;
+ }
+
+ ret_vec.push_back(fta_node);
+ }
+
+ mrg_qpn *mrg_node = new mrg_qpn((watch_join_qpn *)ret_vec[0],
+ node_name, sel_names,ifaces, ifdb);
+ ret_vec.push_back(mrg_node);
+
+ return(ret_vec);
+ }
+
+}
+
// Use to search for unresolved interface param refs in an hfta.
int spx_qpn::count_ifp_refs(set<string> &ifpnames){
return ret;
}
+int watch_tbl_qpn::count_ifp_refs(set<string> &ifpnames){
+ return 0;
+}
+
int mrg_qpn::count_ifp_refs(set<string> &ifpnames){
return 0;
}
return ret;
}
+int watch_join_qpn::count_ifp_refs(set<string> &ifpnames){
+ int ret = 0;
+ int i;
+ for(i=0;i<select_list.size();++i)
+ ret += count_se_ifp_refs(select_list[i]->se,ifpnames);
+ for(i=0;i<where.size();++i)
+ ret += count_pr_ifp_refs(where[i]->pr,ifpnames);
+ return ret;
+}
+
+
// Resolve interface params to string literals
int filter_join_qpn::resolve_if_params( ifq_t *ifdb, string &err){
return ret;
}
+int watch_join_qpn::resolve_if_params( ifq_t *ifdb, string &err){
+ int ret = 0;
+ int i;
+ string ifname = from[0]->get_interface();
+ string ifmach = from[0]->get_machine();
+ for(i=0;i<select_list.size();++i)
+ if( resolve_se_ifp_refs(select_list[i]->se,ifmach, ifname, ifdb, err) )
+ ret = 1;
+ for(i=0;i<where.size();++i)
+ if( resolve_pr_ifp_refs(where[i]->pr,ifmach, ifname, ifdb, err))
+ ret = 1;
+ return ret;
+}
+
int spx_qpn::resolve_if_params( ifq_t *ifdb, string &err){
int ret = 0;
vector<string> sel_names;
vector<pair<string,string> > ifaces = get_ifaces(table_name, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
if (ifaces.empty()) {
- fprintf(stderr,"INTERNAL ERROR in spx_qpn::split_node_for_fta - empty interface set\n");
+ fprintf(stderr,"INTERNAL ERROR in spx_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
exit(1);
}
vector<string> sel_names;
vector<pair<string,string> > ifaces = get_ifaces(table_name, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
if (ifaces.empty()) {
- fprintf(stderr,"INTERNAL ERROR in rsgah_qpn::split_node_for_fta - empty interface set\n");
+ fprintf(stderr,"INTERNAL ERROR in rsgah_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
exit(1);
}
new_opl.push_back(new_se);
}
}
- stream_node->aggr_tbl.add_aggr(aggr_tbl.get_op(a), aggr_tbl.get_fcn_id(a), new_opl, aggr_tbl.get_storage_type(a),false, false,aggr_tbl.has_bailout(a));
+ stream_node->aggr_tbl.add_aggr(aggr_tbl.get_op(a), aggr_tbl.get_fcn_id(a), new_opl, aggr_tbl.get_storage_type(a),aggr_tbl.is_superaggr(a), aggr_tbl.is_running_aggr(a),aggr_tbl.has_bailout(a));
hse = new scalarexp_t(aggr_tbl.get_op(a).c_str(),new_opl);
hse->set_data_type(Ext_fcns->get_fcn_dt(aggr_tbl.get_fcn_id(a)));
hse->set_fcn_id(aggr_tbl.get_fcn_id(a));
vector<string> sel_names;
vector<pair<string,string> > ifaces = get_ifaces(table_name, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
if (ifaces.empty()) {
- fprintf(stderr,"INTERNAL ERROR in sgah_qpn::split_node_for_fta - empty interface set\n");
+ fprintf(stderr,"INTERNAL ERROR in sgah_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
exit(1);
}
stream_node->set_node_name( node_name );
stream_node->table_name->set_range_var(table_name->get_var_name());
-// allowed stream disorder. Default is 2,
+// allowed stream disorder. Default is 1,
// can override with max_lfta_disorder setting.
// Also limit the hfta disorder, set to lfta disorder + 1.
// can override with max_hfta_disorder.
- fta_node->lfta_disorder = 2;
+ fta_node->lfta_disorder = 1;
if(this->get_val_of_def("max_lfta_disorder") != ""){
int d = atoi(this->get_val_of_def("max_lfta_disorder").c_str() );
if(d<1){
}
}
+
// First, process the group-by variables.
// The fta must supply the values of all the gbvars.
// If a gb is computed, the computation must be
child_qpn->table_name = new tablevar_t(
from[f]->get_interface().c_str(), from[f]->get_schema_name().c_str(), from[f]->get_ifq());
child_qpn->table_name->set_range_var(from[f]->get_var_name());
+ child_qpn->table_name->set_machine(from[f]->get_machine());
child_vec.push_back(child_qpn);
select_vec.push_back(&(child_qpn->select_list));
spx_qpn *c_node = child_vec[f];
vector<pair<string, string> > ifaces = get_ifaces(c_node->table_name, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
if (ifaces.empty()) {
- fprintf(stderr,"INTERNAL ERROR in join_eq_hash_qpn::split_node_for_fta - empty interface set\n");
+ fprintf(stderr,"INTERNAL ERROR in join_eq_hash_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
exit(1);
}
return(ret);
}
+vector<table_exp_t *> watch_join_qpn::extract_opview(table_list *Schema, vector<query_node *> &qnodes, opview_set &opviews, string rootnm, string silo_name){
+ vector<table_exp_t *> ret;
+ int retval = process_opview(from[0],0,node_name,
+ Schema,qnodes,opviews,ret, rootnm, silo_name);
+ if(retval) exit(1);
+ return(ret);
+}
+
+
+
+vector<table_exp_t *> watch_tbl_qpn::extract_opview(table_list *Schema, vector<query_node *> &qnodes, opview_set &opviews, string rootnm, string silo_name){
+ vector<table_exp_t *> ret;
+ return ret; // nothing to process
+}
+
//////////////////////////////////////////////////////////////////
return(table_layout);
}
+table_def *watch_tbl_qpn::get_fields(){
+ return(table_layout);
+}
+
table_def *spx_qpn::get_fields(){
return(create_attributes(node_name, select_list));
return(create_attributes(node_name, select_list));
}
+table_def *watch_join_qpn::get_fields(){
+ return(create_attributes(node_name, select_list));
+}
table_def *join_eq_hash_qpn::get_fields(){
int i, h, s, t;
return(fm);
}
+ vector<tablevar_t *> watch_tbl_qpn::get_input_tbls(){
+ vector<tablevar_t *> ret;
+ return(ret);
+ }
+
vector<tablevar_t *> mrg_qpn::get_input_tbls(){
return(fm);
}
return(from);
}
+ vector<tablevar_t *> watch_join_qpn::get_input_tbls(){
+ return(from);
+ }
+
//-----------------------------------------------------------------
// get output tables
return(retval);
}
+ vector<tablevar_t *> watch_tbl_qpn::get_output_tbls(){
+ vector<tablevar_t *> retval(1,new tablevar_t(node_name.c_str()));
+ return(retval);
+ }
+
vector<tablevar_t *> mrg_qpn::get_output_tbls(){
vector<tablevar_t *> retval(1,new tablevar_t(node_name.c_str()));
return(retval);
}
+ vector<tablevar_t *> watch_join_qpn::get_output_tbls(){
+ vector<tablevar_t *> retval(1,new tablevar_t(node_name.c_str()));
+ return(retval);
+ }
+
+
//-----------------------------------------------------------------
// Bind to schema
return tmp_cset;
}
+col_id_set watch_join_qpn::get_colrefs(bool ext_fcns_only,table_list *Schema){
+ col_id_set retval, tmp_cset;
+ int p;
+ for(p=0;p<where.size();++p){
+ gather_pr_col_ids(where[p]->pr, tmp_cset, NULL);
+ }
+ int s;
+ for(s=0;s<select_list.size();++s){
+ gather_se_col_ids(select_list[s]->se, tmp_cset, NULL);
+ }
+ col_id_set::iterator cisi;
+ if(ext_fcns_only){
+ for(cisi=tmp_cset.begin();cisi!=tmp_cset.end();++cisi){
+ field_entry *fe = Schema->get_field((*cisi).schema_ref, (*cisi).field);
+ if(fe->get_unpack_fcns().size()>0)
+ retval.insert((*cisi));
+ }
+ return retval;
+ }
+
+ return tmp_cset;
+}
+
+
// Associate colrefs in SEs with this schema.
}
+void watch_join_qpn::bind_to_schema(table_list *Schema){
+// Bind the tablevars in the From clause to the Schema
+// (it might have changed from analysis time)
+ int f;
+ for(f=0;f<from.size();++f){
+ string snm = from[f]->get_schema_name();
+ int tbl_ref = Schema->get_table_ref(snm);
+ if(tbl_ref >= 0)
+ from[f]->set_schema_ref(tbl_ref);
+ }
+
+// Bind all SEs to this schema
+ tablevar_list_t fm(from);
+
+ int p;
+ for(p=0;p<where.size();++p){
+ bind_to_schema_pr(where[p]->pr, &fm, Schema);
+ }
+ int s;
+ for(s=0;s<select_list.size();++s){
+ bind_to_schema_se(select_list[s]->se, &fm, Schema);
+ }
+
+// Collect set of tuples referenced in this HFTA
+// input, internal, or output.
+
+}
+
+
//-----------------------------------------------------------------
// get_cplx_lit_tbl
+cplx_lit_table *watch_tbl_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){
+ return(new cplx_lit_table());
+}
+
cplx_lit_table *mrg_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){
return(new cplx_lit_table());
}
return(complex_literals);
}
-cplx_lit_table *join_eq_hash_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){
+cplx_lit_table *join_eq_hash_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){
+ int i;
+ cplx_lit_table *complex_literals = new cplx_lit_table();
+
+ for(i=0;i<select_list.size();i++){
+ find_complex_literal_se(select_list[i]->se, Ext_fcns, complex_literals);
+ }
+ for(i=0;i<where.size();++i){
+ find_complex_literal_pr(where[i]->pr,Ext_fcns, complex_literals);
+ }
+
+ return(complex_literals);
+}
+
+cplx_lit_table *filter_join_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){
int i;
cplx_lit_table *complex_literals = new cplx_lit_table();
return(complex_literals);
}
-cplx_lit_table *filter_join_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){
+cplx_lit_table *watch_join_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){
int i;
cplx_lit_table *complex_literals = new cplx_lit_table();
+
//-----------------------------------------------------------------
// get_handle_param_tbl
+vector<handle_param_tbl_entry *> watch_tbl_qpn::get_handle_param_tbl(ext_fcn_list *Ext_fcns){
+ vector<handle_param_tbl_entry *> retval;
+ return(retval);
+}
+
vector<handle_param_tbl_entry *> mrg_qpn::get_handle_param_tbl(ext_fcn_list *Ext_fcns){
vector<handle_param_tbl_entry *> retval;
return(retval);
return(retval);
}
+vector<handle_param_tbl_entry *> watch_join_qpn::get_handle_param_tbl(ext_fcn_list *Ext_fcns){
+ int i;
+ vector<handle_param_tbl_entry *> retval;
+
+ for(i=0;i<select_list.size();i++){
+ find_param_handles_se(select_list[i]->se, Ext_fcns, retval);
+ }
+ for(i=0;i<where.size();++i){
+ find_param_handles_pr(where[i]->pr,Ext_fcns, retval);
+ }
+
+ return(retval);
+}
+
+
+
///////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////
/// Functions for operator output rates estimations
return SGAHCWCB_SELECTIVITY * DEFAULT_INTERFACE_RATE;
}
+double watch_tbl_qpn::get_rate_estimate() {
+
+ // dummy method for now
+ return DEFAULT_INTERFACE_RATE;
+}
+
double mrg_qpn::get_rate_estimate() {
// dummy method for now
ret.append("( ");
if(ldt->complex_comparison(ldt) ){
- ret.append( ldt->get_hfta_comparison_fcn(ldt) );
+ ret.append( ldt->get_hfta_equals_fcn(ldt) );
ret.append("( ");
if(ldt->is_buffer_type() )
ret.append("&");
ret.append("( ");
if(ldt->complex_comparison(rdt) ){
+// TODO can use get_hfta_equals_fcn if op is "=" ?
ret.append(ldt->get_hfta_comparison_fcn(rdt));
ret.append("(");
if(ldt->is_buffer_type() )
ret.append("( ");
if(ldt->complex_comparison(ldt) ){
- ret.append( ldt->get_hfta_comparison_fcn(ldt) );
+ ret.append( ldt->get_hfta_equals_fcn(ldt) );
ret.append("( ");
if(ldt->is_buffer_type() )
ret.append("&");
ret.append("( ");
if(ldt->complex_comparison(rdt) ){
+// TODO can use get_hfta_equals_fcn if op is "=" ?
ret.append(ldt->get_hfta_comparison_fcn(rdt));
ret.append("(");
if(ldt->is_buffer_type() )
string ret;
if(dt->complex_comparison(dt) ){
- ret.append(dt->get_hfta_comparison_fcn(dt));
+ ret.append(dt->get_hfta_equals_fcn(dt));
ret.append("(");
if(dt->is_buffer_type() )
ret.append("&");
return(ret);
}
-static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
+static string generate_lt_test(string &lhs_op, string &rhs_op, data_type *dt){
string ret;
if(dt->complex_comparison(dt) ){
if(dt->is_buffer_type() )
ret.append("&");
ret.append(rhs_op );
- ret.append(") == 0");
+ ret.append(") == 1");
}else{
ret.append(lhs_op );
- ret.append(" == ");
+ ret.append(" < ");
ret.append(rhs_op );
}
return(ret);
}
+//static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
+// string ret;
+//
+// if(dt->complex_comparison(dt) ){
+// ret.append(dt->get_hfta_equals_fcn(dt));
+// ret.append("(");
+// if(dt->is_buffer_type() )
+// ret.append("&");
+// ret.append(lhs_op);
+// ret.append(", ");
+// if(dt->is_buffer_type() )
+// ret.append("&");
+// ret.append(rhs_op );
+// ret.append(") == 0");
+// }else{
+// ret.append(lhs_op );
+// ret.append(" == ");
+// ret.append(rhs_op );
+// }
+//
+// return(ret);
+//}
+
// Here I assume that only MIN and MAX aggregates can be computed
// over BUFFER data types.
unpack_fcn = dt.get_hfta_unpack_fcn_noxf();
}
if(dt.is_buffer_type()){
- sprintf(tmpstr,"\t unpack_var_%s_%d = %s(tup%d.data, tup%d.tuple_size, unpack_offset_%s_%d, &problem);\n",field.c_str(), tblref, unpack_fcn.c_str(), tblref, tblref, field.c_str(), tblref);
+ sprintf(tmpstr,"\tunpack_var_%s_%d = %s(tup%d.data, tup%d.tuple_size, unpack_offset_%s_%d, &problem); // unpack_cid\n",field.c_str(), tblref, unpack_fcn.c_str(), tblref, tblref, field.c_str(), tblref);
}else{
- sprintf(tmpstr,"\t unpack_var_%s_%d = %s_nocheck(tup%d.data, unpack_offset_%s_%d);\n",field.c_str(), tblref, unpack_fcn.c_str(), tblref, field.c_str(), tblref);
+ sprintf(tmpstr,"\tunpack_var_%s_%d = %s_nocheck(tup%d.data, unpack_offset_%s_%d); // unpack_cid\n",field.c_str(), tblref, unpack_fcn.c_str(), tblref, field.c_str(), tblref);
}
ret += tmpstr;
if(dt.is_buffer_type()){
string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, vector<bool> &needs_xform){
int a,g,w,s;
+// Regular or slow flush?
+ hfta_slow_flush = 0;
+ if(this->get_val_of_def("hfta_slow_flush") != ""){
+ int d = atoi(this->get_val_of_def("hfta_slow_flush").c_str() );
+ if(d<0){
+ fprintf(stderr,"Warning, hfta_slow_flush in node %s is %d, must be at least 0, setting to 0.\n",node_name.c_str(), d);
+ hfta_slow_flush = 0;
+ }else{
+ hfta_slow_flush = d;
+ }
+ }
+
// Initialize generate utility globals
segen_gb_tbl = &(gb_tbl);
ret+="\t"+this->gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n";
}
// empty strucutred literals
- map<int, string>::iterator sii;
- for(sii=structured_types.begin();sii!=structured_types.end();++sii){
- data_type dt(sii->second);
- literal_t empty_lit(sii->first);
- ret += "\t"+dt.make_host_cvar(empty_lit.hfta_empty_literal_name())+";\n";
- }
+// map<int, string>::iterator sii;
+// for(sii=structured_types.begin();sii!=structured_types.end();++sii){
+// data_type dt(sii->second);
+// literal_t empty_lit(sii->first);
+// ret += "\t"+dt.make_host_cvar(empty_lit.hfta_empty_literal_name())+";\n";
+// }
// Constructors
if(structured_types.size()==0){
ret += "\t"+generate_functor_name() + "_groupdef(){};\n";
}else{
- ret += "\t"+generate_functor_name() + "_groupdef(){}\n";
+ ret += "\t"+generate_functor_name() + "_groupdef(){\n";
+ for(g=0;g<gb_tbl.size();g++){
+ data_type *gdt = gb_tbl.get_data_type(g);
+ if(gdt->is_buffer_type()){
+ sprintf(tmpstr,"\t\t%s(&gb_var%d);\n",
+ gdt->get_hfta_buffer_init().c_str(), g );
+ ret += tmpstr;
+ }
+ }
+ ret += "\t};\n";
}
+ ret += "\t// shallow copy constructors\n";
ret += "\t"+generate_functor_name() + "_groupdef("+
- this->generate_functor_name() + "_groupdef *gd){\n";
+ "const " + this->generate_functor_name() + "_groupdef &gd){\n";
for(g=0;g<gb_tbl.size();g++){
data_type *gdt = gb_tbl.get_data_type(g);
- if(gdt->is_buffer_type()){
- sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd->gb_var%d));\n",
- gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
- ret += tmpstr;
- }else{
- sprintf(tmpstr,"\t\tgb_var%d = gd->gb_var%d;\n",g,g);
- ret += tmpstr;
- }
+ sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g);
+ ret += tmpstr;
+// TODO : do strings perisist after the call? its a shllow copy
+// if(gdt->is_buffer_type()){
+// sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd->gb_var%d));\n",
+// gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
+// ret += tmpstr;
+// }else{
+// sprintf(tmpstr,"\t\tgb_var%d = gd->gb_var%d;\n",g,g);
+// ret += tmpstr;
+// }
}
ret += "\t}\n";
ret += "\t"+generate_functor_name() + "_groupdef("+
- this->generate_functor_name() + "_groupdef *gd, bool *pattern){\n";
+ "const " + this->generate_functor_name() + "_groupdef &gd, bool *pattern){\n";
+// -- For patterns, need empty strucutred literals
+ map<int, string>::iterator sii;
+ for(sii=structured_types.begin();sii!=structured_types.end();++sii){
+ data_type dt(sii->second);
+ literal_t empty_lit(sii->first);
+ ret += "\t"+dt.make_host_cvar(empty_lit.hfta_empty_literal_name())+";\n";
+ }
+
for(sii=structured_types.begin();sii!=structured_types.end();++sii){
literal_t empty_lit(sii->first);
ret += "\t\t"+empty_lit.to_hfta_C_code("&"+empty_lit.hfta_empty_literal_name())+";\n";
for(g=0;g<gb_tbl.size();g++){
data_type *gdt = gb_tbl.get_data_type(g);
ret += "\t\tif(pattern["+int_to_string(g)+"]){\n";
- if(gdt->is_buffer_type()){
- sprintf(tmpstr,"\t\t\t%s(&gb_var%d, &(gd->gb_var%d));\n",
- gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
- ret += tmpstr;
- }else{
- sprintf(tmpstr,"\t\t\tgb_var%d = gd->gb_var%d;\n",g,g);
- ret += tmpstr;
- }
+ sprintf(tmpstr,"\t\t\tgb_var%d = gd.gb_var%d;\n",g,g);
+ ret += tmpstr;
+// TODO Do strings persist long enough? its a shllow copy constructor?
+// if(gdt->is_buffer_type()){
+// sprintf(tmpstr,"\t\t\t%s(&gb_var%d, &(gd->gb_var%d));\n",
+// gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
+// ret += tmpstr;
+// }else{
+// sprintf(tmpstr,"\t\t\tgb_var%d = gd->gb_var%d;\n",g,g);
+// ret += tmpstr;
+// }
ret += "\t\t}else{\n";
literal_t empty_lit(gdt->type_indicator());
if(empty_lit.is_cpx_lit()){
ret += "\t\t}\n";
}
ret += "\t};\n";
+
+ ret += "\t// deep assignment operator\n";
+ ret += "\t"+generate_functor_name() + "_groupdef& operator=(const "+
+ this->generate_functor_name() + "_groupdef &gd){\n";
+ for(g=0;g<gb_tbl.size();g++){
+ data_type *gdt = gb_tbl.get_data_type(g);
+ if(gdt->is_buffer_type()){
+ sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd.gb_var%d));\n",
+ gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
+ ret += tmpstr;
+ }else{
+ sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g);
+ ret += tmpstr;
+ }
+ }
+ ret += "\t}\n";
+
// destructor
ret += "\t~"+ generate_functor_name() + "_groupdef(){\n";
for(g=0;g<gb_tbl.size();g++){
}
}
ret += "\tbool needs_temporal_flush;\n";
+ ret += "\tbool disordered_arrival;\n";
}
"}\n\n"
;
+//---------------------------------------
+// Parameterized number of tuples output per slow flush
+ ret +=
+"int gb_flush_per_tuple(){\n"
+" return "+int_to_string(hfta_slow_flush)+";\n"
+"}\n\n";
+
+
ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n";
if(hfta_disorder < 2){
if(uses_temporal_flush){
- ret+= "\tif( !( (";
+ ret+= "\tif( ( (";
bool first_one = true;
+ string disorder_test;
for(g=0;g<gb_tbl.size();g++){
data_type *gdt = gb_tbl.get_data_type(g);
if(gdt->is_temporal()){
- sprintf(tmpstr,"last_gb%d",g); string lhs_op = tmpstr;
- sprintf(tmpstr,"gbval->gb_var%d",g); string rhs_op = tmpstr;
- if(first_one){first_one = false;} else {ret += ") && (";}
- ret += generate_equality_test(lhs_op, rhs_op, gdt);
+ sprintf(tmpstr,"last_gb%d",g); string lhs_op = tmpstr;
+ sprintf(tmpstr,"gbval->gb_var%d",g); string rhs_op = tmpstr;
+ if(first_one){first_one = false;} else {ret += ") && (";}
+ ret += generate_lt_test(lhs_op, rhs_op, gdt);
+ disorder_test += generate_lt_test(rhs_op, lhs_op, gdt);
}
}
ret += ") ) ){\n";
}
}
ret += "\t\tneeds_temporal_flush=true;\n";
- ret += "\t\t}else{\n"
- "\t\t\tneeds_temporal_flush=false;\n"
- "\t\t}\n";
+ ret += "\t}else{\n"
+ "\t\tneeds_temporal_flush=false;\n"
+ "\t}\n";
+
+ ret += "\tdisordered_arrival = "+disorder_test+";\n";
+// ret += "\tif( ( ("+disorder_test+") ) ){\n";
+// ret += "\t\tdisordered_arrival=true;\n";
+// ret += "\t}else{\n";
+// ret += "\t\tdisordered_arrival=false;\n";
+// ret += "\t}\n";
+
}
}else{
ret+= "\tif(temp_tuple_received && !( (";
// For temporal status tuple we don't need to do anything else
- ret += "\tif (temp_tuple_received) return NULL;\n\n";
+ ret += "\tif (temp_tuple_received){\n";
+ ret += "\t\tdisordered_arrival = false;\n";
+ ret += "\t\treturn NULL;\n\n";
+ ret += "\t}\n";
for(w=0;w<where.size();++w){
sprintf(tmpstr,"//\t\tPredicate clause %d.\n",w);
// update an aggregate object
ret += "void update_aggregate(host_tuple &tup0, "
- +generate_functor_name()+"_groupdef *gbval, "+
- generate_functor_name()+"_aggrdef *aggval){\n";
+ +generate_functor_name()+"_groupdef &gbval, "+
+ generate_functor_name()+"_aggrdef &aggval){\n";
// Variables for execution of the function.
ret += "\tgs_int32_t problem = 0;\n"; // return unpack failure
// Unpack all remaining attributes
ret += gen_remaining_colrefs(schema, cid_set, found_cids, "", needs_xform);
for(a=0;a<aggr_tbl.size();a++){
- sprintf(tmpstr,"aggval->aggr_var%d",a);
+ sprintf(tmpstr,"aggval.aggr_var%d",a);
string varname = tmpstr;
ret.append(generate_aggr_update(varname,&aggr_tbl,a, schema));
}
}
ret += "\t};\n";
+ ret += "bool disordered(){return disordered_arrival;}\n";
+
//---------------------------------------------------
// create output tuple
// Unpack the partial functions ref'd in the where clause,
// so I'll leave it in longhand.
ret += "host_tuple create_output_tuple("
- +generate_functor_name()+"_groupdef *gbval, "+
- generate_functor_name()+"_aggrdef *aggval, bool &failed){\n";
+ +generate_functor_name()+"_groupdef &gbval, "+
+ generate_functor_name()+"_aggrdef &aggval, bool &failed){\n";
ret += "\thost_tuple tup;\n";
ret += "\tfailed = false;\n";
ret += "\tgs_retval_t retval = 0;\n";
- string gbvar = "gbval->gb_var";
- string aggvar = "aggval->";
+ string gbvar = "gbval.gb_var";
+ string aggvar = "aggval.";
// Create cached temporaries for UDAF return values.
for(a=0;a<aggr_tbl.size();a++){
ret += "struct "+generate_functor_name()+"_hash_func{\n";
ret += "\tgs_uint32_t operator()(const "+generate_functor_name()+
- "_groupdef *grp) const{\n";
+ "_groupdef &grp) const{\n";
ret += "\t\treturn( (";
for(g=0;g<gb_tbl.size();g++){
if(g>0) ret += "^";
data_type *gdt = gb_tbl.get_data_type(g);
if(gdt->use_hashfunc()){
if(gdt->is_buffer_type())
- sprintf(tmpstr,"(%s*%s(&(grp->gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
+ sprintf(tmpstr,"(%s*%s(&(grp.gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
else
- sprintf(tmpstr,"(%s*%s(grp->gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
+ sprintf(tmpstr,"(%s*%s(grp.gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
}else{
- sprintf(tmpstr,"(%s*grp->gb_var%d)",hash_nums[g%NRANDS].c_str(),g);
+ sprintf(tmpstr,"(%s*grp.gb_var%d)",hash_nums[g%NRANDS].c_str(),g);
}
ret += tmpstr;
}
// The comparison function
ret += "struct "+generate_functor_name()+"_equal_func{\n";
- ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef *grp1, "+
- generate_functor_name()+"_groupdef *grp2) const{\n";
+ ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef &grp1, "+
+ "const "+generate_functor_name()+"_groupdef &grp2) const{\n";
ret += "\t\treturn( (";
for(g=0;g<gb_tbl.size();g++){
if(g>0) ret += ") && (";
data_type *gdt = gb_tbl.get_data_type(g);
if(gdt->complex_comparison(gdt)){
- if(gdt->is_buffer_type())
- sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)",
- gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
- else
- sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)",
- gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+ if(gdt->is_buffer_type())
+ sprintf(tmpstr,"(%s(&(grp1.gb_var%d), &(grp2.gb_var%d))==0)",
+ gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
+ else
+ sprintf(tmpstr,"(%s((grp1.gb_var%d), (grp2.gb_var%d))==0)",
+ gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
}else{
- sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g);
+ sprintf(tmpstr,"grp1.gb_var%d == grp2.gb_var%d",g,g);
}
ret += tmpstr;
}
string sgah_qpn::generate_operator(int i, string params){
if(hfta_disorder < 2){
+ string op_name = "groupby_operator";
+ if(hfta_slow_flush>0)
+ op_name = "groupby_slowflush_operator";
return(
- " groupby_operator<" +
+ " "+op_name+"<" +
generate_functor_name()+","+
generate_functor_name() + "_groupdef, " +
generate_functor_name() + "_aggrdef, " +
generate_functor_name()+"_hash_func, "+
generate_functor_name()+"_equal_func "
- "> *op"+int_to_string(i)+" = new groupby_operator<"+
+ "> *op"+int_to_string(i)+" = new "+op_name+"<"+
generate_functor_name()+","+
generate_functor_name() + "_groupdef, " +
generate_functor_name() + "_aggrdef, " +
);
}
+////////////////////////////////////////////////
+/// WATCHLIST_TBL operator
+/// WATCHLIST_TBL functor
+////////////////////////////////////////////
+
+string watch_tbl_qpn::generate_functor_name(){
+ return("watch_tbl_functor_" + normalize_name(this->get_node_name()));
+}
+
+string watch_tbl_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, vector<bool> &needs_xform){
+
+ return("ERROR_WATCH_TBL_FUNCTOR_NOT_YET_IMPLEMENTED");
+}
+
+string watch_tbl_qpn::generate_operator(int i, string params){
+ return("ERROR_WATCH_TBL_FUNCTOR_NOT_YET_IMPLEMENTED");
+}
/////////////////////////////////////////////////////////
////// JOIN_EQ_HASH functor
// Deal with outer join stuff
col_id_set l_cids, r_cids;
+ col_id_set l_base_cids, r_base_cids; // l_cids and r_cids get modified
+ // to account for extra_f fields to
+ // unpack for value imputation
col_id_set::iterator ocsi;
for(ocsi=local_cids.begin();ocsi!=local_cids.end();++ocsi){
- if((*ocsi).tblvar_ref == 0) l_cids.insert((*ocsi));
- else r_cids.insert((*ocsi));
+ if((*ocsi).tblvar_ref == 0){
+ l_cids.insert((*ocsi)); l_base_cids.insert((*ocsi));
+ }else{
+ r_cids.insert((*ocsi)); r_base_cids.insert((*ocsi));
+ }
}
for(ocsi=se_cids.begin();ocsi!=se_cids.end();++ocsi){
- if((*ocsi).tblvar_ref == 0) l_cids.insert((*ocsi));
- else r_cids.insert((*ocsi));
+ if((*ocsi).tblvar_ref == 0){
+ l_cids.insert((*ocsi)); l_base_cids.insert((*ocsi));
+ }else{
+ r_cids.insert((*ocsi)); r_base_cids.insert((*ocsi));
+ }
}
ret += "\t}else if(tup0.data){\n";
string unpack_null = ""; col_id_set extra_cids;
- for(ocsi=r_cids.begin();ocsi!=r_cids.end();++ocsi){
+ for(ocsi=r_base_cids.begin();ocsi!=r_base_cids.end();++ocsi){
string field = (*ocsi).field;
if(r_equiv.count(field)){
- unpack_null+="\t\tunpack_var_"+field+"_1="+generate_se_code(r_equiv[field],schema)+";\n";
+ unpack_null+="\t\tunpack_var_"+field+"_1="+generate_se_code(r_equiv[field],schema)+"; // r_equiv\n";
get_new_se_cids(r_equiv[field],l_cids,new_cids,NULL);
}else{
int schref = (*ocsi).schema_ref;
// NB : works for string type only
// NNB: installed fix for ipv6, more of this should be pushed
// into the literal_t code.
- unpack_null+="\tunpack_var_"+field+"_1= "+empty_lit.hfta_empty_literal_name()+";\n";
+ unpack_null+="\t\tunpack_var_"+field+"_1= "+empty_lit.hfta_empty_literal_name()+"; // empty\n";
}else{
- unpack_null+="\tunpack_var_"+field+"_1="+empty_lit.to_hfta_C_code("")+";\n";
+ unpack_null+="\t\tunpack_var_"+field+"_1="+empty_lit.to_hfta_C_code("")+"; // empty\n";
}
}
}
+ ret += "// l_cids\n";
ret += gen_unpack_cids(schema, l_cids, "tup", needs_xform);
+ ret += "// extra_cids\n";
ret += gen_unpack_cids(schema, extra_cids, "tup", needs_xform);
ret += unpack_null;
ret += gen_unpack_partial_fcn(schema, partial_fcns, sl_pfcns, "tup");
ret+="\t}else{\n";
- unpack_null = ""; extra_cids.clear();
- for(ocsi=l_cids.begin();ocsi!=l_cids.end();++ocsi){
+ unpack_null = ""; extra_cids.clear(); new_cids.clear();
+ for(ocsi=l_base_cids.begin();ocsi!=l_base_cids.end();++ocsi){
string field = (*ocsi).field;
if(l_equiv.count(field)){
- unpack_null+="\t\tunpack_var_"+field+"_0="+generate_se_code(l_equiv[field],schema)+";\n";
+ unpack_null+="\t\tunpack_var_"+field+"_0="+generate_se_code(l_equiv[field],schema)+"; // l_equiv\n";
get_new_se_cids(l_equiv[field],r_cids,new_cids,NULL);
}else{
int schref = (*ocsi).schema_ref;
// NB : works for string type only
// NNB: installed fix for ipv6, more of this should be pushed
// into the literal_t code.
- unpack_null+="\tunpack_var_"+field+"_0= "+empty_lit.hfta_empty_literal_name()+";\n";
+ unpack_null+="\t\tunpack_var_"+field+"_0= "+empty_lit.hfta_empty_literal_name()+"; // empty\n";
}else{
- unpack_null+="\tunpack_var_"+field+"_0="+empty_lit.to_hfta_C_code("")+";\n";
+ unpack_null+="\t\tunpack_var_"+field+"_0="+empty_lit.to_hfta_C_code("")+"; // empty\n";
}
}
}
+ ret += "// r_cids\n";
ret += gen_unpack_cids(schema, r_cids, "tup", needs_xform);
+ ret += "// extra_cids\n";
ret += gen_unpack_cids(schema, extra_cids, "tup", needs_xform);
ret += unpack_null;
ret += gen_unpack_partial_fcn(schema, partial_fcns, sl_pfcns, "tup");
// create a temp status tuple
- ret += "int create_temp_status_tuple(const host_tuple &tup0, const host_tuple &tup1, host_tuple& result) {\n\n";
+ ret += "int create_temp_status_tuple("+this->generate_functor_name()+"_tempeqdef *lts,"+this->generate_functor_name()+"_tempeqdef *rts, host_tuple& result) {\n\n";
ret += "\tgs_retval_t retval = 0;\n";
ret += "\tgs_int32_t problem = 0;\n";
- ret += "\tif(tup0.data){\n";
-
-// Unpack all the temporal attributes references in select list
- col_id_set found_cids;
-
- for(s=0;s<select_list.size();s++){
- if (select_list[s]->se->get_data_type()->is_temporal()) {
-// Find the set of attributes accessed in this SE
- col_id_set new_cids;
- get_new_se_cids(select_list[s]->se,found_cids, new_cids, NULL);
- }
+ for(p=0;p<temporal_dt.size();p++){
+ sprintf(tmpstr,"lhs_var");
+ ret+="\t"+temporal_dt[p]->make_host_cvar(tmpstr)+";\n";
+ sprintf(tmpstr,"rhs_var");
+ ret+="\t"+temporal_dt[p]->make_host_cvar(tmpstr)+";\n";
}
- // Deal with outer join stuff
- l_cids.clear(), r_cids.clear();
- for(ocsi=found_cids.begin();ocsi!=found_cids.end();++ocsi){
- if((*ocsi).tblvar_ref == 0) l_cids.insert((*ocsi));
- else r_cids.insert((*ocsi));
+ ret += "\tif(lts!=NULL){\n";
+ for(p=0;p<temporal_dt.size();p++){
+ ret += "\t\tlhs_var = lts->tempeq_var"+to_string(p)+";\n";
}
- unpack_null = "";
- extra_cids.clear();
- for(ocsi=r_cids.begin();ocsi!=r_cids.end();++ocsi){
- string field = (*ocsi).field;
- if(r_equiv.count(field)){
- unpack_null+="\t\tunpack_var_"+field+"_1="+generate_se_code(r_equiv[field],schema)+";\n";
- col_id_set addnl_cids;
- get_new_se_cids(r_equiv[field],l_cids,addnl_cids,NULL);
- }else{
- int schref = (*ocsi).schema_ref;
- data_type dt(schema->get_type_name(schref,field));
- literal_t empty_lit(dt.type_indicator());
- if(empty_lit.is_cpx_lit()){
- sprintf(tmpstr,"&(unpack_var_%s_1)",field.c_str());
- unpack_null += "\t"+empty_lit.to_hfta_C_code(tmpstr)+";\n";
- }else{
- unpack_null+="\tunpack_var_"+field+"_1="+empty_lit.to_hfta_C_code("")+";\n";
- }
- }
+ ret += "\t}else{\n";
+ for(p=0;p<temporal_dt.size();p++){
+ ret += "\t\tlhs_var = 0;\n";
}
- ret += gen_unpack_cids(schema, l_cids, "1", needs_xform);
- ret += gen_unpack_cids(schema, extra_cids, "1", needs_xform);
- ret += unpack_null;
+ ret += "\t}\n";
- ret+="\t}else if (tup1.data) {\n";
- unpack_null = ""; extra_cids.clear();
- for(ocsi=l_cids.begin();ocsi!=l_cids.end();++ocsi){
- string field = (*ocsi).field;
- if(l_equiv.count(field)){
- unpack_null+="\t\tunpack_var_"+field+"_0="+generate_se_code(l_equiv[field],schema)+";\n";
- col_id_set addnl_cids;
- get_new_se_cids(l_equiv[field],r_cids,addnl_cids,NULL);
- }else{
- int schref = (*ocsi).schema_ref;
- data_type dt(schema->get_type_name(schref,field));
- literal_t empty_lit(dt.type_indicator());
- if(empty_lit.is_cpx_lit()){
- sprintf(tmpstr,"&(unpack_var_%s_0)",field.c_str());
- unpack_null += "\t"+empty_lit.to_hfta_C_code(tmpstr)+";\n";
- }else{
- unpack_null+="\tunpack_var_"+field+"_0="+empty_lit.to_hfta_C_code("")+";\n";
- }
- }
+ ret += "\tif(rts!=NULL){\n";
+ for(p=0;p<temporal_dt.size();p++){
+ ret += "\t\trhs_var = rts->tempeq_var"+to_string(p)+";\n";
}
- ret += gen_unpack_cids(schema, r_cids, "1", needs_xform);
- ret += gen_unpack_cids(schema, extra_cids, "1", needs_xform);
- ret += unpack_null;
- ret+="\t}\n";
+ ret += "\t}else{\n";
+ for(p=0;p<temporal_dt.size();p++){
+ ret += "\t\trhs_var = 0;\n";
+ }
+ ret += "\t}\n";
ret += gen_init_temp_status_tuple(this->get_node_name());
// Start packing.
- ret += "//\t\tPack the fields into the tuple.\n";
- ret += gen_pack_tuple(schema,select_list,this->get_node_name(), true );
+
+
+// This is checked in the query analyzer so I think its safe,
+// But a lot of older code has complex code to propagate multiple
+// timestamps
+ for(s=0;s<select_list.size();s++){
+ scalarexp_t *se = select_list[s]->se;
+ data_type *sdt = se->get_data_type();
+ if(sdt->is_temporal()){
+ string target = "\ttuple->tuple_var"+to_string(s)+" = ";
+ if(from[0]->get_property()==0 && from[1]->get_property()==0){ // INNER
+ ret += target+"(lhs_var>rhs_var ? lhs_var : rhs_var); // INNER\n";
+ }
+ if(from[0]->get_property()!=0 && from[1]->get_property()==0){ // LEFT
+ ret += target+"lhs_var; // LEFT\n";
+// ret += target+"rhs_var; // LEFT\n";
+ }
+ if(from[0]->get_property()==0 && from[1]->get_property()!=0){ // RIGHT
+ ret += target+"rhs_var; // RIGHT\n";
+// ret += target+"lhs_var; // RIGHT\n";
+ }
+ if(from[0]->get_property()!=0 && from[1]->get_property()!=0){ // OUTER
+ ret += target+"(lhs_var<rhs_var ? lhs_var : rhs_var); // OUTER\n";
+ }
+ }
+ }
ret += "\treturn 0;\n";
if(hashkey_dt[p]->complex_comparison(hashkey_dt[p])){
if(hashkey_dt[p]->is_buffer_type())
sprintf(tmpstr,"(%s(&(key1->hashkey_var%d), &(key2->hashkey_var%d))==0)",
- hashkey_dt[p]->get_hfta_comparison_fcn(hashkey_dt[p]).c_str(),p,p);
+ hashkey_dt[p]->get_hfta_equals_fcn(hashkey_dt[p]).c_str(),p,p);
else
sprintf(tmpstr,"(%s((key1->hashkey_var%d), (key2->hashkey_var%d))==0)",
- hashkey_dt[p]->get_hfta_comparison_fcn(hashkey_dt[p]).c_str(),p,p);
+ hashkey_dt[p]->get_hfta_equals_fcn(hashkey_dt[p]).c_str(),p,p);
}else{
sprintf(tmpstr,"key1->hashkey_var%d == key2->hashkey_var%d",p,p);
}
if(gdt->complex_comparison(gdt)){
if(gdt->is_buffer_type())
sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)",
- gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+ gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
else
sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)",
- gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+ gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
}else{
sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g);
}
if(gdt->complex_comparison(gdt)){
if(gdt->is_buffer_type())
sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)",
- gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+ gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
else
sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)",
- gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+ gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
}else{
sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g);
}
ret+="\t"+this->gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n";
}
// Constructors
- ret += "\t"+generate_functor_name() + "_groupdef(){};\n";
- ret += "\t"+generate_functor_name() + "_groupdef("+
- this->generate_functor_name() + "_groupdef *gd){\n";
+
+ ret += "\t"+generate_functor_name() + "_groupdef(){\n";
for(g=0;g<gb_tbl.size();g++){
data_type *gdt = gb_tbl.get_data_type(g);
if(gdt->is_buffer_type()){
- sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd->gb_var%d));\n",
- gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
- ret += tmpstr;
- }else{
- sprintf(tmpstr,"\t\tgb_var%d = gd->gb_var%d;\n",g,g);
+ sprintf(tmpstr,"\t\t%s(&gb_var%d);\n",
+ gdt->get_hfta_buffer_init().c_str(), g );
ret += tmpstr;
}
}
ret += "\t};\n";
+
+ ret += "\t// shallow copy constructor\n";
+ ret += "\t"+generate_functor_name() + "_groupdef("+
+ this->generate_functor_name() + "_groupdef &gd){\n";
+ for(g=0;g<gb_tbl.size();g++){
+ data_type *gdt = gb_tbl.get_data_type(g);
+ sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g);
+ ret += tmpstr;
+ }
+ ret += "\t};\n";
+
+ ret += "\t// deep assignment operator\n";
+ ret += "\t"+generate_functor_name() + "_groupdef& operator=(const "+
+ this->generate_functor_name() + "_groupdef &gd){\n";
+ for(g=0;g<gb_tbl.size();g++){
+ data_type *gdt = gb_tbl.get_data_type(g);
+ if(gdt->is_buffer_type()){
+ sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd.gb_var%d));\n",
+ gdt->get_hfta_buffer_assign_copy().c_str(),g,g );
+ ret += tmpstr;
+ }else{
+ sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g);
+ ret += tmpstr;
+ }
+ }
+ ret += "\t}\n";
+
// destructor
ret += "\t~"+ generate_functor_name() + "_groupdef(){\n";
for(g=0;g<gb_tbl.size();g++){
for(g=0;g<gb_tbl.size();g++){
data_type *gdt = gb_tbl.get_data_type(g);
if(gdt->is_temporal()){
- sprintf(tmpstr,"last_gb%d",g);
+ sprintf(tmpstr,"curr_gb%d",g);
ret+="\t"+gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n";
- sprintf(tmpstr,"last_flushed_gb%d",g);
+ sprintf(tmpstr,"last_gb%d",g);
ret+="\t"+gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n";
}
}
- ret += "\tbool needs_temporal_flush;\n";
+ ret += "\tgs_int32_t needs_temporal_flush;\n";
+ ret += "\tbool disordered_arrival;\n";
}
// The publicly exposed functions
data_type *gdt = gb_tbl.get_data_type(g);
if(gdt->is_temporal()){
literal_t gl(gdt->type_indicator());
+ sprintf(tmpstr,"\tcurr_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str());
+ ret.append(tmpstr);
sprintf(tmpstr,"\tlast_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str());
ret.append(tmpstr);
}
}
- ret += "\tneeds_temporal_flush = false;\n";
+ ret += "\tneeds_temporal_flush = 0;\n";
}
// Init temporal attributes referenced in select list
// set flush indicator and update stored GB vars if there is any change.
if(uses_temporal_flush){
- ret+= "\tif( !( (";
+ ret+= "\tif( ( (";
bool first_one = true;
+ string disorder_test;
for(g=0;g<gb_tbl.size();g++){
data_type *gdt = gb_tbl.get_data_type(g);
if(gdt->is_temporal()){
- sprintf(tmpstr,"last_gb%d",g); string lhs_op = tmpstr;
+ sprintf(tmpstr,"curr_gb%d",g); string lhs_op = tmpstr;
sprintf(tmpstr,"gbval->gb_var%d",g); string rhs_op = tmpstr;
if(first_one){first_one = false;} else {ret += ") && (";}
- ret += generate_equality_test(lhs_op, rhs_op, gdt);
+ ret += generate_lt_test(lhs_op, rhs_op, gdt);
+ disorder_test += generate_lt_test(rhs_op, lhs_op, gdt);
}
}
ret += ") ) ){\n";
+ int temporal_gb=-1;
for(g=0;g<gb_tbl.size();g++){
data_type *gdt = gb_tbl.get_data_type(g);
if(gdt->is_temporal()){
if(gdt->is_buffer_type()){
- sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g);
+ sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&curr_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g);
}else{
- sprintf(tmpstr,"\t\tlast_flushed_gb%d = last_gb%d;\n",g,g);
- ret += tmpstr;
- sprintf(tmpstr,"\t\tlast_gb%d = gbval->gb_var%d;\n",g,g);
+// sprintf(tmpstr,"\t\tlast_gb%d = curr_gb%d;\n",g,g);
+// ret += tmpstr;
+// sprintf(tmpstr,"\t\tcurr_gb%d = gbval->gb_var%d;\n",g,g);
+
+ ret += "\t\tif(curr_gb"+to_string(g)+"==0){\n";
+ ret += "\t\t\tlast_gb"+to_string(g)+" = gbval->gb_var"+to_string(g)+";\n";
+ ret += "\t\t}else{\n";
+ ret += "\t\t\tlast_gb"+to_string(g)+" = curr_gb"+to_string(g)+";\n";
+ ret += "\t\t}\n";
+ sprintf(tmpstr,"\t\tcurr_gb%d = gbval->gb_var%d;\n",g,g);
+ temporal_gb=g;
}
ret += tmpstr;
}
}
- ret += "\t\tneeds_temporal_flush=true;\n";
- ret += "\t\t}else{\n"
- "\t\t\tneeds_temporal_flush=false;\n"
- "\t\t}\n";
+ ret += "\t\tneeds_temporal_flush = curr_gb"+to_string (temporal_gb)+" - last_gb"+to_string(temporal_gb)+";\n";
+ ret += "\t}else{\n"
+ "\t\tneeds_temporal_flush=0;\n"
+ "\t}\n";
+
+ ret += "\tdisordered_arrival = "+disorder_test+";\n";
+// ret += "\tif( ( ("+disorder_test+") ) ){\n";
+// ret += "\t\tdisordered_arrival=true;\n";
+// ret += "\t}else{\n";
+// ret += "\t\tdisordered_arrival=false;\n";
+// ret += "\t}\n";
}
// update an aggregate object
ret += "void update_aggregate(host_tuple &tup0, "
- +generate_functor_name()+"_groupdef *gbval, "+
- generate_functor_name()+"_aggrdef *aggval){\n";
+ +generate_functor_name()+"_groupdef &gbval, "+
+ generate_functor_name()+"_aggrdef &aggval){\n";
// Variables for execution of the function.
ret += "\tgs_int32_t problem = 0;\n"; // return unpack failure
// Unpack all remaining attributes
ret += gen_remaining_colrefs(schema, cid_set, found_cids, "", needs_xform);
for(a=0;a<aggr_tbl.size();a++){
- sprintf(tmpstr,"aggval->aggr_var%d",a);
+ sprintf(tmpstr,"aggval.aggr_var%d",a);
string varname = tmpstr;
ret.append(generate_aggr_update(varname,&aggr_tbl,a, schema));
}
// reinitialize an aggregate object
ret += "void reinit_aggregates( "+
- generate_functor_name()+"_groupdef *gbval, "+
- generate_functor_name()+"_aggrdef *aggval){\n";
+ generate_functor_name()+"_groupdef &gbval, "+
+ generate_functor_name()+"_aggrdef &aggval){\n";
// Variables for execution of the function.
ret += "\tgs_int32_t problem = 0;\n"; // return unpack failure
// use of temporaries depends on the aggregate,
// generate them in generate_aggr_update
+ int temporal_gb; // track the # of the temporal gb
for(g=0;g<gb_tbl.size();g++){
data_type *gdt = gb_tbl.get_data_type(g);
if(gdt->is_temporal()){
if(gdt->is_buffer_type()){
- sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g);
+ sprintf(tmpstr,"\t\t%s(&(gbval.gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g);
}else{
- sprintf(tmpstr,"\t\t gbval->gb_var%d =last_gb%d;\n",g,g);
+ sprintf(tmpstr,"\t\t gbval.gb_var%d =last_gb%d;\n",g,g);
}
ret += tmpstr;
+ temporal_gb = g;
}
}
// Unpack all remaining attributes
for(a=0;a<aggr_tbl.size();a++){
- sprintf(tmpstr,"aggval->aggr_var%d",a);
+ sprintf(tmpstr,"aggval.aggr_var%d",a);
string varname = tmpstr;
ret.append(generate_aggr_reinitialize(varname,&aggr_tbl,a, schema));
}
//---------------------------------------------------
// Flush test
- ret += "\tbool flush_needed(){\n";
+ ret += "gs_int32_t flush_needed(){\n";
if(uses_temporal_flush){
- ret += "\t\treturn needs_temporal_flush;\n";
+ ret += "\treturn needs_temporal_flush;\n";
}else{
- ret += "\t\treturn false;\n";
+ ret += "\treturn false;\n";
}
- ret += "\t};\n";
+ ret += "};\n";
+
+ ret += "bool disordered(){return disordered_arrival;}\n";
+
+//------------------------------------------------
+// time bucket management
+ ret += "void advance_last_tb(){\n";
+ ret += "\tlast_gb"+to_string(temporal_gb)+"++;\n";
+ ret += "}\n\n";
+ ret += "void reset_last_tb(){\n";
+ ret += "\tlast_gb"+to_string(temporal_gb)+" = curr_gb"+to_string(temporal_gb)+";\n";
+ ret += "}\n\n";
//---------------------------------------------------
// create output tuple
// so I'll leave it in longhand.
ret += "host_tuple create_output_tuple("
- +generate_functor_name()+"_groupdef *gbval, "+
- generate_functor_name()+"_aggrdef *aggval, bool &failed){\n";
+ +generate_functor_name()+"_groupdef &gbval, "+
+ generate_functor_name()+"_aggrdef &aggval, bool &failed){\n";
ret += "\thost_tuple tup;\n";
ret += "\tfailed = false;\n";
ret += "\tgs_retval_t retval = 0;\n";
- string gbvar = "gbval->gb_var";
- string aggvar = "aggval->";
+ string gbvar = "gbval.gb_var";
+ string aggvar = "aggval.";
// First, get the return values from the UDAFS
// been unpacked. delete the string udaf return values at the end.
ret += "bool cleaning_when("
- +generate_functor_name()+"_groupdef *gbval, "+
- generate_functor_name()+"_aggrdef *aggval){\n";
+ +generate_functor_name()+"_groupdef &gbval, "+
+ generate_functor_name()+"_aggrdef &aggval){\n";
ret += "\tbool retval = true;\n";
- gbvar = "gbval->gb_var";
- aggvar = "aggval->";
+ gbvar = "gbval.gb_var";
+ aggvar = "aggval.";
set<int> clw_pfcns;
if(sdt->is_temporal()){
sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
ret += tmpstr;
- sprintf(tmpstr,"(flush_finished) ? %s : %s ", generate_se_code(select_list[s]->se,schema).c_str(), generate_se_code_fm_aggr(select_list[s]->se,"last_flushed_gb", "", schema).c_str());
+ sprintf(tmpstr,"(flush_finished) ? %s : %s ", generate_se_code(select_list[s]->se,schema).c_str(), generate_se_code_fm_aggr(select_list[s]->se,"last_gb", "", schema).c_str());
ret += tmpstr;
ret += ";\n";
}
ret += "struct "+generate_functor_name()+"_hash_func{\n";
ret += "\tgs_uint32_t operator()(const "+generate_functor_name()+
- "_groupdef *grp) const{\n";
+ "_groupdef &grp) const{\n";
ret += "\t\treturn(0";
for(g=0;g<gb_tbl.size();g++){
data_type *gdt = gb_tbl.get_data_type(g);
ret += "^";
if(gdt->use_hashfunc()){
if(gdt->is_buffer_type())
- sprintf(tmpstr,"(%s*%s(&(grp->gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
+ sprintf(tmpstr,"(%s*%s(&(grp.gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
else
- sprintf(tmpstr,"(%s*%s(grp->gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
+ sprintf(tmpstr,"(%s*%s(grp.gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g);
}else{
- sprintf(tmpstr,"(%s*grp->gb_var%d)",hash_nums[g%NRANDS].c_str(),g);
+ sprintf(tmpstr,"(%s*grp.gb_var%d)",hash_nums[g%NRANDS].c_str(),g);
}
ret += tmpstr;
}
// The comparison function
ret += "struct "+generate_functor_name()+"_equal_func{\n";
- ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef *grp1, "+
- generate_functor_name()+"_groupdef *grp2) const{\n";
+ ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef &grp1, "+
+ "const "+generate_functor_name()+"_groupdef &grp2) const{\n";
ret += "\t\treturn( (";
string hcmpr = "";
if(first_exec){first_exec=false;}else{ hcmpr += ") && (";}
if(gdt->complex_comparison(gdt)){
if(gdt->is_buffer_type())
- sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)",
- gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+ sprintf(tmpstr,"(%s(&(grp1.gb_var%d), &(grp2.gb_var%d))==0)",
+ gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
else
- sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)",
- gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g);
+ sprintf(tmpstr,"(%s((grp1.gb_var%d), (grp2.gb_var%d))==0)",
+ gdt->get_hfta_equals_fcn(gdt).c_str(),g,g);
}else{
- sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g);
+ sprintf(tmpstr,"grp1.gb_var%d == grp2.gb_var%d",g,g);
}
hcmpr += tmpstr;
}
}
}
+void watch_join_qpn::create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){
+ int i;
+ vector<map<string, scalarexp_t *> *> src_vec;
+
+ for(i=0;i<q_sources.size();i++){
+ if(q_sources[i] != NULL)
+ src_vec.push_back(q_sources[i]->get_protocol_se());
+ else
+ src_vec.push_back(NULL);
+ }
+
+ for(i=0;i<select_list.size();i++){
+ protocol_map[select_list[i]->name] = resolve_protocol_se(select_list[i]->se,src_vec,NULL,Schema);
+ }
+
+ for(i=0;i<key_flds.size();i++){
+ string kfld = key_flds[i];
+ hash_src_l.push_back(resolve_protocol_se(hash_eq[kfld]->pr->get_left_se(),src_vec,NULL,Schema));
+ hash_src_r.push_back(resolve_protocol_se(hash_eq[kfld]->pr->get_right_se(),src_vec,NULL,Schema));
+ }
+}
+
+
void sgah_qpn::create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){
int i;
vector<map<string, scalarexp_t *> *> src_vec;
for(s=1;s<src_vec.size() && match;s++){
pse_map = src_vec[s];
scalarexp_t *match_se = (*pse_map)[fld_nm];
- if(match_se == false)
+ if(match_se == NULL)
match = false;
else
match = is_equivalent_se_base(first_se, match_se, Schema);
protocol_map[fld_nm] = NULL;
}
}
+
+void watch_tbl_qpn::create_protocol_se(vector<qp_node *> q_sources, table_list *Schema){
+ return;
+}
+