X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=src%2Fftacmp%2Fquery_plan.cc;h=a746e44f34ec386f942095d70f95d1625d5ae4d9;hb=393a42a5b1ba6e64bd3eabf7d0ce2f197e966355;hp=c84edb97782b4b483b7ea56c01a3626fd7741461;hpb=e981e864b812c938d3df8b555b6bb98bb89273e7;p=com%2Fgs-lite.git diff --git a/src/ftacmp/query_plan.cc b/src/ftacmp/query_plan.cc index c84edb9..a746e44 100644 --- a/src/ftacmp/query_plan.cc +++ b/src/ftacmp/query_plan.cc @@ -98,16 +98,76 @@ mrg_qpn::mrg_qpn(filter_join_qpn *spx, std::string n_name, std::vector &sources, std::vector > &ifaces, ifq_t *ifdb){ + param_tbl = spx->param_tbl; + int i; + node_name = n_name; + field_entry_list *fel = new field_entry_list(); + merge_fieldpos = -1; + + disorder = 1; + + for(i=0;iselect_list.size();++i){ + data_type *dt = spx->select_list[i]->se->get_data_type()->duplicate(); + if(dt->is_temporal()){ + if(merge_fieldpos < 0){ + merge_fieldpos = i; + }else{ + fprintf(stderr,"Warning: Merge subquery %s found two temporal fields (%s, %s), using %s\n", n_name.c_str(), spx->select_list[merge_fieldpos]->name.c_str(), spx->select_list[i]->name.c_str(), spx->select_list[merge_fieldpos]->name.c_str() ); + dt->reset_temporal(); + } + } + + field_entry *fe = dt->make_field_entry(spx->select_list[i]->name); + fel->append_field(fe); + delete dt; + } + if(merge_fieldpos<0){ + fprintf(stderr,"ERROR, no temporal attribute for merge subquery %s\n",n_name.c_str()); + exit(1); + } + table_layout = new table_def( n_name.c_str(), NULL, NULL, fel, STREAM_SCHEMA); + +// NEED TO HANDLE USER_SPECIFIED SLACK + this->resolve_slack(spx->select_list[merge_fieldpos]->se, + spx->select_list[merge_fieldpos]->name, ifaces, ifdb,NULL); +// if(this->slack == NULL) +// fprintf(stderr,"Zero slack.\n"); +// else +// fprintf(stderr,"slack is %s\n",slack->to_string().c_str()); + + for(i=0;iselect_list[merge_fieldpos]->name.c_str())); + mvars[i]->set_tablevar_ref(i); + fm.push_back(new tablevar_t(sources[i].c_str())); + fm[i]->set_range_var(rvar); + } + + param_tbl = new param_table(); + std::vector param_names = spx->param_tbl->get_param_names(); + int pi; + for(pi=0;piparam_tbl->get_data_type(param_names[pi]); + param_tbl->add_param(param_names[pi],dt->duplicate(), + spx->param_tbl->handle_access(param_names[pi])); + } + definitions = spx->definitions; + +} + + + // This function translates an analyzed parse tree // into one or more query nodes (qp_node). // Currently only one node is created, but some query // fragments might create more than one query node, -// e.g. aggregation over a joim, or nested subqueries -// in the FROM clause (unless this is handles at parse tree +// e.g. aggregation over a join, or nested subqueries +// in the FROM clause (unless this is handled at parse tree // analysis time). At this stage, they will be linked // by the names in the FROM clause. -// INVARIANT : if mroe than one query node is returned, +// INVARIANT : if more than one query node is returned, // the last one represents the output of the query. vector create_query_nodes(query_summary_class *qs,table_list *Schema){ @@ -121,6 +181,16 @@ vector create_query_nodes(query_summary_class *qs,table_list *Schema) // into the qp_node constructors, // and have this code focus on building the query plan tree. +// Watchlist node + if(qs->query_type == WATCHLIST_QUERY){ + watch_tbl_qpn *watchnode = new watch_tbl_qpn(qs, Schema); + +// Done + plan_root = watchnode; + local_plan.push_back(watchnode); + } + + // MERGE node if(qs->query_type == MERGE_QUERY){ mrg_qpn *merge_node = new mrg_qpn(qs,Schema); @@ -159,7 +229,9 @@ printf("\n"); */ - } else{ + } + + if(qs->query_type == SELECT_QUERY){ // Select / Aggregation / Join if(qs->gb_tbl->size() == 0 && qs->aggr_tbl->size() == 0){ @@ -175,9 +247,15 @@ printf("\n"); plan_root = join_node; local_plan.push_back(join_node); }else{ - join_eq_hash_qpn *join_node = new join_eq_hash_qpn(qs,Schema); - plan_root = join_node; - local_plan.push_back(join_node); + if(qs->fta_tree->get_from()->get_properties() == WATCHLIST_JOIN_PROPERTY){ + watch_join_qpn *join_node = new watch_join_qpn(qs,Schema); + plan_root = join_node; + local_plan.push_back(join_node); + }else{ + join_eq_hash_qpn *join_node = new join_eq_hash_qpn(qs,Schema); + plan_root = join_node; + local_plan.push_back(join_node); + } } } }else{ @@ -2077,6 +2155,20 @@ string sgahcwcb_qpn::to_query_string(){ return(ret); } +string watch_tbl_qpn::to_query_string(){ + string ret; +// ret += "DEFINE {\n"; +// ret += "\tfilename='"+filename+";\n"; +// ret += "\trefresh_interval="+to_string(refresh_interval)+";\n}\n"; + ret += "WATCHLIST FIELDS {\n"; + std::vector fields = table_layout->get_fields(); + for(int f=0;fto_string()+"\n"; + } + ret += "}\n"; + + return ret; +} string mrg_qpn::to_query_string(){ @@ -2181,6 +2273,42 @@ string filter_join_qpn::to_query_string(){ return(ret); } +string watch_join_qpn::to_query_string(){ + + string ret = "Select "; + int s; + for(s=0;s0) ret+=", "; + ret += se_to_query_string(select_list[s]->se, NULL); + if(select_list[s]->name != "") ret += " AS "+select_list[s]->name; + } + ret += "\n"; + +// NOTE: assuming binary join. + ret += "WATCHLIST_JOIN "; + + ret += "From "; + int f; + for(f=0;f0) ret+=", "; + ret += from[f]->to_string(); + } + ret += "\n"; + + if(where.size() > 0){ + ret += "Where "; + int w; + for(w=0;w0) ret += " AND "; + ret += "(" + pred_to_query_str(where[w]->pr,NULL) + ")"; + } + ret += "\n"; + } + + return(ret); +} + + // ----------------------------------------------------------------- // Query node subclass specific processing. @@ -2390,6 +2518,18 @@ void mrg_qpn::resolve_slack(scalarexp_t *t_se, string fname, vector watch_tbl_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){ + // nothing to do, nothing to split, return copy of self. + + hfta_returned = 0; + + vector ret_vec; + + ret_vec.push_back(this); + return(ret_vec); + +} + vector mrg_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){ // nothing to do, nothing to split, return copy of self. @@ -2429,7 +2569,7 @@ vector filter_join_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, ta vector sel_names; vector > ifaces = get_ifaces(from[0], ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx); if (ifaces.empty()) { - fprintf(stderr,"INTERNAL ERROR in filter_join_qpn::split_node_for_fta - empty interface set\n"); + fprintf(stderr,"INTERNAL ERROR in filter_join_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str()); exit(1); } @@ -2481,35 +2621,35 @@ vector filter_join_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, ta } for(p=0;ppr, NULL); + predicate_t *new_pr = dup_pr(shared_pred[p]->pr, NULL); cnf_elem *new_cnf = new cnf_elem(new_pr); analyze_cnf(new_cnf); fta_node->shared_pred.push_back(new_cnf); fta_node->where.push_back(new_cnf); } for(p=0;ppr, NULL); + predicate_t *new_pr = dup_pr(pred_t0[p]->pr, NULL); cnf_elem *new_cnf = new cnf_elem(new_pr); analyze_cnf(new_cnf); fta_node->pred_t0.push_back(new_cnf); fta_node->where.push_back(new_cnf); } for(p=0;ppr, NULL); + predicate_t *new_pr = dup_pr(pred_t1[p]->pr, NULL); cnf_elem *new_cnf = new cnf_elem(new_pr); analyze_cnf(new_cnf); fta_node->pred_t1.push_back(new_cnf); fta_node->where.push_back(new_cnf); } for(p=0;ppr, NULL); + predicate_t *new_pr = dup_pr(hash_eq[p]->pr, NULL); cnf_elem *new_cnf = new cnf_elem(new_pr); analyze_cnf(new_cnf); fta_node->hash_eq.push_back(new_cnf); fta_node->where.push_back(new_cnf); } for(p=0;ppr, NULL); + predicate_t *new_pr = dup_pr(postfilter[p]->pr, NULL); cnf_elem *new_cnf = new cnf_elem(new_pr); analyze_cnf(new_cnf); fta_node->postfilter.push_back(new_cnf); @@ -2543,6 +2683,155 @@ vector filter_join_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, ta } + + + + +vector watch_join_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){ + vector ret_vec; + +// First check if the query can be pushed to the FTA. + bool fta_ok = true; + int s; + for(s=0;sse,NULL, Ext_fcns); + } + int p; + for(p=0;ppr,NULL, Ext_fcns); + } + + if(!fta_ok){ + fprintf(stderr,"ERROR, watchlist join %s is fta-unsafe.\n",node_name.c_str()); + exit(1); + } + +// Can it be done in a single lfta? +// Get the set of interfaces it accesses. + int ierr; + int si; + vector sel_names; + vector > ifaces = get_ifaces(from[0], ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx); + if (ifaces.empty()) { + fprintf(stderr,"INTERNAL ERROR in filter_join_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str()); + exit(1); + } + + if(ifaces.size() == 1){ +// Single interface, no need to merge. + hfta_returned = 0; + ret_vec.push_back(this); + +// Treat the range vars a bit differently, the 2nd is reading from a _local_ watchlist. + from[0]->set_machine(ifaces[0].first); + from[0]->set_interface(ifaces[0].second); + from[0]->set_ifq(false); + + from[1]->set_machine(ifaces[0].first); + from[1]->set_interface("_local_"); + from[1]->set_ifq(false); + + return(ret_vec); + }else{ +// Multiple interfaces, generate the interface-specific queries plus +// the merge. + hfta_returned = 1; + + vector sel_names; + for(si=0;siset_node_name( node_name ); + else{ + string new_name = "_"+node_name+"_"+ifaces[si].first+"_"+ifaces[si].second; + untaboo(new_name); + fta_node->set_node_name(new_name); + } + sel_names.push_back(fta_node->get_node_name()); + +// Assign the table + int f; + for(f=0;ffrom.push_back(from[f]->duplicate()); + fta_node->from[f]->set_machine(ifaces[si].first); + if(f==0) + fta_node->from[f]->set_interface(ifaces[si].second); + else + fta_node->from[f]->set_interface("_local_"); + fta_node->from[f]->set_ifq(false); + } + + for(s=0;sselect_list.push_back( dup_select(select_list[s], NULL) ); + } + + for(p=0;ppr, NULL); + cnf_elem *new_cnf = new cnf_elem(new_pr); + analyze_cnf(new_cnf); + fta_node->pred_t0.push_back(new_cnf); + fta_node->where.push_back(new_cnf); + } + for(p=0;ppr, NULL); + cnf_elem *new_cnf = new cnf_elem(new_pr); + analyze_cnf(new_cnf); + fta_node->pred_t1.push_back(new_cnf); + fta_node->where.push_back(new_cnf); + } + for(p=0;ppr, NULL); + cnf_elem *new_cnf = new cnf_elem(new_pr); + analyze_cnf(new_cnf); + fta_node->hash_eq[k] = new_cnf; + fta_node->where.push_back(new_cnf); + } + for(p=0;ppr, NULL); + cnf_elem *new_cnf = new cnf_elem(new_pr); + analyze_cnf(new_cnf); + fta_node->postfilter.push_back(new_cnf); + fta_node->where.push_back(new_cnf); + } + for(p=0;ppr, NULL); + cnf_elem *new_cnf = new cnf_elem(new_pr); + analyze_cnf(new_cnf); + fta_node->postfilter.push_back(new_cnf); + fta_node->where.push_back(new_cnf); + } + fta_node->key_flds = key_flds; + +// Xfer all of the parameters. +// Use existing handle annotations. + vector param_names = param_tbl->get_param_names(); + int pi; + for(pi=0;piget_data_type(param_names[pi]); + fta_node->param_tbl->add_param(param_names[pi],dt->duplicate(), + param_tbl->handle_access(param_names[pi])); + } + fta_node->definitions = definitions; + if(fta_node->resolve_if_params(ifdb, this->err_str)){ + this->error_code = 3; + return ret_vec; + } + + ret_vec.push_back(fta_node); + } + + mrg_qpn *mrg_node = new mrg_qpn((watch_join_qpn *)ret_vec[0], + node_name, sel_names,ifaces, ifdb); + ret_vec.push_back(mrg_node); + + return(ret_vec); + } + +} + // Use to search for unresolved interface param refs in an hfta. int spx_qpn::count_ifp_refs(set &ifpnames){ @@ -2606,6 +2895,10 @@ int rsgah_qpn::count_ifp_refs(set &ifpnames){ return ret; } +int watch_tbl_qpn::count_ifp_refs(set &ifpnames){ + return 0; +} + int mrg_qpn::count_ifp_refs(set &ifpnames){ return 0; } @@ -2638,6 +2931,17 @@ int filter_join_qpn::count_ifp_refs(set &ifpnames){ return ret; } +int watch_join_qpn::count_ifp_refs(set &ifpnames){ + int ret = 0; + int i; + for(i=0;ise,ifpnames); + for(i=0;ipr,ifpnames); + return ret; +} + + // Resolve interface params to string literals int filter_join_qpn::resolve_if_params( ifq_t *ifdb, string &err){ @@ -2654,6 +2958,20 @@ int filter_join_qpn::resolve_if_params( ifq_t *ifdb, string &err){ return ret; } +int watch_join_qpn::resolve_if_params( ifq_t *ifdb, string &err){ + int ret = 0; + int i; + string ifname = from[0]->get_interface(); + string ifmach = from[0]->get_machine(); + for(i=0;ise,ifmach, ifname, ifdb, err) ) + ret = 1; + for(i=0;ipr,ifmach, ifname, ifdb, err)) + ret = 1; + return ret; +} + int spx_qpn::resolve_if_params( ifq_t *ifdb, string &err){ int ret = 0; @@ -2784,7 +3102,7 @@ vector spx_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_list vector sel_names; vector > ifaces = get_ifaces(table_name, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx); if (ifaces.empty()) { - fprintf(stderr,"INTERNAL ERROR in spx_qpn::split_node_for_fta - empty interface set\n"); + fprintf(stderr,"INTERNAL ERROR in spx_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str()); exit(1); } @@ -3138,7 +3456,7 @@ vector rsgah_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_li vector sel_names; vector > ifaces = get_ifaces(table_name, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx); if (ifaces.empty()) { - fprintf(stderr,"INTERNAL ERROR in rsgah_qpn::split_node_for_fta - empty interface set\n"); + fprintf(stderr,"INTERNAL ERROR in rsgah_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str()); exit(1); } @@ -3556,7 +3874,7 @@ vector rsgah_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_li new_opl.push_back(new_se); } } - stream_node->aggr_tbl.add_aggr(aggr_tbl.get_op(a), aggr_tbl.get_fcn_id(a), new_opl, aggr_tbl.get_storage_type(a),false, false,aggr_tbl.has_bailout(a)); + stream_node->aggr_tbl.add_aggr(aggr_tbl.get_op(a), aggr_tbl.get_fcn_id(a), new_opl, aggr_tbl.get_storage_type(a),aggr_tbl.is_superaggr(a), aggr_tbl.is_running_aggr(a),aggr_tbl.has_bailout(a)); hse = new scalarexp_t(aggr_tbl.get_op(a).c_str(),new_opl); hse->set_data_type(Ext_fcns->get_fcn_dt(aggr_tbl.get_fcn_id(a))); hse->set_fcn_id(aggr_tbl.get_fcn_id(a)); @@ -3805,7 +4123,7 @@ vector sgah_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_lis vector sel_names; vector > ifaces = get_ifaces(table_name, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx); if (ifaces.empty()) { - fprintf(stderr,"INTERNAL ERROR in sgah_qpn::split_node_for_fta - empty interface set\n"); + fprintf(stderr,"INTERNAL ERROR in sgah_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str()); exit(1); } @@ -4014,12 +4332,12 @@ vector sgah_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_lis stream_node->set_node_name( node_name ); stream_node->table_name->set_range_var(table_name->get_var_name()); -// allowed stream disorder. Default is 2, +// allowed stream disorder. Default is 1, // can override with max_lfta_disorder setting. // Also limit the hfta disorder, set to lfta disorder + 1. // can override with max_hfta_disorder. - fta_node->lfta_disorder = 2; + fta_node->lfta_disorder = 1; if(this->get_val_of_def("max_lfta_disorder") != ""){ int d = atoi(this->get_val_of_def("max_lfta_disorder").c_str() ); if(d<1){ @@ -4046,6 +4364,7 @@ printf("node %s setting lfta_disorder = %d\n",node_name.c_str(),fta_node->lfta_d } } + // First, process the group-by variables. // The fta must supply the values of all the gbvars. // If a gb is computed, the computation must be @@ -4596,6 +4915,7 @@ vector join_eq_hash_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, t child_qpn->table_name = new tablevar_t( from[f]->get_interface().c_str(), from[f]->get_schema_name().c_str(), from[f]->get_ifq()); child_qpn->table_name->set_range_var(from[f]->get_var_name()); + child_qpn->table_name->set_machine(from[f]->get_machine()); child_vec.push_back(child_qpn); select_vec.push_back(&(child_qpn->select_list)); @@ -4735,7 +5055,7 @@ vector join_eq_hash_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, t spx_qpn *c_node = child_vec[f]; vector > ifaces = get_ifaces(c_node->table_name, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx); if (ifaces.empty()) { - fprintf(stderr,"INTERNAL ERROR in join_eq_hash_qpn::split_node_for_fta - empty interface set\n"); + fprintf(stderr,"INTERNAL ERROR in join_eq_hash_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str()); exit(1); } @@ -5002,6 +5322,21 @@ vector filter_join_qpn::extract_opview(table_list *Schema, vecto return(ret); } +vector watch_join_qpn::extract_opview(table_list *Schema, vector &qnodes, opview_set &opviews, string rootnm, string silo_name){ + vector ret; + int retval = process_opview(from[0],0,node_name, + Schema,qnodes,opviews,ret, rootnm, silo_name); + if(retval) exit(1); + return(ret); +} + + + +vector watch_tbl_qpn::extract_opview(table_list *Schema, vector &qnodes, opview_set &opviews, string rootnm, string silo_name){ + vector ret; + return ret; // nothing to process +} + ////////////////////////////////////////////////////////////////// @@ -5017,6 +5352,10 @@ table_def *mrg_qpn::get_fields(){ return(table_layout); } +table_def *watch_tbl_qpn::get_fields(){ + return(table_layout); +} + table_def *spx_qpn::get_fields(){ return(create_attributes(node_name, select_list)); @@ -5038,6 +5377,9 @@ table_def *filter_join_qpn::get_fields(){ return(create_attributes(node_name, select_list)); } +table_def *watch_join_qpn::get_fields(){ + return(create_attributes(node_name, select_list)); +} table_def *join_eq_hash_qpn::get_fields(){ int i, h, s, t; @@ -5139,6 +5481,57 @@ table_def *join_eq_hash_qpn::get_fields(){ } +//----------------------------------------------------------------- +// get output "keys" +// This is a guess about the set of fields which are a key +// Use as metadata output, e.g. in qtree.xml + + + +// refs to GB attribtues are keys, if a SE is not a GB colref +// but refers to a GB colref (outside of an aggregation) +// then set partial_keys to true +vector sgah_qpn::get_tbl_keys(vector &partial_keys){ + vector keys; + + set gref_set; + for(int i=0; ise->is_gb()){ + keys.push_back(select_list[s]->name); + }else{ + if(contains_gb_se(select_list[s]->se, gref_set)){ + partial_keys.push_back(select_list[s]->name); + } + } + } + return keys; +} + +vector rsgah_qpn::get_tbl_keys(vector &partial_keys){ + vector keys; + + set gref_set; + for(int i=0; ise->is_gb()){ + keys.push_back(select_list[s]->name); + }else{ + if(contains_gb_se(select_list[s]->se, gref_set)){ + partial_keys.push_back(select_list[s]->name); + } + } + } + return keys; +} + + + + //----------------------------------------------------------------- // get output tables @@ -5151,6 +5544,11 @@ table_def *join_eq_hash_qpn::get_fields(){ return(fm); } + vector watch_tbl_qpn::get_input_tbls(){ + vector ret; + return(ret); + } + vector mrg_qpn::get_input_tbls(){ return(fm); } @@ -5183,6 +5581,10 @@ table_def *join_eq_hash_qpn::get_fields(){ return(from); } + vector watch_join_qpn::get_input_tbls(){ + return(from); + } + //----------------------------------------------------------------- // get output tables @@ -5194,6 +5596,11 @@ table_def *join_eq_hash_qpn::get_fields(){ return(retval); } + vector watch_tbl_qpn::get_output_tbls(){ + vector retval(1,new tablevar_t(node_name.c_str())); + return(retval); + } + vector mrg_qpn::get_output_tbls(){ vector retval(1,new tablevar_t(node_name.c_str())); return(retval); @@ -5230,6 +5637,12 @@ table_def *join_eq_hash_qpn::get_fields(){ } + vector watch_join_qpn::get_output_tbls(){ + vector retval(1,new tablevar_t(node_name.c_str())); + return(retval); + } + + //----------------------------------------------------------------- // Bind to schema @@ -5327,6 +5740,30 @@ col_id_set filter_join_qpn::get_colrefs(bool ext_fcns_only,table_list *Schema){ return tmp_cset; } +col_id_set watch_join_qpn::get_colrefs(bool ext_fcns_only,table_list *Schema){ + col_id_set retval, tmp_cset; + int p; + for(p=0;ppr, tmp_cset, NULL); + } + int s; + for(s=0;sse, tmp_cset, NULL); + } + col_id_set::iterator cisi; + if(ext_fcns_only){ + for(cisi=tmp_cset.begin();cisi!=tmp_cset.end();++cisi){ + field_entry *fe = Schema->get_field((*cisi).schema_ref, (*cisi).field); + if(fe->get_unpack_fcns().size()>0) + retval.insert((*cisi)); + } + return retval; + } + + return tmp_cset; +} + + // Associate colrefs in SEs with this schema. @@ -5386,6 +5823,35 @@ void filter_join_qpn::bind_to_schema(table_list *Schema){ } +void watch_join_qpn::bind_to_schema(table_list *Schema){ +// Bind the tablevars in the From clause to the Schema +// (it might have changed from analysis time) + int f; + for(f=0;fget_schema_name(); + int tbl_ref = Schema->get_table_ref(snm); + if(tbl_ref >= 0) + from[f]->set_schema_ref(tbl_ref); + } + +// Bind all SEs to this schema + tablevar_list_t fm(from); + + int p; + for(p=0;ppr, &fm, Schema); + } + int s; + for(s=0;sse, &fm, Schema); + } + +// Collect set of tuples referenced in this HFTA +// input, internal, or output. + +} + + @@ -5573,6 +6039,10 @@ void sgahcwcb_qpn::bind_to_schema(table_list *Schema){ //----------------------------------------------------------------- // get_cplx_lit_tbl +cplx_lit_table *watch_tbl_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){ + return(new cplx_lit_table()); +} + cplx_lit_table *mrg_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){ return(new cplx_lit_table()); } @@ -5692,7 +6162,21 @@ cplx_lit_table *sgahcwcb_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){ return(complex_literals); } -cplx_lit_table *join_eq_hash_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){ +cplx_lit_table *join_eq_hash_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){ + int i; + cplx_lit_table *complex_literals = new cplx_lit_table(); + + for(i=0;ise, Ext_fcns, complex_literals); + } + for(i=0;ipr,Ext_fcns, complex_literals); + } + + return(complex_literals); +} + +cplx_lit_table *filter_join_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){ int i; cplx_lit_table *complex_literals = new cplx_lit_table(); @@ -5706,7 +6190,7 @@ cplx_lit_table *join_eq_hash_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){ return(complex_literals); } -cplx_lit_table *filter_join_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){ +cplx_lit_table *watch_join_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){ int i; cplx_lit_table *complex_literals = new cplx_lit_table(); @@ -5723,9 +6207,15 @@ cplx_lit_table *filter_join_qpn::get_cplx_lit_tbl(ext_fcn_list *Ext_fcns){ + //----------------------------------------------------------------- // get_handle_param_tbl +vector watch_tbl_qpn::get_handle_param_tbl(ext_fcn_list *Ext_fcns){ + vector retval; + return(retval); +} + vector mrg_qpn::get_handle_param_tbl(ext_fcn_list *Ext_fcns){ vector retval; return(retval); @@ -5877,6 +6367,22 @@ vector filter_join_qpn::get_handle_param_tbl(ext_fcn_l return(retval); } +vector watch_join_qpn::get_handle_param_tbl(ext_fcn_list *Ext_fcns){ + int i; + vector retval; + + for(i=0;ise, Ext_fcns, retval); + } + for(i=0;ipr,Ext_fcns, retval); + } + + return(retval); +} + + + /////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////// /// Functions for operator output rates estimations @@ -5909,6 +6415,12 @@ double sgahcwcb_qpn::get_rate_estimate() { return SGAHCWCB_SELECTIVITY * DEFAULT_INTERFACE_RATE; } +double watch_tbl_qpn::get_rate_estimate() { + + // dummy method for now + return DEFAULT_INTERFACE_RATE; +} + double mrg_qpn::get_rate_estimate() { // dummy method for now @@ -6312,7 +6824,7 @@ static string generate_predicate_code(predicate_t *pr,table_list *schema){ ret.append("( "); if(ldt->complex_comparison(ldt) ){ - ret.append( ldt->get_hfta_comparison_fcn(ldt) ); + ret.append( ldt->get_hfta_equals_fcn(ldt) ); ret.append("( "); if(ldt->is_buffer_type() ) ret.append("&"); @@ -6344,6 +6856,7 @@ static string generate_predicate_code(predicate_t *pr,table_list *schema){ ret.append("( "); if(ldt->complex_comparison(rdt) ){ +// TODO can use get_hfta_equals_fcn if op is "=" ? ret.append(ldt->get_hfta_comparison_fcn(rdt)); ret.append("("); if(ldt->is_buffer_type() ) @@ -6413,7 +6926,7 @@ static string generate_predicate_code_fm_aggr(predicate_t *pr, string gbvar, str ret.append("( "); if(ldt->complex_comparison(ldt) ){ - ret.append( ldt->get_hfta_comparison_fcn(ldt) ); + ret.append( ldt->get_hfta_equals_fcn(ldt) ); ret.append("( "); if(ldt->is_buffer_type() ) ret.append("&"); @@ -6445,6 +6958,7 @@ static string generate_predicate_code_fm_aggr(predicate_t *pr, string gbvar, str ret.append("( "); if(ldt->complex_comparison(rdt) ){ +// TODO can use get_hfta_equals_fcn if op is "=" ? ret.append(ldt->get_hfta_comparison_fcn(rdt)); ret.append("("); if(ldt->is_buffer_type() ) @@ -6503,7 +7017,7 @@ static string generate_equality_test(string &lhs_op, string &rhs_op, data_type * string ret; if(dt->complex_comparison(dt) ){ - ret.append(dt->get_hfta_comparison_fcn(dt)); + ret.append(dt->get_hfta_equals_fcn(dt)); ret.append("("); if(dt->is_buffer_type() ) ret.append("&"); @@ -6522,7 +7036,7 @@ static string generate_equality_test(string &lhs_op, string &rhs_op, data_type * return(ret); } -static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){ +static string generate_lt_test(string &lhs_op, string &rhs_op, data_type *dt){ string ret; if(dt->complex_comparison(dt) ){ @@ -6535,16 +7049,39 @@ static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt) if(dt->is_buffer_type() ) ret.append("&"); ret.append(rhs_op ); - ret.append(") == 0"); + ret.append(") == 1"); }else{ ret.append(lhs_op ); - ret.append(" == "); + ret.append(" < "); ret.append(rhs_op ); } return(ret); } +//static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){ +// string ret; +// +// if(dt->complex_comparison(dt) ){ +// ret.append(dt->get_hfta_equals_fcn(dt)); +// ret.append("("); +// if(dt->is_buffer_type() ) +// ret.append("&"); +// ret.append(lhs_op); +// ret.append(", "); +// if(dt->is_buffer_type() ) +// ret.append("&"); +// ret.append(rhs_op ); +// ret.append(") == 0"); +// }else{ +// ret.append(lhs_op ); +// ret.append(" == "); +// ret.append(rhs_op ); +// } +// +// return(ret); +//} + // Here I assume that only MIN and MAX aggregates can be computed // over BUFFER data types. @@ -7319,9 +7856,9 @@ static string gen_unpack_cids(table_list *schema, col_id_set &new_cids, string o unpack_fcn = dt.get_hfta_unpack_fcn_noxf(); } if(dt.is_buffer_type()){ - sprintf(tmpstr,"\t unpack_var_%s_%d = %s(tup%d.data, tup%d.tuple_size, unpack_offset_%s_%d, &problem);\n",field.c_str(), tblref, unpack_fcn.c_str(), tblref, tblref, field.c_str(), tblref); + sprintf(tmpstr,"\tunpack_var_%s_%d = %s(tup%d.data, tup%d.tuple_size, unpack_offset_%s_%d, &problem); // unpack_cid\n",field.c_str(), tblref, unpack_fcn.c_str(), tblref, tblref, field.c_str(), tblref); }else{ - sprintf(tmpstr,"\t unpack_var_%s_%d = %s_nocheck(tup%d.data, unpack_offset_%s_%d);\n",field.c_str(), tblref, unpack_fcn.c_str(), tblref, field.c_str(), tblref); + sprintf(tmpstr,"\tunpack_var_%s_%d = %s_nocheck(tup%d.data, unpack_offset_%s_%d); // unpack_cid\n",field.c_str(), tblref, unpack_fcn.c_str(), tblref, field.c_str(), tblref); } ret += tmpstr; if(dt.is_buffer_type()){ @@ -8347,6 +8884,18 @@ string sgah_qpn::generate_functor_name(){ string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, vector &needs_xform){ int a,g,w,s; +// Regular or slow flush? + hfta_slow_flush = 0; + if(this->get_val_of_def("hfta_slow_flush") != ""){ + int d = atoi(this->get_val_of_def("hfta_slow_flush").c_str() ); + if(d<0){ + fprintf(stderr,"Warning, hfta_slow_flush in node %s is %d, must be at least 0, setting to 0.\n",node_name.c_str(), d); + hfta_slow_flush = 0; + }else{ + hfta_slow_flush = d; + } + } + // Initialize generate utility globals segen_gb_tbl = &(gb_tbl); @@ -8368,36 +8917,57 @@ string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, ve ret+="\t"+this->gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n"; } // empty strucutred literals - map::iterator sii; - for(sii=structured_types.begin();sii!=structured_types.end();++sii){ - data_type dt(sii->second); - literal_t empty_lit(sii->first); - ret += "\t"+dt.make_host_cvar(empty_lit.hfta_empty_literal_name())+";\n"; - } +// map::iterator sii; +// for(sii=structured_types.begin();sii!=structured_types.end();++sii){ +// data_type dt(sii->second); +// literal_t empty_lit(sii->first); +// ret += "\t"+dt.make_host_cvar(empty_lit.hfta_empty_literal_name())+";\n"; +// } // Constructors if(structured_types.size()==0){ ret += "\t"+generate_functor_name() + "_groupdef(){};\n"; }else{ - ret += "\t"+generate_functor_name() + "_groupdef(){}\n"; + ret += "\t"+generate_functor_name() + "_groupdef(){\n"; + for(g=0;gis_buffer_type()){ + sprintf(tmpstr,"\t\t%s(&gb_var%d);\n", + gdt->get_hfta_buffer_init().c_str(), g ); + ret += tmpstr; + } + } + ret += "\t};\n"; } + ret += "\t// shallow copy constructors\n"; ret += "\t"+generate_functor_name() + "_groupdef("+ - this->generate_functor_name() + "_groupdef *gd){\n"; + "const " + this->generate_functor_name() + "_groupdef &gd){\n"; for(g=0;gis_buffer_type()){ - sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd->gb_var%d));\n", - gdt->get_hfta_buffer_assign_copy().c_str(),g,g ); - ret += tmpstr; - }else{ - sprintf(tmpstr,"\t\tgb_var%d = gd->gb_var%d;\n",g,g); - ret += tmpstr; - } + sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g); + ret += tmpstr; +// TODO : do strings perisist after the call? its a shllow copy +// if(gdt->is_buffer_type()){ +// sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd->gb_var%d));\n", +// gdt->get_hfta_buffer_assign_copy().c_str(),g,g ); +// ret += tmpstr; +// }else{ +// sprintf(tmpstr,"\t\tgb_var%d = gd->gb_var%d;\n",g,g); +// ret += tmpstr; +// } } ret += "\t}\n"; ret += "\t"+generate_functor_name() + "_groupdef("+ - this->generate_functor_name() + "_groupdef *gd, bool *pattern){\n"; + "const " + this->generate_functor_name() + "_groupdef &gd, bool *pattern){\n"; +// -- For patterns, need empty strucutred literals + map::iterator sii; + for(sii=structured_types.begin();sii!=structured_types.end();++sii){ + data_type dt(sii->second); + literal_t empty_lit(sii->first); + ret += "\t"+dt.make_host_cvar(empty_lit.hfta_empty_literal_name())+";\n"; + } + for(sii=structured_types.begin();sii!=structured_types.end();++sii){ literal_t empty_lit(sii->first); ret += "\t\t"+empty_lit.to_hfta_C_code("&"+empty_lit.hfta_empty_literal_name())+";\n"; @@ -8405,14 +8975,17 @@ string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, ve for(g=0;gis_buffer_type()){ - sprintf(tmpstr,"\t\t\t%s(&gb_var%d, &(gd->gb_var%d));\n", - gdt->get_hfta_buffer_assign_copy().c_str(),g,g ); - ret += tmpstr; - }else{ - sprintf(tmpstr,"\t\t\tgb_var%d = gd->gb_var%d;\n",g,g); - ret += tmpstr; - } + sprintf(tmpstr,"\t\t\tgb_var%d = gd.gb_var%d;\n",g,g); + ret += tmpstr; +// TODO Do strings persist long enough? its a shllow copy constructor? +// if(gdt->is_buffer_type()){ +// sprintf(tmpstr,"\t\t\t%s(&gb_var%d, &(gd->gb_var%d));\n", +// gdt->get_hfta_buffer_assign_copy().c_str(),g,g ); +// ret += tmpstr; +// }else{ +// sprintf(tmpstr,"\t\t\tgb_var%d = gd->gb_var%d;\n",g,g); +// ret += tmpstr; +// } ret += "\t\t}else{\n"; literal_t empty_lit(gdt->type_indicator()); if(empty_lit.is_cpx_lit()){ @@ -8423,6 +8996,23 @@ string sgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, ve ret += "\t\t}\n"; } ret += "\t};\n"; + + ret += "\t// deep assignment operator\n"; + ret += "\t"+generate_functor_name() + "_groupdef& operator=(const "+ + this->generate_functor_name() + "_groupdef &gd){\n"; + for(g=0;gis_buffer_type()){ + sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd.gb_var%d));\n", + gdt->get_hfta_buffer_assign_copy().c_str(),g,g ); + ret += tmpstr; + }else{ + sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g); + ret += tmpstr; + } + } + ret += "\t}\n"; + // destructor ret += "\t~"+ generate_functor_name() + "_groupdef(){\n"; for(g=0;gis_temporal()){ - sprintf(tmpstr,"last_gb%d",g); string lhs_op = tmpstr; - sprintf(tmpstr,"gbval->gb_var%d",g); string rhs_op = tmpstr; - if(first_one){first_one = false;} else {ret += ") && (";} - ret += generate_equality_test(lhs_op, rhs_op, gdt); + sprintf(tmpstr,"last_gb%d",g); string lhs_op = tmpstr; + sprintf(tmpstr,"gbval->gb_var%d",g); string rhs_op = tmpstr; + if(first_one){first_one = false;} else {ret += ") && (";} + ret += generate_lt_test(lhs_op, rhs_op, gdt); + disorder_test += generate_lt_test(rhs_op, lhs_op, gdt); } } ret += ") ) ){\n"; @@ -8830,9 +9431,17 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n"; } } ret += "\t\tneeds_temporal_flush=true;\n"; - ret += "\t\t}else{\n" - "\t\t\tneeds_temporal_flush=false;\n" - "\t\t}\n"; + ret += "\t}else{\n" + "\t\tneeds_temporal_flush=false;\n" + "\t}\n"; + + ret += "\tdisordered_arrival = "+disorder_test+";\n"; +// ret += "\tif( ( ("+disorder_test+") ) ){\n"; +// ret += "\t\tdisordered_arrival=true;\n"; +// ret += "\t}else{\n"; +// ret += "\t\tdisordered_arrival=false;\n"; +// ret += "\t}\n"; + } }else{ ret+= "\tif(temp_tuple_received && !( ("; @@ -8876,7 +9485,10 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n"; // For temporal status tuple we don't need to do anything else - ret += "\tif (temp_tuple_received) return NULL;\n\n"; + ret += "\tif (temp_tuple_received){\n"; + ret += "\t\tdisordered_arrival = false;\n"; + ret += "\t\treturn NULL;\n\n"; + ret += "\t}\n"; for(w=0;waggr_var%d",a); + sprintf(tmpstr,"aggval.aggr_var%d",a); string varname = tmpstr; ret.append(generate_aggr_update(varname,&aggr_tbl,a, schema)); } @@ -9012,6 +9624,8 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n"; } ret += "\t};\n"; + ret += "bool disordered(){return disordered_arrival;}\n"; + //--------------------------------------------------- // create output tuple // Unpack the partial functions ref'd in the where clause, @@ -9022,15 +9636,15 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n"; // so I'll leave it in longhand. ret += "host_tuple create_output_tuple(" - +generate_functor_name()+"_groupdef *gbval, "+ - generate_functor_name()+"_aggrdef *aggval, bool &failed){\n"; + +generate_functor_name()+"_groupdef &gbval, "+ + generate_functor_name()+"_aggrdef &aggval, bool &failed){\n"; ret += "\thost_tuple tup;\n"; ret += "\tfailed = false;\n"; ret += "\tgs_retval_t retval = 0;\n"; - string gbvar = "gbval->gb_var"; - string aggvar = "aggval->"; + string gbvar = "gbval.gb_var"; + string aggvar = "aggval."; // Create cached temporaries for UDAF return values. for(a=0;a0) ret += "^"; data_type *gdt = gb_tbl.get_data_type(g); if(gdt->use_hashfunc()){ if(gdt->is_buffer_type()) - sprintf(tmpstr,"(%s*%s(&(grp->gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g); + sprintf(tmpstr,"(%s*%s(&(grp.gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g); else - sprintf(tmpstr,"(%s*%s(grp->gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g); + sprintf(tmpstr,"(%s*%s(grp.gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g); }else{ - sprintf(tmpstr,"(%s*grp->gb_var%d)",hash_nums[g%NRANDS].c_str(),g); + sprintf(tmpstr,"(%s*grp.gb_var%d)",hash_nums[g%NRANDS].c_str(),g); } ret += tmpstr; } @@ -9268,22 +9882,22 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n"; // The comparison function ret += "struct "+generate_functor_name()+"_equal_func{\n"; - ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef *grp1, "+ - generate_functor_name()+"_groupdef *grp2) const{\n"; + ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef &grp1, "+ + "const "+generate_functor_name()+"_groupdef &grp2) const{\n"; ret += "\t\treturn( ("; for(g=0;g0) ret += ") && ("; data_type *gdt = gb_tbl.get_data_type(g); if(gdt->complex_comparison(gdt)){ - if(gdt->is_buffer_type()) - sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)", - gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g); - else - sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)", - gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g); + if(gdt->is_buffer_type()) + sprintf(tmpstr,"(%s(&(grp1.gb_var%d), &(grp2.gb_var%d))==0)", + gdt->get_hfta_equals_fcn(gdt).c_str(),g,g); + else + sprintf(tmpstr,"(%s((grp1.gb_var%d), (grp2.gb_var%d))==0)", + gdt->get_hfta_equals_fcn(gdt).c_str(),g,g); }else{ - sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g); + sprintf(tmpstr,"grp1.gb_var%d == grp2.gb_var%d",g,g); } ret += tmpstr; } @@ -9298,14 +9912,17 @@ ret += "// hfta_disorder = "+int_to_string(hfta_disorder)+"\n"; string sgah_qpn::generate_operator(int i, string params){ if(hfta_disorder < 2){ + string op_name = "groupby_operator"; + if(hfta_slow_flush>0) + op_name = "groupby_slowflush_operator"; return( - " groupby_operator<" + + " "+op_name+"<" + generate_functor_name()+","+ generate_functor_name() + "_groupdef, " + generate_functor_name() + "_aggrdef, " + generate_functor_name()+"_hash_func, "+ generate_functor_name()+"_equal_func " - "> *op"+int_to_string(i)+" = new groupby_operator<"+ + "> *op"+int_to_string(i)+" = new "+op_name+"<"+ generate_functor_name()+","+ generate_functor_name() + "_groupdef, " + generate_functor_name() + "_aggrdef, " + @@ -9825,6 +10442,23 @@ string mrg_qpn::generate_operator(int i, string params){ ); } +//////////////////////////////////////////////// +/// WATCHLIST_TBL operator +/// WATCHLIST_TBL functor +//////////////////////////////////////////// + +string watch_tbl_qpn::generate_functor_name(){ + return("watch_tbl_functor_" + normalize_name(this->get_node_name())); +} + +string watch_tbl_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, vector &needs_xform){ + + return("ERROR_WATCH_TBL_FUNCTOR_NOT_YET_IMPLEMENTED"); +} + +string watch_tbl_qpn::generate_operator(int i, string params){ + return("ERROR_WATCH_TBL_FUNCTOR_NOT_YET_IMPLEMENTED"); +} ///////////////////////////////////////////////////////// ////// JOIN_EQ_HASH functor @@ -10446,22 +11080,31 @@ string join_eq_hash_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_ // Deal with outer join stuff col_id_set l_cids, r_cids; + col_id_set l_base_cids, r_base_cids; // l_cids and r_cids get modified + // to account for extra_f fields to + // unpack for value imputation col_id_set::iterator ocsi; for(ocsi=local_cids.begin();ocsi!=local_cids.end();++ocsi){ - if((*ocsi).tblvar_ref == 0) l_cids.insert((*ocsi)); - else r_cids.insert((*ocsi)); + if((*ocsi).tblvar_ref == 0){ + l_cids.insert((*ocsi)); l_base_cids.insert((*ocsi)); + }else{ + r_cids.insert((*ocsi)); r_base_cids.insert((*ocsi)); + } } for(ocsi=se_cids.begin();ocsi!=se_cids.end();++ocsi){ - if((*ocsi).tblvar_ref == 0) l_cids.insert((*ocsi)); - else r_cids.insert((*ocsi)); + if((*ocsi).tblvar_ref == 0){ + l_cids.insert((*ocsi)); l_base_cids.insert((*ocsi)); + }else{ + r_cids.insert((*ocsi)); r_base_cids.insert((*ocsi)); + } } ret += "\t}else if(tup0.data){\n"; string unpack_null = ""; col_id_set extra_cids; - for(ocsi=r_cids.begin();ocsi!=r_cids.end();++ocsi){ + for(ocsi=r_base_cids.begin();ocsi!=r_base_cids.end();++ocsi){ string field = (*ocsi).field; if(r_equiv.count(field)){ - unpack_null+="\t\tunpack_var_"+field+"_1="+generate_se_code(r_equiv[field],schema)+";\n"; + unpack_null+="\t\tunpack_var_"+field+"_1="+generate_se_code(r_equiv[field],schema)+"; // r_equiv\n"; get_new_se_cids(r_equiv[field],l_cids,new_cids,NULL); }else{ int schref = (*ocsi).schema_ref; @@ -10473,23 +11116,25 @@ string join_eq_hash_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_ // NB : works for string type only // NNB: installed fix for ipv6, more of this should be pushed // into the literal_t code. - unpack_null+="\tunpack_var_"+field+"_1= "+empty_lit.hfta_empty_literal_name()+";\n"; + unpack_null+="\t\tunpack_var_"+field+"_1= "+empty_lit.hfta_empty_literal_name()+"; // empty\n"; }else{ - unpack_null+="\tunpack_var_"+field+"_1="+empty_lit.to_hfta_C_code("")+";\n"; + unpack_null+="\t\tunpack_var_"+field+"_1="+empty_lit.to_hfta_C_code("")+"; // empty\n"; } } } + ret += "// l_cids\n"; ret += gen_unpack_cids(schema, l_cids, "tup", needs_xform); + ret += "// extra_cids\n"; ret += gen_unpack_cids(schema, extra_cids, "tup", needs_xform); ret += unpack_null; ret += gen_unpack_partial_fcn(schema, partial_fcns, sl_pfcns, "tup"); ret+="\t}else{\n"; - unpack_null = ""; extra_cids.clear(); - for(ocsi=l_cids.begin();ocsi!=l_cids.end();++ocsi){ + unpack_null = ""; extra_cids.clear(); new_cids.clear(); + for(ocsi=l_base_cids.begin();ocsi!=l_base_cids.end();++ocsi){ string field = (*ocsi).field; if(l_equiv.count(field)){ - unpack_null+="\t\tunpack_var_"+field+"_0="+generate_se_code(l_equiv[field],schema)+";\n"; + unpack_null+="\t\tunpack_var_"+field+"_0="+generate_se_code(l_equiv[field],schema)+"; // l_equiv\n"; get_new_se_cids(l_equiv[field],r_cids,new_cids,NULL); }else{ int schref = (*ocsi).schema_ref; @@ -10501,13 +11146,15 @@ string join_eq_hash_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_ // NB : works for string type only // NNB: installed fix for ipv6, more of this should be pushed // into the literal_t code. - unpack_null+="\tunpack_var_"+field+"_0= "+empty_lit.hfta_empty_literal_name()+";\n"; + unpack_null+="\t\tunpack_var_"+field+"_0= "+empty_lit.hfta_empty_literal_name()+"; // empty\n"; }else{ - unpack_null+="\tunpack_var_"+field+"_0="+empty_lit.to_hfta_C_code("")+";\n"; + unpack_null+="\t\tunpack_var_"+field+"_0="+empty_lit.to_hfta_C_code("")+"; // empty\n"; } } } + ret += "// r_cids\n"; ret += gen_unpack_cids(schema, r_cids, "tup", needs_xform); + ret += "// extra_cids\n"; ret += gen_unpack_cids(schema, extra_cids, "tup", needs_xform); ret += unpack_null; ret += gen_unpack_partial_fcn(schema, partial_fcns, sl_pfcns, "tup"); @@ -10583,84 +11230,67 @@ string join_eq_hash_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_ // create a temp status tuple - ret += "int create_temp_status_tuple(const host_tuple &tup0, const host_tuple &tup1, host_tuple& result) {\n\n"; + ret += "int create_temp_status_tuple("+this->generate_functor_name()+"_tempeqdef *lts,"+this->generate_functor_name()+"_tempeqdef *rts, host_tuple& result) {\n\n"; ret += "\tgs_retval_t retval = 0;\n"; ret += "\tgs_int32_t problem = 0;\n"; - ret += "\tif(tup0.data){\n"; - -// Unpack all the temporal attributes references in select list - col_id_set found_cids; - - for(s=0;sse->get_data_type()->is_temporal()) { -// Find the set of attributes accessed in this SE - col_id_set new_cids; - get_new_se_cids(select_list[s]->se,found_cids, new_cids, NULL); - } + for(p=0;pmake_host_cvar(tmpstr)+";\n"; + sprintf(tmpstr,"rhs_var"); + ret+="\t"+temporal_dt[p]->make_host_cvar(tmpstr)+";\n"; } - // Deal with outer join stuff - l_cids.clear(), r_cids.clear(); - for(ocsi=found_cids.begin();ocsi!=found_cids.end();++ocsi){ - if((*ocsi).tblvar_ref == 0) l_cids.insert((*ocsi)); - else r_cids.insert((*ocsi)); + ret += "\tif(lts!=NULL){\n"; + for(p=0;pget_type_name(schref,field)); - literal_t empty_lit(dt.type_indicator()); - if(empty_lit.is_cpx_lit()){ - sprintf(tmpstr,"&(unpack_var_%s_1)",field.c_str()); - unpack_null += "\t"+empty_lit.to_hfta_C_code(tmpstr)+";\n"; - }else{ - unpack_null+="\tunpack_var_"+field+"_1="+empty_lit.to_hfta_C_code("")+";\n"; - } - } + ret += "\t}else{\n"; + for(p=0;pget_type_name(schref,field)); - literal_t empty_lit(dt.type_indicator()); - if(empty_lit.is_cpx_lit()){ - sprintf(tmpstr,"&(unpack_var_%s_0)",field.c_str()); - unpack_null += "\t"+empty_lit.to_hfta_C_code(tmpstr)+";\n"; - }else{ - unpack_null+="\tunpack_var_"+field+"_0="+empty_lit.to_hfta_C_code("")+";\n"; - } - } + ret += "\tif(rts!=NULL){\n"; + for(p=0;pget_node_name()); // Start packing. - ret += "//\t\tPack the fields into the tuple.\n"; - ret += gen_pack_tuple(schema,select_list,this->get_node_name(), true ); + + +// This is checked in the query analyzer so I think its safe, +// But a lot of older code has complex code to propagate multiple +// timestamps + for(s=0;sse; + data_type *sdt = se->get_data_type(); + if(sdt->is_temporal()){ + string target = "\ttuple->tuple_var"+to_string(s)+" = "; + if(from[0]->get_property()==0 && from[1]->get_property()==0){ // INNER + ret += target+"(lhs_var>rhs_var ? lhs_var : rhs_var); // INNER\n"; + } + if(from[0]->get_property()!=0 && from[1]->get_property()==0){ // LEFT + ret += target+"lhs_var; // LEFT\n"; +// ret += target+"rhs_var; // LEFT\n"; + } + if(from[0]->get_property()==0 && from[1]->get_property()!=0){ // RIGHT + ret += target+"rhs_var; // RIGHT\n"; +// ret += target+"lhs_var; // RIGHT\n"; + } + if(from[0]->get_property()!=0 && from[1]->get_property()!=0){ // OUTER + ret += target+"(lhs_varcomplex_comparison(hashkey_dt[p])){ if(hashkey_dt[p]->is_buffer_type()) sprintf(tmpstr,"(%s(&(key1->hashkey_var%d), &(key2->hashkey_var%d))==0)", - hashkey_dt[p]->get_hfta_comparison_fcn(hashkey_dt[p]).c_str(),p,p); + hashkey_dt[p]->get_hfta_equals_fcn(hashkey_dt[p]).c_str(),p,p); else sprintf(tmpstr,"(%s((key1->hashkey_var%d), (key2->hashkey_var%d))==0)", - hashkey_dt[p]->get_hfta_comparison_fcn(hashkey_dt[p]).c_str(),p,p); + hashkey_dt[p]->get_hfta_equals_fcn(hashkey_dt[p]).c_str(),p,p); }else{ sprintf(tmpstr,"key1->hashkey_var%d == key2->hashkey_var%d",p,p); } @@ -12057,10 +12687,10 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; if(gdt->complex_comparison(gdt)){ if(gdt->is_buffer_type()) sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)", - gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g); + gdt->get_hfta_equals_fcn(gdt).c_str(),g,g); else sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)", - gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g); + gdt->get_hfta_equals_fcn(gdt).c_str(),g,g); }else{ sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g); } @@ -12087,10 +12717,10 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; if(gdt->complex_comparison(gdt)){ if(gdt->is_buffer_type()) sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)", - gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g); + gdt->get_hfta_equals_fcn(gdt).c_str(),g,g); else sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)", - gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g); + gdt->get_hfta_equals_fcn(gdt).c_str(),g,g); }else{ sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g); } @@ -12161,21 +12791,44 @@ string rsgah_qpn::generate_functor(table_list *schema, ext_fcn_list *Ext_fcns, v ret+="\t"+this->gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n"; } // Constructors - ret += "\t"+generate_functor_name() + "_groupdef(){};\n"; - ret += "\t"+generate_functor_name() + "_groupdef("+ - this->generate_functor_name() + "_groupdef *gd){\n"; + + ret += "\t"+generate_functor_name() + "_groupdef(){\n"; for(g=0;gis_buffer_type()){ - sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd->gb_var%d));\n", - gdt->get_hfta_buffer_assign_copy().c_str(),g,g ); - ret += tmpstr; - }else{ - sprintf(tmpstr,"\t\tgb_var%d = gd->gb_var%d;\n",g,g); + sprintf(tmpstr,"\t\t%s(&gb_var%d);\n", + gdt->get_hfta_buffer_init().c_str(), g ); ret += tmpstr; } } ret += "\t};\n"; + + ret += "\t// shallow copy constructor\n"; + ret += "\t"+generate_functor_name() + "_groupdef("+ + this->generate_functor_name() + "_groupdef &gd){\n"; + for(g=0;ggenerate_functor_name() + "_groupdef &gd){\n"; + for(g=0;gis_buffer_type()){ + sprintf(tmpstr,"\t\t%s(&gb_var%d, &(gd.gb_var%d));\n", + gdt->get_hfta_buffer_assign_copy().c_str(),g,g ); + ret += tmpstr; + }else{ + sprintf(tmpstr,"\t\tgb_var%d = gd.gb_var%d;\n",g,g); + ret += tmpstr; + } + } + ret += "\t}\n"; + // destructor ret += "\t~"+ generate_functor_name() + "_groupdef(){\n"; for(g=0;gis_temporal()){ - sprintf(tmpstr,"last_gb%d",g); + sprintf(tmpstr,"curr_gb%d",g); ret+="\t"+gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n"; - sprintf(tmpstr,"last_flushed_gb%d",g); + sprintf(tmpstr,"last_gb%d",g); ret+="\t"+gb_tbl.get_data_type(g)->make_host_cvar(tmpstr)+";\n"; } } - ret += "\tbool needs_temporal_flush;\n"; + ret += "\tgs_int32_t needs_temporal_flush;\n"; + ret += "\tbool disordered_arrival;\n"; } // The publicly exposed functions @@ -12386,11 +13040,13 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; data_type *gdt = gb_tbl.get_data_type(g); if(gdt->is_temporal()){ literal_t gl(gdt->type_indicator()); + sprintf(tmpstr,"\tcurr_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str()); + ret.append(tmpstr); sprintf(tmpstr,"\tlast_gb%d = %s;\n",g, gl.to_hfta_C_code("").c_str()); ret.append(tmpstr); } } - ret += "\tneeds_temporal_flush = false;\n"; + ret += "\tneeds_temporal_flush = 0;\n"; } // Init temporal attributes referenced in select list @@ -12504,36 +13160,54 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; // set flush indicator and update stored GB vars if there is any change. if(uses_temporal_flush){ - ret+= "\tif( !( ("; + ret+= "\tif( ( ("; bool first_one = true; + string disorder_test; for(g=0;gis_temporal()){ - sprintf(tmpstr,"last_gb%d",g); string lhs_op = tmpstr; + sprintf(tmpstr,"curr_gb%d",g); string lhs_op = tmpstr; sprintf(tmpstr,"gbval->gb_var%d",g); string rhs_op = tmpstr; if(first_one){first_one = false;} else {ret += ") && (";} - ret += generate_equality_test(lhs_op, rhs_op, gdt); + ret += generate_lt_test(lhs_op, rhs_op, gdt); + disorder_test += generate_lt_test(rhs_op, lhs_op, gdt); } } ret += ") ) ){\n"; + int temporal_gb=-1; for(g=0;gis_temporal()){ if(gdt->is_buffer_type()){ - sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g); + sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&curr_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g); }else{ - sprintf(tmpstr,"\t\tlast_flushed_gb%d = last_gb%d;\n",g,g); - ret += tmpstr; - sprintf(tmpstr,"\t\tlast_gb%d = gbval->gb_var%d;\n",g,g); +// sprintf(tmpstr,"\t\tlast_gb%d = curr_gb%d;\n",g,g); +// ret += tmpstr; +// sprintf(tmpstr,"\t\tcurr_gb%d = gbval->gb_var%d;\n",g,g); + + ret += "\t\tif(curr_gb"+to_string(g)+"==0){\n"; + ret += "\t\t\tlast_gb"+to_string(g)+" = gbval->gb_var"+to_string(g)+";\n"; + ret += "\t\t}else{\n"; + ret += "\t\t\tlast_gb"+to_string(g)+" = curr_gb"+to_string(g)+";\n"; + ret += "\t\t}\n"; + sprintf(tmpstr,"\t\tcurr_gb%d = gbval->gb_var%d;\n",g,g); + temporal_gb=g; } ret += tmpstr; } } - ret += "\t\tneeds_temporal_flush=true;\n"; - ret += "\t\t}else{\n" - "\t\t\tneeds_temporal_flush=false;\n" - "\t\t}\n"; + ret += "\t\tneeds_temporal_flush = curr_gb"+to_string (temporal_gb)+" - last_gb"+to_string(temporal_gb)+";\n"; + ret += "\t}else{\n" + "\t\tneeds_temporal_flush=0;\n" + "\t}\n"; + + ret += "\tdisordered_arrival = "+disorder_test+";\n"; +// ret += "\tif( ( ("+disorder_test+") ) ){\n"; +// ret += "\t\tdisordered_arrival=true;\n"; +// ret += "\t}else{\n"; +// ret += "\t\tdisordered_arrival=false;\n"; +// ret += "\t}\n"; } @@ -12642,8 +13316,8 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; // update an aggregate object ret += "void update_aggregate(host_tuple &tup0, " - +generate_functor_name()+"_groupdef *gbval, "+ - generate_functor_name()+"_aggrdef *aggval){\n"; + +generate_functor_name()+"_groupdef &gbval, "+ + generate_functor_name()+"_aggrdef &aggval){\n"; // Variables for execution of the function. ret += "\tgs_int32_t problem = 0;\n"; // return unpack failure @@ -12654,7 +13328,7 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; // Unpack all remaining attributes ret += gen_remaining_colrefs(schema, cid_set, found_cids, "", needs_xform); for(a=0;aaggr_var%d",a); + sprintf(tmpstr,"aggval.aggr_var%d",a); string varname = tmpstr; ret.append(generate_aggr_update(varname,&aggr_tbl,a, schema)); } @@ -12666,29 +13340,31 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; // reinitialize an aggregate object ret += "void reinit_aggregates( "+ - generate_functor_name()+"_groupdef *gbval, "+ - generate_functor_name()+"_aggrdef *aggval){\n"; + generate_functor_name()+"_groupdef &gbval, "+ + generate_functor_name()+"_aggrdef &aggval){\n"; // Variables for execution of the function. ret += "\tgs_int32_t problem = 0;\n"; // return unpack failure // use of temporaries depends on the aggregate, // generate them in generate_aggr_update + int temporal_gb; // track the # of the temporal gb for(g=0;gis_temporal()){ if(gdt->is_buffer_type()){ - sprintf(tmpstr,"\t\t%s(&(gbval->gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g); + sprintf(tmpstr,"\t\t%s(&(gbval.gb_var%d),&last_gb%d);\n",gdt->get_hfta_buffer_replace().c_str(),g,g); }else{ - sprintf(tmpstr,"\t\t gbval->gb_var%d =last_gb%d;\n",g,g); + sprintf(tmpstr,"\t\t gbval.gb_var%d =last_gb%d;\n",g,g); } ret += tmpstr; + temporal_gb = g; } } // Unpack all remaining attributes for(a=0;aaggr_var%d",a); + sprintf(tmpstr,"aggval.aggr_var%d",a); string varname = tmpstr; ret.append(generate_aggr_reinitialize(varname,&aggr_tbl,a, schema)); } @@ -12703,13 +13379,24 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; //--------------------------------------------------- // Flush test - ret += "\tbool flush_needed(){\n"; + ret += "gs_int32_t flush_needed(){\n"; if(uses_temporal_flush){ - ret += "\t\treturn needs_temporal_flush;\n"; + ret += "\treturn needs_temporal_flush;\n"; }else{ - ret += "\t\treturn false;\n"; + ret += "\treturn false;\n"; } - ret += "\t};\n"; + ret += "};\n"; + + ret += "bool disordered(){return disordered_arrival;}\n"; + +//------------------------------------------------ +// time bucket management + ret += "void advance_last_tb(){\n"; + ret += "\tlast_gb"+to_string(temporal_gb)+"++;\n"; + ret += "}\n\n"; + ret += "void reset_last_tb(){\n"; + ret += "\tlast_gb"+to_string(temporal_gb)+" = curr_gb"+to_string(temporal_gb)+";\n"; + ret += "}\n\n"; //--------------------------------------------------- // create output tuple @@ -12721,15 +13408,15 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; // so I'll leave it in longhand. ret += "host_tuple create_output_tuple(" - +generate_functor_name()+"_groupdef *gbval, "+ - generate_functor_name()+"_aggrdef *aggval, bool &failed){\n"; + +generate_functor_name()+"_groupdef &gbval, "+ + generate_functor_name()+"_aggrdef &aggval, bool &failed){\n"; ret += "\thost_tuple tup;\n"; ret += "\tfailed = false;\n"; ret += "\tgs_retval_t retval = 0;\n"; - string gbvar = "gbval->gb_var"; - string aggvar = "aggval->"; + string gbvar = "gbval.gb_var"; + string aggvar = "aggval."; // First, get the return values from the UDAFS @@ -12873,14 +13560,14 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; // been unpacked. delete the string udaf return values at the end. ret += "bool cleaning_when(" - +generate_functor_name()+"_groupdef *gbval, "+ - generate_functor_name()+"_aggrdef *aggval){\n"; + +generate_functor_name()+"_groupdef &gbval, "+ + generate_functor_name()+"_aggrdef &aggval){\n"; ret += "\tbool retval = true;\n"; - gbvar = "gbval->gb_var"; - aggvar = "aggval->"; + gbvar = "gbval.gb_var"; + aggvar = "aggval."; set clw_pfcns; @@ -12942,7 +13629,7 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; if(sdt->is_temporal()){ sprintf(tmpstr,"\ttuple->tuple_var%d = ",s); ret += tmpstr; - sprintf(tmpstr,"(flush_finished) ? %s : %s ", generate_se_code(select_list[s]->se,schema).c_str(), generate_se_code_fm_aggr(select_list[s]->se,"last_flushed_gb", "", schema).c_str()); + sprintf(tmpstr,"(flush_finished) ? %s : %s ", generate_se_code(select_list[s]->se,schema).c_str(), generate_se_code_fm_aggr(select_list[s]->se,"last_gb", "", schema).c_str()); ret += tmpstr; ret += ";\n"; } @@ -12957,7 +13644,7 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; ret += "struct "+generate_functor_name()+"_hash_func{\n"; ret += "\tgs_uint32_t operator()(const "+generate_functor_name()+ - "_groupdef *grp) const{\n"; + "_groupdef &grp) const{\n"; ret += "\t\treturn(0"; for(g=0;guse_hashfunc()){ if(gdt->is_buffer_type()) - sprintf(tmpstr,"(%s*%s(&(grp->gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g); + sprintf(tmpstr,"(%s*%s(&(grp.gb_var%d)))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g); else - sprintf(tmpstr,"(%s*%s(grp->gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g); + sprintf(tmpstr,"(%s*%s(grp.gb_var%d))",hash_nums[g%NRANDS].c_str(),gdt->get_hfta_hashfunc().c_str(),g); }else{ - sprintf(tmpstr,"(%s*grp->gb_var%d)",hash_nums[g%NRANDS].c_str(),g); + sprintf(tmpstr,"(%s*grp.gb_var%d)",hash_nums[g%NRANDS].c_str(),g); } ret += tmpstr; } @@ -12982,8 +13669,8 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; // The comparison function ret += "struct "+generate_functor_name()+"_equal_func{\n"; - ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef *grp1, "+ - generate_functor_name()+"_groupdef *grp2) const{\n"; + ret += "\tbool operator()(const "+generate_functor_name()+"_groupdef &grp1, "+ + "const "+generate_functor_name()+"_groupdef &grp2) const{\n"; ret += "\t\treturn( ("; string hcmpr = ""; @@ -12994,13 +13681,13 @@ aggr_table_entry *ate = aggr_tbl.agr_tbl[a]; if(first_exec){first_exec=false;}else{ hcmpr += ") && (";} if(gdt->complex_comparison(gdt)){ if(gdt->is_buffer_type()) - sprintf(tmpstr,"(%s(&(grp1->gb_var%d), &(grp2->gb_var%d))==0)", - gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g); + sprintf(tmpstr,"(%s(&(grp1.gb_var%d), &(grp2.gb_var%d))==0)", + gdt->get_hfta_equals_fcn(gdt).c_str(),g,g); else - sprintf(tmpstr,"(%s((grp1->gb_var%d), (grp2->gb_var%d))==0)", - gdt->get_hfta_comparison_fcn(gdt).c_str(),g,g); + sprintf(tmpstr,"(%s((grp1.gb_var%d), (grp2.gb_var%d))==0)", + gdt->get_hfta_equals_fcn(gdt).c_str(),g,g); }else{ - sprintf(tmpstr,"grp1->gb_var%d == grp2->gb_var%d",g,g); + sprintf(tmpstr,"grp1.gb_var%d == grp2.gb_var%d",g,g); } hcmpr += tmpstr; } @@ -13470,6 +14157,29 @@ void filter_join_qpn::create_protocol_se(vector q_sources, table_list } } +void watch_join_qpn::create_protocol_se(vector q_sources, table_list *Schema){ + int i; + vector *> src_vec; + + for(i=0;iget_protocol_se()); + else + src_vec.push_back(NULL); + } + + for(i=0;iname] = resolve_protocol_se(select_list[i]->se,src_vec,NULL,Schema); + } + + for(i=0;ipr->get_left_se(),src_vec,NULL,Schema)); + hash_src_r.push_back(resolve_protocol_se(hash_eq[kfld]->pr->get_right_se(),src_vec,NULL,Schema)); + } +} + + void sgah_qpn::create_protocol_se(vector q_sources, table_list *Schema){ int i; vector *> src_vec; @@ -13558,7 +14268,7 @@ void mrg_qpn::create_protocol_se(vector q_sources, table_list *Schema for(s=1;s q_sources, table_list *Schema protocol_map[fld_nm] = NULL; } } + +void watch_tbl_qpn::create_protocol_se(vector q_sources, table_list *Schema){ + return; +} +