+
+
+
+
+vector<qp_node *> watch_join_qpn::split_node_for_fta(ext_fcn_list *Ext_fcns, table_list *Schema, int &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
+ vector<qp_node *> ret_vec;
+
+// First check if the query can be pushed to the FTA.
+ bool fta_ok = true;
+ int s;
+ for(s=0;s<select_list.size();s++){
+ fta_ok &= check_fta_forbidden_se(select_list[s]->se,NULL, Ext_fcns);
+ }
+ int p;
+ for(p=0;p<where.size();p++){
+ fta_ok &= check_fta_forbidden_pr(where[p]->pr,NULL, Ext_fcns);
+ }
+
+ if(!fta_ok){
+ fprintf(stderr,"ERROR, watchlist join %s is fta-unsafe.\n",node_name.c_str());
+ exit(1);
+ }
+
+// Can it be done in a single lfta?
+// Get the set of interfaces it accesses.
+ int ierr;
+ int si;
+ vector<string> sel_names;
+ vector<pair<string,string> > ifaces = get_ifaces(from[0], ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
+ if (ifaces.empty()) {
+ fprintf(stderr,"INTERNAL ERROR in filter_join_qpn::split_node_for_fta - empty interface set, node is %s\n", node_name.c_str());
+ exit(1);
+ }
+
+ if(ifaces.size() == 1){
+// Single interface, no need to merge.
+ hfta_returned = 0;
+ ret_vec.push_back(this);
+
+// Treat the range vars a bit differently, the 2nd is reading from a _local_ watchlist.
+ from[0]->set_machine(ifaces[0].first);
+ from[0]->set_interface(ifaces[0].second);
+ from[0]->set_ifq(false);
+
+ from[1]->set_machine(ifaces[0].first);
+ from[1]->set_interface("_local_");
+ from[1]->set_ifq(false);
+
+ return(ret_vec);
+ }else{
+// Multiple interfaces, generate the interface-specific queries plus
+// the merge.
+ hfta_returned = 1;
+
+ vector<string> sel_names;
+ for(si=0;si<ifaces.size();++si){
+ watch_join_qpn *fta_node = new watch_join_qpn();
+
+// Name the fta
+ if(ifaces.size()==1)
+ fta_node->set_node_name( node_name );
+ else{
+ string new_name = "_"+node_name+"_"+ifaces[si].first+"_"+ifaces[si].second;
+ untaboo(new_name);
+ fta_node->set_node_name(new_name);
+ }
+ sel_names.push_back(fta_node->get_node_name());
+
+// Assign the table
+ int f;
+ for(f=0;f<from.size();f++){
+ fta_node->from.push_back(from[f]->duplicate());
+ fta_node->from[f]->set_machine(ifaces[si].first);
+ if(f==0)
+ fta_node->from[f]->set_interface(ifaces[si].second);
+ else
+ fta_node->from[f]->set_interface("_local_");
+ fta_node->from[f]->set_ifq(false);
+ }
+
+ for(s=0;s<select_list.size();s++){
+ fta_node->select_list.push_back( dup_select(select_list[s], NULL) );
+ }
+
+ for(p=0;p<pred_t0.size();p++){
+ predicate_t *new_pr = dup_pr(pred_t0[p]->pr, NULL);
+ cnf_elem *new_cnf = new cnf_elem(new_pr);
+ analyze_cnf(new_cnf);
+ fta_node->pred_t0.push_back(new_cnf);
+ fta_node->where.push_back(new_cnf);
+ }
+ for(p=0;p<pred_t1.size();p++){
+ predicate_t *new_pr = dup_pr(pred_t1[p]->pr, NULL);
+ cnf_elem *new_cnf = new cnf_elem(new_pr);
+ analyze_cnf(new_cnf);
+ fta_node->pred_t1.push_back(new_cnf);
+ fta_node->where.push_back(new_cnf);
+ }
+ for(p=0;p<key_flds.size();p++){ // we've checked that all keys are covered
+ string k = key_flds[p];
+ predicate_t *new_pr = dup_pr(hash_eq[k]->pr, NULL);
+ cnf_elem *new_cnf = new cnf_elem(new_pr);
+ analyze_cnf(new_cnf);
+ fta_node->hash_eq[k] = new_cnf;
+ fta_node->where.push_back(new_cnf);
+ }
+ for(p=0;p<join_filter.size();p++){
+ predicate_t *new_pr = dup_pr(join_filter[p]->pr, NULL);
+ cnf_elem *new_cnf = new cnf_elem(new_pr);
+ analyze_cnf(new_cnf);
+ fta_node->postfilter.push_back(new_cnf);
+ fta_node->where.push_back(new_cnf);
+ }
+ for(p=0;p<postfilter.size();p++){
+ predicate_t *new_pr = dup_pr(postfilter[p]->pr, NULL);
+ cnf_elem *new_cnf = new cnf_elem(new_pr);
+ analyze_cnf(new_cnf);
+ fta_node->postfilter.push_back(new_cnf);
+ fta_node->where.push_back(new_cnf);
+ }
+ fta_node->key_flds = key_flds;
+
+// Xfer all of the parameters.
+// Use existing handle annotations.
+ vector<string> param_names = param_tbl->get_param_names();
+ int pi;
+ for(pi=0;pi<param_names.size();pi++){
+ data_type *dt = param_tbl->get_data_type(param_names[pi]);
+ fta_node->param_tbl->add_param(param_names[pi],dt->duplicate(),
+ param_tbl->handle_access(param_names[pi]));
+ }
+ fta_node->definitions = definitions;
+ if(fta_node->resolve_if_params(ifdb, this->err_str)){
+ this->error_code = 3;
+ return ret_vec;
+ }
+
+ ret_vec.push_back(fta_node);
+ }
+
+ mrg_qpn *mrg_node = new mrg_qpn((watch_join_qpn *)ret_vec[0],
+ node_name, sel_names,ifaces, ifdb);
+ ret_vec.push_back(mrg_node);
+
+ return(ret_vec);
+ }
+
+}
+