X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fftacmp%2Fstream_query.cc;h=faa1ea3ec6f6e1004d60e5a04c724f11b82e125b;hb=refs%2Fchanges%2F49%2F4649%2F2;hp=04317b670f9baa1ae34fe44c6632e02f1d312266;hpb=07495effe193ca3f73c3bf0ce417068f9ac9dcdd;p=com%2Fgs-lite.git diff --git a/src/ftacmp/stream_query.cc b/src/ftacmp/stream_query.cc index 04317b6..faa1ea3 100644 --- a/src/ftacmp/stream_query.cc +++ b/src/ftacmp/stream_query.cc @@ -1,1957 +1,1974 @@ -/* ------------------------------------------------ -Copyright 2014 AT&T Intellectual Property - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - ------------------------------------------- */ -#include"stream_query.h" -#include"generate_utils.h" -#include"analyze_fta.h" -#include -#include -#include - -static char tmpstr[500]; - -using namespace std; - - -// Create a query plan from a query node and an existing -// query plan. Use for lfta queries, the parent query plan provides -// the annotations. -stream_query::stream_query(qp_node *qnode, stream_query *parent){ - query_plan.push_back(qnode); - qhead = 0; - qtail.push_back(0); - attributes = qnode->get_fields(); - parameters = qnode->get_param_tbl(); - defines = parent->defines; - query_name = qnode->get_node_name(); -} - -// Copy the query plan. -stream_query::stream_query(stream_query &src){ - query_plan = src.query_plan; - qhead = src.qhead; - qtail = src.qtail; - attributes = src.attributes; - parameters = src.parameters; - defines = src.defines; - query_name = src.query_name; - gid = src.gid; -} - - -// Create a query plan from an analyzed parse tree. -// Perform analyses to find the output node, input nodes, etc. - -stream_query::stream_query(query_summary_class *qs,table_list *Schema){ -// Generate the query plan nodes from the analyzed parse tree. -// There is only one for now, so just assign the return value -// of create_query_nodes to query_plan - error_code = 0; - query_plan = create_query_nodes(qs,Schema); - int i; -if(query_plan.size() == 0){ - fprintf(stderr,"INTERNAL ERROR, zero-size query plan in stream_query::stream_query\n"); - exit(1); -} - for(i=0;iget_error_code() != 0){ - error_code = query_plan[i]->get_error_code(); - err_str += query_plan[i]->get_error_str(); - } - } - } - qhead = query_plan.size()-1; - gid = -1; - -} - - -stream_query * stream_query::add_query(query_summary_class *qs,table_list *Schema){ -// Add another query block to the query plan - error_code = 0; - vector new_nodes = create_query_nodes(qs, Schema); - query_plan.insert(query_plan.end(),new_nodes.begin(), new_nodes.end()); - return this; -} - -stream_query * stream_query::add_query(stream_query &src){ -// Add another query block to the query plan - error_code = 0; - query_plan.insert(query_plan.end(),src.query_plan.begin(), src.query_plan.end()); - return this; -} - - -void stream_query::generate_protocol_se(map &sq_map, table_list *Schema){ - int i,n; - -// Mapping fields to protocol fields requires schema binding. -// First ensure all nodes are in the schema. - for(n=0;nadd_table(query_plan[n]->get_fields()); - } - } -// Now do schema binding - for(n=0;nbind_to_schema(Schema); - } - } - -// create name-to-index map - map name_to_node; - for(n=0;nget_node_name()] = n; - } - } - -// Create a list of the nodes to process, in order. -// Search from the root down. -// ASSUME tree plan. - - list search_q; - list work_list; - - search_q.push_back(qhead); - while(! search_q.empty()){ - int the_q = search_q.front(); - search_q.pop_front(); work_list.push_front(the_q); - vector the_pred = query_plan[the_q]->get_predecessors(); - for(i=0;i q_sources; - vector q_input_tbls = query_plan[the_q]->get_input_tbls(); - for(i=0;iget_schema_name(); - if(name_to_node.count(itbl_nm)>0){ - q_sources.push_back(query_plan[name_to_node[itbl_nm]]); - }else if(sq_map.count(itbl_nm)>0){ - q_sources.push_back(sq_map[itbl_nm]->get_query_head()); - }else{ - q_sources.push_back(NULL); - } - } - query_plan[the_q]->create_protocol_se(q_sources, Schema); - } - -////////////////////////////////////////////////////////// -// trust but verify - - -/* - for(i=0;iget_node_name().c_str(), - query_plan[i]->node_type().c_str()); - map *pse_map = query_plan[i]->get_protocol_se(); - map::iterator mssi; - for(mssi=pse_map->begin();mssi!=pse_map->end();++mssi){ - if((*mssi).second) - printf("\t%s : %s\n",(*mssi).first.c_str(), (*mssi).second->to_string().c_str()); - else - printf("\t%s : NULL\n",(*mssi).first.c_str()); - } - if(query_plan[i]->node_type() == "filter_join" || query_plan[i]->node_type() == "join_eq_hash_qpn"){ - vector pse_l; - vector pse_r; - if(query_plan[i]->node_type() == "filter_join"){ - pse_l = ((filter_join_qpn *)query_plan[i])->get_hash_l(); - pse_r = ((filter_join_qpn *)query_plan[i])->get_hash_r(); - } - if(query_plan[i]->node_type() == "join_eq_hash_qpn"){ - pse_l = ((join_eq_hash_qpn *)query_plan[i])->get_hash_l(); - pse_r = ((join_eq_hash_qpn *)query_plan[i])->get_hash_r(); - } - int p; - for(p=0;pto_string().c_str()); - else - printf("NULL\n"); - } - } - if(query_plan[i]->node_type() == "sgah_qpn" || query_plan[i]->node_type() == "rsgah_qpn" || query_plan[i]->node_type() == "sgahcwcb_qpn"){ - vector pseg; - if(query_plan[i]->node_type() == "sgah_qpn") - pseg = ((sgah_qpn *)query_plan[i])->get_gb_sources(); - if(query_plan[i]->node_type() == "rsgah_qpn") - pseg = ((rsgah_qpn *)query_plan[i])->get_gb_sources(); - if(query_plan[i]->node_type() == "sgahcwcb_qpn") - pseg = ((sgahcwcb_qpn *)query_plan[i])->get_gb_sources(); - int g; - for(g=0;gto_string().c_str()); - else - printf("\t\tgb %d = NULL\n",g); - } - } - } - } -*/ - -} - -bool stream_query::generate_linkage(){ - bool create_failed = false; - int n, f,s; - -// Clear any leftover linkages - for(n=0;nclear_predecessors(); - query_plan[n]->clear_successors(); - } - } - qtail.clear(); - -// create name-to-index map - map name_to_node; - for(n=0;nget_node_name()] = n; - } - } - -// Do the 2-way linkage. - for(n=0;n fm = query_plan[n]->get_input_tbls(); - for(f=0;fget_schema_name(); - if(name_to_node.count(src_tbl)>0){ - int s = name_to_node[src_tbl]; - query_plan[n]->add_predecessor(s); - query_plan[s]->add_successor(n); - } - } - } - } - -// Find the head (no successors) and the tails (at least one -// predecessor is external). -// Verify that there is only one head, -// and that all other nodes have one successor (because -// right now I can only handle trees). - - qhead = -1; // no head yet found. - for(n=0;n succ = query_plan[n]->get_successors(); -/* - if(succ.size() > 1){ - fprintf(stderr,"ERROR, query node %s supplies data to multiple nodes, but currently only tree query plans are supported:\n",query_plan[n]->get_node_name().c_str()); - for(s=0;sget_node_name().c_str()); - } - fprintf(stderr,"\n"); - create_failed = true; - } -*/ - if(succ.size() == 0){ - if(qhead >= 0){ - fprintf(stderr,"ERROR, query nodes %s and %s both have zero successors.\n",query_plan[n]->get_node_name().c_str(), query_plan[qhead]->get_node_name().c_str()); - create_failed = true; - }else{ - qhead = n; - } - } - if(query_plan[n]->n_predecessors() < query_plan[n]->get_input_tbls().size()){ - qtail.push_back(n); - } - } - } - - return create_failed; -} - -// After the collection of query plan nodes is generated, -// analyze their structure to link them up into a tree (or dag?). -// Verify that the structure is acceptable. -// Do some other analysis and verification tasks (?) -// then gather summar information. -int stream_query::generate_plan(table_list *Schema){ - -// The first thing to do is verify that the query plan -// nodes were successfully created. - bool create_failed = false; - int n,f,s; - for(n=0;nget_error_code()){ - fprintf(stderr,"%s",query_plan[n]->get_error_str().c_str()); - create_failed = true; - } - } -/* -for(n=0;nget_node_name(); -printf("In generate_plan, node %d is %s, reads from:\n\t",n,nstr.c_str()); -vector inv = query_plan[n]->get_input_tbls(); -int nn; -for(nn=0;nnto_string().c_str(),inv[nn]->get_schema_ref()); -} -printf("\n"); -} -} -*/ - - if(create_failed) return(1); - -// Here, link up the query nodes, then verify that the -// structure is acceptable (single root, no cycles, no stranded -// nodes, etc.) - create_failed = generate_linkage(); - if(create_failed) return -1; - - -// Here, do optimizations such as predicate pushing, -// join rearranging, etc. -// Nothing to do yet. - -/* -for(n=0;nget_node_name(); -printf("B generate_plan, node %d is %s, reads from:\n\t",n,nstr.c_str()); -vector inv = query_plan[n]->get_input_tbls(); -int nn; -for(nn=0;nnto_string().c_str(),inv[nn]->get_schema_ref()); -} -printf("\n"); -} -} -printf("qhead=%d, qtail = ",qhead); -int nn; -for(nn=0;nnget_node_name(); - attributes = query_plan[qhead]->get_fields(); -// TODO: The params and defines require a lot more thought. - parameters = query_plan[qhead]->get_param_tbl(); - defines = query_plan[qhead]->get_definitions(); - - return(0); - }; - -void stream_query::add_output_operator(ospec_str *o){ - output_specs.push_back(o); -} - - -void stream_query::get_external_libs(set &libset){ - - int qn,i; - for(qn=0;qn op_libs = query_plan[qn]->external_libs(); - for(i=0;i op_libs = output_operators[qn]->external_libs(); - for(i=0;i stream_query::split_query(ext_fcn_list *Ext_fcns, table_list *Schema, bool &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){ - vector queries; - int l,q,s; - int qp_hfta; - - hfta_returned = false; // assume until proven otherwise - - for(l=0;l qnodes = query_plan[leaf]->split_node_for_fta(Ext_fcns, Schema, qp_hfta, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx); - - - if(qnodes.size() == 0 || query_plan[leaf]->get_error_code()){ // error -//printf("error\n"); - error_code = query_plan[leaf]->get_error_code(); - err_str = query_plan[leaf]->get_error_str(); - vector null_result; - return(null_result); - } - if(qnodes.size() == 1 && qp_hfta){ // nothing happened -//printf("no change\n"); - query_plan[leaf] = qnodes[0]; - } - if(qnodes.size() == 1 && !qp_hfta){ // push to lfta -//printf("lfta only\n"); - queries.push_back(new stream_query(qnodes[0], this)); - vector succ = query_plan[leaf]->get_successors(); - for(s=0;sremove_predecessor(leaf); - } - query_plan[leaf] = NULL; // delete it? - } - if(qnodes.size() > 1){ // actual splitting occurred. - if(!qp_hfta){ // internal consistency check. - error_code = 1; - err_str = "INTERNAL ERROR: mulitple nodes returned by split_node_for_fta, but none are hfta nodes.\n"; - vector null_result; - return(null_result); - } - - for(q=0;qget_node_name().c_str()); - queries.push_back(new stream_query(qnodes[q], this)); - } -// Use new hfta node - query_plan[leaf] = qnodes[qnodes.size()-1]; -// Add in any extra hfta nodes - for(q=qnodes.size()-qp_hfta;q ifps; - for(q=0;qcount_ifp_refs(ifps); - hfta_returned = true; - } - } - - if(n_ifprefs){ - set::iterator ssi; - err_str += "ERROR, unresolved interface parameters in HFTA:\n"; - for(ssi=ifps.begin();ssi!=ifps.end();++ssi){ - err_str += (*ssi)+" "; - } - err_str += "\n"; - error_code = 3; - vector null_result; - return(null_result); - } - - - if(hfta_returned){ - if(generate_linkage()){ - fprintf(stderr,"INTERNAL ERROR, generate_linkage failed in split_query.\n"); - exit(1); - } - queries.push_back(this); - } - - return(queries); -} - - - -vector stream_query::extract_opview(table_list *Schema, vector &qnodes, opview_set &opviews, string silo_nm){ - vector subqueries; - int l,q; - - string root_name = this->get_output_tabledef()->get_tbl_name(); - - - for(l=0;l new_qnodes = query_plan[leaf]->extract_opview(Schema, qnodes, opviews, root_name, silo_nm); - - for(q=0;qget_fields()->to_string(); - - ret += "DEFINE{\n"; - ret += "\tquery_name '"+query_plan[q]->get_node_name()+"';\n"; - - map defs = query_plan[q]->get_definitions(); - map::iterator dfi; - for(dfi=defs.begin(); dfi!=defs.end(); ++dfi){ - ret += "\t"+ (*dfi).first + " '" + (*dfi).second + "';\n"; - } - ret += "}\n\n"; - - ret += "PARAM{\n"; - param_table *params = query_plan[q]->get_param_tbl(); - vector param_names = params->get_param_names(); - int p; - for(p=0;pget_data_type( param_names[p] ); - ret += "\t" + param_names[p] + " '" + dt->get_type_str() + "';\n"; - } - ret += "}\n"; - - ret += query_plan[q]->to_query_string(); - ret += "\n}\n\n"; - - return(ret); - -} - -string stream_query::collect_refd_ifaces(){ - string ret=""; - int q; - for(q=0;q defs = query_plan[q]->get_definitions(); - if(defs.count("_referenced_ifaces")){ - if(ret != "") ret += ","; - ret += defs["_referenced_ifaces"]; - } - } - } - - return ret; -} - - -bool stream_query::stream_input_only(table_list *Schema){ - vector input_tbls = this->get_input_tables(); - int i; - for(i=0;iget_table_ref(input_tbls[i]->get_schema_name()); - if(Schema->get_schema_type(t) == PROTOCOL_SCHEMA) return(false); - } - return(true); -} - -// Return input tables. No duplicate removal performed. -vector stream_query::get_input_tables(){ - vector retval; - -// create name-to-index map - int n; - map name_to_node; - for(n=0;nget_node_name()] = n; - } - } - - int l; - for(l=0;l tmp_v = query_plan[leaf]->get_input_tbls(); - int i; - for(i=0;iget_schema_name()) == 0) - retval.push_back(tmp_v[i]); - } - } - return(retval); -} - - -void stream_query::compute_node_format(int q, vector &nfmt, map &op_idx){ - int netcnt = 0, hostcnt = 0; - int i; - - vector itbls = query_plan[q]->get_input_tbls(); - for(i=0;iget_schema_name(); - if(op_idx.count(tname)){ - int o = op_idx[tname]; - if(nfmt[o] == UNKNOWNFORMAT) - compute_node_format(o,nfmt,op_idx); - if(nfmt[o] == NETFORMAT) netcnt++; - else hostcnt++; - }else{ - netcnt++; - } - } - if(query_plan[q]->makes_transform()){ - nfmt[q] = HOSTFORMAT; - }else{ - if(hostcnt>0){ - nfmt[q] = HOSTFORMAT; - }else{ - nfmt[q] = NETFORMAT; - } - } -//printf("query plan %d (%s) is ",q,query_plan[q]->get_node_name().c_str()); -//if(nfmt[q] == HOSTFORMAT) printf(" host format.\n"); -//else printf("net format\n"); -} - - -string stream_query::generate_hfta(table_list *Schema, ext_fcn_list *Ext_fcns, opview_set &opviews, bool distributed_mode){ - int schref, ov_ix, i, q, param_sz; - bool dag_graph = false; - - -// Bind the SEs in all query plan nodes to this schema, and -// Add all tables used by this query to the schema. -// Question: Will I be polluting the global schema by adding all -// query node schemas? - -// First ensure all nodes are in the schema. - int qn; - for(qn=0;qnadd_table(query_plan[qn]->get_fields()); - } - } -// Now do binding. - for(qn=0;qnbind_to_schema(Schema); - } - } - -// Is it a DAG plan? - set qsources; - int n; - for(n=0;n tmp_v = query_plan[n]->get_input_tbls(); - int i; - for(i=0;iget_schema_name()) > 0) - dag_graph = true; - qsources.insert(tmp_v[i]->get_schema_name()); - } - } - } - - - -// Collect set of tables ref'd in this HFTA - set tbl_set; - for(qn=0;qn input_tbls = query_plan[qn]->get_input_tbls(); - vector output_tbls = query_plan[qn]->get_output_tbls(); -// Convert to tblrefs, add to set of ref'd tables - int i; - for(i=0;iget_table_ref(input_tbls[i]->get_schema_name()); - int t = input_tbls[i]->get_schema_ref(); - if(t < 0){ - fprintf(stderr,"INTERNAL ERROR in generate_hfta. " - "query plan node %s references input table %s, which is not in schema.\n", - query_name.c_str(), input_tbls[i]->get_schema_name().c_str()); - exit(1); - } - tbl_set.insert(t); - } - - for(i=0;iget_table_ref(output_tbls[i]->get_schema_name()); - if(t < 0){ - fprintf(stderr,"INTERNAL ERROR in generate_hfta." - "query plan node %s references output table %s, which is not in schema.\n", - query_name.c_str(), output_tbls[i]->get_schema_name().c_str()); - exit(1); - } - tbl_set.insert(t); - } - } - } - -// Collect map of lftas, query nodes - map op_idx; - for(q=0;qget_node_name()] = q; - } - } - -// map of input tables must include query id and input -// source (0,1) becuase several queries might reference the same source - vector input_tbls = this->get_input_tables(); - vector input_tbl_free; - for(i=0;i lfta_idx; -//fprintf(stderr,"%d input tables, %d query nodes\n",input_tbls.size(), query_plan.size()); - for(q=0;q itbls = query_plan[q]->get_input_tbls(); - int it; - for(it=0;itget_schema_name()+"-"+int_to_string(q)+"-"+int_to_string(it); - string src_tblname = itbls[it]->get_schema_name(); - bool src_is_external = false; - for(i=0;iget_schema_name()){ - src_is_external = true; - if(input_tbl_free[i]){ - lfta_idx[tname] = i; - input_tbl_free[i] = false; -//fprintf(stderr,"Adding %s (src_tblname=%s, q=%d, it=%d) to %d.\n",tname.c_str(), src_tblname.c_str(), q, it, i); - break; - } - } - } - if(i==input_tbls.size() && src_is_external){ - fprintf(stderr,"INTERNAL ERROR in stream_query::generate_hfta, can't find free entry in input_tbls for query %d, intput %d (%s)\n",q,it,src_tblname.c_str()); - exit(1); - } - } - } - } -/* - for(i=0;iget_schema_name(); - lfta_idx[src_tblname] = i; - } -*/ - -// Compute the output formats of the operators. - vector node_fmt(query_plan.size(),UNKNOWNFORMAT); - compute_node_format(qhead, node_fmt, op_idx); - - -// Generate the schema strings for the outputs. - string schema_str; - for(i=0;imake_schema(i); - schema_str += "gs_csp_t node"+int_to_string(i)+"_schema = "+make_C_embedded_string(schema_tmpstr)+";\n"; - } - } - - attributes = query_plan[qhead]->get_fields(); - - - string schema_tmpstr = this->make_schema(); - schema_str += "gs_csp_t "+generate_schema_string_name(query_name)+" = "+make_C_embedded_string(schema_tmpstr)+";\n"; - -// Generate the collection of tuple defs. - - string tuple_defs = "\n/*\tDefine tuple structures \t*/\n\n"; - set::iterator si; - for(si=tbl_set.begin(); si!=tbl_set.end(); ++si){ - tuple_defs += generate_host_tuple_struct( Schema->get_table( (*si) )); - tuple_defs += "\n\n"; - } - -// generate the finalize tuple function - string finalize_str = generate_hfta_finalize_tuple(attributes); - -// Analyze and make the output operators - bool eat_input = false; - string src_op = query_name; - string pred_op = src_op; - int last_op = -1; - if(output_specs.size()>0) - eat_input = true; - if(n_successors>0) - eat_input = false; // must create stream output for successor HFTAs. - int n_filestreams = 0; - for(i=0;ioperator_type == "stream" ){ - eat_input = false; - } - if(output_specs[i]->operator_type == "file" || output_specs[i]->operator_type == "zfile" ){ - last_op = i; - n_filestreams++; - } - } - int filestream_id = 0; - for(i=0;ioperator_type == "file" || output_specs[i]->operator_type == "zfile"){ - - int n_fstreams = output_specs[i]->n_partitions / n_parallel; - if(n_fstreams * n_parallel < output_specs[i]->n_partitions){ - n_fstreams++; - if(n_parallel == 1 || query_name.find("__copy1") != string::npos){ - fprintf(stderr,"WARNING, in query %s, %d streams requested for %s output, but it must be a multiple of the hfta parallelism (%d), increasing number of output streams to %d.\n",query_name.c_str(), output_specs[i]->n_partitions, output_specs[i]->operator_type.c_str(), n_parallel, n_fstreams*n_parallel); - } - } -// output_file_qpn *new_ofq = new output_file_qpn(); - string filestream_tag = ""; - if(n_filestreams>1){ - filestream_tag = "_fileoutput"+int_to_string(filestream_id); - filestream_id++; - } - output_file_qpn *new_ofq = new output_file_qpn(pred_op, src_op, filestream_tag, query_plan[qhead]->get_fields(), output_specs[i], (i==last_op ? eat_input : false) ); -// if(n_fstreams > 1){ - if(n_fstreams > 0){ - string err_str; - bool err_ret = new_ofq->set_splitting_params(n_parallel,parallel_idx,n_fstreams,output_specs[i]->partitioning_flds,err_str); - if(err_ret){ - fprintf(stderr,"%s",err_str.c_str()); - exit(1); - } - } - output_operators.push_back(new_ofq ); - pred_op = output_operators.back()->get_node_name(); - }else if(! (output_specs[i]->operator_type == "stream" || output_specs[i]->operator_type == "Stream" || output_specs[i]->operator_type == "STREAM") ){ - fprintf(stderr,"WARNING, output operator type %s (on query %s) is not supported, ignoring\n",output_specs[i]->operator_type.c_str(),query_name.c_str() ); - } - } - - - -// Generate functors for the query nodes. - - string functor_defs = "\n/*\tFunctor definitions\t*/\n\n"; - for(qn=0;qn needs_xform; - vector itbls = query_plan[qn]->get_input_tbls(); - for(i=0;iget_schema_name(); -// if(query_plan[qn]->makes_transform()){ - if(op_idx.count(tname)>0){ - if(node_fmt[ op_idx[tname] ] == NETFORMAT){ - needs_xform.push_back(true); - }else{ - needs_xform.push_back(false); - } - }else{ - needs_xform.push_back(true); - } -// }else{ -// if(op_idx.count(tname)>0){ -// if(node_fmt[qn] != node_fmt[ op_idx[tname] ]){ -// needs_xform.push_back(true); -// }else{ -// needs_xform.push_back(false); -// } -// }else{ -// if(node_fmt[qn] == HOSTFORMAT){ -// needs_xform.push_back(true); -// }else{ -// needs_xform.push_back(false); -// } -// } -// } - } - - functor_defs += query_plan[qn]->generate_functor(Schema, Ext_fcns, needs_xform); - } - } - -// Generate output operator functors - - vector needs_xform; - for(i=0;igenerate_functor(Schema, Ext_fcns, needs_xform); - - string ret = -"extern \"C\" {\n" -"#include \n" -"#include \n" -"#include \n" -"#include \n" -"#include \n" -"#include \n" -"}\n" -; - if(dag_graph) - ret += -"#define PLAN_DAG\n" -; - ret += -"#include \n" -"#include\n" -"\n" -"#include \n" -"#include \n" -"#include \n" -"#include \n" -"\n" -//"#define MAXSCHEMASZ 16384\n" -"#include \n\n" -; - -// Get include file for each of the operators. -// avoid duplicate inserts. - set include_fls; - for(qn=0;qnget_include_file()); - } - for(i=0;iget_include_file()); - set::iterator ssi; - for(ssi=include_fls.begin();ssi!=include_fls.end();++ssi) - ret += (*ssi); - -// Add defines for hash functions - ret += -"\n" -"#define hfta_BOOL_to_hash(x) (x)\n" -"#define hfta_USHORT_to_hash(x) (x)\n" -"#define hfta_UINT_to_hash(x) (x)\n" -"#define hfta_IP_to_hash(x) (x)\n" -"#define hfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n" -"#define hfta_INT_to_hash(x) (gs_uint32_t)(x)\n" -"#define hfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n" -"#define hfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n" -"#define hfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n" -"\n" -; - -// ret += "#define SERIOUS_LFTA \""+input_tbls[0]->get_schema_name()+"\"\n"; - ret += "#define OUTPUT_HFTA \""+query_name+"\"\n\n"; - -// HACK ALERT: I know for now that all query plans are -// single operator plans, but while SPX and SGAH can use the -// UNOP template, the merge operator must use the MULTOP template. -// WORSE HACK ALERT : merge does not translate its input, -// so don't apply finalize to the output. -// TODO: clean this up. - -// string node_type = query_plan[0]->node_type(); - - ret += schema_str; - ret += tuple_defs; - -// Need to work on the input, output xform logic. -// For now, just add it in. -// ret += finalize_str; - - if(node_fmt[qhead] == NETFORMAT){ - ret += -"void finalize_tuple(host_tuple &tup){\n" -"return;\n" -"}\n" -"\n"; - }else{ - ret += finalize_str; - } - - ret += functor_defs; - -// Parameter block management -// The proper parameter block must be transmitted to each -// external stream source. -// There is a 1-1 mapping between the param blocks returned -// by this list and the registered data sources ... -// TODO: make this more manageable, but for now -// there is no parameter block manipulation so I just -// need to have the same number. - - ret += -"int get_lfta_params(gs_int32_t sz, void * value,list& lst){\n" -" // for now every lfta receive the full copy of hfta parameters\n" -" struct param_block pb;\n"; - - set lfta_seen; - for(i=0;iget_schema_name(); - if(lfta_seen.count(src_tblname) == 0){ - lfta_seen.insert(src_tblname); - schref = input_tbls[i]->get_schema_ref(); - if(Schema->get_schema_type(schref) == OPERATOR_VIEW_SCHEMA){ - ov_ix = input_tbls[i]->get_opview_idx(); - opview_entry *opv = opviews.get_entry(ov_ix); - string op_param = "SUBQ:"; - int q; - for(q=0;qsubq_names.size();++q){ - if(q>0) op_param+=","; - op_param+=opv->subq_names[q]; - } - op_param+="\\n"; - param_sz = op_param.size()-1; - - sprintf(tmpstr,"\t\tpb.block_length = %d;\n",param_sz); ret+=tmpstr; - ret+= -" pb.data = malloc(pb.block_length);\n"; - ret+="\t\tmemcpy(pb.data,\""+op_param+"\",pb.block_length);\n" -" lst.push_back(pb);\n\n"; - }else{ - ret+= -" pb.block_length = sz;\n" -" pb.data = malloc(pb.block_length);\n" -" memcpy(pb.data, value, pb.block_length);\n" -" lst.push_back(pb);\n\n"; - } - } - } - - ret += -" return 0;\n" -"}\n" -"\n"; - - ret+= -"struct FTA* alloc_hfta (struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command, gs_int32_t sz, void * value ) {\n" -"\n" -" // find the lftas\n" -" list *lfta_list = new list;\n" -"\n" -" FTAID f;\n" -" char schemabuf[MAXSCHEMASZ];\n" -" gs_schemahandle_t schema_handle;\n" -"\n"; - -// Register the input data sources. -// Register a source only once. - -// vector ext_reg_txt; - map input_tbl_srcid; - for(i=0;iget_schema_name(); -// Use UDOP alias when in distributed mode. -// the cluster manager will make the translation -// using infr from qtree.xml - if(distributed_mode && input_tbls[i]->get_udop_alias() != "") - src_tblname = input_tbls[i]->get_udop_alias(); - if(input_tbl_srcid.count(src_tblname) == 0){ - int srcid = input_tbl_srcid.size(); - input_tbl_srcid[src_tblname] = srcid; - string tmp_s= -"\n // find "+src_tblname+"\n" -" if (fta_find(\""+src_tblname+"\",1,&f,schemabuf,MAXSCHEMASZ)!=0) {\n" -" fprintf(stderr,\"HFTA::error:could not find LFTA \\n\");\n" -" return 0;\n" -" }\n" -" //fprintf(stderr,\"HFTA::FTA found at %u[%u]\\n\",ftamsgid,ftaindex);\n" -"\n" -" // parse the schema and get the schema handle\n" -" schema_handle = ftaschema_parse_string(schemabuf);\n" -" lfta_info* inf"+int_to_string(srcid)+" = new lfta_info();\n" -" inf"+int_to_string(srcid)+"->f = f;\n" -" inf"+int_to_string(srcid)+"->fta_name = strdup(\""+src_tblname+"\");\n" -" inf"+int_to_string(srcid)+"->schema = strdup(schemabuf);\n" -" inf"+int_to_string(srcid)+"->schema_handle = schema_handle;\n" -" lfta_list->push_back(inf"+int_to_string(srcid)+");\n\n"; -// ext_reg_txt.push_back(tmp_s); - ret += tmp_s; - } - } - - ret+="\n"; - ret += "\tgs_schemahandle_t root_schema_handle = ftaschema_parse_string("+generate_schema_string_name(query_name)+");\n"; - for(i=0;iget_node_name()+") \n"; - -// Create parameters for operator construction. - string op_params; - vector itbls = query_plan[q]->get_input_tbls(); - string tname = itbls[0]->get_schema_name(); -// string li_tname = tname +"-"+int_to_string(q)+"-0"; -// if(lfta_idx.count(li_tname)>0) - if(input_tbl_srcid.count(tname)>0){ -// ret += ext_reg_txt[lfta_idx[li_tname]]; -// op_params += "inf"+int_to_string( lfta_idx[li_tname] )+"->schema_handle"; - op_params += "inf"+int_to_string( input_tbl_srcid[tname] )+"->schema_handle"; - }else if(op_idx.count(tname)>0){ - op_params += "op"+int_to_string( op_idx[tname] )+"_schema_handle"; - }else{ - fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when creating operator (3) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str()); - exit(1); - } - if(itbls.size()>1){ - string tname = itbls[1]->get_schema_name(); -// string li_tname = tname +"-"+int_to_string(q)+"-1"; -// if(lfta_idx.count(li_tname)>0) - if(input_tbl_srcid.count(tname)>0){ -// ret += ext_reg_txt[lfta_idx[li_tname]]; -// op_params += ",inf"+int_to_string( lfta_idx[li_tname] )+"->schema_handle"; - op_params += ",inf"+int_to_string( input_tbl_srcid[tname] )+"->schema_handle"; - }else if(op_idx.count(tname)>0){ - op_params += ",op"+int_to_string( op_idx[tname] )+"_schema_handle"; - }else{ - fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when creating operator (4) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str()); - exit(1); - } - } - ret += query_plan[q]->generate_operator(q,op_params); - ret += -" operator_node* node"+int_to_string(q)+" = new operator_node(op"+int_to_string(q)+");\n"; - - n_basic_ops = q; - } - } - n_basic_ops++; -// Next for the output operators if any - for(i=0;igenerate_operator(n_basic_ops+i,"root_schema_handle"); - ret += -" operator_node* node"+int_to_string(n_basic_ops+i)+" = new operator_node(op"+int_to_string(n_basic_ops+i)+");\n"; - } - - -// Link up operators. - for(q=0;q itbls = query_plan[q]->get_input_tbls(); - string tname = itbls[0]->get_schema_name(); -// string li_tname = tname +"-"+int_to_string(q)+"-0"; -// if(lfta_idx.count(li_tname)>0) - if(input_tbl_srcid.count(tname)>0){ -// ret += "\tnode"+int_to_string(q)+"->set_left_lfta(inf"+int_to_string( lfta_idx[li_tname] )+");\n"; - ret += "\tnode"+int_to_string(q)+"->set_left_lfta(inf"+int_to_string( input_tbl_srcid[tname] )+");\n"; - }else if(op_idx.count(tname)>0){ - ret += "\tnode"+int_to_string(q)+"->set_left_child_node(node"+int_to_string( op_idx[tname] )+");\n"; - }else{ - fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when linking operator (1) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str()); - exit(1); - } - if(itbls.size()>1){ - string tname = itbls[1]->get_schema_name(); -// string li_tname = tname +"-"+int_to_string(q)+"-1"; -// if(lfta_idx.count(li_tname)>0) - if(input_tbl_srcid.count(tname)>0){ -// ret += "\tnode"+int_to_string(q)+"->set_right_lfta(inf"+int_to_string( lfta_idx[li_tname] )+");\n"; - ret += "\tnode"+int_to_string(q)+"->set_right_lfta(inf"+int_to_string( input_tbl_srcid[tname] )+");\n"; - }else if(op_idx.count(tname)>0){ - ret += "\tnode"+int_to_string(q)+"->set_right_child_node(node"+int_to_string( op_idx[tname] )+");\n"; - }else{ - fprintf(stderr,"INTERNAL ERROR, can't identify input table %s (%s) when linking operator (2) %d (%s)\n",tname.c_str(), tname.c_str(),q,query_plan[q]->get_node_name().c_str()); - exit(1); - } - } - } - } - for(i=0;iset_left_child_node(node"+int_to_string( qhead )+");\n"; - else - ret += "\tnode"+int_to_string(n_basic_ops+i)+"->set_left_child_node(node"+int_to_string( n_basic_ops+i-1 )+");\n"; - } - - // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters - - bool fta_reusable = false; - if (query_plan[qhead]->get_val_of_def("reusable") == "yes" || - query_plan[qhead]->get_param_tbl()->size() == 0) { - fta_reusable = 1; - } - - int root_node = qhead; - if(output_operators.size()>0) - root_node = n_basic_ops+i-1; - - ret+= -"\n" -"\n" -" MULTOP_HFTA* ftap = new MULTOP_HFTA(ftaid, OUTPUT_HFTA, command, sz, value, root_schema_handle, node"+int_to_string(root_node)+", lfta_list, " + (fta_reusable ?"true":"false") + ", reusable);\n" -" if(ftap->init_failed()){ delete ftap; return 0;}\n" -" return (FTA*)ftap;\n" -"}\n" -"\n" -"\n"; - - - string comm_bufsize = "16*1024*1024"; - if(defines.count("hfta_comm_buf")>0){ - comm_bufsize = defines["hfta_comm_buf"]; - } - - ret+= -"\n" -"int main(int argc, char * argv[]) {\n" -"\n" -"\n" -" /* parse the arguments */\n" -"\n" -" gs_int32_t tip1,tip2,tip3,tip4;\n" -" endpoint gshub;\n" -" gs_sp_t instance_name;\n" -" if (argc<3) {\n" -" gslog(LOG_EMERG,\"Wrong arguments at startup\");\n" -" exit(1);\n" -" }\n" -"\n" -" if ((sscanf(argv[1],\"%u.%u.%u.%u:%hu\",&tip1,&tip2,&tip3,&tip4,&(gshub.port)) != 5)) {\n" -" gslog(LOG_EMERG,\"HUB IP NOT DEFINED\");\n" -" exit(1);\n" -" }\n" -" gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);\n" -" gshub.port=htons(gshub.port);\n" -" instance_name=strdup(argv[2]);\n" -" if (set_hub(gshub)!=0) {\n" -" gslog(LOG_EMERG,\"Could not set hub\");\n" -" exit(1);\n" -" }\n" -" if (set_instance_name(instance_name)!=0) {\n" -" gslog(LOG_EMERG,\"Could not set instance name\");\n" -" exit(1);\n" -" }\n" -"\n" -"\n" -" /* initialize host library */\n" -"\n" -//" fprintf(stderr,\"Initializing gscp\\n\");\n" -" gsopenlog(argv[0]);\n" -"\n" -" if (hostlib_init(HFTA, "+comm_bufsize+", DEFAULTDEV, 0, 0)!=0) {\n" -" fprintf(stderr,\"%s::error:could not initialize gscp\\n\",\n" -" argv[0]);\n" -" exit(1);\n" -" }\n" -"\n" -"\n" -"\n" -" FTAID ret = fta_register(OUTPUT_HFTA, " + (fta_reusable?"1":"0") + ", DEFAULTDEV, alloc_hfta, "+generate_schema_string_name(query_name)+", -1, 0ull);\n" -" fta_start_service(-1);\n" -"\n" -" return 0;\n" -"\n" -"}\n" -"\n"; -//////////////////// - return(ret); -} - - -// Checks if the node i is compatible with interface partitioning -// (can be pushed below the merge that combines partitioned stream) -// i - index of the query node in a query plan -bool stream_query::is_partn_compatible(int index, map lfta_names, vector interface_names, vector machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result) { - int i; - qp_node* node = query_plan[index]; - qp_node* child_node = NULL; - - if (node->predecessors.empty()) - return false; - - // all the node predecessors must be partition merges with the same partition definition - partn_def_t* partn_def = NULL; - for (i = 0; i < node->predecessors.size(); ++i) { - child_node = query_plan[node->predecessors[i]]; - if (child_node->node_type() != "mrg_qpn") - return false; - - // merge must have only one parent for this optimization to work - // check that all its successors are the same - for (int j = 1; j < child_node->successors.size(); ++j) { - if (child_node->successors[j] != child_node->successors[0]) - return false; - } - - partn_def_t* new_partn_def = ((mrg_qpn*)child_node)->get_partn_definition(lfta_names, interface_names, machine_names, ifaces_db, partn_parse_result); - if (!new_partn_def) - return false; - if (!i) - partn_def = new_partn_def; - else if (new_partn_def != partn_def) - return false; - - } - - if (node->node_type() == "spx_qpn") // spx nodes are always partition compatible - return true; - else if (node->node_type() == "sgah_qpn") { - gb_table gb_tbl = ((sgah_qpn*)node)->gb_tbl; - return true; //partn_def->is_compatible(&gb_tbl); - } - else if (node->node_type() == "rsgah_qpn") { - gb_table gb_tbl = ((rsgah_qpn*)node)->gb_tbl; - return partn_def->is_compatible(&gb_tbl); - } - else if (node->node_type() == "sgahcwcb_qpn") { - gb_table gb_tbl = ((sgahcwcb_qpn*)node)->gb_tbl; - return partn_def->is_compatible(&gb_tbl); - } - else if (node->node_type() == "join_eq_hash_qpn") { - return true; - } - else - return false; -} - -// Push the operator below the merge that combines -void stream_query::pushdown_partn_operator(int index) { - qp_node* node = query_plan[index]; - int i; - char node_index[128]; - -// fprintf(stderr, "Partn pushdown, query %s of type %s\n", node->get_node_name().c_str(), node->node_type().c_str()); - - - // HACK ALERT: When reordering merges we screw up slack computation - // since slack should no longer be used, it is not an issue - - - // we can safely reorder nodes that have one and only one temporal atribute in select list - table_def* table_layout = node->get_fields(); - vector fields = table_layout->get_fields(); - int merge_fieldpos = -1; - - data_type* dt; - for (i = 0; i < fields.size(); ++i) { - data_type dt(fields[i]->get_type(),fields[i]->get_modifier_list()); - if(dt.is_temporal()) { - if (merge_fieldpos != -1) // more that one temporal field found - return; - merge_fieldpos = i; - } - } - - if (merge_fieldpos == -1) // if no temporal fieldf found - return; - - std::vector mvars; // the merge-by columns. - - // reodring procedure is different for unary operators and joins - if (node->node_type() == "join_eq_hash_qpn") { - vector new_nodes; - - tablevar_t *left_table_name; - tablevar_t *right_table_name; - mrg_qpn* left_mrg = (mrg_qpn*)query_plan[node->predecessors[0]]; - mrg_qpn* right_mrg = (mrg_qpn*)query_plan[node->predecessors[1]]; - - // for now we will only consider plans where both child merges - // merge the same set of streams - - if (left_mrg->fm.size() != right_mrg->fm.size()) - return; - - // maping of interface names to table definitions - map iface_map; - for (i = 0; i < left_mrg->fm.size(); i++) { - left_table_name = left_mrg->fm[i]; - iface_map[left_table_name->get_machine() + left_table_name->get_interface()] = left_table_name; - } - - for (i = 0; i < right_mrg->fm.size(); i++) { - right_table_name = right_mrg->fm[i]; - - // find corresponding left tabke - if (!iface_map.count(right_table_name->get_machine() + right_table_name->get_interface())) - return; - - left_table_name = iface_map[right_table_name->get_machine() + right_table_name->get_interface()]; - - // create new join nodes - sprintf(node_index, "_%d", i); - join_eq_hash_qpn* new_node = (join_eq_hash_qpn*)node->make_copy(node_index); - - // make a copy of right_table_name - right_table_name = right_table_name->duplicate(); - left_table_name->set_range_var(((join_eq_hash_qpn*)node)->from[0]->get_var_name()); - right_table_name->set_range_var(((join_eq_hash_qpn*)node)->from[1]->get_var_name()); - new_node->from[0] = left_table_name; - new_node->from[1] = right_table_name; - new_nodes.push_back(new_node); - } - - // make right_mrg a new root - right_mrg->set_node_name(node->get_node_name()); - right_mrg->table_layout = table_layout; - right_mrg->merge_fieldpos = merge_fieldpos; - - for (i = 0; i < right_mrg->fm.size(); i++) { - // make newly create joins children of merge - sprintf(node_index, "_%d", i); - right_mrg->fm[i] = new tablevar_t(right_mrg->fm[i]->get_machine().c_str(), right_mrg->fm[i]->get_interface().c_str(), (node->get_node_name() + string(node_index)).c_str()); - sprintf(node_index, "_m%d", i); - right_mrg->fm[i]->set_range_var(node_index); - right_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos)); - - } - - if (left_mrg != right_mrg) - query_plan[node->predecessors[0]] = NULL; // remove left merge from the plan - - query_plan.insert(query_plan.end(), new_nodes.begin(), new_nodes.end()); - - } else { // unary operator - // get the child merge node - mrg_qpn* child_mrg = (mrg_qpn*)query_plan[node->predecessors[0]]; - - child_mrg->set_node_name(node->get_node_name()); - child_mrg->table_layout = table_layout; - child_mrg->merge_fieldpos = merge_fieldpos; - - // create new nodes for every source stream - for (i = 0; i < child_mrg->fm.size(); i++) { - tablevar_t *table_name = child_mrg->fm[i]; - sprintf(node_index, "_%d", i); - qp_node* new_node = node->make_copy(node_index); - - if (node->node_type() == "spx_qpn") - ((spx_qpn*)new_node)->table_name = table_name; - else if (node->node_type() == "sgah_qpn") - ((sgah_qpn*)new_node)->table_name = table_name; - else if (node->node_type() == "rsgah_qpn") - ((rsgah_qpn*)new_node)->table_name = table_name; - else if (node->node_type() == "sgahcwcb_qpn") - ((sgahcwcb_qpn*)new_node)->table_name = table_name; - table_name->set_range_var("_t0"); - - child_mrg->fm[i] = new tablevar_t(table_name->get_machine().c_str(), table_name->get_interface().c_str(), new_node->get_node_name().c_str()); - sprintf(node_index, "_m%d", i); - child_mrg->fm[i]->set_range_var(node_index); - child_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos)); - - // add new node to query plan - query_plan.push_back(new_node); - } - } - query_plan[index] = NULL; - generate_linkage(); -} - - -// Checks if the node i can be pushed below the merge - bool stream_query::is_pushdown_compatible(int index, map lfta_names, vector interface_names, vector machine_names) { - - int i; - qp_node* node = query_plan[index]; - qp_node* child_node = NULL; - - if (node->predecessors.size() != 1) - return false; - - // node predecessor must be merge that combine streams from multiple hosts - child_node = query_plan[node->predecessors[0]]; - if (child_node->node_type() != "mrg_qpn") - return false; - - if (!((mrg_qpn*)child_node)->is_multihost_merge()) - return false; - - // merge must have only one parent for this optimization to work - // check that all its successors are the same - for (int j = 1; j < child_node->successors.size(); ++j) { - if (child_node->successors[j] != child_node->successors[0]) - return false; - } - - // selections can always be pushed down, aggregations can always be split into selection/aggr or aggr/aggr pair - // and pushed down - if (node->node_type() == "spx_qpn") - return true; - else if (node->node_type() == "sgah_qpn") - return true; - else - return false; - - } - -// Push the operator below the merge - void stream_query::pushdown_operator(int index, ext_fcn_list *Ext_fcns, table_list *Schema) { - qp_node* node = query_plan[index]; - int i; - char node_suffix[128]; - - // we can only safely push down queries that have one and only one temporal atribute in select list - table_def* table_layout = node->get_fields(); - vector fields = table_layout->get_fields(); - int merge_fieldpos = -1; - - data_type* dt; - for (i = 0; i < fields.size(); ++i) { - data_type dt(fields[i]->get_type(),fields[i]->get_modifier_list()); - if(dt.is_temporal()) { - if (merge_fieldpos != -1) // more that one temporal field found - return; - merge_fieldpos = i; - } - } - - if (merge_fieldpos == -1) // if no temporal field found - return; - - std::vector mvars; // the merge-by columns. - - fprintf(stderr, "Regular pushdown, query %s of type %s\n", node->get_node_name().c_str(), node->node_type().c_str()); - - // get the child merge node - mrg_qpn* child_mrg = (mrg_qpn*)query_plan[node->predecessors[0]]; - - tablevar_t *table_name = NULL; - - - if (node->node_type() == "spx_qpn") { - // get the child merge node - - // create new nodes for every source stream - for (i = 0; i < child_mrg->fm.size(); i++) { - table_name = child_mrg->fm[i]; - sprintf(node_suffix, "_%d", i); - qp_node* new_node = node->make_copy(node_suffix); - - ((spx_qpn*)new_node)->table_name = table_name; - table_name->set_range_var("_t0"); - - child_mrg->fm[i] = new tablevar_t(table_name->get_machine().c_str(), table_name->get_interface().c_str(), new_node->get_node_name().c_str()); - sprintf(node_suffix, "_m%d", i); - child_mrg->fm[i]->set_range_var(node_suffix); - child_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos)); - - // add new node to query plan - query_plan.push_back(new_node); - } - child_mrg->table_layout = table_layout; - child_mrg->merge_fieldpos = merge_fieldpos; - - } else { // aggregation node - - vector new_nodes; - - // split aggregations into high and low-level part - vector split_nodes = ((sgah_qpn*)node)->split_node_for_hfta(Ext_fcns, Schema); - if (split_nodes.size() != 2) - return; - - sgah_qpn* super_aggr = (sgah_qpn*)split_nodes[1]; - super_aggr->table_name = ((sgah_qpn*)node)->table_name; - - // group all the sources by host - map > host_map; - for (i = 0; i < child_mrg->fm.size(); i++) { - tablevar_t *table_name = child_mrg->fm[i]; - if (host_map.count(table_name->get_machine())) - host_map[table_name->get_machine()].push_back(i); - else { - vector tables; - tables.push_back(i); - host_map[table_name->get_machine()] = tables; - } - } - - // create a new merge and low-level aggregation for each host - map >::iterator iter; - for (iter = host_map.begin(); iter != host_map.end(); iter++) { - string host_name = (*iter).first; - vector tables = (*iter).second; - - sprintf(node_suffix, "_%s", host_name.c_str()); - string suffix(node_suffix); - untaboo(suffix); - mrg_qpn *new_mrg = (mrg_qpn *)child_mrg->make_copy(suffix); - for (i = 0; i < tables.size(); ++i) { - sprintf(node_suffix, "_m%d", i); - new_mrg->fm.push_back(child_mrg->fm[tables[i]]); - new_mrg->mvars.push_back(child_mrg->mvars[i]); - new_mrg->fm[i]->set_range_var(node_suffix); - } - qp_node* new_node = split_nodes[0]->make_copy(suffix); - - if (new_node->node_type() == "spx_qpn") { - ((spx_qpn*)new_node)->table_name = new tablevar_t(host_name.c_str(), "IFACE", new_mrg->get_node_name().c_str()); - ((spx_qpn*)new_node)->table_name->set_range_var("_t0"); - } else { - ((sgah_qpn*)new_node)->table_name = new tablevar_t(host_name.c_str(), "IFACE", new_mrg->get_node_name().c_str()); - ((sgah_qpn*)new_node)->table_name->set_range_var("_t0"); - } - query_plan.push_back(new_mrg); - new_nodes.push_back(new_node); - } - - child_mrg->merge_fieldpos = merge_fieldpos; - if (split_nodes[0]->node_type() == "spx_qpn") - child_mrg->table_layout = create_attributes(child_mrg->get_node_name(), ((spx_qpn*)split_nodes[0])->select_list); - else - child_mrg->table_layout = create_attributes(child_mrg->get_node_name(), ((sgah_qpn*)split_nodes[0])->select_list); - - - // connect newly created nodes with parent multihost merge - for (i = 0; i < new_nodes.size(); ++i) { - if (new_nodes[i]->node_type() == "spx_qpn") - child_mrg->fm[i] = new tablevar_t(((spx_qpn*)new_nodes[i])->table_name->get_machine().c_str(), "IFACE", new_nodes[i]->get_node_name().c_str()); - else { - child_mrg->fm[i] = new tablevar_t(((sgah_qpn*)new_nodes[i])->table_name->get_machine().c_str(), "IFACE", new_nodes[i]->get_node_name().c_str()); - - } - child_mrg->mvars[i]->set_field(child_mrg->table_layout->get_field_name(merge_fieldpos)); - - sprintf(node_suffix, "_m%d", i); - child_mrg->fm[i]->set_range_var(node_suffix); - query_plan.push_back(new_nodes[i]); - } - child_mrg->fm.resize(new_nodes.size()); - child_mrg->mvars.resize(new_nodes.size()); - - // push the new high-level aggregation - query_plan.push_back(super_aggr); - - } - query_plan[index] = NULL; - generate_linkage(); - - } - -// Extract subtree rooted at node i into separate hfta -stream_query* stream_query::extract_subtree(int index) { - - vector nodes; - stream_query* new_query = new stream_query(query_plan[index], this); - - nodes.push_back(index); - for (int i = 0; i < nodes.size(); ++i) { - qp_node* node = query_plan[nodes[i]]; - if (!node) - continue; - - // add all children to nodes list - for (int j = 0; j < node->predecessors.size(); ++j) - nodes.push_back(node->predecessors[j]); - if (i) - new_query->query_plan.push_back(node); - - query_plan[nodes[i]] = NULL; - } - - return new_query; -} - -// Splits query that combines data from multiple hosts into separate hftas. -vector stream_query::split_multihost_query() { - - vector ret; - char node_suffix[128]; - int i; - - // find merges combining multiple hosts into per-host groups - int plan_size = query_plan.size(); - vector new_nodes; - - for (i = 0; i < plan_size; ++i) { - qp_node* node = query_plan[i]; - if (node && node->node_type() == "mrg_qpn") { - mrg_qpn* mrg = (mrg_qpn*)node; - if (mrg->is_multihost_merge()) { - - // group all the sources by host - map > host_map; - for (int j = 0; j < mrg->fm.size(); j++) { - tablevar_t *table_name = mrg->fm[j]; - if (host_map.count(table_name->get_machine())) - host_map[table_name->get_machine()].push_back(j); - else { - vector tables; - tables.push_back(j); - host_map[table_name->get_machine()] = tables; - } - } - - // create a new merge for each host - map >::iterator iter; - for (iter = host_map.begin(); iter != host_map.end(); iter++) { - string host_name = (*iter).first; - vector tables = (*iter).second; - - if (tables.size() == 1) - continue; - - sprintf(node_suffix, "_%s", host_name.c_str()); - string suffix(node_suffix); - untaboo(suffix); - mrg_qpn *new_mrg = (mrg_qpn *)mrg->make_copy(suffix); - for (int j = 0; j < tables.size(); ++j) { - new_mrg->fm.push_back(mrg->fm[tables[j]]); - new_mrg->mvars.push_back(mrg->mvars[j]); - sprintf(node_suffix, "m_%d", j); - new_mrg->fm[j]->set_range_var(node_suffix); - } - new_nodes.push_back(new_mrg); - } - - if (!new_nodes.empty()) { - // connect newly created merge nodes with parent multihost merge - for (int j = 0; j < new_nodes.size(); ++j) { - mrg->fm[j] = new tablevar_t(new_nodes[j]->fm[0]->get_machine().c_str(), "IFACE", new_nodes[j]->get_node_name().c_str()); - query_plan.push_back(new_nodes[j]); - } - mrg->fm.resize(new_nodes.size()); - mrg->mvars.resize(new_nodes.size()); - generate_linkage(); - } - - - // now externalize the sources - for (int j = 0; j < node->predecessors.size(); ++j) { - // Extract subtree rooted at node i into separate hfta - stream_query* q = extract_subtree(node->predecessors[j]); - if (q) { - q->generate_linkage(); - ret.push_back(q); - } - - } - generate_linkage(); - } - } - } - - return ret; -} - - -// Perform local FTA optimizations -void stream_query::optimize(vector& hfta_list, map lfta_names, vector interface_names, vector machine_names, ext_fcn_list *Ext_fcns, table_list *Schema, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result){ - - // Topologically sort the nodes in query plan (leaf-first) - int i = 0, j = 0; - vector sorted_nodes; - - int num_nodes = query_plan.size(); - bool* leaf_flags = new bool[num_nodes]; - memset(leaf_flags, 0, num_nodes * sizeof(bool)); - - // run topological sort - bool done = false; - - // add all leafs to sorted_nodes - while (!done) { - done = true; - for (i = 0; i < num_nodes; ++i) { - if (!query_plan[i]) - continue; - - if (!leaf_flags[i] && query_plan[i]->predecessors.empty()) { - leaf_flags[i] = true; - sorted_nodes.push_back(i); - done = false; - - // remove the node from its parents predecessor lists - // since we only care about number of predecessors, it is sufficient just to remove - // one element from the parent's predecessors list - for (int j = query_plan[i]->successors.size() - 1; j >= 0; --j) - query_plan[query_plan[i]->successors[j]]->predecessors.pop_back(); - } - } - } - delete[] leaf_flags; - num_nodes = sorted_nodes.size(); - generate_linkage(); // rebuild the recently destroyed predecessor lists. - - // collect the information about interfaces nodes read from - for (i = 0; i < num_nodes; ++i) { - qp_node* node = query_plan[sorted_nodes[i]]; - vector input_tables = node->get_input_tbls(); - for (j = 0; j < input_tables.size(); ++j) { - tablevar_t * table = input_tables[j]; - if (lfta_names.count(table->schema_name)) { - int index = lfta_names[table->schema_name]; - table->set_machine(machine_names[index]); - table->set_interface(interface_names[index]); - } - } - } - - /* - - // push eligible operators down in the query plan - for (i = 0; i < num_nodes; ++i) { - if (partn_parse_result && is_partn_compatible(sorted_nodes[i], lfta_names, interface_names, machine_names, ifaces_db, partn_parse_result)) { - pushdown_partn_operator(sorted_nodes[i]); - } else if (is_pushdown_compatible(sorted_nodes[i], lfta_names, interface_names, machine_names)) { - pushdown_operator(sorted_nodes[i], Ext_fcns, Schema); - } - } - - // split the query into multiple hftas if it combines the data from multiple hosts - vector hftas = split_multihost_query(); - hfta_list.insert(hfta_list.end(), hftas.begin(), hftas.end()); - */ - - num_nodes = query_plan.size(); - // also split multi-way merges into two-way merges - for (i = 0; i < num_nodes; ++i) { - qp_node* node = query_plan[i]; - if (node && node->node_type() == "mrg_qpn") { - vector split_merge = ((mrg_qpn *)node)->split_sources(); - - query_plan.insert(query_plan.end(), split_merge.begin(), split_merge.end()); - // delete query_plan[sorted_nodes[i]]; - query_plan[i] = NULL; - } - } - - generate_linkage(); -} - - - -table_def *stream_query::get_output_tabledef(){ - return( query_plan[qhead]->get_fields() ); -} - - - -////////////////////////////////////////////////////////// -//// De-siloing. TO BE REMOVED - -void stream_query::desilo_lftas(map &lfta_names,vector &silo_names,table_list *Schema){ - int i,t,s; - - int suffix_len = silo_names.back().size(); - - for(i=0;i itbls = query_plan[qtail[i]]->get_input_tbls(); - for(t=0;tget_schema_name(); - if(lfta_names.count(itbl_name)>0){ -//printf("Query %s input %d references lfta input %s\n",query_plan[qtail[i]]->get_node_name().c_str(),t,itbl_name.c_str()); - vector src_names; - string lfta_base = itbl_name.substr(0,itbl_name.size()-suffix_len); - for(s=0;sget_node_name()+ - "_input_"+int_to_string(t); - mrg_qpn *merge_node = new mrg_qpn(merge_node_name,src_names,Schema); - int m_op_pos = Schema->add_table(merge_node->table_layout); - itbls[t]->set_schema(merge_node_name); - itbls[t]->set_schema_ref(m_op_pos); - query_plan.push_back(merge_node); - } - } - } - - generate_linkage(); -} -//////////////////////////////////////// -/// End de-siloing - - - -// Given a collection of LFTA stream queries, -// extract their WHERE predicates -// and pass them to an analysis routine which will extract -// common elements -// -void get_common_lfta_filter(std::vector lfta_list,table_list *Schema,ext_fcn_list *Ext_fcns, vector &prefilter_preds, set &pred_ids){ - int s; - std::vector< std::vector > where_list; - -// still safe to assume that LFTA queries have a single -// query node, which is at position 0. - for(s=0;s cnf_list = lfta_list[s]->query_plan[0]->get_filter_clause(); - if(lfta_list[s]->query_plan[0]->node_type() == "sgah_qpn"){ - gb_table *gtbl = ((sgah_qpn *)(lfta_list[s]->query_plan[0]))->get_gb_tbl(); - int c; - for(c=0;cpr,gtbl); - } - } - where_list.push_back(lfta_list[s]->query_plan[0]->get_filter_clause()); - } - - find_common_filter(where_list,Schema,Ext_fcns,prefilter_preds, pred_ids); -} - - - - - -// Given a collection of LFTA stream queries, -// extract the union of all temporal attributes referenced in select clauses -// those attributes will need to be unpacked in prefilter -// -void get_prefilter_temporal_cids(std::vector lfta_list, col_id_set &temp_cids){ - int s, sl; - vector sl_list; - gb_table *gb_tbl = NULL; - - -// still safe to assume that LFTA queries have a single -// query node, which is at position 0. - for(s=0;squery_plan[0]->node_type() == "spx_qpn"){ - spx_qpn *spx_node = (spx_qpn *)lfta_list[s]->query_plan[0]; - sl_list = spx_node->get_select_se_list(); - } - if(lfta_list[s]->query_plan[0]->node_type() == "sgah_qpn"){ - sgah_qpn *sgah_node = (sgah_qpn *)lfta_list[s]->query_plan[0]; - sl_list = sgah_node->get_select_se_list(); - gb_tbl = sgah_node->get_gb_tbl(); - } - - for(sl=0;slget_data_type(); - if (sdt->is_temporal()) { - gather_se_col_ids(sl_list[sl],temp_cids, gb_tbl); - } - } - } -} - - +/* ------------------------------------------------ +Copyright 2014 AT&T Intellectual Property + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ------------------------------------------- */ +#include"stream_query.h" +#include"generate_utils.h" +#include"analyze_fta.h" +#include +#include +#include + +static char tmpstr[500]; + +using namespace std; + + +// Create a query plan from a query node and an existing +// query plan. Use for lfta queries, the parent query plan provides +// the annotations. +stream_query::stream_query(qp_node *qnode, stream_query *parent){ + query_plan.push_back(qnode); + qhead = 0; + qtail.push_back(0); + attributes = qnode->get_fields(); + parameters = qnode->get_param_tbl(); + defines = parent->defines; + query_name = qnode->get_node_name(); +} + +// Copy the query plan. +stream_query::stream_query(stream_query &src){ + query_plan = src.query_plan; + qhead = src.qhead; + qtail = src.qtail; + attributes = src.attributes; + parameters = src.parameters; + defines = src.defines; + query_name = src.query_name; + gid = src.gid; +} + + +// Create a query plan from an analyzed parse tree. +// Perform analyses to find the output node, input nodes, etc. + +stream_query::stream_query(query_summary_class *qs,table_list *Schema){ +// Generate the query plan nodes from the analyzed parse tree. +// There is only one for now, so just assign the return value +// of create_query_nodes to query_plan + error_code = 0; + query_plan = create_query_nodes(qs,Schema); + int i; +if(query_plan.size() == 0){ + fprintf(stderr,"INTERNAL ERROR, zero-size query plan in stream_query::stream_query\n"); + exit(1); +} + for(i=0;iget_error_code() != 0){ + error_code = query_plan[i]->get_error_code(); + err_str += query_plan[i]->get_error_str(); + } + } + } + qhead = query_plan.size()-1; + gid = -1; + +} + + +stream_query * stream_query::add_query(query_summary_class *qs,table_list *Schema){ +// Add another query block to the query plan + error_code = 0; + vector new_nodes = create_query_nodes(qs, Schema); + query_plan.insert(query_plan.end(),new_nodes.begin(), new_nodes.end()); + return this; +} + +stream_query * stream_query::add_query(stream_query &src){ +// Add another query block to the query plan + error_code = 0; + query_plan.insert(query_plan.end(),src.query_plan.begin(), src.query_plan.end()); + return this; +} + + +void stream_query::generate_protocol_se(map &sq_map, table_list *Schema){ + int i,n; + +// Mapping fields to protocol fields requires schema binding. +// First ensure all nodes are in the schema. + for(n=0;nadd_table(query_plan[n]->get_fields()); + } + } +// Now do schema binding + for(n=0;nbind_to_schema(Schema); + } + } + +// create name-to-index map + map name_to_node; + for(n=0;nget_node_name()] = n; + } + } + +// Create a list of the nodes to process, in order. +// Search from the root down. +// ASSUME tree plan. + + list search_q; + list work_list; + + search_q.push_back(qhead); + while(! search_q.empty()){ + int the_q = search_q.front(); + search_q.pop_front(); work_list.push_front(the_q); + vector the_pred = query_plan[the_q]->get_predecessors(); + for(i=0;i q_sources; + vector q_input_tbls = query_plan[the_q]->get_input_tbls(); + for(i=0;iget_schema_name(); + if(name_to_node.count(itbl_nm)>0){ + q_sources.push_back(query_plan[name_to_node[itbl_nm]]); + }else if(sq_map.count(itbl_nm)>0){ + q_sources.push_back(sq_map[itbl_nm]->get_query_head()); + }else{ + q_sources.push_back(NULL); + } + } + query_plan[the_q]->create_protocol_se(q_sources, Schema); + } + +////////////////////////////////////////////////////////// +// trust but verify + + +/* + for(i=0;iget_node_name().c_str(), + query_plan[i]->node_type().c_str()); + map *pse_map = query_plan[i]->get_protocol_se(); + map::iterator mssi; + for(mssi=pse_map->begin();mssi!=pse_map->end();++mssi){ + if((*mssi).second) + printf("\t%s : %s\n",(*mssi).first.c_str(), (*mssi).second->to_string().c_str()); + else + printf("\t%s : NULL\n",(*mssi).first.c_str()); + } + if(query_plan[i]->node_type() == "filter_join" || query_plan[i]->node_type() == "join_eq_hash_qpn"){ + vector pse_l; + vector pse_r; + if(query_plan[i]->node_type() == "filter_join"){ + pse_l = ((filter_join_qpn *)query_plan[i])->get_hash_l(); + pse_r = ((filter_join_qpn *)query_plan[i])->get_hash_r(); + } + if(query_plan[i]->node_type() == "join_eq_hash_qpn"){ + pse_l = ((join_eq_hash_qpn *)query_plan[i])->get_hash_l(); + pse_r = ((join_eq_hash_qpn *)query_plan[i])->get_hash_r(); + } + int p; + for(p=0;pto_string().c_str()); + else + printf("NULL\n"); + } + } + if(query_plan[i]->node_type() == "sgah_qpn" || query_plan[i]->node_type() == "rsgah_qpn" || query_plan[i]->node_type() == "sgahcwcb_qpn"){ + vector pseg; + if(query_plan[i]->node_type() == "sgah_qpn") + pseg = ((sgah_qpn *)query_plan[i])->get_gb_sources(); + if(query_plan[i]->node_type() == "rsgah_qpn") + pseg = ((rsgah_qpn *)query_plan[i])->get_gb_sources(); + if(query_plan[i]->node_type() == "sgahcwcb_qpn") + pseg = ((sgahcwcb_qpn *)query_plan[i])->get_gb_sources(); + int g; + for(g=0;gto_string().c_str()); + else + printf("\t\tgb %d = NULL\n",g); + } + } + } + } +*/ + +} + +bool stream_query::generate_linkage(){ + bool create_failed = false; + int n, f,s; + +// Clear any leftover linkages + for(n=0;nclear_predecessors(); + query_plan[n]->clear_successors(); + } + } + qtail.clear(); + +// create name-to-index map + map name_to_node; + for(n=0;nget_node_name()] = n; + } + } + +// Do the 2-way linkage. + for(n=0;n fm = query_plan[n]->get_input_tbls(); + for(f=0;fget_schema_name(); + if(name_to_node.count(src_tbl)>0){ + int s = name_to_node[src_tbl]; + query_plan[n]->add_predecessor(s); + query_plan[s]->add_successor(n); + } + } + } + } + +// Find the head (no successors) and the tails (at least one +// predecessor is external). +// Verify that there is only one head, +// and that all other nodes have one successor (because +// right now I can only handle trees). + + qhead = -1; // no head yet found. + for(n=0;n succ = query_plan[n]->get_successors(); +/* + if(succ.size() > 1){ + fprintf(stderr,"ERROR, query node %s supplies data to multiple nodes, but currently only tree query plans are supported:\n",query_plan[n]->get_node_name().c_str()); + for(s=0;sget_node_name().c_str()); + } + fprintf(stderr,"\n"); + create_failed = true; + } +*/ + if(succ.size() == 0){ + if(qhead >= 0){ + fprintf(stderr,"ERROR, query nodes %s and %s both have zero successors.\n",query_plan[n]->get_node_name().c_str(), query_plan[qhead]->get_node_name().c_str()); + create_failed = true; + }else{ + qhead = n; + } + } +// does the query node read a source, or is it a source? + if(query_plan[n]->n_predecessors() < query_plan[n]->get_input_tbls().size() || query_plan[n]->get_input_tbls().size() == 0){ + qtail.push_back(n); + } + } + } + + return create_failed; +} + +// After the collection of query plan nodes is generated, +// analyze their structure to link them up into a tree (or dag?). +// Verify that the structure is acceptable. +// Do some other analysis and verification tasks (?) +// then gather summar information. +int stream_query::generate_plan(table_list *Schema){ + +// The first thing to do is verify that the query plan +// nodes were successfully created. + bool create_failed = false; + int n,f,s; + for(n=0;nget_error_code()){ + fprintf(stderr,"%s",query_plan[n]->get_error_str().c_str()); + create_failed = true; + } + } +/* +for(n=0;nget_node_name(); +printf("In generate_plan, node %d is %s, reads from:\n\t",n,nstr.c_str()); +vector inv = query_plan[n]->get_input_tbls(); +int nn; +for(nn=0;nnto_string().c_str(),inv[nn]->get_schema_ref()); +} +printf("\n"); +} +} +*/ + + if(create_failed) return(1); + +// Here, link up the query nodes, then verify that the +// structure is acceptable (single root, no cycles, no stranded +// nodes, etc.) + create_failed = generate_linkage(); + if(create_failed) return -1; + + +// Here, do optimizations such as predicate pushing, +// join rearranging, etc. +// Nothing to do yet. + +/* +for(n=0;nget_node_name(); +printf("B generate_plan, node %d is %s, reads from:\n\t",n,nstr.c_str()); +vector inv = query_plan[n]->get_input_tbls(); +int nn; +for(nn=0;nnto_string().c_str(),inv[nn]->get_schema_ref()); +} +printf("\n"); +} +} +printf("qhead=%d, qtail = ",qhead); +int nn; +for(nn=0;nnget_node_name(); + attributes = query_plan[qhead]->get_fields(); +// TODO: The params and defines require a lot more thought. + parameters = query_plan[qhead]->get_param_tbl(); + defines = query_plan[qhead]->get_definitions(); + + return(0); + }; + +void stream_query::add_output_operator(ospec_str *o){ + output_specs.push_back(o); +} + + +void stream_query::get_external_libs(set &libset){ + + int qn,i; + for(qn=0;qn op_libs = query_plan[qn]->external_libs(); + for(i=0;i op_libs = output_operators[qn]->external_libs(); + for(i=0;i stream_query::split_query(ext_fcn_list *Ext_fcns, table_list *Schema, bool &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){ + vector queries; + int l,q,s; + int qp_hfta; + + hfta_returned = false; // assume until proven otherwise + + for(l=0;l qnodes = query_plan[leaf]->split_node_for_fta(Ext_fcns, Schema, qp_hfta, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx); + + + if(qnodes.size() == 0 || query_plan[leaf]->get_error_code()){ // error +//printf("error\n"); + error_code = query_plan[leaf]->get_error_code(); + err_str = query_plan[leaf]->get_error_str(); + vector null_result; + return(null_result); + } + if(qnodes.size() == 1 && qp_hfta){ // nothing happened +//printf("no change\n"); + query_plan[leaf] = qnodes[0]; + } + if(qnodes.size() == 1 && !qp_hfta){ // push to lfta +//printf("lfta only\n"); + queries.push_back(new stream_query(qnodes[0], this)); + vector succ = query_plan[leaf]->get_successors(); + for(s=0;sremove_predecessor(leaf); + } + query_plan[leaf] = NULL; // delete it? + } + if(qnodes.size() > 1){ // actual splitting occurred. + if(!qp_hfta){ // internal consistency check. + error_code = 1; + err_str = "INTERNAL ERROR: mulitple nodes returned by split_node_for_fta, but none are hfta nodes.\n"; + vector null_result; + return(null_result); + } + + for(q=0;qget_node_name().c_str()); + queries.push_back(new stream_query(qnodes[q], this)); + } +// Use new hfta node + query_plan[leaf] = qnodes[qnodes.size()-1]; +// Add in any extra hfta nodes + for(q=qnodes.size()-qp_hfta;q ifps; + for(q=0;qcount_ifp_refs(ifps); + hfta_returned = true; + } + } + + if(n_ifprefs){ + set::iterator ssi; + err_str += "ERROR, unresolved interface parameters in HFTA:\n"; + for(ssi=ifps.begin();ssi!=ifps.end();++ssi){ + err_str += (*ssi)+" "; + } + err_str += "\n"; + error_code = 3; + vector null_result; + return(null_result); + } + + + if(hfta_returned){ + if(generate_linkage()){ + fprintf(stderr,"INTERNAL ERROR, generate_linkage failed in split_query.\n"); + exit(1); + } + queries.push_back(this); + } + + return(queries); +} + + + +vector stream_query::extract_opview(table_list *Schema, vector &qnodes, opview_set &opviews, string silo_nm){ + vector subqueries; + int l,q; + + string root_name = this->get_output_tabledef()->get_tbl_name(); + + + for(l=0;l new_qnodes = query_plan[leaf]->extract_opview(Schema, qnodes, opviews, root_name, silo_nm); + + for(q=0;qget_fields()->to_string(); + + ret += "DEFINE{\n"; + ret += "\tquery_name '"+query_plan[q]->get_node_name()+"';\n"; + + map defs = query_plan[q]->get_definitions(); + map::iterator dfi; + for(dfi=defs.begin(); dfi!=defs.end(); ++dfi){ + ret += "\t"+ (*dfi).first + " '" + (*dfi).second + "';\n"; + } + ret += "}\n\n"; + + ret += "PARAM{\n"; + param_table *params = query_plan[q]->get_param_tbl(); + vector param_names = params->get_param_names(); + int p; + for(p=0;pget_data_type( param_names[p] ); + ret += "\t" + param_names[p] + " '" + dt->get_type_str() + "';\n"; + } + ret += "}\n"; + + ret += query_plan[q]->to_query_string(); + ret += "\n}\n\n"; + + return(ret); + +} + +string stream_query::collect_refd_ifaces(){ + string ret=""; + int q; + for(q=0;q defs = query_plan[q]->get_definitions(); + if(defs.count("_referenced_ifaces")){ + if(ret != "") ret += ","; + ret += defs["_referenced_ifaces"]; + } + } + } + + return ret; +} + + +bool stream_query::stream_input_only(table_list *Schema){ + vector input_tbls = this->get_input_tables(); + int i; + for(i=0;iget_table_ref(input_tbls[i]->get_schema_name()); + if(Schema->get_schema_type(t) == PROTOCOL_SCHEMA) return(false); + } + return(true); +} + +// Return input tables. No duplicate removal performed. +vector stream_query::get_input_tables(){ + vector retval; + +// create name-to-index map + int n; + map name_to_node; + for(n=0;nget_node_name()] = n; + } + } + + int l; + for(l=0;l tmp_v = query_plan[leaf]->get_input_tbls(); + int i; + for(i=0;iget_schema_name()) == 0) + retval.push_back(tmp_v[i]); + } + } + return(retval); +} + + +void stream_query::compute_node_format(int q, vector &nfmt, map &op_idx){ + int netcnt = 0, hostcnt = 0; + int i; + + vector itbls = query_plan[q]->get_input_tbls(); + for(i=0;iget_schema_name(); + if(op_idx.count(tname)){ + int o = op_idx[tname]; + if(nfmt[o] == UNKNOWNFORMAT) + compute_node_format(o,nfmt,op_idx); + if(nfmt[o] == NETFORMAT) netcnt++; + else hostcnt++; + }else{ + netcnt++; + } + } + if(query_plan[q]->makes_transform()){ + nfmt[q] = HOSTFORMAT; + }else{ + if(hostcnt>0){ + nfmt[q] = HOSTFORMAT; + }else{ + nfmt[q] = NETFORMAT; + } + } +//printf("query plan %d (%s) is ",q,query_plan[q]->get_node_name().c_str()); +//if(nfmt[q] == HOSTFORMAT) printf(" host format.\n"); +//else printf("net format\n"); +} + + +string stream_query::generate_hfta(table_list *Schema, ext_fcn_list *Ext_fcns, opview_set &opviews, bool distributed_mode){ + int schref, ov_ix, i, q, param_sz; + bool dag_graph = false; + + +// Bind the SEs in all query plan nodes to this schema, and +// Add all tables used by this query to the schema. +// Question: Will I be polluting the global schema by adding all +// query node schemas? + +// First ensure all nodes are in the schema. + int qn; + for(qn=0;qnadd_table(query_plan[qn]->get_fields()); + } + } +// Now do binding. + for(qn=0;qnbind_to_schema(Schema); + } + } + +// Is it a DAG plan? + set qsources; + int n; + for(n=0;n tmp_v = query_plan[n]->get_input_tbls(); + int i; + for(i=0;iget_schema_name()) > 0) + dag_graph = true; + qsources.insert(tmp_v[i]->get_schema_name()); + } + } + } + + + +// Collect set of tables ref'd in this HFTA + set tbl_set; + for(qn=0;qn input_tbls = query_plan[qn]->get_input_tbls(); + vector output_tbls = query_plan[qn]->get_output_tbls(); +// Convert to tblrefs, add to set of ref'd tables + int i; + for(i=0;iget_table_ref(input_tbls[i]->get_schema_name()); + int t = input_tbls[i]->get_schema_ref(); + if(t < 0){ + fprintf(stderr,"INTERNAL ERROR in generate_hfta. " + "query plan node %s references input table %s, which is not in schema.\n", + query_name.c_str(), input_tbls[i]->get_schema_name().c_str()); + exit(1); + } + tbl_set.insert(t); + } + + for(i=0;iget_table_ref(output_tbls[i]->get_schema_name()); + if(t < 0){ + fprintf(stderr,"INTERNAL ERROR in generate_hfta." + "query plan node %s references output table %s, which is not in schema.\n", + query_name.c_str(), output_tbls[i]->get_schema_name().c_str()); + exit(1); + } + tbl_set.insert(t); + } + } + } + +// Collect map of lftas, query nodes + map op_idx; + for(q=0;qget_node_name()] = q; + } + } + +// map of input tables must include query id and input +// source (0,1) becuase several queries might reference the same source + vector input_tbls = this->get_input_tables(); + vector input_tbl_free; + for(i=0;i lfta_idx; +//fprintf(stderr,"%d input tables, %d query nodes\n",input_tbls.size(), query_plan.size()); + for(q=0;q itbls = query_plan[q]->get_input_tbls(); + int it; + for(it=0;itget_schema_name()+"-"+int_to_string(q)+"-"+int_to_string(it); + string src_tblname = itbls[it]->get_schema_name(); + bool src_is_external = false; + for(i=0;iget_schema_name()){ + src_is_external = true; + if(input_tbl_free[i]){ + lfta_idx[tname] = i; + input_tbl_free[i] = false; +//fprintf(stderr,"Adding %s (src_tblname=%s, q=%d, it=%d) to %d.\n",tname.c_str(), src_tblname.c_str(), q, it, i); + break; + } + } + } + if(i==input_tbls.size() && src_is_external){ + fprintf(stderr,"INTERNAL ERROR in stream_query::generate_hfta, can't find free entry in input_tbls for query %d, intput %d (%s)\n",q,it,src_tblname.c_str()); + exit(1); + } + } + } + } +/* + for(i=0;iget_schema_name(); + lfta_idx[src_tblname] = i; + } +*/ + +// Compute the output formats of the operators. + vector node_fmt(query_plan.size(),UNKNOWNFORMAT); + compute_node_format(qhead, node_fmt, op_idx); + + +// Generate the schema strings for the outputs. + string schema_str; + for(i=0;imake_schema(i); + schema_str += "gs_csp_t node"+int_to_string(i)+"_schema = "+make_C_embedded_string(schema_tmpstr)+";\n"; + } + } + + attributes = query_plan[qhead]->get_fields(); + + + string schema_tmpstr = this->make_schema(); + schema_str += "gs_csp_t "+generate_schema_string_name(query_name)+" = "+make_C_embedded_string(schema_tmpstr)+";\n"; + +// Generate the collection of tuple defs. + + string tuple_defs = "\n/*\tDefine tuple structures \t*/\n\n"; + set::iterator si; + for(si=tbl_set.begin(); si!=tbl_set.end(); ++si){ + tuple_defs += generate_host_tuple_struct( Schema->get_table( (*si) )); + tuple_defs += "\n\n"; + } + +// generate the finalize tuple function + string finalize_str = generate_hfta_finalize_tuple(attributes); + +// Analyze and make the output operators + bool eat_input = false; + string src_op = query_name; + string pred_op = src_op; + int last_op = -1; + if(output_specs.size()>0) + eat_input = true; + if(n_successors>0) + eat_input = false; // must create stream output for successor HFTAs. + int n_filestreams = 0; + for(i=0;ioperator_type == "stream" ){ + eat_input = false; + } + if(output_specs[i]->operator_type == "file" || output_specs[i]->operator_type == "zfile" ){ + last_op = i; + n_filestreams++; + } + } + int filestream_id = 0; + for(i=0;ioperator_type == "file" || output_specs[i]->operator_type == "zfile"){ + + int n_fstreams = output_specs[i]->n_partitions / n_parallel; + if(n_fstreams * n_parallel < output_specs[i]->n_partitions){ + n_fstreams++; + if(n_parallel == 1 || query_name.find("__copy1") != string::npos){ + fprintf(stderr,"WARNING, in query %s, %d streams requested for %s output, but it must be a multiple of the hfta parallelism (%d), increasing number of output streams to %d.\n",query_name.c_str(), output_specs[i]->n_partitions, output_specs[i]->operator_type.c_str(), n_parallel, n_fstreams*n_parallel); + } + } +// output_file_qpn *new_ofq = new output_file_qpn(); + string filestream_tag = ""; + if(n_filestreams>1){ + filestream_tag = "_fileoutput"+int_to_string(filestream_id); + filestream_id++; + } + output_file_qpn *new_ofq = new output_file_qpn(pred_op, src_op, filestream_tag, query_plan[qhead]->get_fields(), output_specs[i], (i==last_op ? eat_input : false) ); +// if(n_fstreams > 1){ + if(n_fstreams > 0){ + string err_str; + bool err_ret = new_ofq->set_splitting_params(n_parallel,parallel_idx,n_fstreams,output_specs[i]->partitioning_flds,err_str); + if(err_ret){ + fprintf(stderr,"%s",err_str.c_str()); + exit(1); + } + } + output_operators.push_back(new_ofq ); + pred_op = output_operators.back()->get_node_name(); + }else if(! (output_specs[i]->operator_type == "stream" || output_specs[i]->operator_type == "Stream" || output_specs[i]->operator_type == "STREAM") ){ + fprintf(stderr,"WARNING, output operator type %s (on query %s) is not supported, ignoring\n",output_specs[i]->operator_type.c_str(),query_name.c_str() ); + } + } + + + +// Generate functors for the query nodes. + + string functor_defs = "\n/*\tFunctor definitions\t*/\n\n"; + for(qn=0;qn needs_xform; + vector itbls = query_plan[qn]->get_input_tbls(); + for(i=0;iget_schema_name(); +// if(query_plan[qn]->makes_transform()){ + if(op_idx.count(tname)>0){ + if(node_fmt[ op_idx[tname] ] == NETFORMAT){ + needs_xform.push_back(true); + }else{ + needs_xform.push_back(false); + } + }else{ + needs_xform.push_back(true); + } +// }else{ +// if(op_idx.count(tname)>0){ +// if(node_fmt[qn] != node_fmt[ op_idx[tname] ]){ +// needs_xform.push_back(true); +// }else{ +// needs_xform.push_back(false); +// } +// }else{ +// if(node_fmt[qn] == HOSTFORMAT){ +// needs_xform.push_back(true); +// }else{ +// needs_xform.push_back(false); +// } +// } +// } + } + + functor_defs += query_plan[qn]->generate_functor(Schema, Ext_fcns, needs_xform); + } + } + +// Generate output operator functors + + vector needs_xform; + for(i=0;igenerate_functor(Schema, Ext_fcns, needs_xform); + + string ret = +"extern \"C\" {\n" +"#include \n" +"#include \n" +"#include \n" +"#include \n" +"#include \n" +"#include \n" +"}\n" +; + if(dag_graph) + ret += +"#define PLAN_DAG\n" +; + ret += +"#include \n" +"#include\n" +"\n" +"#include \n" +"#include \n" +"#include \n" +"#include \n" +"\n" +//"#define MAXSCHEMASZ 16384\n" +"#include \n\n" +; + +// Get include file for each of the operators. +// avoid duplicate inserts. + set include_fls; + for(qn=0;qnget_include_file()); + } + for(i=0;iget_include_file()); + set::iterator ssi; + for(ssi=include_fls.begin();ssi!=include_fls.end();++ssi) + ret += (*ssi); + +// Add defines for hash functions + ret += +"\n" +"#define hfta_BOOL_to_hash(x) (x)\n" +"#define hfta_USHORT_to_hash(x) (x)\n" +"#define hfta_UINT_to_hash(x) (x)\n" +"#define hfta_IP_to_hash(x) (x)\n" +"#define hfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n" +"#define hfta_INT_to_hash(x) (gs_uint32_t)(x)\n" +"#define hfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n" +"#define hfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n" +"#define hfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n" +"\n" +; + +// ret += "#define SERIOUS_LFTA \""+input_tbls[0]->get_schema_name()+"\"\n"; + ret += "#define OUTPUT_HFTA \""+query_name+"\"\n\n"; + +// HACK ALERT: I know for now that all query plans are +// single operator plans, but while SPX and SGAH can use the +// UNOP template, the merge operator must use the MULTOP template. +// WORSE HACK ALERT : merge does not translate its input, +// so don't apply finalize to the output. +// TODO: clean this up. + +// string node_type = query_plan[0]->node_type(); + + ret += schema_str; + ret += tuple_defs; + +// Need to work on the input, output xform logic. +// For now, just add it in. +// ret += finalize_str; + + if(node_fmt[qhead] == NETFORMAT){ + ret += +"void finalize_tuple(host_tuple &tup){\n" +"return;\n" +"}\n" +"\n"; + }else{ + ret += finalize_str; + } + + ret += functor_defs; + +// Parameter block management +// The proper parameter block must be transmitted to each +// external stream source. +// There is a 1-1 mapping between the param blocks returned +// by this list and the registered data sources ... +// TODO: make this more manageable, but for now +// there is no parameter block manipulation so I just +// need to have the same number. + + ret += +"int get_lfta_params(gs_int32_t sz, void * value,list& lst){\n" +" // for now every lfta receive the full copy of hfta parameters\n" +" struct param_block pb;\n"; + + set lfta_seen; + for(i=0;iget_schema_name(); + if(lfta_seen.count(src_tblname) == 0){ + lfta_seen.insert(src_tblname); + schref = input_tbls[i]->get_schema_ref(); + if(Schema->get_schema_type(schref) == OPERATOR_VIEW_SCHEMA){ + ov_ix = input_tbls[i]->get_opview_idx(); + opview_entry *opv = opviews.get_entry(ov_ix); + string op_param = "SUBQ:"; + int q; + for(q=0;qsubq_names.size();++q){ + if(q>0) op_param+=","; + op_param+=opv->subq_names[q]; + } + op_param+="\\n"; + param_sz = op_param.size()-1; + + sprintf(tmpstr,"\t\tpb.block_length = %d;\n",param_sz); ret+=tmpstr; + ret+= +" pb.data = malloc(pb.block_length);\n"; + ret+="\t\tmemcpy(pb.data,\""+op_param+"\",pb.block_length);\n" +" lst.push_back(pb);\n\n"; + }else{ + ret+= +" pb.block_length = sz;\n" +" pb.data = malloc(pb.block_length);\n" +" memcpy(pb.data, value, pb.block_length);\n" +" lst.push_back(pb);\n\n"; + } + } + } + + ret += +" return 0;\n" +"}\n" +"\n"; + + ret+= +"struct FTA* alloc_hfta (struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command, gs_int32_t sz, void * value ) {\n" +"\n" +" // find the lftas\n" +" list *lfta_list = new list;\n" +"\n" +" FTAID f;\n" +" char schemabuf[MAXSCHEMASZ];\n" +" gs_schemahandle_t schema_handle;\n" +"\n"; + +// Register the input data sources. +// Register a source only once. + +// vector ext_reg_txt; + map input_tbl_srcid; + for(i=0;iget_schema_name(); +// Use UDOP alias when in distributed mode. +// the cluster manager will make the translation +// using infr from qtree.xml + if(distributed_mode && input_tbls[i]->get_udop_alias() != "") + src_tblname = input_tbls[i]->get_udop_alias(); + if(input_tbl_srcid.count(src_tblname) == 0){ + int srcid = input_tbl_srcid.size(); + input_tbl_srcid[src_tblname] = srcid; + string tmp_s= +"\n // find "+src_tblname+"\n" +" if (fta_find(\""+src_tblname+"\",1,&f,schemabuf,MAXSCHEMASZ)!=0) {\n" +" fprintf(stderr,\"HFTA::error:could not find LFTA \\n\");\n" +" return 0;\n" +" }\n" +" //fprintf(stderr,\"HFTA::FTA found at %u[%u]\\n\",ftamsgid,ftaindex);\n" +"\n" +" // parse the schema and get the schema handle\n" +" schema_handle = ftaschema_parse_string(schemabuf);\n" +" lfta_info* inf"+int_to_string(srcid)+" = new lfta_info();\n" +" inf"+int_to_string(srcid)+"->f = f;\n" +" inf"+int_to_string(srcid)+"->fta_name = strdup(\""+src_tblname+"\");\n" +" inf"+int_to_string(srcid)+"->schema = strdup(schemabuf);\n" +" inf"+int_to_string(srcid)+"->schema_handle = schema_handle;\n" +" lfta_list->push_back(inf"+int_to_string(srcid)+");\n\n"; +// ext_reg_txt.push_back(tmp_s); + ret += tmp_s; + } + } + + ret+="\n"; + ret += "\tgs_schemahandle_t root_schema_handle = ftaschema_parse_string("+generate_schema_string_name(query_name)+");\n"; + for(i=0;iget_node_name()+") \n"; + +// Create parameters for operator construction. + string op_params; + vector itbls = query_plan[q]->get_input_tbls(); + string tname = itbls[0]->get_schema_name(); +// string li_tname = tname +"-"+int_to_string(q)+"-0"; +// if(lfta_idx.count(li_tname)>0) + if(input_tbl_srcid.count(tname)>0){ +// ret += ext_reg_txt[lfta_idx[li_tname]]; +// op_params += "inf"+int_to_string( lfta_idx[li_tname] )+"->schema_handle"; + op_params += "inf"+int_to_string( input_tbl_srcid[tname] )+"->schema_handle"; + }else if(op_idx.count(tname)>0){ + op_params += "op"+int_to_string( op_idx[tname] )+"_schema_handle"; + }else{ + fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when creating operator (3) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str()); + exit(1); + } + if(itbls.size()>1){ + string tname = itbls[1]->get_schema_name(); +// string li_tname = tname +"-"+int_to_string(q)+"-1"; +// if(lfta_idx.count(li_tname)>0) + if(input_tbl_srcid.count(tname)>0){ +// ret += ext_reg_txt[lfta_idx[li_tname]]; +// op_params += ",inf"+int_to_string( lfta_idx[li_tname] )+"->schema_handle"; + op_params += ",inf"+int_to_string( input_tbl_srcid[tname] )+"->schema_handle"; + }else if(op_idx.count(tname)>0){ + op_params += ",op"+int_to_string( op_idx[tname] )+"_schema_handle"; + }else{ + fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when creating operator (4) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str()); + exit(1); + } + } + ret += query_plan[q]->generate_operator(q,op_params); + ret += +" operator_node* node"+int_to_string(q)+" = new operator_node(op"+int_to_string(q)+");\n"; + + n_basic_ops = q; + } + } + n_basic_ops++; +// Next for the output operators if any + for(i=0;igenerate_operator(n_basic_ops+i,"root_schema_handle"); + ret += +" operator_node* node"+int_to_string(n_basic_ops+i)+" = new operator_node(op"+int_to_string(n_basic_ops+i)+");\n"; + } + + +// Link up operators. + for(q=0;q itbls = query_plan[q]->get_input_tbls(); + string tname = itbls[0]->get_schema_name(); +// string li_tname = tname +"-"+int_to_string(q)+"-0"; +// if(lfta_idx.count(li_tname)>0) + if(input_tbl_srcid.count(tname)>0){ +// ret += "\tnode"+int_to_string(q)+"->set_left_lfta(inf"+int_to_string( lfta_idx[li_tname] )+");\n"; + ret += "\tnode"+int_to_string(q)+"->set_left_lfta(inf"+int_to_string( input_tbl_srcid[tname] )+");\n"; + }else if(op_idx.count(tname)>0){ + ret += "\tnode"+int_to_string(q)+"->set_left_child_node(node"+int_to_string( op_idx[tname] )+");\n"; + }else{ + fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when linking operator (1) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str()); + exit(1); + } + if(itbls.size()>1){ + string tname = itbls[1]->get_schema_name(); +// string li_tname = tname +"-"+int_to_string(q)+"-1"; +// if(lfta_idx.count(li_tname)>0) + if(input_tbl_srcid.count(tname)>0){ +// ret += "\tnode"+int_to_string(q)+"->set_right_lfta(inf"+int_to_string( lfta_idx[li_tname] )+");\n"; + ret += "\tnode"+int_to_string(q)+"->set_right_lfta(inf"+int_to_string( input_tbl_srcid[tname] )+");\n"; + }else if(op_idx.count(tname)>0){ + ret += "\tnode"+int_to_string(q)+"->set_right_child_node(node"+int_to_string( op_idx[tname] )+");\n"; + }else{ + fprintf(stderr,"INTERNAL ERROR, can't identify input table %s (%s) when linking operator (2) %d (%s)\n",tname.c_str(), tname.c_str(),q,query_plan[q]->get_node_name().c_str()); + exit(1); + } + } + } + } + for(i=0;iset_left_child_node(node"+int_to_string( qhead )+");\n"; + else + ret += "\tnode"+int_to_string(n_basic_ops+i)+"->set_left_child_node(node"+int_to_string( n_basic_ops+i-1 )+");\n"; + } + + // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters + + bool fta_reusable = false; + if (query_plan[qhead]->get_val_of_def("reusable") == "yes" || + query_plan[qhead]->get_param_tbl()->size() == 0) { + fta_reusable = 1; + } + + int root_node = qhead; + if(output_operators.size()>0) + root_node = n_basic_ops+i-1; + + ret+= +"\n" +"\n" +" MULTOP_HFTA* ftap = new MULTOP_HFTA(ftaid, OUTPUT_HFTA, command, sz, value, root_schema_handle, node"+int_to_string(root_node)+", lfta_list, " + (fta_reusable ?"true":"false") + ", reusable);\n" +" if(ftap->init_failed()){ delete ftap; return 0;}\n" +" return (FTA*)ftap;\n" +"}\n" +"\n" +"\n"; + + + string comm_bufsize = "16*1024*1024"; + if(defines.count("hfta_comm_buf")>0){ + comm_bufsize = defines["hfta_comm_buf"]; + } + + ret+= +"\n" +"int main(int argc, char * argv[]) {\n" +"\n" +"\n" +" /* parse the arguments */\n" +"\n" +" gs_int32_t tip1,tip2,tip3,tip4;\n" +" endpoint gshub;\n" +" gs_sp_t instance_name;\n" +" if (argc<3) {\n" +" gslog(LOG_EMERG,\"Wrong arguments at startup\");\n" +" exit(1);\n" +" }\n" +"\n" +" if ((sscanf(argv[1],\"%u.%u.%u.%u:%hu\",&tip1,&tip2,&tip3,&tip4,&(gshub.port)) != 5)) {\n" +" gslog(LOG_EMERG,\"HUB IP NOT DEFINED\");\n" +" exit(1);\n" +" }\n" +" gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);\n" +" gshub.port=htons(gshub.port);\n" +" instance_name=strdup(argv[2]);\n" +" if (set_hub(gshub)!=0) {\n" +" gslog(LOG_EMERG,\"Could not set hub\");\n" +" exit(1);\n" +" }\n" +" if (set_instance_name(instance_name)!=0) {\n" +" gslog(LOG_EMERG,\"Could not set instance name\");\n" +" exit(1);\n" +" }\n" +"\n" +"\n" +" /* initialize host library */\n" +"\n" +//" fprintf(stderr,\"Initializing gscp\\n\");\n" +" gsopenlog(argv[0]);\n" +"\n" +" if (hostlib_init(HFTA, "+comm_bufsize+", DEFAULTDEV, 0, 0)!=0) {\n" +" fprintf(stderr,\"%s::error:could not initialize gscp\\n\",\n" +" argv[0]);\n" +" exit(1);\n" +" }\n" +"\n" +"\n" +"\n" +" FTAID ret = fta_register(OUTPUT_HFTA, " + (fta_reusable?"1":"0") + ", DEFAULTDEV, alloc_hfta, "+generate_schema_string_name(query_name)+", -1, 0ull);\n" +" fta_start_service(-1);\n" +"\n" +" return 0;\n" +"\n" +"}\n" +"\n"; +//////////////////// + return(ret); +} + + +// Checks if the node i is compatible with interface partitioning +// (can be pushed below the merge that combines partitioned stream) +// i - index of the query node in a query plan +bool stream_query::is_partn_compatible(int index, map lfta_names, vector interface_names, vector machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result) { + int i; + qp_node* node = query_plan[index]; + qp_node* child_node = NULL; + + if (node->predecessors.empty()) + return false; + + // all the node predecessors must be partition merges with the same partition definition + partn_def_t* partn_def = NULL; + for (i = 0; i < node->predecessors.size(); ++i) { + child_node = query_plan[node->predecessors[i]]; + if (child_node->node_type() != "mrg_qpn") + return false; + + // merge must have only one parent for this optimization to work + // check that all its successors are the same + for (int j = 1; j < child_node->successors.size(); ++j) { + if (child_node->successors[j] != child_node->successors[0]) + return false; + } + + partn_def_t* new_partn_def = ((mrg_qpn*)child_node)->get_partn_definition(lfta_names, interface_names, machine_names, ifaces_db, partn_parse_result); + if (!new_partn_def) + return false; + if (!i) + partn_def = new_partn_def; + else if (new_partn_def != partn_def) + return false; + + } + + if (node->node_type() == "spx_qpn") // spx nodes are always partition compatible + return true; + else if (node->node_type() == "sgah_qpn") { + gb_table gb_tbl = ((sgah_qpn*)node)->gb_tbl; + return true; //partn_def->is_compatible(&gb_tbl); + } + else if (node->node_type() == "rsgah_qpn") { + gb_table gb_tbl = ((rsgah_qpn*)node)->gb_tbl; + return partn_def->is_compatible(&gb_tbl); + } + else if (node->node_type() == "sgahcwcb_qpn") { + gb_table gb_tbl = ((sgahcwcb_qpn*)node)->gb_tbl; + return partn_def->is_compatible(&gb_tbl); + } + else if (node->node_type() == "join_eq_hash_qpn") { + return true; + } + else + return false; +} + +// Push the operator below the merge that combines +void stream_query::pushdown_partn_operator(int index) { + qp_node* node = query_plan[index]; + int i; + char node_index[128]; + +// fprintf(stderr, "Partn pushdown, query %s of type %s\n", node->get_node_name().c_str(), node->node_type().c_str()); + + + // HACK ALERT: When reordering merges we screw up slack computation + // since slack should no longer be used, it is not an issue + + + // we can safely reorder nodes that have one and only one temporal atribute in select list + table_def* table_layout = node->get_fields(); + vector fields = table_layout->get_fields(); + int merge_fieldpos = -1; + + data_type* dt; + for (i = 0; i < fields.size(); ++i) { + data_type dt(fields[i]->get_type(),fields[i]->get_modifier_list()); + if(dt.is_temporal()) { + if (merge_fieldpos != -1) // more that one temporal field found + return; + merge_fieldpos = i; + } + } + + if (merge_fieldpos == -1) // if no temporal fieldf found + return; + + std::vector mvars; // the merge-by columns. + + // reodring procedure is different for unary operators and joins + if (node->node_type() == "join_eq_hash_qpn") { + vector new_nodes; + + tablevar_t *left_table_name; + tablevar_t *right_table_name; + mrg_qpn* left_mrg = (mrg_qpn*)query_plan[node->predecessors[0]]; + mrg_qpn* right_mrg = (mrg_qpn*)query_plan[node->predecessors[1]]; + + // for now we will only consider plans where both child merges + // merge the same set of streams + + if (left_mrg->fm.size() != right_mrg->fm.size()) + return; + + // maping of interface names to table definitions + map iface_map; + for (i = 0; i < left_mrg->fm.size(); i++) { + left_table_name = left_mrg->fm[i]; + iface_map[left_table_name->get_machine() + left_table_name->get_interface()] = left_table_name; + } + + for (i = 0; i < right_mrg->fm.size(); i++) { + right_table_name = right_mrg->fm[i]; + + // find corresponding left tabke + if (!iface_map.count(right_table_name->get_machine() + right_table_name->get_interface())) + return; + + left_table_name = iface_map[right_table_name->get_machine() + right_table_name->get_interface()]; + + // create new join nodes + sprintf(node_index, "_%d", i); + join_eq_hash_qpn* new_node = (join_eq_hash_qpn*)node->make_copy(node_index); + + // make a copy of right_table_name + right_table_name = right_table_name->duplicate(); + left_table_name->set_range_var(((join_eq_hash_qpn*)node)->from[0]->get_var_name()); + right_table_name->set_range_var(((join_eq_hash_qpn*)node)->from[1]->get_var_name()); + new_node->from[0] = left_table_name; + new_node->from[1] = right_table_name; + new_nodes.push_back(new_node); + } + + // make right_mrg a new root + right_mrg->set_node_name(node->get_node_name()); + right_mrg->table_layout = table_layout; + right_mrg->merge_fieldpos = merge_fieldpos; + + for (i = 0; i < right_mrg->fm.size(); i++) { + // make newly create joins children of merge + sprintf(node_index, "_%d", i); + right_mrg->fm[i] = new tablevar_t(right_mrg->fm[i]->get_machine().c_str(), right_mrg->fm[i]->get_interface().c_str(), (node->get_node_name() + string(node_index)).c_str()); + sprintf(node_index, "_m%d", i); + right_mrg->fm[i]->set_range_var(node_index); + right_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos)); + + } + + if (left_mrg != right_mrg) + query_plan[node->predecessors[0]] = NULL; // remove left merge from the plan + + query_plan.insert(query_plan.end(), new_nodes.begin(), new_nodes.end()); + + } else { // unary operator + // get the child merge node + mrg_qpn* child_mrg = (mrg_qpn*)query_plan[node->predecessors[0]]; + + child_mrg->set_node_name(node->get_node_name()); + child_mrg->table_layout = table_layout; + child_mrg->merge_fieldpos = merge_fieldpos; + + // create new nodes for every source stream + for (i = 0; i < child_mrg->fm.size(); i++) { + tablevar_t *table_name = child_mrg->fm[i]; + sprintf(node_index, "_%d", i); + qp_node* new_node = node->make_copy(node_index); + + if (node->node_type() == "spx_qpn") + ((spx_qpn*)new_node)->table_name = table_name; + else if (node->node_type() == "sgah_qpn") + ((sgah_qpn*)new_node)->table_name = table_name; + else if (node->node_type() == "rsgah_qpn") + ((rsgah_qpn*)new_node)->table_name = table_name; + else if (node->node_type() == "sgahcwcb_qpn") + ((sgahcwcb_qpn*)new_node)->table_name = table_name; + table_name->set_range_var("_t0"); + + child_mrg->fm[i] = new tablevar_t(table_name->get_machine().c_str(), table_name->get_interface().c_str(), new_node->get_node_name().c_str()); + sprintf(node_index, "_m%d", i); + child_mrg->fm[i]->set_range_var(node_index); + child_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos)); + + // add new node to query plan + query_plan.push_back(new_node); + } + } + query_plan[index] = NULL; + generate_linkage(); +} + + +// Checks if the node i can be pushed below the merge + bool stream_query::is_pushdown_compatible(int index, map lfta_names, vector interface_names, vector machine_names) { + + int i; + qp_node* node = query_plan[index]; + qp_node* child_node = NULL; + + if (node->predecessors.size() != 1) + return false; + + // node predecessor must be merge that combine streams from multiple hosts + child_node = query_plan[node->predecessors[0]]; + if (child_node->node_type() != "mrg_qpn") + return false; + + if (!((mrg_qpn*)child_node)->is_multihost_merge()) + return false; + + // merge must have only one parent for this optimization to work + // check that all its successors are the same + for (int j = 1; j < child_node->successors.size(); ++j) { + if (child_node->successors[j] != child_node->successors[0]) + return false; + } + + // selections can always be pushed down, aggregations can always be split into selection/aggr or aggr/aggr pair + // and pushed down + if (node->node_type() == "spx_qpn") + return true; + else if (node->node_type() == "sgah_qpn") + return true; + else + return false; + + } + +// Push the operator below the merge + void stream_query::pushdown_operator(int index, ext_fcn_list *Ext_fcns, table_list *Schema) { + qp_node* node = query_plan[index]; + int i; + char node_suffix[128]; + + // we can only safely push down queries that have one and only one temporal atribute in select list + table_def* table_layout = node->get_fields(); + vector fields = table_layout->get_fields(); + int merge_fieldpos = -1; + + data_type* dt; + for (i = 0; i < fields.size(); ++i) { + data_type dt(fields[i]->get_type(),fields[i]->get_modifier_list()); + if(dt.is_temporal()) { + if (merge_fieldpos != -1) // more that one temporal field found + return; + merge_fieldpos = i; + } + } + + if (merge_fieldpos == -1) // if no temporal field found + return; + + std::vector mvars; // the merge-by columns. + + fprintf(stderr, "Regular pushdown, query %s of type %s\n", node->get_node_name().c_str(), node->node_type().c_str()); + + // get the child merge node + mrg_qpn* child_mrg = (mrg_qpn*)query_plan[node->predecessors[0]]; + + tablevar_t *table_name = NULL; + + + if (node->node_type() == "spx_qpn") { + // get the child merge node + + // create new nodes for every source stream + for (i = 0; i < child_mrg->fm.size(); i++) { + table_name = child_mrg->fm[i]; + sprintf(node_suffix, "_%d", i); + qp_node* new_node = node->make_copy(node_suffix); + + ((spx_qpn*)new_node)->table_name = table_name; + table_name->set_range_var("_t0"); + + child_mrg->fm[i] = new tablevar_t(table_name->get_machine().c_str(), table_name->get_interface().c_str(), new_node->get_node_name().c_str()); + sprintf(node_suffix, "_m%d", i); + child_mrg->fm[i]->set_range_var(node_suffix); + child_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos)); + + // add new node to query plan + query_plan.push_back(new_node); + } + child_mrg->table_layout = table_layout; + child_mrg->merge_fieldpos = merge_fieldpos; + + } else { // aggregation node + + vector new_nodes; + + // split aggregations into high and low-level part + vector split_nodes = ((sgah_qpn*)node)->split_node_for_hfta(Ext_fcns, Schema); + if (split_nodes.size() != 2) + return; + + sgah_qpn* super_aggr = (sgah_qpn*)split_nodes[1]; + super_aggr->table_name = ((sgah_qpn*)node)->table_name; + + // group all the sources by host + map > host_map; + for (i = 0; i < child_mrg->fm.size(); i++) { + tablevar_t *table_name = child_mrg->fm[i]; + if (host_map.count(table_name->get_machine())) + host_map[table_name->get_machine()].push_back(i); + else { + vector tables; + tables.push_back(i); + host_map[table_name->get_machine()] = tables; + } + } + + // create a new merge and low-level aggregation for each host + map >::iterator iter; + for (iter = host_map.begin(); iter != host_map.end(); iter++) { + string host_name = (*iter).first; + vector tables = (*iter).second; + + sprintf(node_suffix, "_%s", host_name.c_str()); + string suffix(node_suffix); + untaboo(suffix); + mrg_qpn *new_mrg = (mrg_qpn *)child_mrg->make_copy(suffix); + for (i = 0; i < tables.size(); ++i) { + sprintf(node_suffix, "_m%d", i); + new_mrg->fm.push_back(child_mrg->fm[tables[i]]); + new_mrg->mvars.push_back(child_mrg->mvars[i]); + new_mrg->fm[i]->set_range_var(node_suffix); + } + qp_node* new_node = split_nodes[0]->make_copy(suffix); + + if (new_node->node_type() == "spx_qpn") { + ((spx_qpn*)new_node)->table_name = new tablevar_t(host_name.c_str(), "IFACE", new_mrg->get_node_name().c_str()); + ((spx_qpn*)new_node)->table_name->set_range_var("_t0"); + } else { + ((sgah_qpn*)new_node)->table_name = new tablevar_t(host_name.c_str(), "IFACE", new_mrg->get_node_name().c_str()); + ((sgah_qpn*)new_node)->table_name->set_range_var("_t0"); + } + query_plan.push_back(new_mrg); + new_nodes.push_back(new_node); + } + + child_mrg->merge_fieldpos = merge_fieldpos; + if (split_nodes[0]->node_type() == "spx_qpn") + child_mrg->table_layout = create_attributes(child_mrg->get_node_name(), ((spx_qpn*)split_nodes[0])->select_list); + else + child_mrg->table_layout = create_attributes(child_mrg->get_node_name(), ((sgah_qpn*)split_nodes[0])->select_list); + + + // connect newly created nodes with parent multihost merge + for (i = 0; i < new_nodes.size(); ++i) { + if (new_nodes[i]->node_type() == "spx_qpn") + child_mrg->fm[i] = new tablevar_t(((spx_qpn*)new_nodes[i])->table_name->get_machine().c_str(), "IFACE", new_nodes[i]->get_node_name().c_str()); + else { + child_mrg->fm[i] = new tablevar_t(((sgah_qpn*)new_nodes[i])->table_name->get_machine().c_str(), "IFACE", new_nodes[i]->get_node_name().c_str()); + + } + child_mrg->mvars[i]->set_field(child_mrg->table_layout->get_field_name(merge_fieldpos)); + + sprintf(node_suffix, "_m%d", i); + child_mrg->fm[i]->set_range_var(node_suffix); + query_plan.push_back(new_nodes[i]); + } + child_mrg->fm.resize(new_nodes.size()); + child_mrg->mvars.resize(new_nodes.size()); + + // push the new high-level aggregation + query_plan.push_back(super_aggr); + + } + query_plan[index] = NULL; + generate_linkage(); + + } + +// Extract subtree rooted at node i into separate hfta +stream_query* stream_query::extract_subtree(int index) { + + vector nodes; + stream_query* new_query = new stream_query(query_plan[index], this); + + nodes.push_back(index); + for (int i = 0; i < nodes.size(); ++i) { + qp_node* node = query_plan[nodes[i]]; + if (!node) + continue; + + // add all children to nodes list + for (int j = 0; j < node->predecessors.size(); ++j) + nodes.push_back(node->predecessors[j]); + if (i) + new_query->query_plan.push_back(node); + + query_plan[nodes[i]] = NULL; + } + + return new_query; +} + +// Splits query that combines data from multiple hosts into separate hftas. +vector stream_query::split_multihost_query() { + + vector ret; + char node_suffix[128]; + int i; + + // find merges combining multiple hosts into per-host groups + int plan_size = query_plan.size(); + vector new_nodes; + + for (i = 0; i < plan_size; ++i) { + qp_node* node = query_plan[i]; + if (node && node->node_type() == "mrg_qpn") { + mrg_qpn* mrg = (mrg_qpn*)node; + if (mrg->is_multihost_merge()) { + + // group all the sources by host + map > host_map; + for (int j = 0; j < mrg->fm.size(); j++) { + tablevar_t *table_name = mrg->fm[j]; + if (host_map.count(table_name->get_machine())) + host_map[table_name->get_machine()].push_back(j); + else { + vector tables; + tables.push_back(j); + host_map[table_name->get_machine()] = tables; + } + } + + // create a new merge for each host + map >::iterator iter; + for (iter = host_map.begin(); iter != host_map.end(); iter++) { + string host_name = (*iter).first; + vector tables = (*iter).second; + + if (tables.size() == 1) + continue; + + sprintf(node_suffix, "_%s", host_name.c_str()); + string suffix(node_suffix); + untaboo(suffix); + mrg_qpn *new_mrg = (mrg_qpn *)mrg->make_copy(suffix); + for (int j = 0; j < tables.size(); ++j) { + new_mrg->fm.push_back(mrg->fm[tables[j]]); + new_mrg->mvars.push_back(mrg->mvars[j]); + sprintf(node_suffix, "m_%d", j); + new_mrg->fm[j]->set_range_var(node_suffix); + } + new_nodes.push_back(new_mrg); + } + + if (!new_nodes.empty()) { + // connect newly created merge nodes with parent multihost merge + for (int j = 0; j < new_nodes.size(); ++j) { + mrg->fm[j] = new tablevar_t(new_nodes[j]->fm[0]->get_machine().c_str(), "IFACE", new_nodes[j]->get_node_name().c_str()); + query_plan.push_back(new_nodes[j]); + } + mrg->fm.resize(new_nodes.size()); + mrg->mvars.resize(new_nodes.size()); + generate_linkage(); + } + + + // now externalize the sources + for (int j = 0; j < node->predecessors.size(); ++j) { + // Extract subtree rooted at node i into separate hfta + stream_query* q = extract_subtree(node->predecessors[j]); + if (q) { + q->generate_linkage(); + ret.push_back(q); + } + + } + generate_linkage(); + } + } + } + + return ret; +} + + +// Perform local FTA optimizations +void stream_query::optimize(vector& hfta_list, map lfta_names, vector interface_names, vector machine_names, ext_fcn_list *Ext_fcns, table_list *Schema, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result){ + + // Topologically sort the nodes in query plan (leaf-first) + int i = 0, j = 0; + vector sorted_nodes; + + int num_nodes = query_plan.size(); + bool* leaf_flags = new bool[num_nodes]; + memset(leaf_flags, 0, num_nodes * sizeof(bool)); + + // run topological sort + bool done = false; + + // add all leafs to sorted_nodes + while (!done) { + done = true; + for (i = 0; i < num_nodes; ++i) { + if (!query_plan[i]) + continue; + + if (!leaf_flags[i] && query_plan[i]->predecessors.empty()) { + leaf_flags[i] = true; + sorted_nodes.push_back(i); + done = false; + + // remove the node from its parents predecessor lists + // since we only care about number of predecessors, it is sufficient just to remove + // one element from the parent's predecessors list + for (int j = query_plan[i]->successors.size() - 1; j >= 0; --j) + query_plan[query_plan[i]->successors[j]]->predecessors.pop_back(); + } + } + } + delete[] leaf_flags; + num_nodes = sorted_nodes.size(); + generate_linkage(); // rebuild the recently destroyed predecessor lists. + + // collect the information about interfaces nodes read from + for (i = 0; i < num_nodes; ++i) { + qp_node* node = query_plan[sorted_nodes[i]]; + vector input_tables = node->get_input_tbls(); + for (j = 0; j < input_tables.size(); ++j) { + tablevar_t * table = input_tables[j]; + if (lfta_names.count(table->schema_name)) { + int index = lfta_names[table->schema_name]; + table->set_machine(machine_names[index]); + table->set_interface(interface_names[index]); + } + } + } + + /* + + // push eligible operators down in the query plan + for (i = 0; i < num_nodes; ++i) { + if (partn_parse_result && is_partn_compatible(sorted_nodes[i], lfta_names, interface_names, machine_names, ifaces_db, partn_parse_result)) { + pushdown_partn_operator(sorted_nodes[i]); + } else if (is_pushdown_compatible(sorted_nodes[i], lfta_names, interface_names, machine_names)) { + pushdown_operator(sorted_nodes[i], Ext_fcns, Schema); + } + } + + // split the query into multiple hftas if it combines the data from multiple hosts + vector hftas = split_multihost_query(); + hfta_list.insert(hfta_list.end(), hftas.begin(), hftas.end()); + */ + + num_nodes = query_plan.size(); + // also split multi-way merges into two-way merges + for (i = 0; i < num_nodes; ++i) { + qp_node* node = query_plan[i]; + if (node && node->node_type() == "mrg_qpn") { + vector split_merge = ((mrg_qpn *)node)->split_sources(); + + query_plan.insert(query_plan.end(), split_merge.begin(), split_merge.end()); + // delete query_plan[sorted_nodes[i]]; + query_plan[i] = NULL; + } + } + + generate_linkage(); +} + + + +table_def *stream_query::get_output_tabledef(){ + return( query_plan[qhead]->get_fields() ); +} + +vector stream_query::get_tbl_keys(vector &partial_keys){ + return query_plan[qhead]->get_tbl_keys(partial_keys); +} + + + +////////////////////////////////////////////////////////// +//// De-siloing. TO BE REMOVED + +void stream_query::desilo_lftas(map &lfta_names,vector &silo_names,table_list *Schema){ + int i,t,s; + + int suffix_len = silo_names.back().size(); + + for(i=0;i itbls = query_plan[qtail[i]]->get_input_tbls(); + for(t=0;tget_schema_name(); + if(lfta_names.count(itbl_name)>0){ +//printf("Query %s input %d references lfta input %s\n",query_plan[qtail[i]]->get_node_name().c_str(),t,itbl_name.c_str()); + vector src_names; + string lfta_base = itbl_name.substr(0,itbl_name.size()-suffix_len); + for(s=0;sget_node_name()+ + "_input_"+int_to_string(t); + mrg_qpn *merge_node = new mrg_qpn(merge_node_name,src_names,Schema); + int m_op_pos = Schema->add_table(merge_node->table_layout); + itbls[t]->set_schema(merge_node_name); + itbls[t]->set_schema_ref(m_op_pos); + query_plan.push_back(merge_node); + } + } + } + + generate_linkage(); +} +//////////////////////////////////////// +/// End de-siloing + + + +// Given a collection of LFTA stream queries, +// extract their WHERE predicates +// and pass them to an analysis routine which will extract +// common elements +// +void get_common_lfta_filter(std::vector lfta_list,table_list *Schema,ext_fcn_list *Ext_fcns, vector &prefilter_preds, set &pred_ids){ + int s; + std::vector< std::vector > where_list; + +// still safe to assume that LFTA queries have a single +// query node, which is at position 0. + for(s=0;s cnf_list = lfta_list[s]->query_plan[0]->get_filter_clause(); + if(lfta_list[s]->query_plan[0]->node_type() == "sgah_qpn"){ + gb_table *gtbl = ((sgah_qpn *)(lfta_list[s]->query_plan[0]))->get_gb_tbl(); + int c; + for(c=0;cpr,gtbl); + } + } + where_list.push_back(lfta_list[s]->query_plan[0]->get_filter_clause()); + } + + find_common_filter(where_list,Schema,Ext_fcns,prefilter_preds, pred_ids); +} + + + + + +// Given a collection of LFTA stream queries, +// extract the union of all temporal attributes referenced in select clauses +// those attributes will need to be unpacked in prefilter +// +void get_prefilter_temporal_cids(std::vector lfta_list, col_id_set &temp_cids){ + int s, sl; + vector sl_list; + gb_table *gb_tbl = NULL; + + +// still safe to assume that LFTA queries have a single +// query node, which is at position 0. + for(s=0;squery_plan[0]->node_type() == "spx_qpn"){ + spx_qpn *spx_node = (spx_qpn *)lfta_list[s]->query_plan[0]; + sl_list = spx_node->get_select_se_list(); + } + if(lfta_list[s]->query_plan[0]->node_type() == "sgah_qpn"){ + sgah_qpn *sgah_node = (sgah_qpn *)lfta_list[s]->query_plan[0]; + sl_list = sgah_node->get_select_se_list(); + gb_tbl = sgah_node->get_gb_tbl(); + } + + if(lfta_list[s]->query_plan[0]->node_type() == "filter_join"){ + filter_join_qpn *fj_node = (filter_join_qpn *)lfta_list[s]->query_plan[0]; + sl_list = fj_node->get_select_se_list(); + col_id ci; // also get the temporal var in case not in select list + ci.load_from_colref(fj_node->temporal_var); + temp_cids.insert(ci); + } + if(lfta_list[s]->query_plan[0]->node_type() == "watch_join"){ + watch_join_qpn *wj_node = (watch_join_qpn *)lfta_list[s]->query_plan[0]; + sl_list = wj_node->get_select_se_list(); + } + + for(sl=0;slget_data_type(); + if (sdt->is_temporal()) { + gather_se_col_ids(sl_list[sl],temp_cids, gb_tbl); + } + } + } +} + +