-/* ------------------------------------------------\r
-Copyright 2014 AT&T Intellectual Property\r
- Licensed under the Apache License, Version 2.0 (the "License");\r
- you may not use this file except in compliance with the License.\r
- You may obtain a copy of the License at\r
-\r
- http://www.apache.org/licenses/LICENSE-2.0\r
-\r
- Unless required by applicable law or agreed to in writing, software\r
- distributed under the License is distributed on an "AS IS" BASIS,\r
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- See the License for the specific language governing permissions and\r
- limitations under the License.\r
- ------------------------------------------- */\r
-#include"stream_query.h"\r
-#include"generate_utils.h"\r
-#include"analyze_fta.h"\r
-#include<algorithm>\r
-#include<utility>\r
-#include <list>\r
-\r
-static char tmpstr[500];\r
-\r
-using namespace std;\r
-\r
-\r
-// Create a query plan from a query node and an existing\r
-// query plan. Use for lfta queries, the parent query plan provides\r
-// the annotations.\r
-stream_query::stream_query(qp_node *qnode, stream_query *parent){\r
- query_plan.push_back(qnode);\r
- qhead = 0;\r
- qtail.push_back(0);\r
- attributes = qnode->get_fields();\r
- parameters = qnode->get_param_tbl();\r
- defines = parent->defines;\r
- query_name = qnode->get_node_name();\r
-}\r
-\r
-// Copy the query plan.\r
-stream_query::stream_query(stream_query &src){\r
- query_plan = src.query_plan;\r
- qhead = src.qhead;\r
- qtail = src.qtail;\r
- attributes = src.attributes;\r
- parameters = src.parameters;\r
- defines = src.defines;\r
- query_name = src.query_name;\r
- gid = src.gid;\r
-}\r
-\r
-\r
-// Create a query plan from an analyzed parse tree.\r
-// Perform analyses to find the output node, input nodes, etc.\r
-\r
-stream_query::stream_query(query_summary_class *qs,table_list *Schema){\r
-// Generate the query plan nodes from the analyzed parse tree.\r
-// There is only one for now, so just assign the return value\r
-// of create_query_nodes to query_plan\r
- error_code = 0;\r
- query_plan = create_query_nodes(qs,Schema);\r
- int i;\r
-if(query_plan.size() == 0){\r
- fprintf(stderr,"INTERNAL ERROR, zero-size query plan in stream_query::stream_query\n");\r
- exit(1);\r
-}\r
- for(i=0;i<query_plan.size();++i){\r
- if(query_plan[i]){\r
- if(query_plan[i]->get_error_code() != 0){\r
- error_code = query_plan[i]->get_error_code();\r
- err_str += query_plan[i]->get_error_str();\r
- }\r
- }\r
- }\r
- qhead = query_plan.size()-1;\r
- gid = -1;\r
-\r
-}\r
-\r
-\r
-stream_query * stream_query::add_query(query_summary_class *qs,table_list *Schema){\r
-// Add another query block to the query plan\r
- error_code = 0;\r
- vector<qp_node *> new_nodes = create_query_nodes(qs, Schema);\r
- query_plan.insert(query_plan.end(),new_nodes.begin(), new_nodes.end());\r
- return this;\r
-}\r
-\r
-stream_query * stream_query::add_query(stream_query &src){\r
-// Add another query block to the query plan\r
- error_code = 0;\r
- query_plan.insert(query_plan.end(),src.query_plan.begin(), src.query_plan.end());\r
- return this;\r
-}\r
-\r
-\r
-void stream_query::generate_protocol_se(map<string,stream_query *> &sq_map, table_list *Schema){\r
- int i,n;\r
-\r
-// Mapping fields to protocol fields requires schema binding.\r
-// First ensure all nodes are in the schema.\r
- for(n=0;n<query_plan.size();++n){\r
- if(query_plan[n] != NULL){\r
- Schema->add_table(query_plan[n]->get_fields());\r
- }\r
- }\r
-// Now do schema binding\r
- for(n=0;n<query_plan.size();++n){\r
- if(query_plan[n] != NULL){\r
- query_plan[n]->bind_to_schema(Schema);\r
- }\r
- }\r
-\r
-// create name-to-index map\r
- map<string, int> name_to_node;\r
- for(n=0;n<query_plan.size();++n){\r
- if(query_plan[n]){\r
- name_to_node[query_plan[n]->get_node_name()] = n;\r
- }\r
- }\r
-\r
-// Create a list of the nodes to process, in order.\r
-// Search from the root down.\r
-// ASSUME tree plan.\r
-\r
- list<int> search_q;\r
- list<int> work_list;\r
-\r
- search_q.push_back(qhead);\r
- while(! search_q.empty()){\r
- int the_q = search_q.front();\r
- search_q.pop_front(); work_list.push_front(the_q);\r
- vector<int> the_pred = query_plan[the_q]->get_predecessors();\r
- for(i=0;i<the_pred.size();i++)\r
- search_q.push_back(the_pred[i]);\r
- }\r
-\r
-// Scan through the work list, from the front,\r
-// gather the qp_node's ref'd, and call the\r
-// protocol se generator. Pass NULL for\r
-// sources not found - presumably user-defined operator\r
-\r
- while(! work_list.empty()){\r
- int the_q = work_list.front();\r
- work_list.pop_front();\r
- vector<qp_node *> q_sources;\r
- vector<tablevar_t *> q_input_tbls = query_plan[the_q]->get_input_tbls();\r
- for(i=0;i<q_input_tbls.size();++i){\r
- string itbl_nm = q_input_tbls[i]->get_schema_name();\r
- if(name_to_node.count(itbl_nm)>0){\r
- q_sources.push_back(query_plan[name_to_node[itbl_nm]]);\r
- }else if(sq_map.count(itbl_nm)>0){\r
- q_sources.push_back(sq_map[itbl_nm]->get_query_head());\r
- }else{\r
- q_sources.push_back(NULL);\r
- }\r
- }\r
- query_plan[the_q]->create_protocol_se(q_sources, Schema);\r
- }\r
-\r
-//////////////////////////////////////////////////////////\r
-// trust but verify\r
-\r
-\r
-/*\r
- for(i=0;i<query_plan.size();++i){\r
- if(query_plan[i]){\r
- printf("query node %s, type=%s:\n",query_plan[i]->get_node_name().c_str(),\r
- query_plan[i]->node_type().c_str());\r
- map<std::string, scalarexp_t *> *pse_map = query_plan[i]->get_protocol_se();\r
- map<std::string, scalarexp_t *>::iterator mssi;\r
- for(mssi=pse_map->begin();mssi!=pse_map->end();++mssi){\r
- if((*mssi).second)\r
- printf("\t%s : %s\n",(*mssi).first.c_str(), (*mssi).second->to_string().c_str());\r
- else\r
- printf("\t%s : NULL\n",(*mssi).first.c_str());\r
- }\r
- if(query_plan[i]->node_type() == "filter_join" || query_plan[i]->node_type() == "join_eq_hash_qpn"){\r
- vector<scalarexp_t *> pse_l;\r
- vector<scalarexp_t *> pse_r;\r
- if(query_plan[i]->node_type() == "filter_join"){\r
- pse_l = ((filter_join_qpn *)query_plan[i])->get_hash_l();\r
- pse_r = ((filter_join_qpn *)query_plan[i])->get_hash_r();\r
- }\r
- if(query_plan[i]->node_type() == "join_eq_hash_qpn"){\r
- pse_l = ((join_eq_hash_qpn *)query_plan[i])->get_hash_l();\r
- pse_r = ((join_eq_hash_qpn *)query_plan[i])->get_hash_r();\r
- }\r
- int p;\r
- for(p=0;p<pse_l.size();++p){\r
- if(pse_l[p] != NULL)\r
- printf("\t\t%s = ",pse_l[p]->to_string().c_str());\r
- else\r
- printf("\t\tNULL = ");\r
- if(pse_r[p] != NULL)\r
- printf("%s\n",pse_r[p]->to_string().c_str());\r
- else\r
- printf("NULL\n");\r
- }\r
- }\r
- if(query_plan[i]->node_type() == "sgah_qpn" || query_plan[i]->node_type() == "rsgah_qpn" || query_plan[i]->node_type() == "sgahcwcb_qpn"){\r
- vector<scalarexp_t *> pseg;\r
- if(query_plan[i]->node_type() == "sgah_qpn")\r
- pseg = ((sgah_qpn *)query_plan[i])->get_gb_sources();\r
- if(query_plan[i]->node_type() == "rsgah_qpn")\r
- pseg = ((rsgah_qpn *)query_plan[i])->get_gb_sources();\r
- if(query_plan[i]->node_type() == "sgahcwcb_qpn")\r
- pseg = ((sgahcwcb_qpn *)query_plan[i])->get_gb_sources();\r
- int g;\r
- for(g=0;g<pseg.size();g++){\r
- if(pseg[g] != NULL)\r
- printf("\t\tgb %d = %s\n",g,pseg[g]->to_string().c_str());\r
- else\r
- printf("\t\tgb %d = NULL\n",g);\r
- }\r
- }\r
- }\r
- }\r
-*/\r
-\r
-}\r
-\r
-bool stream_query::generate_linkage(){\r
- bool create_failed = false;\r
- int n, f,s;\r
-\r
-// Clear any leftover linkages\r
- for(n=0;n<query_plan.size();++n){\r
- if(query_plan[n]){\r
- query_plan[n]->clear_predecessors();\r
- query_plan[n]->clear_successors();\r
- }\r
- }\r
- qtail.clear();\r
-\r
-// create name-to-index map\r
- map<string, int> name_to_node;\r
- for(n=0;n<query_plan.size();++n){\r
- if(query_plan[n]){\r
- name_to_node[query_plan[n]->get_node_name()] = n;\r
- }\r
- }\r
-\r
-// Do the 2-way linkage.\r
- for(n=0;n<query_plan.size();++n){\r
- if(query_plan[n]){\r
- vector<tablevar_t *> fm = query_plan[n]->get_input_tbls();\r
- for(f=0;f<fm.size();++f){\r
- string src_tbl = fm[f]->get_schema_name();\r
- if(name_to_node.count(src_tbl)>0){\r
- int s = name_to_node[src_tbl];\r
- query_plan[n]->add_predecessor(s);\r
- query_plan[s]->add_successor(n);\r
- }\r
- }\r
- }\r
- }\r
-\r
-// Find the head (no successors) and the tails (at least one\r
-// predecessor is external).\r
-// Verify that there is only one head,\r
-// and that all other nodes have one successor (because\r
-// right now I can only handle trees).\r
-\r
- qhead = -1; // no head yet found.\r
- for(n=0;n<query_plan.size();++n){\r
- if(query_plan[n]){\r
- vector<int> succ = query_plan[n]->get_successors();\r
-/*\r
- if(succ.size() > 1){\r
- fprintf(stderr,"ERROR, query node %s supplies data to multiple nodes, but currently only tree query plans are supported:\n",query_plan[n]->get_node_name().c_str());\r
- for(s=0;s<succ.size();++s){\r
- fprintf(stderr,"%s ",query_plan[succ[s]]->get_node_name().c_str());\r
- }\r
- fprintf(stderr,"\n");\r
- create_failed = true;\r
- }\r
-*/\r
- if(succ.size() == 0){\r
- if(qhead >= 0){\r
- fprintf(stderr,"ERROR, query nodes %s and %s both have zero successors.\n",query_plan[n]->get_node_name().c_str(), query_plan[qhead]->get_node_name().c_str());\r
- create_failed = true;\r
- }else{\r
- qhead = n;\r
- }\r
- }\r
- if(query_plan[n]->n_predecessors() < query_plan[n]->get_input_tbls().size()){\r
- qtail.push_back(n);\r
- }\r
- }\r
- }\r
-\r
- return create_failed;\r
-}\r
-\r
-// After the collection of query plan nodes is generated,\r
-// analyze their structure to link them up into a tree (or dag?).\r
-// Verify that the structure is acceptable.\r
-// Do some other analysis and verification tasks (?)\r
-// then gather summar information.\r
-int stream_query::generate_plan(table_list *Schema){\r
-\r
-// The first thing to do is verify that the query plan\r
-// nodes were successfully created.\r
- bool create_failed = false;\r
- int n,f,s;\r
- for(n=0;n<query_plan.size();++n){\r
- if(query_plan[n]!=NULL && query_plan[n]->get_error_code()){\r
- fprintf(stderr,"%s",query_plan[n]->get_error_str().c_str());\r
- create_failed = true;\r
- }\r
- }\r
-/*\r
-for(n=0;n<query_plan.size();++n){\r
-if(query_plan[n] != NULL){\r
-string nstr = query_plan[n]->get_node_name();\r
-printf("In generate_plan, node %d is %s, reads from:\n\t",n,nstr.c_str());\r
-vector<tablevar_t *> inv = query_plan[n]->get_input_tbls();\r
-int nn;\r
-for(nn=0;nn<inv.size();nn++){\r
-printf("%s (%d) ",inv[nn]->to_string().c_str(),inv[nn]->get_schema_ref());\r
-}\r
-printf("\n");\r
-}\r
-}\r
-*/\r
-\r
- if(create_failed) return(1);\r
-\r
-// Here, link up the query nodes, then verify that the\r
-// structure is acceptable (single root, no cycles, no stranded\r
-// nodes, etc.)\r
- create_failed = generate_linkage();\r
- if(create_failed) return -1;\r
-\r
-\r
-// Here, do optimizations such as predicate pushing,\r
-// join rearranging, etc.\r
-// Nothing to do yet.\r
-\r
-/*\r
-for(n=0;n<query_plan.size();++n){\r
-if(query_plan[n] != NULL){\r
-string nstr = query_plan[n]->get_node_name();\r
-printf("B generate_plan, node %d is %s, reads from:\n\t",n,nstr.c_str());\r
-vector<tablevar_t *> inv = query_plan[n]->get_input_tbls();\r
-int nn;\r
-for(nn=0;nn<inv.size();nn++){\r
-printf("%s (%d) ",inv[nn]->to_string().c_str(),inv[nn]->get_schema_ref());\r
-}\r
-printf("\n");\r
-}\r
-}\r
-printf("qhead=%d, qtail = ",qhead);\r
-int nn;\r
-for(nn=0;nn<qtail.size();++nn)\r
-printf("%d ",qtail[nn]);\r
-printf("\n");\r
-*/\r
-\r
-// Collect query summaries. The query is reperesented by its head node.\r
- query_name = query_plan[qhead]->get_node_name();\r
- attributes = query_plan[qhead]->get_fields();\r
-// TODO: The params and defines require a lot more thought.\r
- parameters = query_plan[qhead]->get_param_tbl();\r
- defines = query_plan[qhead]->get_definitions();\r
-\r
- return(0);\r
- };\r
-\r
-void stream_query::add_output_operator(ospec_str *o){\r
- output_specs.push_back(o);\r
-}\r
-\r
-\r
-void stream_query::get_external_libs(set<string> &libset){\r
-\r
- int qn,i;\r
- for(qn=0;qn<query_plan.size();++qn){\r
- if(query_plan[qn] != NULL){\r
- vector<string> op_libs = query_plan[qn]->external_libs();\r
- for(i=0;i<op_libs.size();++i){\r
- libset.insert(op_libs[i]);\r
- }\r
- }\r
- }\r
-\r
- for(qn=0;qn<output_operators.size();++qn){\r
- if(output_operators[qn] != NULL){\r
- vector<string> op_libs = output_operators[qn]->external_libs();\r
- for(i=0;i<op_libs.size();++i){\r
- libset.insert(op_libs[i]);\r
- }\r
- }\r
- }\r
-}\r
-\r
-\r
-\r
-// Split into LFTA, HFTA components.\r
-// Split this query into LFTA and HFTA queries.\r
-// Four possible outcomes:\r
-// 1) the query reads from a protocol, but does not need to\r
-// split (can be evaluated as an LFTA).\r
-// The lfta query is the only element in the return vector,\r
-// and hfta_returned is false.\r
-// 2) the query reads from no protocol, and therefore cannot be split.\r
-// THe hfta query is the only element in the return vector,\r
-// and hfta_returned is true.\r
-// 3) reads from at least one protocol, but cannot be split : failure.\r
-// return vector is empty, the error conditions are written\r
-// in err_str and error_code\r
-// 4) The query splits into an hfta query and one or more LFTA queries.\r
-// the return vector has two or more elements, and hfta_returned\r
-// is true. The last element is the HFTA.\r
-\r
-vector<stream_query *> stream_query::split_query(ext_fcn_list *Ext_fcns, table_list *Schema, bool &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){\r
- vector<stream_query *> queries;\r
- int l,q,s;\r
- int qp_hfta;\r
-\r
- hfta_returned = false; // assume until proven otherwise\r
-\r
- for(l=0;l<qtail.size();++l){\r
- int leaf = qtail[l];\r
-\r
-\r
- vector<qp_node *> qnodes = query_plan[leaf]->split_node_for_fta(Ext_fcns, Schema, qp_hfta, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);\r
-\r
-\r
- if(qnodes.size() == 0 || query_plan[leaf]->get_error_code()){ // error\r
-//printf("error\n");\r
- error_code = query_plan[leaf]->get_error_code();\r
- err_str = query_plan[leaf]->get_error_str();\r
- vector<stream_query *> null_result;\r
- return(null_result);\r
- }\r
- if(qnodes.size() == 1 && qp_hfta){ // nothing happened\r
-//printf("no change\n");\r
- query_plan[leaf] = qnodes[0];\r
- }\r
- if(qnodes.size() == 1 && !qp_hfta){ // push to lfta\r
-//printf("lfta only\n");\r
- queries.push_back(new stream_query(qnodes[0], this));\r
- vector<int> succ = query_plan[leaf]->get_successors();\r
- for(s=0;s<succ.size();++s){\r
- query_plan[succ[s]]->remove_predecessor(leaf);\r
- }\r
- query_plan[leaf] = NULL; // delete it?\r
- }\r
- if(qnodes.size() > 1){ // actual splitting occurred.\r
- if(!qp_hfta){ // internal consistency check.\r
- error_code = 1;\r
- err_str = "INTERNAL ERROR: mulitple nodes returned by split_node_for_fta, but none are hfta nodes.\n";\r
- vector<stream_query *> null_result;\r
- return(null_result);\r
- }\r
-\r
- for(q=0;q<qnodes.size()-qp_hfta;++q){ // process lfta nodes\r
-//printf("creating lfta %d (%s)\n",q,qnodes[q]->get_node_name().c_str());\r
- queries.push_back(new stream_query(qnodes[q], this));\r
- }\r
-// Use new hfta node\r
- query_plan[leaf] = qnodes[qnodes.size()-1];\r
-// Add in any extra hfta nodes\r
- for(q=qnodes.size()-qp_hfta;q<qnodes.size()-1;++q)\r
- query_plan.push_back(qnodes[q]);\r
- }\r
- }\r
-\r
- int n_ifprefs = 0;\r
- set<string> ifps;\r
- for(q=0;q<query_plan.size();++q){\r
- if(query_plan[q] != NULL){\r
- n_ifprefs += query_plan[q]->count_ifp_refs(ifps);\r
- hfta_returned = true;\r
- }\r
- }\r
-\r
- if(n_ifprefs){\r
- set<string>::iterator ssi;\r
- err_str += "ERROR, unresolved interface parameters in HFTA:\n";\r
- for(ssi=ifps.begin();ssi!=ifps.end();++ssi){\r
- err_str += (*ssi)+" ";\r
- }\r
- err_str += "\n";\r
- error_code = 3;\r
- vector<stream_query *> null_result;\r
- return(null_result);\r
- }\r
-\r
-\r
- if(hfta_returned){\r
- if(generate_linkage()){\r
- fprintf(stderr,"INTERNAL ERROR, generate_linkage failed in split_query.\n");\r
- exit(1);\r
- }\r
- queries.push_back(this);\r
- }\r
-\r
- return(queries);\r
-}\r
-\r
-\r
-\r
-vector<table_exp_t *> stream_query::extract_opview(table_list *Schema, vector<query_node *> &qnodes, opview_set &opviews, string silo_nm){\r
- vector<table_exp_t *> subqueries;\r
- int l,q;\r
-\r
- string root_name = this->get_output_tabledef()->get_tbl_name();\r
-\r
-\r
- for(l=0;l<qtail.size();++l){\r
- int leaf = qtail[l];\r
- vector<table_exp_t *> new_qnodes = query_plan[leaf]->extract_opview(Schema, qnodes, opviews, root_name, silo_nm);\r
-\r
- for(q=0;q<new_qnodes.size();++q){ // process lfta nodes\r
- subqueries.push_back( new_qnodes[q]);\r
- }\r
- }\r
-\r
- return(subqueries);\r
-}\r
-\r
-\r
-\r
-\r
-string stream_query::make_schema(){\r
- return make_schema(qhead);\r
-}\r
-\r
-string stream_query::make_schema(int q){\r
- string ret="FTA{\n\n";\r
-\r
- ret += query_plan[q]->get_fields()->to_string();\r
-\r
- ret += "DEFINE{\n";\r
- ret += "\tquery_name '"+query_plan[q]->get_node_name()+"';\n";\r
-\r
- map<string, string> defs = query_plan[q]->get_definitions();\r
- map<string, string>::iterator dfi;\r
- for(dfi=defs.begin(); dfi!=defs.end(); ++dfi){\r
- ret += "\t"+ (*dfi).first + " '" + (*dfi).second + "';\n";\r
- }\r
- ret += "}\n\n";\r
-\r
- ret += "PARAM{\n";\r
- param_table *params = query_plan[q]->get_param_tbl();\r
- vector<string> param_names = params->get_param_names();\r
- int p;\r
- for(p=0;p<param_names.size();p++){\r
- data_type *dt = params->get_data_type( param_names[p] );\r
- ret += "\t" + param_names[p] + " '" + dt->get_type_str() + "';\n";\r
- }\r
- ret += "}\n";\r
-\r
- ret += query_plan[q]->to_query_string();\r
- ret += "\n}\n\n";\r
-\r
- return(ret);\r
-\r
-}\r
-\r
-string stream_query::collect_refd_ifaces(){\r
- string ret="";\r
- int q;\r
- for(q=0;q<query_plan.size();++q){\r
- if(query_plan[q]){\r
- map<string, string> defs = query_plan[q]->get_definitions();\r
- if(defs.count("_referenced_ifaces")){\r
- if(ret != "") ret += ",";\r
- ret += defs["_referenced_ifaces"];\r
- }\r
- }\r
- }\r
-\r
- return ret;\r
-}\r
-\r
-\r
-bool stream_query::stream_input_only(table_list *Schema){\r
- vector<tablevar_t *> input_tbls = this->get_input_tables();\r
- int i;\r
- for(i=0;i<input_tbls.size();++i){\r
- int t = Schema->get_table_ref(input_tbls[i]->get_schema_name());\r
- if(Schema->get_schema_type(t) == PROTOCOL_SCHEMA) return(false);\r
- }\r
- return(true);\r
-}\r
-\r
-// Return input tables. No duplicate removal performed.\r
-vector<tablevar_t *> stream_query::get_input_tables(){\r
- vector<tablevar_t *> retval;\r
-\r
-// create name-to-index map\r
- int n;\r
- map<string, int> name_to_node;\r
- for(n=0;n<query_plan.size();++n){\r
- if(query_plan[n]){\r
- name_to_node[query_plan[n]->get_node_name()] = n;\r
- }\r
- }\r
-\r
- int l;\r
- for(l=0;l<qtail.size();++l){\r
- int leaf = qtail[l];\r
- vector<tablevar_t *> tmp_v = query_plan[leaf]->get_input_tbls();\r
- int i;\r
- for(i=0;i<tmp_v.size();++i){\r
- if(name_to_node.count(tmp_v[i]->get_schema_name()) == 0)\r
- retval.push_back(tmp_v[i]);\r
- }\r
- }\r
- return(retval);\r
-}\r
-\r
-\r
-void stream_query::compute_node_format(int q, vector<int> &nfmt, map<string, int> &op_idx){\r
- int netcnt = 0, hostcnt = 0;\r
- int i;\r
-\r
- vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();\r
- for(i=0;i<itbls.size();++i){\r
- string tname = itbls[i]->get_schema_name();\r
- if(op_idx.count(tname)){\r
- int o = op_idx[tname];\r
- if(nfmt[o] == UNKNOWNFORMAT)\r
- compute_node_format(o,nfmt,op_idx);\r
- if(nfmt[o] == NETFORMAT) netcnt++;\r
- else hostcnt++;\r
- }else{\r
- netcnt++;\r
- }\r
- }\r
- if(query_plan[q]->makes_transform()){\r
- nfmt[q] = HOSTFORMAT;\r
- }else{\r
- if(hostcnt>0){\r
- nfmt[q] = HOSTFORMAT;\r
- }else{\r
- nfmt[q] = NETFORMAT;\r
- }\r
- }\r
-//printf("query plan %d (%s) is ",q,query_plan[q]->get_node_name().c_str());\r
-//if(nfmt[q] == HOSTFORMAT) printf(" host format.\n");\r
-//else printf("net format\n");\r
-}\r
-\r
-\r
-string stream_query::generate_hfta(table_list *Schema, ext_fcn_list *Ext_fcns, opview_set &opviews, bool distributed_mode){\r
- int schref, ov_ix, i, q, param_sz;\r
- bool dag_graph = false;\r
-\r
-\r
-// Bind the SEs in all query plan nodes to this schema, and\r
-// Add all tables used by this query to the schema.\r
-// Question: Will I be polluting the global schema by adding all\r
-// query node schemas?\r
-\r
-// First ensure all nodes are in the schema.\r
- int qn;\r
- for(qn=0;qn<query_plan.size();++qn){\r
- if(query_plan[qn] != NULL){\r
- Schema->add_table(query_plan[qn]->get_fields());\r
- }\r
- }\r
-// Now do binding.\r
- for(qn=0;qn<query_plan.size();++qn){\r
- if(query_plan[qn] != NULL){\r
- query_plan[qn]->bind_to_schema(Schema);\r
- }\r
- }\r
-\r
-// Is it a DAG plan?\r
- set<string> qsources;\r
- int n;\r
- for(n=0;n<query_plan.size();++n){\r
- if(query_plan[n]){\r
- vector<tablevar_t *> tmp_v = query_plan[n]->get_input_tbls();\r
- int i;\r
- for(i=0;i<tmp_v.size();++i){\r
- if(qsources.count(tmp_v[i]->get_schema_name()) > 0)\r
- dag_graph = true;\r
- qsources.insert(tmp_v[i]->get_schema_name());\r
- }\r
- }\r
- }\r
-\r
-\r
-\r
-// Collect set of tables ref'd in this HFTA\r
- set<int> tbl_set;\r
- for(qn=0;qn<query_plan.size();++qn){\r
- if(query_plan[qn]){\r
-// get names of the tables\r
- vector<tablevar_t *> input_tbls = query_plan[qn]->get_input_tbls();\r
- vector<tablevar_t *> output_tbls = query_plan[qn]->get_output_tbls();\r
-// Convert to tblrefs, add to set of ref'd tables\r
- int i;\r
- for(i=0;i<input_tbls.size();i++){\r
-// int t = Schema->get_table_ref(input_tbls[i]->get_schema_name());\r
- int t = input_tbls[i]->get_schema_ref();\r
- if(t < 0){\r
- fprintf(stderr,"INTERNAL ERROR in generate_hfta. "\r
- "query plan node %s references input table %s, which is not in schema.\n",\r
- query_name.c_str(), input_tbls[i]->get_schema_name().c_str());\r
- exit(1);\r
- }\r
- tbl_set.insert(t);\r
- }\r
-\r
- for(i=0;i<output_tbls.size();i++){\r
- int t = Schema->get_table_ref(output_tbls[i]->get_schema_name());\r
- if(t < 0){\r
- fprintf(stderr,"INTERNAL ERROR in generate_hfta."\r
- "query plan node %s references output table %s, which is not in schema.\n",\r
- query_name.c_str(), output_tbls[i]->get_schema_name().c_str());\r
- exit(1);\r
- }\r
- tbl_set.insert(t);\r
- }\r
- }\r
- }\r
-\r
-// Collect map of lftas, query nodes\r
- map<string, int> op_idx;\r
- for(q=0;q<query_plan.size();q++){\r
- if(query_plan[q]){\r
- op_idx[query_plan[q]->get_node_name()] = q;\r
- }\r
- }\r
-\r
-// map of input tables must include query id and input\r
-// source (0,1) becuase several queries might reference the same source\r
- vector<tablevar_t *> input_tbls = this->get_input_tables();\r
- vector<bool> input_tbl_free;\r
- for(i=0;i<input_tbls.size();++i){\r
- input_tbl_free.push_back(true);\r
- }\r
- map<string, int> lfta_idx;\r
-//fprintf(stderr,"%d input tables, %d query nodes\n",input_tbls.size(), query_plan.size());\r
- for(q=0;q<query_plan.size();q++){\r
- if(query_plan[q]){\r
- vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();\r
- int it;\r
- for(it=0;it<itbls.size();it++){\r
- string tname = itbls[it]->get_schema_name()+"-"+int_to_string(q)+"-"+int_to_string(it);\r
- string src_tblname = itbls[it]->get_schema_name();\r
- bool src_is_external = false;\r
- for(i=0;i<input_tbls.size();++i){\r
- if(src_tblname == input_tbls[i]->get_schema_name()){\r
- src_is_external = true;\r
- if(input_tbl_free[i]){\r
- lfta_idx[tname] = i;\r
- input_tbl_free[i] = false;\r
-//fprintf(stderr,"Adding %s (src_tblname=%s, q=%d, it=%d) to %d.\n",tname.c_str(), src_tblname.c_str(), q, it, i);\r
- break;\r
- }\r
- }\r
- }\r
- if(i==input_tbls.size() && src_is_external){\r
- fprintf(stderr,"INTERNAL ERROR in stream_query::generate_hfta, can't find free entry in input_tbls for query %d, intput %d (%s)\n",q,it,src_tblname.c_str());\r
- exit(1);\r
- }\r
- }\r
- }\r
- }\r
-/*\r
- for(i=0;i<input_tbls.size();++i){\r
- string src_tblname = input_tbls[i]->get_schema_name();\r
- lfta_idx[src_tblname] = i;\r
- }\r
-*/\r
-\r
-// Compute the output formats of the operators.\r
- vector<int> node_fmt(query_plan.size(),UNKNOWNFORMAT);\r
- compute_node_format(qhead, node_fmt, op_idx);\r
-\r
-\r
-// Generate the schema strings for the outputs.\r
- string schema_str;\r
- for(i=0;i<query_plan.size();++i){\r
- if(i != qhead && query_plan[i]){\r
- string schema_tmpstr = this->make_schema(i);\r
- schema_str += "gs_csp_t node"+int_to_string(i)+"_schema = "+make_C_embedded_string(schema_tmpstr)+";\n";\r
- }\r
- }\r
-\r
- attributes = query_plan[qhead]->get_fields();\r
-\r
-\r
- string schema_tmpstr = this->make_schema();\r
- schema_str += "gs_csp_t "+generate_schema_string_name(query_name)+" = "+make_C_embedded_string(schema_tmpstr)+";\n";\r
-\r
-// Generate the collection of tuple defs.\r
-\r
- string tuple_defs = "\n/*\tDefine tuple structures \t*/\n\n";\r
- set<int>::iterator si;\r
- for(si=tbl_set.begin(); si!=tbl_set.end(); ++si){\r
- tuple_defs += generate_host_tuple_struct( Schema->get_table( (*si) ));\r
- tuple_defs += "\n\n";\r
- }\r
-\r
-// generate the finalize tuple function\r
- string finalize_str = generate_hfta_finalize_tuple(attributes);\r
-\r
-// Analyze and make the output operators\r
- bool eat_input = false;\r
- string src_op = query_name;\r
- string pred_op = src_op;\r
- int last_op = -1;\r
- if(output_specs.size()>0)\r
- eat_input = true;\r
- if(n_successors>0)\r
- eat_input = false; // must create stream output for successor HFTAs.\r
- int n_filestreams = 0;\r
- for(i=0;i<output_specs.size();++i){\r
- if(output_specs[i]->operator_type == "stream" ){\r
- eat_input = false;\r
- }\r
- if(output_specs[i]->operator_type == "file" || output_specs[i]->operator_type == "zfile" ){\r
- last_op = i;\r
- n_filestreams++;\r
- }\r
- }\r
- int filestream_id = 0;\r
- for(i=0;i<output_specs.size();++i){\r
- if(output_specs[i]->operator_type == "file" || output_specs[i]->operator_type == "zfile"){\r
-\r
- int n_fstreams = output_specs[i]->n_partitions / n_parallel;\r
- if(n_fstreams * n_parallel < output_specs[i]->n_partitions){\r
- n_fstreams++;\r
- if(n_parallel == 1 || query_name.find("__copy1") != string::npos){\r
- fprintf(stderr,"WARNING, in query %s, %d streams requested for %s output, but it must be a multiple of the hfta parallelism (%d), increasing number of output streams to %d.\n",query_name.c_str(), output_specs[i]->n_partitions, output_specs[i]->operator_type.c_str(), n_parallel, n_fstreams*n_parallel);\r
- }\r
- }\r
-// output_file_qpn *new_ofq = new output_file_qpn();\r
- string filestream_tag = "";\r
- if(n_filestreams>1){\r
- filestream_tag = "_fileoutput"+int_to_string(filestream_id);\r
- filestream_id++;\r
- }\r
- output_file_qpn *new_ofq = new output_file_qpn(pred_op, src_op, filestream_tag, query_plan[qhead]->get_fields(), output_specs[i], (i==last_op ? eat_input : false) );\r
-// if(n_fstreams > 1){\r
- if(n_fstreams > 0){\r
- string err_str;\r
- bool err_ret = new_ofq->set_splitting_params(n_parallel,parallel_idx,n_fstreams,output_specs[i]->partitioning_flds,err_str);\r
- if(err_ret){\r
- fprintf(stderr,"%s",err_str.c_str());\r
- exit(1);\r
- }\r
- }\r
- output_operators.push_back(new_ofq );\r
- pred_op = output_operators.back()->get_node_name();\r
- }else if(! (output_specs[i]->operator_type == "stream" || output_specs[i]->operator_type == "Stream" || output_specs[i]->operator_type == "STREAM") ){\r
- fprintf(stderr,"WARNING, output operator type %s (on query %s) is not supported, ignoring\n",output_specs[i]->operator_type.c_str(),query_name.c_str() );\r
- }\r
- }\r
-\r
-\r
-\r
-// Generate functors for the query nodes.\r
-\r
- string functor_defs = "\n/*\tFunctor definitions\t*/\n\n";\r
- for(qn=0;qn<query_plan.size();++qn){\r
- if(query_plan[qn]!=NULL){\r
-// Compute whether the input needs a ntoh xform.\r
- vector<bool> needs_xform;\r
- vector<tablevar_t *> itbls = query_plan[qn]->get_input_tbls();\r
- for(i=0;i<itbls.size();++i){\r
- string tname = itbls[i]->get_schema_name();\r
-// if(query_plan[qn]->makes_transform()){\r
- if(op_idx.count(tname)>0){\r
- if(node_fmt[ op_idx[tname] ] == NETFORMAT){\r
- needs_xform.push_back(true);\r
- }else{\r
- needs_xform.push_back(false);\r
- }\r
- }else{\r
- needs_xform.push_back(true);\r
- }\r
-// }else{\r
-// if(op_idx.count(tname)>0){\r
-// if(node_fmt[qn] != node_fmt[ op_idx[tname] ]){\r
-// needs_xform.push_back(true);\r
-// }else{\r
-// needs_xform.push_back(false);\r
-// }\r
-// }else{\r
-// if(node_fmt[qn] == HOSTFORMAT){\r
-// needs_xform.push_back(true);\r
-// }else{\r
-// needs_xform.push_back(false);\r
-// }\r
-// }\r
-// }\r
- }\r
-\r
- functor_defs += query_plan[qn]->generate_functor(Schema, Ext_fcns, needs_xform);\r
- }\r
- }\r
-\r
-// Generate output operator functors\r
-\r
- vector<bool> needs_xform;\r
- for(i=0;i<output_operators.size();++i)\r
- functor_defs += output_operators[i]->generate_functor(Schema, Ext_fcns, needs_xform);\r
-\r
- string ret =\r
-"extern \"C\" {\n"\r
-"#include <lapp.h>\n"\r
-"#include <fta.h>\n"\r
-"#include <gshub.h>\n"\r
-"#include <stdlib.h>\n"\r
-"#include <stdio.h>\n"\r
-"#include <limits.h>\n"\r
-"}\n"\r
-;\r
- if(dag_graph)\r
- ret +=\r
-"#define PLAN_DAG\n"\r
-;\r
- ret +=\r
-"#include <schemaparser.h>\n"\r
-"#include<hfta_runtime_library.h>\n"\r
-"\n"\r
-"#include <host_tuple.h>\n"\r
-"#include <hfta.h>\n"\r
-"#include <hfta_udaf.h>\n"\r
-"#include <hfta_sfun.h>\n"\r
-"\n"\r
-//"#define MAXSCHEMASZ 16384\n"\r
-"#include <stdio.h>\n\n"\r
-;\r
-\r
-// Get include file for each of the operators.\r
-// avoid duplicate inserts.\r
- set<string> include_fls;\r
- for(qn=0;qn<query_plan.size();++qn){\r
- if(query_plan[qn] != NULL)\r
- include_fls.insert(query_plan[qn]->get_include_file());\r
- }\r
- for(i=0;i<output_operators.size();++i)\r
- include_fls.insert(output_operators[i]->get_include_file());\r
- set<string>::iterator ssi;\r
- for(ssi=include_fls.begin();ssi!=include_fls.end();++ssi)\r
- ret += (*ssi);\r
-\r
-// Add defines for hash functions\r
- ret +=\r
-"\n"\r
-"#define hfta_BOOL_to_hash(x) (x)\n"\r
-"#define hfta_USHORT_to_hash(x) (x)\n"\r
-"#define hfta_UINT_to_hash(x) (x)\n"\r
-"#define hfta_IP_to_hash(x) (x)\n"\r
-"#define hfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"\r
-"#define hfta_INT_to_hash(x) (gs_uint32_t)(x)\n"\r
-"#define hfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"\r
-"#define hfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"\r
-"#define hfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"\r
-"\n"\r
-;\r
-\r
-// ret += "#define SERIOUS_LFTA \""+input_tbls[0]->get_schema_name()+"\"\n";\r
- ret += "#define OUTPUT_HFTA \""+query_name+"\"\n\n";\r
-\r
-// HACK ALERT: I know for now that all query plans are\r
-// single operator plans, but while SPX and SGAH can use the\r
-// UNOP template, the merge operator must use the MULTOP template.\r
-// WORSE HACK ALERT : merge does not translate its input,\r
-// so don't apply finalize to the output.\r
-// TODO: clean this up.\r
-\r
-// string node_type = query_plan[0]->node_type();\r
-\r
- ret += schema_str;\r
- ret += tuple_defs;\r
-\r
-// Need to work on the input, output xform logic.\r
-// For now, just add it in.\r
-// ret += finalize_str;\r
-\r
- if(node_fmt[qhead] == NETFORMAT){\r
- ret +=\r
-"void finalize_tuple(host_tuple &tup){\n"\r
-"return;\n"\r
-"}\n"\r
-"\n";\r
- }else{\r
- ret += finalize_str;\r
- }\r
-\r
- ret += functor_defs;\r
-\r
-// Parameter block management\r
-// The proper parameter block must be transmitted to each\r
-// external stream source.\r
-// There is a 1-1 mapping between the param blocks returned\r
-// by this list and the registered data sources ...\r
-// TODO: make this more manageable, but for now\r
-// there is no parameter block manipulation so I just\r
-// need to have the same number.\r
-\r
- ret +=\r
-"int get_lfta_params(gs_int32_t sz, void * value,list<param_block>& lst){\n"\r
-" // for now every lfta receive the full copy of hfta parameters\n"\r
-" struct param_block pb;\n";\r
-\r
- set<string> lfta_seen;\r
- for(i=0;i<input_tbls.size();++i){\r
- string src_tblname = input_tbls[i]->get_schema_name();\r
- if(lfta_seen.count(src_tblname) == 0){\r
- lfta_seen.insert(src_tblname);\r
- schref = input_tbls[i]->get_schema_ref();\r
- if(Schema->get_schema_type(schref) == OPERATOR_VIEW_SCHEMA){\r
- ov_ix = input_tbls[i]->get_opview_idx();\r
- opview_entry *opv = opviews.get_entry(ov_ix);\r
- string op_param = "SUBQ:";\r
- int q;\r
- for(q=0;q<opv->subq_names.size();++q){\r
- if(q>0) op_param+=",";\r
- op_param+=opv->subq_names[q];\r
- }\r
- op_param+="\\n";\r
- param_sz = op_param.size()-1;\r
-\r
- sprintf(tmpstr,"\t\tpb.block_length = %d;\n",param_sz); ret+=tmpstr;\r
- ret+=\r
-" pb.data = malloc(pb.block_length);\n";\r
- ret+="\t\tmemcpy(pb.data,\""+op_param+"\",pb.block_length);\n"\r
-" lst.push_back(pb);\n\n";\r
- }else{\r
- ret+=\r
-" pb.block_length = sz;\n"\r
-" pb.data = malloc(pb.block_length);\n"\r
-" memcpy(pb.data, value, pb.block_length);\n"\r
-" lst.push_back(pb);\n\n";\r
- }\r
- }\r
- }\r
-\r
- ret +=\r
-" return 0;\n"\r
-"}\n"\r
-"\n";\r
-\r
- ret+=\r
-"struct FTA* alloc_hfta (struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command, gs_int32_t sz, void * value ) {\n"\r
-"\n"\r
-" // find the lftas\n"\r
-" list<lfta_info*> *lfta_list = new list<lfta_info*>;\n"\r
-"\n"\r
-" FTAID f;\n"\r
-" char schemabuf[MAXSCHEMASZ];\n"\r
-" gs_schemahandle_t schema_handle;\n"\r
-"\n";\r
-\r
-// Register the input data sources.\r
-// Register a source only once.\r
-\r
-// vector<string> ext_reg_txt;\r
- map<string, int> input_tbl_srcid;\r
- for(i=0;i<input_tbls.size();++i){\r
- string src_tblname = input_tbls[i]->get_schema_name();\r
-// Use UDOP alias when in distributed mode.\r
-// the cluster manager will make the translation\r
-// using infr from qtree.xml\r
- if(distributed_mode && input_tbls[i]->get_udop_alias() != "")\r
- src_tblname = input_tbls[i]->get_udop_alias();\r
- if(input_tbl_srcid.count(src_tblname) == 0){\r
- int srcid = input_tbl_srcid.size();\r
- input_tbl_srcid[src_tblname] = srcid;\r
- string tmp_s=\r
-"\n // find "+src_tblname+"\n"\r
-" if (fta_find(\""+src_tblname+"\",1,&f,schemabuf,MAXSCHEMASZ)!=0) {\n"\r
-" fprintf(stderr,\"HFTA::error:could not find LFTA \\n\");\n"\r
-" return 0;\n"\r
-" }\n"\r
-" //fprintf(stderr,\"HFTA::FTA found at %u[%u]\\n\",ftamsgid,ftaindex);\n"\r
-"\n"\r
-" // parse the schema and get the schema handle\n"\r
-" schema_handle = ftaschema_parse_string(schemabuf);\n"\r
-" lfta_info* inf"+int_to_string(srcid)+" = new lfta_info();\n"\r
-" inf"+int_to_string(srcid)+"->f = f;\n"\r
-" inf"+int_to_string(srcid)+"->fta_name = strdup(\""+src_tblname+"\");\n"\r
-" inf"+int_to_string(srcid)+"->schema = strdup(schemabuf);\n"\r
-" inf"+int_to_string(srcid)+"->schema_handle = schema_handle;\n"\r
-" lfta_list->push_back(inf"+int_to_string(srcid)+");\n\n";\r
-// ext_reg_txt.push_back(tmp_s);\r
- ret += tmp_s;\r
- }\r
- }\r
-\r
- ret+="\n";\r
- ret += "\tgs_schemahandle_t root_schema_handle = ftaschema_parse_string("+generate_schema_string_name(query_name)+");\n";\r
- for(i=0;i<query_plan.size();++i){\r
- if(i != qhead && query_plan[i]){\r
- ret += "\tgs_schemahandle_t op"+int_to_string(i)+"_schema_handle = ftaschema_parse_string(node"+int_to_string(i)+"_schema);\n";\r
- }\r
- }\r
- ret+="\n";\r
-\r
-// Create the operators.\r
- int n_basic_ops;\r
- for(q=0;q<query_plan.size();q++){\r
- if(query_plan[q]){\r
- ret+=\r
-"\n"\r
-" // create an instance of operator "+int_to_string(q)+" ("+query_plan[q]->get_node_name()+") \n";\r
-\r
-// Create parameters for operator construction.\r
- string op_params;\r
- vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();\r
- string tname = itbls[0]->get_schema_name();\r
-// string li_tname = tname +"-"+int_to_string(q)+"-0";\r
-// if(lfta_idx.count(li_tname)>0)\r
- if(input_tbl_srcid.count(tname)>0){\r
-// ret += ext_reg_txt[lfta_idx[li_tname]];\r
-// op_params += "inf"+int_to_string( lfta_idx[li_tname] )+"->schema_handle";\r
- op_params += "inf"+int_to_string( input_tbl_srcid[tname] )+"->schema_handle";\r
- }else if(op_idx.count(tname)>0){\r
- op_params += "op"+int_to_string( op_idx[tname] )+"_schema_handle";\r
- }else{\r
- fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when creating operator (3) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str());\r
- exit(1);\r
- }\r
- if(itbls.size()>1){\r
- string tname = itbls[1]->get_schema_name();\r
-// string li_tname = tname +"-"+int_to_string(q)+"-1";\r
-// if(lfta_idx.count(li_tname)>0)\r
- if(input_tbl_srcid.count(tname)>0){\r
-// ret += ext_reg_txt[lfta_idx[li_tname]];\r
-// op_params += ",inf"+int_to_string( lfta_idx[li_tname] )+"->schema_handle";\r
- op_params += ",inf"+int_to_string( input_tbl_srcid[tname] )+"->schema_handle";\r
- }else if(op_idx.count(tname)>0){\r
- op_params += ",op"+int_to_string( op_idx[tname] )+"_schema_handle";\r
- }else{\r
- fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when creating operator (4) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str());\r
- exit(1);\r
- }\r
- }\r
- ret += query_plan[q]->generate_operator(q,op_params);\r
- ret +=\r
-" operator_node* node"+int_to_string(q)+" = new operator_node(op"+int_to_string(q)+");\n";\r
-\r
- n_basic_ops = q;\r
- }\r
- }\r
- n_basic_ops++;\r
-// Next for the output operators if any\r
- for(i=0;i<output_operators.size();++i){\r
- ret += output_operators[i]->generate_operator(n_basic_ops+i,"root_schema_handle");\r
- ret +=\r
-" operator_node* node"+int_to_string(n_basic_ops+i)+" = new operator_node(op"+int_to_string(n_basic_ops+i)+");\n";\r
- }\r
-\r
-\r
-// Link up operators.\r
- for(q=0;q<query_plan.size();++q){\r
- if(query_plan[q]){\r
-// NOTE: this code assume that the operator has at most\r
-// two inputs. But the template code also makes\r
-// this assumption. Both will need to be changed.\r
- vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();\r
- string tname = itbls[0]->get_schema_name();\r
-// string li_tname = tname +"-"+int_to_string(q)+"-0";\r
-// if(lfta_idx.count(li_tname)>0)\r
- if(input_tbl_srcid.count(tname)>0){\r
-// ret += "\tnode"+int_to_string(q)+"->set_left_lfta(inf"+int_to_string( lfta_idx[li_tname] )+");\n";\r
- ret += "\tnode"+int_to_string(q)+"->set_left_lfta(inf"+int_to_string( input_tbl_srcid[tname] )+");\n";\r
- }else if(op_idx.count(tname)>0){\r
- ret += "\tnode"+int_to_string(q)+"->set_left_child_node(node"+int_to_string( op_idx[tname] )+");\n";\r
- }else{\r
- fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when linking operator (1) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str());\r
- exit(1);\r
- }\r
- if(itbls.size()>1){\r
- string tname = itbls[1]->get_schema_name();\r
-// string li_tname = tname +"-"+int_to_string(q)+"-1";\r
-// if(lfta_idx.count(li_tname)>0)\r
- if(input_tbl_srcid.count(tname)>0){\r
-// ret += "\tnode"+int_to_string(q)+"->set_right_lfta(inf"+int_to_string( lfta_idx[li_tname] )+");\n";\r
- ret += "\tnode"+int_to_string(q)+"->set_right_lfta(inf"+int_to_string( input_tbl_srcid[tname] )+");\n";\r
- }else if(op_idx.count(tname)>0){\r
- ret += "\tnode"+int_to_string(q)+"->set_right_child_node(node"+int_to_string( op_idx[tname] )+");\n";\r
- }else{\r
- fprintf(stderr,"INTERNAL ERROR, can't identify input table %s (%s) when linking operator (2) %d (%s)\n",tname.c_str(), tname.c_str(),q,query_plan[q]->get_node_name().c_str());\r
- exit(1);\r
- }\r
- }\r
- }\r
- }\r
- for(i=0;i<output_operators.size();++i){\r
- if(i==0)\r
- ret += "\tnode"+int_to_string(n_basic_ops)+"->set_left_child_node(node"+int_to_string( qhead )+");\n";\r
- else\r
- ret += "\tnode"+int_to_string(n_basic_ops+i)+"->set_left_child_node(node"+int_to_string( n_basic_ops+i-1 )+");\n";\r
- }\r
-\r
- // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters\r
-\r
- bool fta_reusable = false;\r
- if (query_plan[qhead]->get_val_of_def("reusable") == "yes" ||\r
- query_plan[qhead]->get_param_tbl()->size() == 0) {\r
- fta_reusable = 1;\r
- }\r
-\r
- int root_node = qhead;\r
- if(output_operators.size()>0)\r
- root_node = n_basic_ops+i-1;\r
-\r
- ret+=\r
-"\n"\r
-"\n"\r
-" MULTOP_HFTA* ftap = new MULTOP_HFTA(ftaid, OUTPUT_HFTA, command, sz, value, root_schema_handle, node"+int_to_string(root_node)+", lfta_list, " + (fta_reusable ?"true":"false") + ", reusable);\n"\r
-" if(ftap->init_failed()){ delete ftap; return 0;}\n"\r
-" return (FTA*)ftap;\n"\r
-"}\n"\r
-"\n"\r
-"\n";\r
-\r
-\r
- string comm_bufsize = "16*1024*1024";\r
- if(defines.count("hfta_comm_buf")>0){\r
- comm_bufsize = defines["hfta_comm_buf"];\r
- }\r
-\r
- ret+=\r
-"\n"\r
-"int main(int argc, char * argv[]) {\n"\r
-"\n"\r
-"\n"\r
-" /* parse the arguments */\n"\r
-"\n"\r
-" gs_int32_t tip1,tip2,tip3,tip4;\n"\r
-" endpoint gshub;\n"\r
-" gs_sp_t instance_name;\n"\r
-" if (argc<3) {\n"\r
-" gslog(LOG_EMERG,\"Wrong arguments at startup\");\n"\r
-" exit(1);\n"\r
-" }\n"\r
-"\n"\r
-" if ((sscanf(argv[1],\"%u.%u.%u.%u:%hu\",&tip1,&tip2,&tip3,&tip4,&(gshub.port)) != 5)) {\n"\r
-" gslog(LOG_EMERG,\"HUB IP NOT DEFINED\");\n"\r
-" exit(1);\n"\r
-" }\n"\r
-" gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);\n"\r
-" gshub.port=htons(gshub.port);\n"\r
-" instance_name=strdup(argv[2]);\n"\r
-" if (set_hub(gshub)!=0) {\n"\r
-" gslog(LOG_EMERG,\"Could not set hub\");\n"\r
-" exit(1);\n"\r
-" }\n"\r
-" if (set_instance_name(instance_name)!=0) {\n"\r
-" gslog(LOG_EMERG,\"Could not set instance name\");\n"\r
-" exit(1);\n"\r
-" }\n"\r
-"\n"\r
-"\n"\r
-" /* initialize host library */\n"\r
-"\n"\r
-//" fprintf(stderr,\"Initializing gscp\\n\");\n"\r
-" gsopenlog(argv[0]);\n"\r
-"\n"\r
-" if (hostlib_init(HFTA, "+comm_bufsize+", DEFAULTDEV, 0, 0)!=0) {\n"\r
-" fprintf(stderr,\"%s::error:could not initialize gscp\\n\",\n"\r
-" argv[0]);\n"\r
-" exit(1);\n"\r
-" }\n"\r
-"\n"\r
-"\n"\r
-"\n"\r
-" FTAID ret = fta_register(OUTPUT_HFTA, " + (fta_reusable?"1":"0") + ", DEFAULTDEV, alloc_hfta, "+generate_schema_string_name(query_name)+", -1, 0ull);\n"\r
-" fta_start_service(-1);\n"\r
-"\n"\r
-" return 0;\n"\r
-"\n"\r
-"}\n"\r
-"\n";\r
-////////////////////\r
- return(ret);\r
-}\r
-\r
-\r
-// Checks if the node i is compatible with interface partitioning\r
-// (can be pushed below the merge that combines partitioned stream)\r
-// i - index of the query node in a query plan\r
-bool stream_query::is_partn_compatible(int index, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result) {\r
- int i;\r
- qp_node* node = query_plan[index];\r
- qp_node* child_node = NULL;\r
-\r
- if (node->predecessors.empty())\r
- return false;\r
-\r
- // all the node predecessors must be partition merges with the same partition definition\r
- partn_def_t* partn_def = NULL;\r
- for (i = 0; i < node->predecessors.size(); ++i) {\r
- child_node = query_plan[node->predecessors[i]];\r
- if (child_node->node_type() != "mrg_qpn")\r
- return false;\r
-\r
- // merge must have only one parent for this optimization to work\r
- // check that all its successors are the same\r
- for (int j = 1; j < child_node->successors.size(); ++j) {\r
- if (child_node->successors[j] != child_node->successors[0])\r
- return false;\r
- }\r
-\r
- partn_def_t* new_partn_def = ((mrg_qpn*)child_node)->get_partn_definition(lfta_names, interface_names, machine_names, ifaces_db, partn_parse_result);\r
- if (!new_partn_def)\r
- return false;\r
- if (!i)\r
- partn_def = new_partn_def;\r
- else if (new_partn_def != partn_def)\r
- return false;\r
-\r
- }\r
-\r
- if (node->node_type() == "spx_qpn") // spx nodes are always partition compatible\r
- return true;\r
- else if (node->node_type() == "sgah_qpn") {\r
- gb_table gb_tbl = ((sgah_qpn*)node)->gb_tbl;\r
- return true; //partn_def->is_compatible(&gb_tbl);\r
- }\r
- else if (node->node_type() == "rsgah_qpn") {\r
- gb_table gb_tbl = ((rsgah_qpn*)node)->gb_tbl;\r
- return partn_def->is_compatible(&gb_tbl);\r
- }\r
- else if (node->node_type() == "sgahcwcb_qpn") {\r
- gb_table gb_tbl = ((sgahcwcb_qpn*)node)->gb_tbl;\r
- return partn_def->is_compatible(&gb_tbl);\r
- }\r
- else if (node->node_type() == "join_eq_hash_qpn") {\r
- return true;\r
- }\r
- else\r
- return false;\r
-}\r
-\r
-// Push the operator below the merge that combines\r
-void stream_query::pushdown_partn_operator(int index) {\r
- qp_node* node = query_plan[index];\r
- int i;\r
- char node_index[128];\r
-\r
-// fprintf(stderr, "Partn pushdown, query %s of type %s\n", node->get_node_name().c_str(), node->node_type().c_str());\r
-\r
-\r
- // HACK ALERT: When reordering merges we screw up slack computation\r
- // since slack should no longer be used, it is not an issue\r
-\r
-\r
- // we can safely reorder nodes that have one and only one temporal atribute in select list\r
- table_def* table_layout = node->get_fields();\r
- vector<field_entry*> fields = table_layout->get_fields();\r
- int merge_fieldpos = -1;\r
-\r
- data_type* dt;\r
- for (i = 0; i < fields.size(); ++i) {\r
- data_type dt(fields[i]->get_type(),fields[i]->get_modifier_list());\r
- if(dt.is_temporal()) {\r
- if (merge_fieldpos != -1) // more that one temporal field found\r
- return;\r
- merge_fieldpos = i;\r
- }\r
- }\r
-\r
- if (merge_fieldpos == -1) // if no temporal fieldf found\r
- return;\r
-\r
- std::vector<colref_t *> mvars; // the merge-by columns.\r
-\r
- // reodring procedure is different for unary operators and joins\r
- if (node->node_type() == "join_eq_hash_qpn") {\r
- vector<qp_node*> new_nodes;\r
-\r
- tablevar_t *left_table_name;\r
- tablevar_t *right_table_name;\r
- mrg_qpn* left_mrg = (mrg_qpn*)query_plan[node->predecessors[0]];\r
- mrg_qpn* right_mrg = (mrg_qpn*)query_plan[node->predecessors[1]];\r
-\r
- // for now we will only consider plans where both child merges\r
- // merge the same set of streams\r
-\r
- if (left_mrg->fm.size() != right_mrg->fm.size())\r
- return;\r
-\r
- // maping of interface names to table definitions\r
- map<string, tablevar_t*> iface_map;\r
- for (i = 0; i < left_mrg->fm.size(); i++) {\r
- left_table_name = left_mrg->fm[i];\r
- iface_map[left_table_name->get_machine() + left_table_name->get_interface()] = left_table_name;\r
- }\r
-\r
- for (i = 0; i < right_mrg->fm.size(); i++) {\r
- right_table_name = right_mrg->fm[i];\r
-\r
- // find corresponding left tabke\r
- if (!iface_map.count(right_table_name->get_machine() + right_table_name->get_interface()))\r
- return;\r
-\r
- left_table_name = iface_map[right_table_name->get_machine() + right_table_name->get_interface()];\r
-\r
- // create new join nodes\r
- sprintf(node_index, "_%d", i);\r
- join_eq_hash_qpn* new_node = (join_eq_hash_qpn*)node->make_copy(node_index);\r
-\r
- // make a copy of right_table_name\r
- right_table_name = right_table_name->duplicate();\r
- left_table_name->set_range_var(((join_eq_hash_qpn*)node)->from[0]->get_var_name());\r
- right_table_name->set_range_var(((join_eq_hash_qpn*)node)->from[1]->get_var_name());\r
- new_node->from[0] = left_table_name;\r
- new_node->from[1] = right_table_name;\r
- new_nodes.push_back(new_node);\r
- }\r
-\r
- // make right_mrg a new root\r
- right_mrg->set_node_name(node->get_node_name());\r
- right_mrg->table_layout = table_layout;\r
- right_mrg->merge_fieldpos = merge_fieldpos;\r
-\r
- for (i = 0; i < right_mrg->fm.size(); i++) {\r
- // make newly create joins children of merge\r
- sprintf(node_index, "_%d", i);\r
- right_mrg->fm[i] = new tablevar_t(right_mrg->fm[i]->get_machine().c_str(), right_mrg->fm[i]->get_interface().c_str(), (node->get_node_name() + string(node_index)).c_str());\r
- sprintf(node_index, "_m%d", i);\r
- right_mrg->fm[i]->set_range_var(node_index);\r
- right_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos));\r
-\r
- }\r
-\r
- if (left_mrg != right_mrg)\r
- query_plan[node->predecessors[0]] = NULL; // remove left merge from the plan\r
-\r
- query_plan.insert(query_plan.end(), new_nodes.begin(), new_nodes.end());\r
-\r
- } else { // unary operator\r
- // get the child merge node\r
- mrg_qpn* child_mrg = (mrg_qpn*)query_plan[node->predecessors[0]];\r
-\r
- child_mrg->set_node_name(node->get_node_name());\r
- child_mrg->table_layout = table_layout;\r
- child_mrg->merge_fieldpos = merge_fieldpos;\r
-\r
- // create new nodes for every source stream\r
- for (i = 0; i < child_mrg->fm.size(); i++) {\r
- tablevar_t *table_name = child_mrg->fm[i];\r
- sprintf(node_index, "_%d", i);\r
- qp_node* new_node = node->make_copy(node_index);\r
-\r
- if (node->node_type() == "spx_qpn")\r
- ((spx_qpn*)new_node)->table_name = table_name;\r
- else if (node->node_type() == "sgah_qpn")\r
- ((sgah_qpn*)new_node)->table_name = table_name;\r
- else if (node->node_type() == "rsgah_qpn")\r
- ((rsgah_qpn*)new_node)->table_name = table_name;\r
- else if (node->node_type() == "sgahcwcb_qpn")\r
- ((sgahcwcb_qpn*)new_node)->table_name = table_name;\r
- table_name->set_range_var("_t0");\r
-\r
- child_mrg->fm[i] = new tablevar_t(table_name->get_machine().c_str(), table_name->get_interface().c_str(), new_node->get_node_name().c_str());\r
- sprintf(node_index, "_m%d", i);\r
- child_mrg->fm[i]->set_range_var(node_index);\r
- child_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos));\r
-\r
- // add new node to query plan\r
- query_plan.push_back(new_node);\r
- }\r
- }\r
- query_plan[index] = NULL;\r
- generate_linkage();\r
-}\r
-\r
-\r
-// Checks if the node i can be pushed below the merge\r
- bool stream_query::is_pushdown_compatible(int index, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names) {\r
-\r
- int i;\r
- qp_node* node = query_plan[index];\r
- qp_node* child_node = NULL;\r
-\r
- if (node->predecessors.size() != 1)\r
- return false;\r
-\r
- // node predecessor must be merge that combine streams from multiple hosts\r
- child_node = query_plan[node->predecessors[0]];\r
- if (child_node->node_type() != "mrg_qpn")\r
- return false;\r
-\r
- if (!((mrg_qpn*)child_node)->is_multihost_merge())\r
- return false;\r
-\r
- // merge must have only one parent for this optimization to work\r
- // check that all its successors are the same\r
- for (int j = 1; j < child_node->successors.size(); ++j) {\r
- if (child_node->successors[j] != child_node->successors[0])\r
- return false;\r
- }\r
-\r
- // selections can always be pushed down, aggregations can always be split into selection/aggr or aggr/aggr pair\r
- // and pushed down\r
- if (node->node_type() == "spx_qpn")\r
- return true;\r
- else if (node->node_type() == "sgah_qpn")\r
- return true;\r
- else\r
- return false;\r
-\r
- }\r
-\r
-// Push the operator below the merge\r
- void stream_query::pushdown_operator(int index, ext_fcn_list *Ext_fcns, table_list *Schema) {\r
- qp_node* node = query_plan[index];\r
- int i;\r
- char node_suffix[128];\r
-\r
- // we can only safely push down queries that have one and only one temporal atribute in select list\r
- table_def* table_layout = node->get_fields();\r
- vector<field_entry*> fields = table_layout->get_fields();\r
- int merge_fieldpos = -1;\r
-\r
- data_type* dt;\r
- for (i = 0; i < fields.size(); ++i) {\r
- data_type dt(fields[i]->get_type(),fields[i]->get_modifier_list());\r
- if(dt.is_temporal()) {\r
- if (merge_fieldpos != -1) // more that one temporal field found\r
- return;\r
- merge_fieldpos = i;\r
- }\r
- }\r
-\r
- if (merge_fieldpos == -1) // if no temporal field found\r
- return;\r
-\r
- std::vector<colref_t *> mvars; // the merge-by columns.\r
-\r
- fprintf(stderr, "Regular pushdown, query %s of type %s\n", node->get_node_name().c_str(), node->node_type().c_str());\r
-\r
- // get the child merge node\r
- mrg_qpn* child_mrg = (mrg_qpn*)query_plan[node->predecessors[0]];\r
-\r
- tablevar_t *table_name = NULL;\r
-\r
-\r
- if (node->node_type() == "spx_qpn") {\r
- // get the child merge node\r
-\r
- // create new nodes for every source stream\r
- for (i = 0; i < child_mrg->fm.size(); i++) {\r
- table_name = child_mrg->fm[i];\r
- sprintf(node_suffix, "_%d", i);\r
- qp_node* new_node = node->make_copy(node_suffix);\r
-\r
- ((spx_qpn*)new_node)->table_name = table_name;\r
- table_name->set_range_var("_t0");\r
-\r
- child_mrg->fm[i] = new tablevar_t(table_name->get_machine().c_str(), table_name->get_interface().c_str(), new_node->get_node_name().c_str());\r
- sprintf(node_suffix, "_m%d", i);\r
- child_mrg->fm[i]->set_range_var(node_suffix);\r
- child_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos));\r
-\r
- // add new node to query plan\r
- query_plan.push_back(new_node);\r
- }\r
- child_mrg->table_layout = table_layout;\r
- child_mrg->merge_fieldpos = merge_fieldpos;\r
-\r
- } else { // aggregation node\r
-\r
- vector<qp_node*> new_nodes;\r
-\r
- // split aggregations into high and low-level part\r
- vector<qp_node *> split_nodes = ((sgah_qpn*)node)->split_node_for_hfta(Ext_fcns, Schema);\r
- if (split_nodes.size() != 2)\r
- return;\r
-\r
- sgah_qpn* super_aggr = (sgah_qpn*)split_nodes[1];\r
- super_aggr->table_name = ((sgah_qpn*)node)->table_name;\r
-\r
- // group all the sources by host\r
- map<string, vector<int> > host_map;\r
- for (i = 0; i < child_mrg->fm.size(); i++) {\r
- tablevar_t *table_name = child_mrg->fm[i];\r
- if (host_map.count(table_name->get_machine()))\r
- host_map[table_name->get_machine()].push_back(i);\r
- else {\r
- vector<int> tables;\r
- tables.push_back(i);\r
- host_map[table_name->get_machine()] = tables;\r
- }\r
- }\r
-\r
- // create a new merge and low-level aggregation for each host\r
- map<string, vector<int> >::iterator iter;\r
- for (iter = host_map.begin(); iter != host_map.end(); iter++) {\r
- string host_name = (*iter).first;\r
- vector<int> tables = (*iter).second;\r
-\r
- sprintf(node_suffix, "_%s", host_name.c_str());\r
- string suffix(node_suffix);\r
- untaboo(suffix);\r
- mrg_qpn *new_mrg = (mrg_qpn *)child_mrg->make_copy(suffix);\r
- for (i = 0; i < tables.size(); ++i) {\r
- sprintf(node_suffix, "_m%d", i);\r
- new_mrg->fm.push_back(child_mrg->fm[tables[i]]);\r
- new_mrg->mvars.push_back(child_mrg->mvars[i]);\r
- new_mrg->fm[i]->set_range_var(node_suffix);\r
- }\r
- qp_node* new_node = split_nodes[0]->make_copy(suffix);\r
-\r
- if (new_node->node_type() == "spx_qpn") {\r
- ((spx_qpn*)new_node)->table_name = new tablevar_t(host_name.c_str(), "IFACE", new_mrg->get_node_name().c_str());\r
- ((spx_qpn*)new_node)->table_name->set_range_var("_t0");\r
- } else {\r
- ((sgah_qpn*)new_node)->table_name = new tablevar_t(host_name.c_str(), "IFACE", new_mrg->get_node_name().c_str());\r
- ((sgah_qpn*)new_node)->table_name->set_range_var("_t0");\r
- }\r
- query_plan.push_back(new_mrg);\r
- new_nodes.push_back(new_node);\r
- }\r
-\r
- child_mrg->merge_fieldpos = merge_fieldpos;\r
- if (split_nodes[0]->node_type() == "spx_qpn")\r
- child_mrg->table_layout = create_attributes(child_mrg->get_node_name(), ((spx_qpn*)split_nodes[0])->select_list);\r
- else\r
- child_mrg->table_layout = create_attributes(child_mrg->get_node_name(), ((sgah_qpn*)split_nodes[0])->select_list);\r
-\r
-\r
- // connect newly created nodes with parent multihost merge\r
- for (i = 0; i < new_nodes.size(); ++i) {\r
- if (new_nodes[i]->node_type() == "spx_qpn")\r
- child_mrg->fm[i] = new tablevar_t(((spx_qpn*)new_nodes[i])->table_name->get_machine().c_str(), "IFACE", new_nodes[i]->get_node_name().c_str());\r
- else {\r
- child_mrg->fm[i] = new tablevar_t(((sgah_qpn*)new_nodes[i])->table_name->get_machine().c_str(), "IFACE", new_nodes[i]->get_node_name().c_str());\r
-\r
- }\r
- child_mrg->mvars[i]->set_field(child_mrg->table_layout->get_field_name(merge_fieldpos));\r
-\r
- sprintf(node_suffix, "_m%d", i);\r
- child_mrg->fm[i]->set_range_var(node_suffix);\r
- query_plan.push_back(new_nodes[i]);\r
- }\r
- child_mrg->fm.resize(new_nodes.size());\r
- child_mrg->mvars.resize(new_nodes.size());\r
-\r
- // push the new high-level aggregation\r
- query_plan.push_back(super_aggr);\r
-\r
- }\r
- query_plan[index] = NULL;\r
- generate_linkage();\r
-\r
- }\r
-\r
-// Extract subtree rooted at node i into separate hfta\r
-stream_query* stream_query::extract_subtree(int index) {\r
-\r
- vector<int> nodes;\r
- stream_query* new_query = new stream_query(query_plan[index], this);\r
-\r
- nodes.push_back(index);\r
- for (int i = 0; i < nodes.size(); ++i) {\r
- qp_node* node = query_plan[nodes[i]];\r
- if (!node)\r
- continue;\r
-\r
- // add all children to nodes list\r
- for (int j = 0; j < node->predecessors.size(); ++j)\r
- nodes.push_back(node->predecessors[j]);\r
- if (i)\r
- new_query->query_plan.push_back(node);\r
-\r
- query_plan[nodes[i]] = NULL;\r
- }\r
-\r
- return new_query;\r
-}\r
-\r
-// Splits query that combines data from multiple hosts into separate hftas.\r
-vector<stream_query*> stream_query::split_multihost_query() {\r
-\r
- vector<stream_query*> ret;\r
- char node_suffix[128];\r
- int i;\r
-\r
- // find merges combining multiple hosts into per-host groups\r
- int plan_size = query_plan.size();\r
- vector<mrg_qpn*> new_nodes;\r
-\r
- for (i = 0; i < plan_size; ++i) {\r
- qp_node* node = query_plan[i];\r
- if (node && node->node_type() == "mrg_qpn") {\r
- mrg_qpn* mrg = (mrg_qpn*)node;\r
- if (mrg->is_multihost_merge()) {\r
-\r
- // group all the sources by host\r
- map<string, vector<int> > host_map;\r
- for (int j = 0; j < mrg->fm.size(); j++) {\r
- tablevar_t *table_name = mrg->fm[j];\r
- if (host_map.count(table_name->get_machine()))\r
- host_map[table_name->get_machine()].push_back(j);\r
- else {\r
- vector<int> tables;\r
- tables.push_back(j);\r
- host_map[table_name->get_machine()] = tables;\r
- }\r
- }\r
-\r
- // create a new merge for each host\r
- map<string, vector<int> >::iterator iter;\r
- for (iter = host_map.begin(); iter != host_map.end(); iter++) {\r
- string host_name = (*iter).first;\r
- vector<int> tables = (*iter).second;\r
-\r
- if (tables.size() == 1)\r
- continue;\r
-\r
- sprintf(node_suffix, "_%s", host_name.c_str());\r
- string suffix(node_suffix);\r
- untaboo(suffix);\r
- mrg_qpn *new_mrg = (mrg_qpn *)mrg->make_copy(suffix);\r
- for (int j = 0; j < tables.size(); ++j) {\r
- new_mrg->fm.push_back(mrg->fm[tables[j]]);\r
- new_mrg->mvars.push_back(mrg->mvars[j]);\r
- sprintf(node_suffix, "m_%d", j);\r
- new_mrg->fm[j]->set_range_var(node_suffix);\r
- }\r
- new_nodes.push_back(new_mrg);\r
- }\r
-\r
- if (!new_nodes.empty()) {\r
- // connect newly created merge nodes with parent multihost merge\r
- for (int j = 0; j < new_nodes.size(); ++j) {\r
- mrg->fm[j] = new tablevar_t(new_nodes[j]->fm[0]->get_machine().c_str(), "IFACE", new_nodes[j]->get_node_name().c_str());\r
- query_plan.push_back(new_nodes[j]);\r
- }\r
- mrg->fm.resize(new_nodes.size());\r
- mrg->mvars.resize(new_nodes.size());\r
- generate_linkage();\r
- }\r
-\r
-\r
- // now externalize the sources\r
- for (int j = 0; j < node->predecessors.size(); ++j) {\r
- // Extract subtree rooted at node i into separate hfta\r
- stream_query* q = extract_subtree(node->predecessors[j]);\r
- if (q) {\r
- q->generate_linkage();\r
- ret.push_back(q);\r
- }\r
-\r
- }\r
- generate_linkage();\r
- }\r
- }\r
- }\r
-\r
- return ret;\r
-}\r
-\r
-\r
-// Perform local FTA optimizations\r
-void stream_query::optimize(vector<stream_query *>& hfta_list, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ext_fcn_list *Ext_fcns, table_list *Schema, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result){\r
-\r
- // Topologically sort the nodes in query plan (leaf-first)\r
- int i = 0, j = 0;\r
- vector<int> sorted_nodes;\r
-\r
- int num_nodes = query_plan.size();\r
- bool* leaf_flags = new bool[num_nodes];\r
- memset(leaf_flags, 0, num_nodes * sizeof(bool));\r
-\r
- // run topological sort\r
- bool done = false;\r
-\r
- // add all leafs to sorted_nodes\r
- while (!done) {\r
- done = true;\r
- for (i = 0; i < num_nodes; ++i) {\r
- if (!query_plan[i])\r
- continue;\r
-\r
- if (!leaf_flags[i] && query_plan[i]->predecessors.empty()) {\r
- leaf_flags[i] = true;\r
- sorted_nodes.push_back(i);\r
- done = false;\r
-\r
- // remove the node from its parents predecessor lists\r
- // since we only care about number of predecessors, it is sufficient just to remove\r
- // one element from the parent's predecessors list\r
- for (int j = query_plan[i]->successors.size() - 1; j >= 0; --j)\r
- query_plan[query_plan[i]->successors[j]]->predecessors.pop_back();\r
- }\r
- }\r
- }\r
- delete[] leaf_flags;\r
- num_nodes = sorted_nodes.size();\r
- generate_linkage(); // rebuild the recently destroyed predecessor lists.\r
-\r
- // collect the information about interfaces nodes read from\r
- for (i = 0; i < num_nodes; ++i) {\r
- qp_node* node = query_plan[sorted_nodes[i]];\r
- vector<tablevar_t *> input_tables = node->get_input_tbls();\r
- for (j = 0; j < input_tables.size(); ++j) {\r
- tablevar_t * table = input_tables[j];\r
- if (lfta_names.count(table->schema_name)) {\r
- int index = lfta_names[table->schema_name];\r
- table->set_machine(machine_names[index]);\r
- table->set_interface(interface_names[index]);\r
- }\r
- }\r
- }\r
-\r
- /*\r
-\r
- // push eligible operators down in the query plan\r
- for (i = 0; i < num_nodes; ++i) {\r
- if (partn_parse_result && is_partn_compatible(sorted_nodes[i], lfta_names, interface_names, machine_names, ifaces_db, partn_parse_result)) {\r
- pushdown_partn_operator(sorted_nodes[i]);\r
- } else if (is_pushdown_compatible(sorted_nodes[i], lfta_names, interface_names, machine_names)) {\r
- pushdown_operator(sorted_nodes[i], Ext_fcns, Schema);\r
- }\r
- }\r
-\r
- // split the query into multiple hftas if it combines the data from multiple hosts\r
- vector<stream_query*> hftas = split_multihost_query();\r
- hfta_list.insert(hfta_list.end(), hftas.begin(), hftas.end());\r
- */\r
-\r
- num_nodes = query_plan.size();\r
- // also split multi-way merges into two-way merges\r
- for (i = 0; i < num_nodes; ++i) {\r
- qp_node* node = query_plan[i];\r
- if (node && node->node_type() == "mrg_qpn") {\r
- vector<mrg_qpn *> split_merge = ((mrg_qpn *)node)->split_sources();\r
-\r
- query_plan.insert(query_plan.end(), split_merge.begin(), split_merge.end());\r
- // delete query_plan[sorted_nodes[i]];\r
- query_plan[i] = NULL;\r
- }\r
- }\r
-\r
- generate_linkage();\r
-}\r
-\r
-\r
-\r
-table_def *stream_query::get_output_tabledef(){\r
- return( query_plan[qhead]->get_fields() );\r
-}\r
-\r
-\r
-\r
-//////////////////////////////////////////////////////////\r
-//// De-siloing. TO BE REMOVED\r
-\r
-void stream_query::desilo_lftas(map<string, int> &lfta_names,vector<string> &silo_names,table_list *Schema){\r
- int i,t,s;\r
-\r
- int suffix_len = silo_names.back().size();\r
-\r
- for(i=0;i<qtail.size();++i){\r
- vector<tablevar_t *> itbls = query_plan[qtail[i]]->get_input_tbls();\r
- for(t=0;t<itbls.size();++t){\r
- string itbl_name = itbls[t]->get_schema_name();\r
- if(lfta_names.count(itbl_name)>0){\r
-//printf("Query %s input %d references lfta input %s\n",query_plan[qtail[i]]->get_node_name().c_str(),t,itbl_name.c_str());\r
- vector<string> src_names;\r
- string lfta_base = itbl_name.substr(0,itbl_name.size()-suffix_len);\r
- for(s=0;s<silo_names.size();++s){\r
- string lfta_subsilo = lfta_base + silo_names[s];\r
-//printf("\t%s\n",lfta_subsilo.c_str());\r
- src_names.push_back(lfta_subsilo);\r
- }\r
- string merge_node_name = "desilo_"+query_plan[qtail[i]]->get_node_name()+\r
- "_input_"+int_to_string(t);\r
- mrg_qpn *merge_node = new mrg_qpn(merge_node_name,src_names,Schema);\r
- int m_op_pos = Schema->add_table(merge_node->table_layout);\r
- itbls[t]->set_schema(merge_node_name);\r
- itbls[t]->set_schema_ref(m_op_pos);\r
- query_plan.push_back(merge_node);\r
- }\r
- }\r
- }\r
-\r
- generate_linkage();\r
-}\r
-////////////////////////////////////////\r
-/// End de-siloing\r
-\r
-\r
-\r
-// Given a collection of LFTA stream queries,\r
-// extract their WHERE predicates\r
-// and pass them to an analysis routine which will extract\r
-// common elements\r
-//\r
-void get_common_lfta_filter(std::vector<stream_query *> lfta_list,table_list *Schema,ext_fcn_list *Ext_fcns, vector<cnf_set *> &prefilter_preds, set<unsigned int> &pred_ids){\r
- int s;\r
- std::vector< std::vector<cnf_elem *> > where_list;\r
-\r
-// still safe to assume that LFTA queries have a single\r
-// query node, which is at position 0.\r
- for(s=0;s<lfta_list.size();++s){\r
- vector<cnf_elem *> cnf_list = lfta_list[s]->query_plan[0]->get_filter_clause();\r
- if(lfta_list[s]->query_plan[0]->node_type() == "sgah_qpn"){\r
- gb_table *gtbl = ((sgah_qpn *)(lfta_list[s]->query_plan[0]))->get_gb_tbl();\r
- int c;\r
- for(c=0;c<cnf_list.size();++c){\r
- insert_gb_def_pr(cnf_list[c]->pr,gtbl);\r
- }\r
- }\r
- where_list.push_back(lfta_list[s]->query_plan[0]->get_filter_clause());\r
- }\r
-\r
- find_common_filter(where_list,Schema,Ext_fcns,prefilter_preds, pred_ids);\r
-}\r
-\r
-\r
-\r
-\r
-\r
-// Given a collection of LFTA stream queries,\r
-// extract the union of all temporal attributes referenced in select clauses\r
-// those attributes will need to be unpacked in prefilter\r
-//\r
-void get_prefilter_temporal_cids(std::vector<stream_query *> lfta_list, col_id_set &temp_cids){\r
- int s, sl;\r
- vector<scalarexp_t *> sl_list;\r
- gb_table *gb_tbl = NULL;\r
-\r
-\r
-// still safe to assume that LFTA queries have a single\r
-// query node, which is at position 0.\r
- for(s=0;s<lfta_list.size();++s){\r
-\r
- if(lfta_list[s]->query_plan[0]->node_type() == "spx_qpn"){\r
- spx_qpn *spx_node = (spx_qpn *)lfta_list[s]->query_plan[0];\r
- sl_list = spx_node->get_select_se_list();\r
- }\r
- if(lfta_list[s]->query_plan[0]->node_type() == "sgah_qpn"){\r
- sgah_qpn *sgah_node = (sgah_qpn *)lfta_list[s]->query_plan[0];\r
- sl_list = sgah_node->get_select_se_list();\r
- gb_tbl = sgah_node->get_gb_tbl();\r
- }\r
-\r
- for(sl=0;sl<sl_list.size();sl++){\r
- data_type *sdt = sl_list[sl]->get_data_type();\r
- if (sdt->is_temporal()) {\r
- gather_se_col_ids(sl_list[sl],temp_cids, gb_tbl);\r
- }\r
- }\r
- }\r
-}\r
-\r
-\r
+/* ------------------------------------------------
+Copyright 2014 AT&T Intellectual Property
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ------------------------------------------- */
+#include"stream_query.h"
+#include"generate_utils.h"
+#include"analyze_fta.h"
+#include<algorithm>
+#include<utility>
+#include <list>
+
+static char tmpstr[500];
+
+using namespace std;
+
+
+// Create a query plan from a query node and an existing
+// query plan. Use for lfta queries, the parent query plan provides
+// the annotations.
+stream_query::stream_query(qp_node *qnode, stream_query *parent){
+ query_plan.push_back(qnode);
+ qhead = 0;
+ qtail.push_back(0);
+ attributes = qnode->get_fields();
+ parameters = qnode->get_param_tbl();
+ defines = parent->defines;
+ query_name = qnode->get_node_name();
+}
+
+// Copy the query plan.
+stream_query::stream_query(stream_query &src){
+ query_plan = src.query_plan;
+ qhead = src.qhead;
+ qtail = src.qtail;
+ attributes = src.attributes;
+ parameters = src.parameters;
+ defines = src.defines;
+ query_name = src.query_name;
+ gid = src.gid;
+}
+
+
+// Create a query plan from an analyzed parse tree.
+// Perform analyses to find the output node, input nodes, etc.
+
+stream_query::stream_query(query_summary_class *qs,table_list *Schema){
+// Generate the query plan nodes from the analyzed parse tree.
+// There is only one for now, so just assign the return value
+// of create_query_nodes to query_plan
+ error_code = 0;
+ query_plan = create_query_nodes(qs,Schema);
+ int i;
+if(query_plan.size() == 0){
+ fprintf(stderr,"INTERNAL ERROR, zero-size query plan in stream_query::stream_query\n");
+ exit(1);
+}
+ for(i=0;i<query_plan.size();++i){
+ if(query_plan[i]){
+ if(query_plan[i]->get_error_code() != 0){
+ error_code = query_plan[i]->get_error_code();
+ err_str += query_plan[i]->get_error_str();
+ }
+ }
+ }
+ qhead = query_plan.size()-1;
+ gid = -1;
+
+}
+
+
+stream_query * stream_query::add_query(query_summary_class *qs,table_list *Schema){
+// Add another query block to the query plan
+ error_code = 0;
+ vector<qp_node *> new_nodes = create_query_nodes(qs, Schema);
+ query_plan.insert(query_plan.end(),new_nodes.begin(), new_nodes.end());
+ return this;
+}
+
+stream_query * stream_query::add_query(stream_query &src){
+// Add another query block to the query plan
+ error_code = 0;
+ query_plan.insert(query_plan.end(),src.query_plan.begin(), src.query_plan.end());
+ return this;
+}
+
+
+void stream_query::generate_protocol_se(map<string,stream_query *> &sq_map, table_list *Schema){
+ int i,n;
+
+// Mapping fields to protocol fields requires schema binding.
+// First ensure all nodes are in the schema.
+ for(n=0;n<query_plan.size();++n){
+ if(query_plan[n] != NULL){
+ Schema->add_table(query_plan[n]->get_fields());
+ }
+ }
+// Now do schema binding
+ for(n=0;n<query_plan.size();++n){
+ if(query_plan[n] != NULL){
+ query_plan[n]->bind_to_schema(Schema);
+ }
+ }
+
+// create name-to-index map
+ map<string, int> name_to_node;
+ for(n=0;n<query_plan.size();++n){
+ if(query_plan[n]){
+ name_to_node[query_plan[n]->get_node_name()] = n;
+ }
+ }
+
+// Create a list of the nodes to process, in order.
+// Search from the root down.
+// ASSUME tree plan.
+
+ list<int> search_q;
+ list<int> work_list;
+
+ search_q.push_back(qhead);
+ while(! search_q.empty()){
+ int the_q = search_q.front();
+ search_q.pop_front(); work_list.push_front(the_q);
+ vector<int> the_pred = query_plan[the_q]->get_predecessors();
+ for(i=0;i<the_pred.size();i++)
+ search_q.push_back(the_pred[i]);
+ }
+
+// Scan through the work list, from the front,
+// gather the qp_node's ref'd, and call the
+// protocol se generator. Pass NULL for
+// sources not found - presumably user-defined operator
+
+ while(! work_list.empty()){
+ int the_q = work_list.front();
+ work_list.pop_front();
+ vector<qp_node *> q_sources;
+ vector<tablevar_t *> q_input_tbls = query_plan[the_q]->get_input_tbls();
+ for(i=0;i<q_input_tbls.size();++i){
+ string itbl_nm = q_input_tbls[i]->get_schema_name();
+ if(name_to_node.count(itbl_nm)>0){
+ q_sources.push_back(query_plan[name_to_node[itbl_nm]]);
+ }else if(sq_map.count(itbl_nm)>0){
+ q_sources.push_back(sq_map[itbl_nm]->get_query_head());
+ }else{
+ q_sources.push_back(NULL);
+ }
+ }
+ query_plan[the_q]->create_protocol_se(q_sources, Schema);
+ }
+
+//////////////////////////////////////////////////////////
+// trust but verify
+
+
+/*
+ for(i=0;i<query_plan.size();++i){
+ if(query_plan[i]){
+ printf("query node %s, type=%s:\n",query_plan[i]->get_node_name().c_str(),
+ query_plan[i]->node_type().c_str());
+ map<std::string, scalarexp_t *> *pse_map = query_plan[i]->get_protocol_se();
+ map<std::string, scalarexp_t *>::iterator mssi;
+ for(mssi=pse_map->begin();mssi!=pse_map->end();++mssi){
+ if((*mssi).second)
+ printf("\t%s : %s\n",(*mssi).first.c_str(), (*mssi).second->to_string().c_str());
+ else
+ printf("\t%s : NULL\n",(*mssi).first.c_str());
+ }
+ if(query_plan[i]->node_type() == "filter_join" || query_plan[i]->node_type() == "join_eq_hash_qpn"){
+ vector<scalarexp_t *> pse_l;
+ vector<scalarexp_t *> pse_r;
+ if(query_plan[i]->node_type() == "filter_join"){
+ pse_l = ((filter_join_qpn *)query_plan[i])->get_hash_l();
+ pse_r = ((filter_join_qpn *)query_plan[i])->get_hash_r();
+ }
+ if(query_plan[i]->node_type() == "join_eq_hash_qpn"){
+ pse_l = ((join_eq_hash_qpn *)query_plan[i])->get_hash_l();
+ pse_r = ((join_eq_hash_qpn *)query_plan[i])->get_hash_r();
+ }
+ int p;
+ for(p=0;p<pse_l.size();++p){
+ if(pse_l[p] != NULL)
+ printf("\t\t%s = ",pse_l[p]->to_string().c_str());
+ else
+ printf("\t\tNULL = ");
+ if(pse_r[p] != NULL)
+ printf("%s\n",pse_r[p]->to_string().c_str());
+ else
+ printf("NULL\n");
+ }
+ }
+ if(query_plan[i]->node_type() == "sgah_qpn" || query_plan[i]->node_type() == "rsgah_qpn" || query_plan[i]->node_type() == "sgahcwcb_qpn"){
+ vector<scalarexp_t *> pseg;
+ if(query_plan[i]->node_type() == "sgah_qpn")
+ pseg = ((sgah_qpn *)query_plan[i])->get_gb_sources();
+ if(query_plan[i]->node_type() == "rsgah_qpn")
+ pseg = ((rsgah_qpn *)query_plan[i])->get_gb_sources();
+ if(query_plan[i]->node_type() == "sgahcwcb_qpn")
+ pseg = ((sgahcwcb_qpn *)query_plan[i])->get_gb_sources();
+ int g;
+ for(g=0;g<pseg.size();g++){
+ if(pseg[g] != NULL)
+ printf("\t\tgb %d = %s\n",g,pseg[g]->to_string().c_str());
+ else
+ printf("\t\tgb %d = NULL\n",g);
+ }
+ }
+ }
+ }
+*/
+
+}
+
+bool stream_query::generate_linkage(){
+ bool create_failed = false;
+ int n, f,s;
+
+// Clear any leftover linkages
+ for(n=0;n<query_plan.size();++n){
+ if(query_plan[n]){
+ query_plan[n]->clear_predecessors();
+ query_plan[n]->clear_successors();
+ }
+ }
+ qtail.clear();
+
+// create name-to-index map
+ map<string, int> name_to_node;
+ for(n=0;n<query_plan.size();++n){
+ if(query_plan[n]){
+ name_to_node[query_plan[n]->get_node_name()] = n;
+ }
+ }
+
+// Do the 2-way linkage.
+ for(n=0;n<query_plan.size();++n){
+ if(query_plan[n]){
+ vector<tablevar_t *> fm = query_plan[n]->get_input_tbls();
+ for(f=0;f<fm.size();++f){
+ string src_tbl = fm[f]->get_schema_name();
+ if(name_to_node.count(src_tbl)>0){
+ int s = name_to_node[src_tbl];
+ query_plan[n]->add_predecessor(s);
+ query_plan[s]->add_successor(n);
+ }
+ }
+ }
+ }
+
+// Find the head (no successors) and the tails (at least one
+// predecessor is external).
+// Verify that there is only one head,
+// and that all other nodes have one successor (because
+// right now I can only handle trees).
+
+ qhead = -1; // no head yet found.
+ for(n=0;n<query_plan.size();++n){
+ if(query_plan[n]){
+ vector<int> succ = query_plan[n]->get_successors();
+/*
+ if(succ.size() > 1){
+ fprintf(stderr,"ERROR, query node %s supplies data to multiple nodes, but currently only tree query plans are supported:\n",query_plan[n]->get_node_name().c_str());
+ for(s=0;s<succ.size();++s){
+ fprintf(stderr,"%s ",query_plan[succ[s]]->get_node_name().c_str());
+ }
+ fprintf(stderr,"\n");
+ create_failed = true;
+ }
+*/
+ if(succ.size() == 0){
+ if(qhead >= 0){
+ fprintf(stderr,"ERROR, query nodes %s and %s both have zero successors.\n",query_plan[n]->get_node_name().c_str(), query_plan[qhead]->get_node_name().c_str());
+ create_failed = true;
+ }else{
+ qhead = n;
+ }
+ }
+ if(query_plan[n]->n_predecessors() < query_plan[n]->get_input_tbls().size()){
+ qtail.push_back(n);
+ }
+ }
+ }
+
+ return create_failed;
+}
+
+// After the collection of query plan nodes is generated,
+// analyze their structure to link them up into a tree (or dag?).
+// Verify that the structure is acceptable.
+// Do some other analysis and verification tasks (?)
+// then gather summar information.
+int stream_query::generate_plan(table_list *Schema){
+
+// The first thing to do is verify that the query plan
+// nodes were successfully created.
+ bool create_failed = false;
+ int n,f,s;
+ for(n=0;n<query_plan.size();++n){
+ if(query_plan[n]!=NULL && query_plan[n]->get_error_code()){
+ fprintf(stderr,"%s",query_plan[n]->get_error_str().c_str());
+ create_failed = true;
+ }
+ }
+/*
+for(n=0;n<query_plan.size();++n){
+if(query_plan[n] != NULL){
+string nstr = query_plan[n]->get_node_name();
+printf("In generate_plan, node %d is %s, reads from:\n\t",n,nstr.c_str());
+vector<tablevar_t *> inv = query_plan[n]->get_input_tbls();
+int nn;
+for(nn=0;nn<inv.size();nn++){
+printf("%s (%d) ",inv[nn]->to_string().c_str(),inv[nn]->get_schema_ref());
+}
+printf("\n");
+}
+}
+*/
+
+ if(create_failed) return(1);
+
+// Here, link up the query nodes, then verify that the
+// structure is acceptable (single root, no cycles, no stranded
+// nodes, etc.)
+ create_failed = generate_linkage();
+ if(create_failed) return -1;
+
+
+// Here, do optimizations such as predicate pushing,
+// join rearranging, etc.
+// Nothing to do yet.
+
+/*
+for(n=0;n<query_plan.size();++n){
+if(query_plan[n] != NULL){
+string nstr = query_plan[n]->get_node_name();
+printf("B generate_plan, node %d is %s, reads from:\n\t",n,nstr.c_str());
+vector<tablevar_t *> inv = query_plan[n]->get_input_tbls();
+int nn;
+for(nn=0;nn<inv.size();nn++){
+printf("%s (%d) ",inv[nn]->to_string().c_str(),inv[nn]->get_schema_ref());
+}
+printf("\n");
+}
+}
+printf("qhead=%d, qtail = ",qhead);
+int nn;
+for(nn=0;nn<qtail.size();++nn)
+printf("%d ",qtail[nn]);
+printf("\n");
+*/
+
+// Collect query summaries. The query is reperesented by its head node.
+ query_name = query_plan[qhead]->get_node_name();
+ attributes = query_plan[qhead]->get_fields();
+// TODO: The params and defines require a lot more thought.
+ parameters = query_plan[qhead]->get_param_tbl();
+ defines = query_plan[qhead]->get_definitions();
+
+ return(0);
+ };
+
+void stream_query::add_output_operator(ospec_str *o){
+ output_specs.push_back(o);
+}
+
+
+void stream_query::get_external_libs(set<string> &libset){
+
+ int qn,i;
+ for(qn=0;qn<query_plan.size();++qn){
+ if(query_plan[qn] != NULL){
+ vector<string> op_libs = query_plan[qn]->external_libs();
+ for(i=0;i<op_libs.size();++i){
+ libset.insert(op_libs[i]);
+ }
+ }
+ }
+
+ for(qn=0;qn<output_operators.size();++qn){
+ if(output_operators[qn] != NULL){
+ vector<string> op_libs = output_operators[qn]->external_libs();
+ for(i=0;i<op_libs.size();++i){
+ libset.insert(op_libs[i]);
+ }
+ }
+ }
+}
+
+
+
+// Split into LFTA, HFTA components.
+// Split this query into LFTA and HFTA queries.
+// Four possible outcomes:
+// 1) the query reads from a protocol, but does not need to
+// split (can be evaluated as an LFTA).
+// The lfta query is the only element in the return vector,
+// and hfta_returned is false.
+// 2) the query reads from no protocol, and therefore cannot be split.
+// THe hfta query is the only element in the return vector,
+// and hfta_returned is true.
+// 3) reads from at least one protocol, but cannot be split : failure.
+// return vector is empty, the error conditions are written
+// in err_str and error_code
+// 4) The query splits into an hfta query and one or more LFTA queries.
+// the return vector has two or more elements, and hfta_returned
+// is true. The last element is the HFTA.
+
+vector<stream_query *> stream_query::split_query(ext_fcn_list *Ext_fcns, table_list *Schema, bool &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
+ vector<stream_query *> queries;
+ int l,q,s;
+ int qp_hfta;
+
+ hfta_returned = false; // assume until proven otherwise
+
+ for(l=0;l<qtail.size();++l){
+ int leaf = qtail[l];
+
+
+ vector<qp_node *> qnodes = query_plan[leaf]->split_node_for_fta(Ext_fcns, Schema, qp_hfta, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
+
+
+ if(qnodes.size() == 0 || query_plan[leaf]->get_error_code()){ // error
+//printf("error\n");
+ error_code = query_plan[leaf]->get_error_code();
+ err_str = query_plan[leaf]->get_error_str();
+ vector<stream_query *> null_result;
+ return(null_result);
+ }
+ if(qnodes.size() == 1 && qp_hfta){ // nothing happened
+//printf("no change\n");
+ query_plan[leaf] = qnodes[0];
+ }
+ if(qnodes.size() == 1 && !qp_hfta){ // push to lfta
+//printf("lfta only\n");
+ queries.push_back(new stream_query(qnodes[0], this));
+ vector<int> succ = query_plan[leaf]->get_successors();
+ for(s=0;s<succ.size();++s){
+ query_plan[succ[s]]->remove_predecessor(leaf);
+ }
+ query_plan[leaf] = NULL; // delete it?
+ }
+ if(qnodes.size() > 1){ // actual splitting occurred.
+ if(!qp_hfta){ // internal consistency check.
+ error_code = 1;
+ err_str = "INTERNAL ERROR: mulitple nodes returned by split_node_for_fta, but none are hfta nodes.\n";
+ vector<stream_query *> null_result;
+ return(null_result);
+ }
+
+ for(q=0;q<qnodes.size()-qp_hfta;++q){ // process lfta nodes
+//printf("creating lfta %d (%s)\n",q,qnodes[q]->get_node_name().c_str());
+ queries.push_back(new stream_query(qnodes[q], this));
+ }
+// Use new hfta node
+ query_plan[leaf] = qnodes[qnodes.size()-1];
+// Add in any extra hfta nodes
+ for(q=qnodes.size()-qp_hfta;q<qnodes.size()-1;++q)
+ query_plan.push_back(qnodes[q]);
+ }
+ }
+
+ int n_ifprefs = 0;
+ set<string> ifps;
+ for(q=0;q<query_plan.size();++q){
+ if(query_plan[q] != NULL){
+ n_ifprefs += query_plan[q]->count_ifp_refs(ifps);
+ hfta_returned = true;
+ }
+ }
+
+ if(n_ifprefs){
+ set<string>::iterator ssi;
+ err_str += "ERROR, unresolved interface parameters in HFTA:\n";
+ for(ssi=ifps.begin();ssi!=ifps.end();++ssi){
+ err_str += (*ssi)+" ";
+ }
+ err_str += "\n";
+ error_code = 3;
+ vector<stream_query *> null_result;
+ return(null_result);
+ }
+
+
+ if(hfta_returned){
+ if(generate_linkage()){
+ fprintf(stderr,"INTERNAL ERROR, generate_linkage failed in split_query.\n");
+ exit(1);
+ }
+ queries.push_back(this);
+ }
+
+ return(queries);
+}
+
+
+
+vector<table_exp_t *> stream_query::extract_opview(table_list *Schema, vector<query_node *> &qnodes, opview_set &opviews, string silo_nm){
+ vector<table_exp_t *> subqueries;
+ int l,q;
+
+ string root_name = this->get_output_tabledef()->get_tbl_name();
+
+
+ for(l=0;l<qtail.size();++l){
+ int leaf = qtail[l];
+ vector<table_exp_t *> new_qnodes = query_plan[leaf]->extract_opview(Schema, qnodes, opviews, root_name, silo_nm);
+
+ for(q=0;q<new_qnodes.size();++q){ // process lfta nodes
+ subqueries.push_back( new_qnodes[q]);
+ }
+ }
+
+ return(subqueries);
+}
+
+
+
+
+string stream_query::make_schema(){
+ return make_schema(qhead);
+}
+
+string stream_query::make_schema(int q){
+ string ret="FTA{\n\n";
+
+ ret += query_plan[q]->get_fields()->to_string();
+
+ ret += "DEFINE{\n";
+ ret += "\tquery_name '"+query_plan[q]->get_node_name()+"';\n";
+
+ map<string, string> defs = query_plan[q]->get_definitions();
+ map<string, string>::iterator dfi;
+ for(dfi=defs.begin(); dfi!=defs.end(); ++dfi){
+ ret += "\t"+ (*dfi).first + " '" + (*dfi).second + "';\n";
+ }
+ ret += "}\n\n";
+
+ ret += "PARAM{\n";
+ param_table *params = query_plan[q]->get_param_tbl();
+ vector<string> param_names = params->get_param_names();
+ int p;
+ for(p=0;p<param_names.size();p++){
+ data_type *dt = params->get_data_type( param_names[p] );
+ ret += "\t" + param_names[p] + " '" + dt->get_type_str() + "';\n";
+ }
+ ret += "}\n";
+
+ ret += query_plan[q]->to_query_string();
+ ret += "\n}\n\n";
+
+ return(ret);
+
+}
+
+string stream_query::collect_refd_ifaces(){
+ string ret="";
+ int q;
+ for(q=0;q<query_plan.size();++q){
+ if(query_plan[q]){
+ map<string, string> defs = query_plan[q]->get_definitions();
+ if(defs.count("_referenced_ifaces")){
+ if(ret != "") ret += ",";
+ ret += defs["_referenced_ifaces"];
+ }
+ }
+ }
+
+ return ret;
+}
+
+
+bool stream_query::stream_input_only(table_list *Schema){
+ vector<tablevar_t *> input_tbls = this->get_input_tables();
+ int i;
+ for(i=0;i<input_tbls.size();++i){
+ int t = Schema->get_table_ref(input_tbls[i]->get_schema_name());
+ if(Schema->get_schema_type(t) == PROTOCOL_SCHEMA) return(false);
+ }
+ return(true);
+}
+
+// Return input tables. No duplicate removal performed.
+vector<tablevar_t *> stream_query::get_input_tables(){
+ vector<tablevar_t *> retval;
+
+// create name-to-index map
+ int n;
+ map<string, int> name_to_node;
+ for(n=0;n<query_plan.size();++n){
+ if(query_plan[n]){
+ name_to_node[query_plan[n]->get_node_name()] = n;
+ }
+ }
+
+ int l;
+ for(l=0;l<qtail.size();++l){
+ int leaf = qtail[l];
+ vector<tablevar_t *> tmp_v = query_plan[leaf]->get_input_tbls();
+ int i;
+ for(i=0;i<tmp_v.size();++i){
+ if(name_to_node.count(tmp_v[i]->get_schema_name()) == 0)
+ retval.push_back(tmp_v[i]);
+ }
+ }
+ return(retval);
+}
+
+
+void stream_query::compute_node_format(int q, vector<int> &nfmt, map<string, int> &op_idx){
+ int netcnt = 0, hostcnt = 0;
+ int i;
+
+ vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
+ for(i=0;i<itbls.size();++i){
+ string tname = itbls[i]->get_schema_name();
+ if(op_idx.count(tname)){
+ int o = op_idx[tname];
+ if(nfmt[o] == UNKNOWNFORMAT)
+ compute_node_format(o,nfmt,op_idx);
+ if(nfmt[o] == NETFORMAT) netcnt++;
+ else hostcnt++;
+ }else{
+ netcnt++;
+ }
+ }
+ if(query_plan[q]->makes_transform()){
+ nfmt[q] = HOSTFORMAT;
+ }else{
+ if(hostcnt>0){
+ nfmt[q] = HOSTFORMAT;
+ }else{
+ nfmt[q] = NETFORMAT;
+ }
+ }
+//printf("query plan %d (%s) is ",q,query_plan[q]->get_node_name().c_str());
+//if(nfmt[q] == HOSTFORMAT) printf(" host format.\n");
+//else printf("net format\n");
+}
+
+
+string stream_query::generate_hfta(table_list *Schema, ext_fcn_list *Ext_fcns, opview_set &opviews, bool distributed_mode){
+ int schref, ov_ix, i, q, param_sz;
+ bool dag_graph = false;
+
+
+// Bind the SEs in all query plan nodes to this schema, and
+// Add all tables used by this query to the schema.
+// Question: Will I be polluting the global schema by adding all
+// query node schemas?
+
+// First ensure all nodes are in the schema.
+ int qn;
+ for(qn=0;qn<query_plan.size();++qn){
+ if(query_plan[qn] != NULL){
+ Schema->add_table(query_plan[qn]->get_fields());
+ }
+ }
+// Now do binding.
+ for(qn=0;qn<query_plan.size();++qn){
+ if(query_plan[qn] != NULL){
+ query_plan[qn]->bind_to_schema(Schema);
+ }
+ }
+
+// Is it a DAG plan?
+ set<string> qsources;
+ int n;
+ for(n=0;n<query_plan.size();++n){
+ if(query_plan[n]){
+ vector<tablevar_t *> tmp_v = query_plan[n]->get_input_tbls();
+ int i;
+ for(i=0;i<tmp_v.size();++i){
+ if(qsources.count(tmp_v[i]->get_schema_name()) > 0)
+ dag_graph = true;
+ qsources.insert(tmp_v[i]->get_schema_name());
+ }
+ }
+ }
+
+
+
+// Collect set of tables ref'd in this HFTA
+ set<int> tbl_set;
+ for(qn=0;qn<query_plan.size();++qn){
+ if(query_plan[qn]){
+// get names of the tables
+ vector<tablevar_t *> input_tbls = query_plan[qn]->get_input_tbls();
+ vector<tablevar_t *> output_tbls = query_plan[qn]->get_output_tbls();
+// Convert to tblrefs, add to set of ref'd tables
+ int i;
+ for(i=0;i<input_tbls.size();i++){
+// int t = Schema->get_table_ref(input_tbls[i]->get_schema_name());
+ int t = input_tbls[i]->get_schema_ref();
+ if(t < 0){
+ fprintf(stderr,"INTERNAL ERROR in generate_hfta. "
+ "query plan node %s references input table %s, which is not in schema.\n",
+ query_name.c_str(), input_tbls[i]->get_schema_name().c_str());
+ exit(1);
+ }
+ tbl_set.insert(t);
+ }
+
+ for(i=0;i<output_tbls.size();i++){
+ int t = Schema->get_table_ref(output_tbls[i]->get_schema_name());
+ if(t < 0){
+ fprintf(stderr,"INTERNAL ERROR in generate_hfta."
+ "query plan node %s references output table %s, which is not in schema.\n",
+ query_name.c_str(), output_tbls[i]->get_schema_name().c_str());
+ exit(1);
+ }
+ tbl_set.insert(t);
+ }
+ }
+ }
+
+// Collect map of lftas, query nodes
+ map<string, int> op_idx;
+ for(q=0;q<query_plan.size();q++){
+ if(query_plan[q]){
+ op_idx[query_plan[q]->get_node_name()] = q;
+ }
+ }
+
+// map of input tables must include query id and input
+// source (0,1) becuase several queries might reference the same source
+ vector<tablevar_t *> input_tbls = this->get_input_tables();
+ vector<bool> input_tbl_free;
+ for(i=0;i<input_tbls.size();++i){
+ input_tbl_free.push_back(true);
+ }
+ map<string, int> lfta_idx;
+//fprintf(stderr,"%d input tables, %d query nodes\n",input_tbls.size(), query_plan.size());
+ for(q=0;q<query_plan.size();q++){
+ if(query_plan[q]){
+ vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
+ int it;
+ for(it=0;it<itbls.size();it++){
+ string tname = itbls[it]->get_schema_name()+"-"+int_to_string(q)+"-"+int_to_string(it);
+ string src_tblname = itbls[it]->get_schema_name();
+ bool src_is_external = false;
+ for(i=0;i<input_tbls.size();++i){
+ if(src_tblname == input_tbls[i]->get_schema_name()){
+ src_is_external = true;
+ if(input_tbl_free[i]){
+ lfta_idx[tname] = i;
+ input_tbl_free[i] = false;
+//fprintf(stderr,"Adding %s (src_tblname=%s, q=%d, it=%d) to %d.\n",tname.c_str(), src_tblname.c_str(), q, it, i);
+ break;
+ }
+ }
+ }
+ if(i==input_tbls.size() && src_is_external){
+ fprintf(stderr,"INTERNAL ERROR in stream_query::generate_hfta, can't find free entry in input_tbls for query %d, intput %d (%s)\n",q,it,src_tblname.c_str());
+ exit(1);
+ }
+ }
+ }
+ }
+/*
+ for(i=0;i<input_tbls.size();++i){
+ string src_tblname = input_tbls[i]->get_schema_name();
+ lfta_idx[src_tblname] = i;
+ }
+*/
+
+// Compute the output formats of the operators.
+ vector<int> node_fmt(query_plan.size(),UNKNOWNFORMAT);
+ compute_node_format(qhead, node_fmt, op_idx);
+
+
+// Generate the schema strings for the outputs.
+ string schema_str;
+ for(i=0;i<query_plan.size();++i){
+ if(i != qhead && query_plan[i]){
+ string schema_tmpstr = this->make_schema(i);
+ schema_str += "gs_csp_t node"+int_to_string(i)+"_schema = "+make_C_embedded_string(schema_tmpstr)+";\n";
+ }
+ }
+
+ attributes = query_plan[qhead]->get_fields();
+
+
+ string schema_tmpstr = this->make_schema();
+ schema_str += "gs_csp_t "+generate_schema_string_name(query_name)+" = "+make_C_embedded_string(schema_tmpstr)+";\n";
+
+// Generate the collection of tuple defs.
+
+ string tuple_defs = "\n/*\tDefine tuple structures \t*/\n\n";
+ set<int>::iterator si;
+ for(si=tbl_set.begin(); si!=tbl_set.end(); ++si){
+ tuple_defs += generate_host_tuple_struct( Schema->get_table( (*si) ));
+ tuple_defs += "\n\n";
+ }
+
+// generate the finalize tuple function
+ string finalize_str = generate_hfta_finalize_tuple(attributes);
+
+// Analyze and make the output operators
+ bool eat_input = false;
+ string src_op = query_name;
+ string pred_op = src_op;
+ int last_op = -1;
+ if(output_specs.size()>0)
+ eat_input = true;
+ if(n_successors>0)
+ eat_input = false; // must create stream output for successor HFTAs.
+ int n_filestreams = 0;
+ for(i=0;i<output_specs.size();++i){
+ if(output_specs[i]->operator_type == "stream" ){
+ eat_input = false;
+ }
+ if(output_specs[i]->operator_type == "file" || output_specs[i]->operator_type == "zfile" ){
+ last_op = i;
+ n_filestreams++;
+ }
+ }
+ int filestream_id = 0;
+ for(i=0;i<output_specs.size();++i){
+ if(output_specs[i]->operator_type == "file" || output_specs[i]->operator_type == "zfile"){
+
+ int n_fstreams = output_specs[i]->n_partitions / n_parallel;
+ if(n_fstreams * n_parallel < output_specs[i]->n_partitions){
+ n_fstreams++;
+ if(n_parallel == 1 || query_name.find("__copy1") != string::npos){
+ fprintf(stderr,"WARNING, in query %s, %d streams requested for %s output, but it must be a multiple of the hfta parallelism (%d), increasing number of output streams to %d.\n",query_name.c_str(), output_specs[i]->n_partitions, output_specs[i]->operator_type.c_str(), n_parallel, n_fstreams*n_parallel);
+ }
+ }
+// output_file_qpn *new_ofq = new output_file_qpn();
+ string filestream_tag = "";
+ if(n_filestreams>1){
+ filestream_tag = "_fileoutput"+int_to_string(filestream_id);
+ filestream_id++;
+ }
+ output_file_qpn *new_ofq = new output_file_qpn(pred_op, src_op, filestream_tag, query_plan[qhead]->get_fields(), output_specs[i], (i==last_op ? eat_input : false) );
+// if(n_fstreams > 1){
+ if(n_fstreams > 0){
+ string err_str;
+ bool err_ret = new_ofq->set_splitting_params(n_parallel,parallel_idx,n_fstreams,output_specs[i]->partitioning_flds,err_str);
+ if(err_ret){
+ fprintf(stderr,"%s",err_str.c_str());
+ exit(1);
+ }
+ }
+ output_operators.push_back(new_ofq );
+ pred_op = output_operators.back()->get_node_name();
+ }else if(! (output_specs[i]->operator_type == "stream" || output_specs[i]->operator_type == "Stream" || output_specs[i]->operator_type == "STREAM") ){
+ fprintf(stderr,"WARNING, output operator type %s (on query %s) is not supported, ignoring\n",output_specs[i]->operator_type.c_str(),query_name.c_str() );
+ }
+ }
+
+
+
+// Generate functors for the query nodes.
+
+ string functor_defs = "\n/*\tFunctor definitions\t*/\n\n";
+ for(qn=0;qn<query_plan.size();++qn){
+ if(query_plan[qn]!=NULL){
+// Compute whether the input needs a ntoh xform.
+ vector<bool> needs_xform;
+ vector<tablevar_t *> itbls = query_plan[qn]->get_input_tbls();
+ for(i=0;i<itbls.size();++i){
+ string tname = itbls[i]->get_schema_name();
+// if(query_plan[qn]->makes_transform()){
+ if(op_idx.count(tname)>0){
+ if(node_fmt[ op_idx[tname] ] == NETFORMAT){
+ needs_xform.push_back(true);
+ }else{
+ needs_xform.push_back(false);
+ }
+ }else{
+ needs_xform.push_back(true);
+ }
+// }else{
+// if(op_idx.count(tname)>0){
+// if(node_fmt[qn] != node_fmt[ op_idx[tname] ]){
+// needs_xform.push_back(true);
+// }else{
+// needs_xform.push_back(false);
+// }
+// }else{
+// if(node_fmt[qn] == HOSTFORMAT){
+// needs_xform.push_back(true);
+// }else{
+// needs_xform.push_back(false);
+// }
+// }
+// }
+ }
+
+ functor_defs += query_plan[qn]->generate_functor(Schema, Ext_fcns, needs_xform);
+ }
+ }
+
+// Generate output operator functors
+
+ vector<bool> needs_xform;
+ for(i=0;i<output_operators.size();++i)
+ functor_defs += output_operators[i]->generate_functor(Schema, Ext_fcns, needs_xform);
+
+ string ret =
+"extern \"C\" {\n"
+"#include <lapp.h>\n"
+"#include <fta.h>\n"
+"#include <gshub.h>\n"
+"#include <stdlib.h>\n"
+"#include <stdio.h>\n"
+"#include <limits.h>\n"
+"}\n"
+;
+ if(dag_graph)
+ ret +=
+"#define PLAN_DAG\n"
+;
+ ret +=
+"#include <schemaparser.h>\n"
+"#include<hfta_runtime_library.h>\n"
+"\n"
+"#include <host_tuple.h>\n"
+"#include <hfta.h>\n"
+"#include <hfta_udaf.h>\n"
+"#include <hfta_sfun.h>\n"
+"\n"
+//"#define MAXSCHEMASZ 16384\n"
+"#include <stdio.h>\n\n"
+;
+
+// Get include file for each of the operators.
+// avoid duplicate inserts.
+ set<string> include_fls;
+ for(qn=0;qn<query_plan.size();++qn){
+ if(query_plan[qn] != NULL)
+ include_fls.insert(query_plan[qn]->get_include_file());
+ }
+ for(i=0;i<output_operators.size();++i)
+ include_fls.insert(output_operators[i]->get_include_file());
+ set<string>::iterator ssi;
+ for(ssi=include_fls.begin();ssi!=include_fls.end();++ssi)
+ ret += (*ssi);
+
+// Add defines for hash functions
+ ret +=
+"\n"
+"#define hfta_BOOL_to_hash(x) (x)\n"
+"#define hfta_USHORT_to_hash(x) (x)\n"
+"#define hfta_UINT_to_hash(x) (x)\n"
+"#define hfta_IP_to_hash(x) (x)\n"
+"#define hfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
+"#define hfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
+"#define hfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
+"#define hfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
+"#define hfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
+"\n"
+;
+
+// ret += "#define SERIOUS_LFTA \""+input_tbls[0]->get_schema_name()+"\"\n";
+ ret += "#define OUTPUT_HFTA \""+query_name+"\"\n\n";
+
+// HACK ALERT: I know for now that all query plans are
+// single operator plans, but while SPX and SGAH can use the
+// UNOP template, the merge operator must use the MULTOP template.
+// WORSE HACK ALERT : merge does not translate its input,
+// so don't apply finalize to the output.
+// TODO: clean this up.
+
+// string node_type = query_plan[0]->node_type();
+
+ ret += schema_str;
+ ret += tuple_defs;
+
+// Need to work on the input, output xform logic.
+// For now, just add it in.
+// ret += finalize_str;
+
+ if(node_fmt[qhead] == NETFORMAT){
+ ret +=
+"void finalize_tuple(host_tuple &tup){\n"
+"return;\n"
+"}\n"
+"\n";
+ }else{
+ ret += finalize_str;
+ }
+
+ ret += functor_defs;
+
+// Parameter block management
+// The proper parameter block must be transmitted to each
+// external stream source.
+// There is a 1-1 mapping between the param blocks returned
+// by this list and the registered data sources ...
+// TODO: make this more manageable, but for now
+// there is no parameter block manipulation so I just
+// need to have the same number.
+
+ ret +=
+"int get_lfta_params(gs_int32_t sz, void * value,list<param_block>& lst){\n"
+" // for now every lfta receive the full copy of hfta parameters\n"
+" struct param_block pb;\n";
+
+ set<string> lfta_seen;
+ for(i=0;i<input_tbls.size();++i){
+ string src_tblname = input_tbls[i]->get_schema_name();
+ if(lfta_seen.count(src_tblname) == 0){
+ lfta_seen.insert(src_tblname);
+ schref = input_tbls[i]->get_schema_ref();
+ if(Schema->get_schema_type(schref) == OPERATOR_VIEW_SCHEMA){
+ ov_ix = input_tbls[i]->get_opview_idx();
+ opview_entry *opv = opviews.get_entry(ov_ix);
+ string op_param = "SUBQ:";
+ int q;
+ for(q=0;q<opv->subq_names.size();++q){
+ if(q>0) op_param+=",";
+ op_param+=opv->subq_names[q];
+ }
+ op_param+="\\n";
+ param_sz = op_param.size()-1;
+
+ sprintf(tmpstr,"\t\tpb.block_length = %d;\n",param_sz); ret+=tmpstr;
+ ret+=
+" pb.data = malloc(pb.block_length);\n";
+ ret+="\t\tmemcpy(pb.data,\""+op_param+"\",pb.block_length);\n"
+" lst.push_back(pb);\n\n";
+ }else{
+ ret+=
+" pb.block_length = sz;\n"
+" pb.data = malloc(pb.block_length);\n"
+" memcpy(pb.data, value, pb.block_length);\n"
+" lst.push_back(pb);\n\n";
+ }
+ }
+ }
+
+ ret +=
+" return 0;\n"
+"}\n"
+"\n";
+
+ ret+=
+"struct FTA* alloc_hfta (struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command, gs_int32_t sz, void * value ) {\n"
+"\n"
+" // find the lftas\n"
+" list<lfta_info*> *lfta_list = new list<lfta_info*>;\n"
+"\n"
+" FTAID f;\n"
+" char schemabuf[MAXSCHEMASZ];\n"
+" gs_schemahandle_t schema_handle;\n"
+"\n";
+
+// Register the input data sources.
+// Register a source only once.
+
+// vector<string> ext_reg_txt;
+ map<string, int> input_tbl_srcid;
+ for(i=0;i<input_tbls.size();++i){
+ string src_tblname = input_tbls[i]->get_schema_name();
+// Use UDOP alias when in distributed mode.
+// the cluster manager will make the translation
+// using infr from qtree.xml
+ if(distributed_mode && input_tbls[i]->get_udop_alias() != "")
+ src_tblname = input_tbls[i]->get_udop_alias();
+ if(input_tbl_srcid.count(src_tblname) == 0){
+ int srcid = input_tbl_srcid.size();
+ input_tbl_srcid[src_tblname] = srcid;
+ string tmp_s=
+"\n // find "+src_tblname+"\n"
+" if (fta_find(\""+src_tblname+"\",1,&f,schemabuf,MAXSCHEMASZ)!=0) {\n"
+" fprintf(stderr,\"HFTA::error:could not find LFTA \\n\");\n"
+" return 0;\n"
+" }\n"
+" //fprintf(stderr,\"HFTA::FTA found at %u[%u]\\n\",ftamsgid,ftaindex);\n"
+"\n"
+" // parse the schema and get the schema handle\n"
+" schema_handle = ftaschema_parse_string(schemabuf);\n"
+" lfta_info* inf"+int_to_string(srcid)+" = new lfta_info();\n"
+" inf"+int_to_string(srcid)+"->f = f;\n"
+" inf"+int_to_string(srcid)+"->fta_name = strdup(\""+src_tblname+"\");\n"
+" inf"+int_to_string(srcid)+"->schema = strdup(schemabuf);\n"
+" inf"+int_to_string(srcid)+"->schema_handle = schema_handle;\n"
+" lfta_list->push_back(inf"+int_to_string(srcid)+");\n\n";
+// ext_reg_txt.push_back(tmp_s);
+ ret += tmp_s;
+ }
+ }
+
+ ret+="\n";
+ ret += "\tgs_schemahandle_t root_schema_handle = ftaschema_parse_string("+generate_schema_string_name(query_name)+");\n";
+ for(i=0;i<query_plan.size();++i){
+ if(i != qhead && query_plan[i]){
+ ret += "\tgs_schemahandle_t op"+int_to_string(i)+"_schema_handle = ftaschema_parse_string(node"+int_to_string(i)+"_schema);\n";
+ }
+ }
+ ret+="\n";
+
+// Create the operators.
+ int n_basic_ops;
+ for(q=0;q<query_plan.size();q++){
+ if(query_plan[q]){
+ ret+=
+"\n"
+" // create an instance of operator "+int_to_string(q)+" ("+query_plan[q]->get_node_name()+") \n";
+
+// Create parameters for operator construction.
+ string op_params;
+ vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
+ string tname = itbls[0]->get_schema_name();
+// string li_tname = tname +"-"+int_to_string(q)+"-0";
+// if(lfta_idx.count(li_tname)>0)
+ if(input_tbl_srcid.count(tname)>0){
+// ret += ext_reg_txt[lfta_idx[li_tname]];
+// op_params += "inf"+int_to_string( lfta_idx[li_tname] )+"->schema_handle";
+ op_params += "inf"+int_to_string( input_tbl_srcid[tname] )+"->schema_handle";
+ }else if(op_idx.count(tname)>0){
+ op_params += "op"+int_to_string( op_idx[tname] )+"_schema_handle";
+ }else{
+ fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when creating operator (3) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str());
+ exit(1);
+ }
+ if(itbls.size()>1){
+ string tname = itbls[1]->get_schema_name();
+// string li_tname = tname +"-"+int_to_string(q)+"-1";
+// if(lfta_idx.count(li_tname)>0)
+ if(input_tbl_srcid.count(tname)>0){
+// ret += ext_reg_txt[lfta_idx[li_tname]];
+// op_params += ",inf"+int_to_string( lfta_idx[li_tname] )+"->schema_handle";
+ op_params += ",inf"+int_to_string( input_tbl_srcid[tname] )+"->schema_handle";
+ }else if(op_idx.count(tname)>0){
+ op_params += ",op"+int_to_string( op_idx[tname] )+"_schema_handle";
+ }else{
+ fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when creating operator (4) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str());
+ exit(1);
+ }
+ }
+ ret += query_plan[q]->generate_operator(q,op_params);
+ ret +=
+" operator_node* node"+int_to_string(q)+" = new operator_node(op"+int_to_string(q)+");\n";
+
+ n_basic_ops = q;
+ }
+ }
+ n_basic_ops++;
+// Next for the output operators if any
+ for(i=0;i<output_operators.size();++i){
+ ret += output_operators[i]->generate_operator(n_basic_ops+i,"root_schema_handle");
+ ret +=
+" operator_node* node"+int_to_string(n_basic_ops+i)+" = new operator_node(op"+int_to_string(n_basic_ops+i)+");\n";
+ }
+
+
+// Link up operators.
+ for(q=0;q<query_plan.size();++q){
+ if(query_plan[q]){
+// NOTE: this code assume that the operator has at most
+// two inputs. But the template code also makes
+// this assumption. Both will need to be changed.
+ vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
+ string tname = itbls[0]->get_schema_name();
+// string li_tname = tname +"-"+int_to_string(q)+"-0";
+// if(lfta_idx.count(li_tname)>0)
+ if(input_tbl_srcid.count(tname)>0){
+// ret += "\tnode"+int_to_string(q)+"->set_left_lfta(inf"+int_to_string( lfta_idx[li_tname] )+");\n";
+ ret += "\tnode"+int_to_string(q)+"->set_left_lfta(inf"+int_to_string( input_tbl_srcid[tname] )+");\n";
+ }else if(op_idx.count(tname)>0){
+ ret += "\tnode"+int_to_string(q)+"->set_left_child_node(node"+int_to_string( op_idx[tname] )+");\n";
+ }else{
+ fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when linking operator (1) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str());
+ exit(1);
+ }
+ if(itbls.size()>1){
+ string tname = itbls[1]->get_schema_name();
+// string li_tname = tname +"-"+int_to_string(q)+"-1";
+// if(lfta_idx.count(li_tname)>0)
+ if(input_tbl_srcid.count(tname)>0){
+// ret += "\tnode"+int_to_string(q)+"->set_right_lfta(inf"+int_to_string( lfta_idx[li_tname] )+");\n";
+ ret += "\tnode"+int_to_string(q)+"->set_right_lfta(inf"+int_to_string( input_tbl_srcid[tname] )+");\n";
+ }else if(op_idx.count(tname)>0){
+ ret += "\tnode"+int_to_string(q)+"->set_right_child_node(node"+int_to_string( op_idx[tname] )+");\n";
+ }else{
+ fprintf(stderr,"INTERNAL ERROR, can't identify input table %s (%s) when linking operator (2) %d (%s)\n",tname.c_str(), tname.c_str(),q,query_plan[q]->get_node_name().c_str());
+ exit(1);
+ }
+ }
+ }
+ }
+ for(i=0;i<output_operators.size();++i){
+ if(i==0)
+ ret += "\tnode"+int_to_string(n_basic_ops)+"->set_left_child_node(node"+int_to_string( qhead )+");\n";
+ else
+ ret += "\tnode"+int_to_string(n_basic_ops+i)+"->set_left_child_node(node"+int_to_string( n_basic_ops+i-1 )+");\n";
+ }
+
+ // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
+
+ bool fta_reusable = false;
+ if (query_plan[qhead]->get_val_of_def("reusable") == "yes" ||
+ query_plan[qhead]->get_param_tbl()->size() == 0) {
+ fta_reusable = 1;
+ }
+
+ int root_node = qhead;
+ if(output_operators.size()>0)
+ root_node = n_basic_ops+i-1;
+
+ ret+=
+"\n"
+"\n"
+" MULTOP_HFTA* ftap = new MULTOP_HFTA(ftaid, OUTPUT_HFTA, command, sz, value, root_schema_handle, node"+int_to_string(root_node)+", lfta_list, " + (fta_reusable ?"true":"false") + ", reusable);\n"
+" if(ftap->init_failed()){ delete ftap; return 0;}\n"
+" return (FTA*)ftap;\n"
+"}\n"
+"\n"
+"\n";
+
+
+ string comm_bufsize = "16*1024*1024";
+ if(defines.count("hfta_comm_buf")>0){
+ comm_bufsize = defines["hfta_comm_buf"];
+ }
+
+ ret+=
+"\n"
+"int main(int argc, char * argv[]) {\n"
+"\n"
+"\n"
+" /* parse the arguments */\n"
+"\n"
+" gs_int32_t tip1,tip2,tip3,tip4;\n"
+" endpoint gshub;\n"
+" gs_sp_t instance_name;\n"
+" if (argc<3) {\n"
+" gslog(LOG_EMERG,\"Wrong arguments at startup\");\n"
+" exit(1);\n"
+" }\n"
+"\n"
+" if ((sscanf(argv[1],\"%u.%u.%u.%u:%hu\",&tip1,&tip2,&tip3,&tip4,&(gshub.port)) != 5)) {\n"
+" gslog(LOG_EMERG,\"HUB IP NOT DEFINED\");\n"
+" exit(1);\n"
+" }\n"
+" gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);\n"
+" gshub.port=htons(gshub.port);\n"
+" instance_name=strdup(argv[2]);\n"
+" if (set_hub(gshub)!=0) {\n"
+" gslog(LOG_EMERG,\"Could not set hub\");\n"
+" exit(1);\n"
+" }\n"
+" if (set_instance_name(instance_name)!=0) {\n"
+" gslog(LOG_EMERG,\"Could not set instance name\");\n"
+" exit(1);\n"
+" }\n"
+"\n"
+"\n"
+" /* initialize host library */\n"
+"\n"
+//" fprintf(stderr,\"Initializing gscp\\n\");\n"
+" gsopenlog(argv[0]);\n"
+"\n"
+" if (hostlib_init(HFTA, "+comm_bufsize+", DEFAULTDEV, 0, 0)!=0) {\n"
+" fprintf(stderr,\"%s::error:could not initialize gscp\\n\",\n"
+" argv[0]);\n"
+" exit(1);\n"
+" }\n"
+"\n"
+"\n"
+"\n"
+" FTAID ret = fta_register(OUTPUT_HFTA, " + (fta_reusable?"1":"0") + ", DEFAULTDEV, alloc_hfta, "+generate_schema_string_name(query_name)+", -1, 0ull);\n"
+" fta_start_service(-1);\n"
+"\n"
+" return 0;\n"
+"\n"
+"}\n"
+"\n";
+////////////////////
+ return(ret);
+}
+
+
+// Checks if the node i is compatible with interface partitioning
+// (can be pushed below the merge that combines partitioned stream)
+// i - index of the query node in a query plan
+bool stream_query::is_partn_compatible(int index, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result) {
+ int i;
+ qp_node* node = query_plan[index];
+ qp_node* child_node = NULL;
+
+ if (node->predecessors.empty())
+ return false;
+
+ // all the node predecessors must be partition merges with the same partition definition
+ partn_def_t* partn_def = NULL;
+ for (i = 0; i < node->predecessors.size(); ++i) {
+ child_node = query_plan[node->predecessors[i]];
+ if (child_node->node_type() != "mrg_qpn")
+ return false;
+
+ // merge must have only one parent for this optimization to work
+ // check that all its successors are the same
+ for (int j = 1; j < child_node->successors.size(); ++j) {
+ if (child_node->successors[j] != child_node->successors[0])
+ return false;
+ }
+
+ partn_def_t* new_partn_def = ((mrg_qpn*)child_node)->get_partn_definition(lfta_names, interface_names, machine_names, ifaces_db, partn_parse_result);
+ if (!new_partn_def)
+ return false;
+ if (!i)
+ partn_def = new_partn_def;
+ else if (new_partn_def != partn_def)
+ return false;
+
+ }
+
+ if (node->node_type() == "spx_qpn") // spx nodes are always partition compatible
+ return true;
+ else if (node->node_type() == "sgah_qpn") {
+ gb_table gb_tbl = ((sgah_qpn*)node)->gb_tbl;
+ return true; //partn_def->is_compatible(&gb_tbl);
+ }
+ else if (node->node_type() == "rsgah_qpn") {
+ gb_table gb_tbl = ((rsgah_qpn*)node)->gb_tbl;
+ return partn_def->is_compatible(&gb_tbl);
+ }
+ else if (node->node_type() == "sgahcwcb_qpn") {
+ gb_table gb_tbl = ((sgahcwcb_qpn*)node)->gb_tbl;
+ return partn_def->is_compatible(&gb_tbl);
+ }
+ else if (node->node_type() == "join_eq_hash_qpn") {
+ return true;
+ }
+ else
+ return false;
+}
+
+// Push the operator below the merge that combines
+void stream_query::pushdown_partn_operator(int index) {
+ qp_node* node = query_plan[index];
+ int i;
+ char node_index[128];
+
+// fprintf(stderr, "Partn pushdown, query %s of type %s\n", node->get_node_name().c_str(), node->node_type().c_str());
+
+
+ // HACK ALERT: When reordering merges we screw up slack computation
+ // since slack should no longer be used, it is not an issue
+
+
+ // we can safely reorder nodes that have one and only one temporal atribute in select list
+ table_def* table_layout = node->get_fields();
+ vector<field_entry*> fields = table_layout->get_fields();
+ int merge_fieldpos = -1;
+
+ data_type* dt;
+ for (i = 0; i < fields.size(); ++i) {
+ data_type dt(fields[i]->get_type(),fields[i]->get_modifier_list());
+ if(dt.is_temporal()) {
+ if (merge_fieldpos != -1) // more that one temporal field found
+ return;
+ merge_fieldpos = i;
+ }
+ }
+
+ if (merge_fieldpos == -1) // if no temporal fieldf found
+ return;
+
+ std::vector<colref_t *> mvars; // the merge-by columns.
+
+ // reodring procedure is different for unary operators and joins
+ if (node->node_type() == "join_eq_hash_qpn") {
+ vector<qp_node*> new_nodes;
+
+ tablevar_t *left_table_name;
+ tablevar_t *right_table_name;
+ mrg_qpn* left_mrg = (mrg_qpn*)query_plan[node->predecessors[0]];
+ mrg_qpn* right_mrg = (mrg_qpn*)query_plan[node->predecessors[1]];
+
+ // for now we will only consider plans where both child merges
+ // merge the same set of streams
+
+ if (left_mrg->fm.size() != right_mrg->fm.size())
+ return;
+
+ // maping of interface names to table definitions
+ map<string, tablevar_t*> iface_map;
+ for (i = 0; i < left_mrg->fm.size(); i++) {
+ left_table_name = left_mrg->fm[i];
+ iface_map[left_table_name->get_machine() + left_table_name->get_interface()] = left_table_name;
+ }
+
+ for (i = 0; i < right_mrg->fm.size(); i++) {
+ right_table_name = right_mrg->fm[i];
+
+ // find corresponding left tabke
+ if (!iface_map.count(right_table_name->get_machine() + right_table_name->get_interface()))
+ return;
+
+ left_table_name = iface_map[right_table_name->get_machine() + right_table_name->get_interface()];
+
+ // create new join nodes
+ sprintf(node_index, "_%d", i);
+ join_eq_hash_qpn* new_node = (join_eq_hash_qpn*)node->make_copy(node_index);
+
+ // make a copy of right_table_name
+ right_table_name = right_table_name->duplicate();
+ left_table_name->set_range_var(((join_eq_hash_qpn*)node)->from[0]->get_var_name());
+ right_table_name->set_range_var(((join_eq_hash_qpn*)node)->from[1]->get_var_name());
+ new_node->from[0] = left_table_name;
+ new_node->from[1] = right_table_name;
+ new_nodes.push_back(new_node);
+ }
+
+ // make right_mrg a new root
+ right_mrg->set_node_name(node->get_node_name());
+ right_mrg->table_layout = table_layout;
+ right_mrg->merge_fieldpos = merge_fieldpos;
+
+ for (i = 0; i < right_mrg->fm.size(); i++) {
+ // make newly create joins children of merge
+ sprintf(node_index, "_%d", i);
+ right_mrg->fm[i] = new tablevar_t(right_mrg->fm[i]->get_machine().c_str(), right_mrg->fm[i]->get_interface().c_str(), (node->get_node_name() + string(node_index)).c_str());
+ sprintf(node_index, "_m%d", i);
+ right_mrg->fm[i]->set_range_var(node_index);
+ right_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos));
+
+ }
+
+ if (left_mrg != right_mrg)
+ query_plan[node->predecessors[0]] = NULL; // remove left merge from the plan
+
+ query_plan.insert(query_plan.end(), new_nodes.begin(), new_nodes.end());
+
+ } else { // unary operator
+ // get the child merge node
+ mrg_qpn* child_mrg = (mrg_qpn*)query_plan[node->predecessors[0]];
+
+ child_mrg->set_node_name(node->get_node_name());
+ child_mrg->table_layout = table_layout;
+ child_mrg->merge_fieldpos = merge_fieldpos;
+
+ // create new nodes for every source stream
+ for (i = 0; i < child_mrg->fm.size(); i++) {
+ tablevar_t *table_name = child_mrg->fm[i];
+ sprintf(node_index, "_%d", i);
+ qp_node* new_node = node->make_copy(node_index);
+
+ if (node->node_type() == "spx_qpn")
+ ((spx_qpn*)new_node)->table_name = table_name;
+ else if (node->node_type() == "sgah_qpn")
+ ((sgah_qpn*)new_node)->table_name = table_name;
+ else if (node->node_type() == "rsgah_qpn")
+ ((rsgah_qpn*)new_node)->table_name = table_name;
+ else if (node->node_type() == "sgahcwcb_qpn")
+ ((sgahcwcb_qpn*)new_node)->table_name = table_name;
+ table_name->set_range_var("_t0");
+
+ child_mrg->fm[i] = new tablevar_t(table_name->get_machine().c_str(), table_name->get_interface().c_str(), new_node->get_node_name().c_str());
+ sprintf(node_index, "_m%d", i);
+ child_mrg->fm[i]->set_range_var(node_index);
+ child_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos));
+
+ // add new node to query plan
+ query_plan.push_back(new_node);
+ }
+ }
+ query_plan[index] = NULL;
+ generate_linkage();
+}
+
+
+// Checks if the node i can be pushed below the merge
+ bool stream_query::is_pushdown_compatible(int index, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names) {
+
+ int i;
+ qp_node* node = query_plan[index];
+ qp_node* child_node = NULL;
+
+ if (node->predecessors.size() != 1)
+ return false;
+
+ // node predecessor must be merge that combine streams from multiple hosts
+ child_node = query_plan[node->predecessors[0]];
+ if (child_node->node_type() != "mrg_qpn")
+ return false;
+
+ if (!((mrg_qpn*)child_node)->is_multihost_merge())
+ return false;
+
+ // merge must have only one parent for this optimization to work
+ // check that all its successors are the same
+ for (int j = 1; j < child_node->successors.size(); ++j) {
+ if (child_node->successors[j] != child_node->successors[0])
+ return false;
+ }
+
+ // selections can always be pushed down, aggregations can always be split into selection/aggr or aggr/aggr pair
+ // and pushed down
+ if (node->node_type() == "spx_qpn")
+ return true;
+ else if (node->node_type() == "sgah_qpn")
+ return true;
+ else
+ return false;
+
+ }
+
+// Push the operator below the merge
+ void stream_query::pushdown_operator(int index, ext_fcn_list *Ext_fcns, table_list *Schema) {
+ qp_node* node = query_plan[index];
+ int i;
+ char node_suffix[128];
+
+ // we can only safely push down queries that have one and only one temporal atribute in select list
+ table_def* table_layout = node->get_fields();
+ vector<field_entry*> fields = table_layout->get_fields();
+ int merge_fieldpos = -1;
+
+ data_type* dt;
+ for (i = 0; i < fields.size(); ++i) {
+ data_type dt(fields[i]->get_type(),fields[i]->get_modifier_list());
+ if(dt.is_temporal()) {
+ if (merge_fieldpos != -1) // more that one temporal field found
+ return;
+ merge_fieldpos = i;
+ }
+ }
+
+ if (merge_fieldpos == -1) // if no temporal field found
+ return;
+
+ std::vector<colref_t *> mvars; // the merge-by columns.
+
+ fprintf(stderr, "Regular pushdown, query %s of type %s\n", node->get_node_name().c_str(), node->node_type().c_str());
+
+ // get the child merge node
+ mrg_qpn* child_mrg = (mrg_qpn*)query_plan[node->predecessors[0]];
+
+ tablevar_t *table_name = NULL;
+
+
+ if (node->node_type() == "spx_qpn") {
+ // get the child merge node
+
+ // create new nodes for every source stream
+ for (i = 0; i < child_mrg->fm.size(); i++) {
+ table_name = child_mrg->fm[i];
+ sprintf(node_suffix, "_%d", i);
+ qp_node* new_node = node->make_copy(node_suffix);
+
+ ((spx_qpn*)new_node)->table_name = table_name;
+ table_name->set_range_var("_t0");
+
+ child_mrg->fm[i] = new tablevar_t(table_name->get_machine().c_str(), table_name->get_interface().c_str(), new_node->get_node_name().c_str());
+ sprintf(node_suffix, "_m%d", i);
+ child_mrg->fm[i]->set_range_var(node_suffix);
+ child_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos));
+
+ // add new node to query plan
+ query_plan.push_back(new_node);
+ }
+ child_mrg->table_layout = table_layout;
+ child_mrg->merge_fieldpos = merge_fieldpos;
+
+ } else { // aggregation node
+
+ vector<qp_node*> new_nodes;
+
+ // split aggregations into high and low-level part
+ vector<qp_node *> split_nodes = ((sgah_qpn*)node)->split_node_for_hfta(Ext_fcns, Schema);
+ if (split_nodes.size() != 2)
+ return;
+
+ sgah_qpn* super_aggr = (sgah_qpn*)split_nodes[1];
+ super_aggr->table_name = ((sgah_qpn*)node)->table_name;
+
+ // group all the sources by host
+ map<string, vector<int> > host_map;
+ for (i = 0; i < child_mrg->fm.size(); i++) {
+ tablevar_t *table_name = child_mrg->fm[i];
+ if (host_map.count(table_name->get_machine()))
+ host_map[table_name->get_machine()].push_back(i);
+ else {
+ vector<int> tables;
+ tables.push_back(i);
+ host_map[table_name->get_machine()] = tables;
+ }
+ }
+
+ // create a new merge and low-level aggregation for each host
+ map<string, vector<int> >::iterator iter;
+ for (iter = host_map.begin(); iter != host_map.end(); iter++) {
+ string host_name = (*iter).first;
+ vector<int> tables = (*iter).second;
+
+ sprintf(node_suffix, "_%s", host_name.c_str());
+ string suffix(node_suffix);
+ untaboo(suffix);
+ mrg_qpn *new_mrg = (mrg_qpn *)child_mrg->make_copy(suffix);
+ for (i = 0; i < tables.size(); ++i) {
+ sprintf(node_suffix, "_m%d", i);
+ new_mrg->fm.push_back(child_mrg->fm[tables[i]]);
+ new_mrg->mvars.push_back(child_mrg->mvars[i]);
+ new_mrg->fm[i]->set_range_var(node_suffix);
+ }
+ qp_node* new_node = split_nodes[0]->make_copy(suffix);
+
+ if (new_node->node_type() == "spx_qpn") {
+ ((spx_qpn*)new_node)->table_name = new tablevar_t(host_name.c_str(), "IFACE", new_mrg->get_node_name().c_str());
+ ((spx_qpn*)new_node)->table_name->set_range_var("_t0");
+ } else {
+ ((sgah_qpn*)new_node)->table_name = new tablevar_t(host_name.c_str(), "IFACE", new_mrg->get_node_name().c_str());
+ ((sgah_qpn*)new_node)->table_name->set_range_var("_t0");
+ }
+ query_plan.push_back(new_mrg);
+ new_nodes.push_back(new_node);
+ }
+
+ child_mrg->merge_fieldpos = merge_fieldpos;
+ if (split_nodes[0]->node_type() == "spx_qpn")
+ child_mrg->table_layout = create_attributes(child_mrg->get_node_name(), ((spx_qpn*)split_nodes[0])->select_list);
+ else
+ child_mrg->table_layout = create_attributes(child_mrg->get_node_name(), ((sgah_qpn*)split_nodes[0])->select_list);
+
+
+ // connect newly created nodes with parent multihost merge
+ for (i = 0; i < new_nodes.size(); ++i) {
+ if (new_nodes[i]->node_type() == "spx_qpn")
+ child_mrg->fm[i] = new tablevar_t(((spx_qpn*)new_nodes[i])->table_name->get_machine().c_str(), "IFACE", new_nodes[i]->get_node_name().c_str());
+ else {
+ child_mrg->fm[i] = new tablevar_t(((sgah_qpn*)new_nodes[i])->table_name->get_machine().c_str(), "IFACE", new_nodes[i]->get_node_name().c_str());
+
+ }
+ child_mrg->mvars[i]->set_field(child_mrg->table_layout->get_field_name(merge_fieldpos));
+
+ sprintf(node_suffix, "_m%d", i);
+ child_mrg->fm[i]->set_range_var(node_suffix);
+ query_plan.push_back(new_nodes[i]);
+ }
+ child_mrg->fm.resize(new_nodes.size());
+ child_mrg->mvars.resize(new_nodes.size());
+
+ // push the new high-level aggregation
+ query_plan.push_back(super_aggr);
+
+ }
+ query_plan[index] = NULL;
+ generate_linkage();
+
+ }
+
+// Extract subtree rooted at node i into separate hfta
+stream_query* stream_query::extract_subtree(int index) {
+
+ vector<int> nodes;
+ stream_query* new_query = new stream_query(query_plan[index], this);
+
+ nodes.push_back(index);
+ for (int i = 0; i < nodes.size(); ++i) {
+ qp_node* node = query_plan[nodes[i]];
+ if (!node)
+ continue;
+
+ // add all children to nodes list
+ for (int j = 0; j < node->predecessors.size(); ++j)
+ nodes.push_back(node->predecessors[j]);
+ if (i)
+ new_query->query_plan.push_back(node);
+
+ query_plan[nodes[i]] = NULL;
+ }
+
+ return new_query;
+}
+
+// Splits query that combines data from multiple hosts into separate hftas.
+vector<stream_query*> stream_query::split_multihost_query() {
+
+ vector<stream_query*> ret;
+ char node_suffix[128];
+ int i;
+
+ // find merges combining multiple hosts into per-host groups
+ int plan_size = query_plan.size();
+ vector<mrg_qpn*> new_nodes;
+
+ for (i = 0; i < plan_size; ++i) {
+ qp_node* node = query_plan[i];
+ if (node && node->node_type() == "mrg_qpn") {
+ mrg_qpn* mrg = (mrg_qpn*)node;
+ if (mrg->is_multihost_merge()) {
+
+ // group all the sources by host
+ map<string, vector<int> > host_map;
+ for (int j = 0; j < mrg->fm.size(); j++) {
+ tablevar_t *table_name = mrg->fm[j];
+ if (host_map.count(table_name->get_machine()))
+ host_map[table_name->get_machine()].push_back(j);
+ else {
+ vector<int> tables;
+ tables.push_back(j);
+ host_map[table_name->get_machine()] = tables;
+ }
+ }
+
+ // create a new merge for each host
+ map<string, vector<int> >::iterator iter;
+ for (iter = host_map.begin(); iter != host_map.end(); iter++) {
+ string host_name = (*iter).first;
+ vector<int> tables = (*iter).second;
+
+ if (tables.size() == 1)
+ continue;
+
+ sprintf(node_suffix, "_%s", host_name.c_str());
+ string suffix(node_suffix);
+ untaboo(suffix);
+ mrg_qpn *new_mrg = (mrg_qpn *)mrg->make_copy(suffix);
+ for (int j = 0; j < tables.size(); ++j) {
+ new_mrg->fm.push_back(mrg->fm[tables[j]]);
+ new_mrg->mvars.push_back(mrg->mvars[j]);
+ sprintf(node_suffix, "m_%d", j);
+ new_mrg->fm[j]->set_range_var(node_suffix);
+ }
+ new_nodes.push_back(new_mrg);
+ }
+
+ if (!new_nodes.empty()) {
+ // connect newly created merge nodes with parent multihost merge
+ for (int j = 0; j < new_nodes.size(); ++j) {
+ mrg->fm[j] = new tablevar_t(new_nodes[j]->fm[0]->get_machine().c_str(), "IFACE", new_nodes[j]->get_node_name().c_str());
+ query_plan.push_back(new_nodes[j]);
+ }
+ mrg->fm.resize(new_nodes.size());
+ mrg->mvars.resize(new_nodes.size());
+ generate_linkage();
+ }
+
+
+ // now externalize the sources
+ for (int j = 0; j < node->predecessors.size(); ++j) {
+ // Extract subtree rooted at node i into separate hfta
+ stream_query* q = extract_subtree(node->predecessors[j]);
+ if (q) {
+ q->generate_linkage();
+ ret.push_back(q);
+ }
+
+ }
+ generate_linkage();
+ }
+ }
+ }
+
+ return ret;
+}
+
+
+// Perform local FTA optimizations
+void stream_query::optimize(vector<stream_query *>& hfta_list, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ext_fcn_list *Ext_fcns, table_list *Schema, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result){
+
+ // Topologically sort the nodes in query plan (leaf-first)
+ int i = 0, j = 0;
+ vector<int> sorted_nodes;
+
+ int num_nodes = query_plan.size();
+ bool* leaf_flags = new bool[num_nodes];
+ memset(leaf_flags, 0, num_nodes * sizeof(bool));
+
+ // run topological sort
+ bool done = false;
+
+ // add all leafs to sorted_nodes
+ while (!done) {
+ done = true;
+ for (i = 0; i < num_nodes; ++i) {
+ if (!query_plan[i])
+ continue;
+
+ if (!leaf_flags[i] && query_plan[i]->predecessors.empty()) {
+ leaf_flags[i] = true;
+ sorted_nodes.push_back(i);
+ done = false;
+
+ // remove the node from its parents predecessor lists
+ // since we only care about number of predecessors, it is sufficient just to remove
+ // one element from the parent's predecessors list
+ for (int j = query_plan[i]->successors.size() - 1; j >= 0; --j)
+ query_plan[query_plan[i]->successors[j]]->predecessors.pop_back();
+ }
+ }
+ }
+ delete[] leaf_flags;
+ num_nodes = sorted_nodes.size();
+ generate_linkage(); // rebuild the recently destroyed predecessor lists.
+
+ // collect the information about interfaces nodes read from
+ for (i = 0; i < num_nodes; ++i) {
+ qp_node* node = query_plan[sorted_nodes[i]];
+ vector<tablevar_t *> input_tables = node->get_input_tbls();
+ for (j = 0; j < input_tables.size(); ++j) {
+ tablevar_t * table = input_tables[j];
+ if (lfta_names.count(table->schema_name)) {
+ int index = lfta_names[table->schema_name];
+ table->set_machine(machine_names[index]);
+ table->set_interface(interface_names[index]);
+ }
+ }
+ }
+
+ /*
+
+ // push eligible operators down in the query plan
+ for (i = 0; i < num_nodes; ++i) {
+ if (partn_parse_result && is_partn_compatible(sorted_nodes[i], lfta_names, interface_names, machine_names, ifaces_db, partn_parse_result)) {
+ pushdown_partn_operator(sorted_nodes[i]);
+ } else if (is_pushdown_compatible(sorted_nodes[i], lfta_names, interface_names, machine_names)) {
+ pushdown_operator(sorted_nodes[i], Ext_fcns, Schema);
+ }
+ }
+
+ // split the query into multiple hftas if it combines the data from multiple hosts
+ vector<stream_query*> hftas = split_multihost_query();
+ hfta_list.insert(hfta_list.end(), hftas.begin(), hftas.end());
+ */
+
+ num_nodes = query_plan.size();
+ // also split multi-way merges into two-way merges
+ for (i = 0; i < num_nodes; ++i) {
+ qp_node* node = query_plan[i];
+ if (node && node->node_type() == "mrg_qpn") {
+ vector<mrg_qpn *> split_merge = ((mrg_qpn *)node)->split_sources();
+
+ query_plan.insert(query_plan.end(), split_merge.begin(), split_merge.end());
+ // delete query_plan[sorted_nodes[i]];
+ query_plan[i] = NULL;
+ }
+ }
+
+ generate_linkage();
+}
+
+
+
+table_def *stream_query::get_output_tabledef(){
+ return( query_plan[qhead]->get_fields() );
+}
+
+
+
+//////////////////////////////////////////////////////////
+//// De-siloing. TO BE REMOVED
+
+void stream_query::desilo_lftas(map<string, int> &lfta_names,vector<string> &silo_names,table_list *Schema){
+ int i,t,s;
+
+ int suffix_len = silo_names.back().size();
+
+ for(i=0;i<qtail.size();++i){
+ vector<tablevar_t *> itbls = query_plan[qtail[i]]->get_input_tbls();
+ for(t=0;t<itbls.size();++t){
+ string itbl_name = itbls[t]->get_schema_name();
+ if(lfta_names.count(itbl_name)>0){
+//printf("Query %s input %d references lfta input %s\n",query_plan[qtail[i]]->get_node_name().c_str(),t,itbl_name.c_str());
+ vector<string> src_names;
+ string lfta_base = itbl_name.substr(0,itbl_name.size()-suffix_len);
+ for(s=0;s<silo_names.size();++s){
+ string lfta_subsilo = lfta_base + silo_names[s];
+//printf("\t%s\n",lfta_subsilo.c_str());
+ src_names.push_back(lfta_subsilo);
+ }
+ string merge_node_name = "desilo_"+query_plan[qtail[i]]->get_node_name()+
+ "_input_"+int_to_string(t);
+ mrg_qpn *merge_node = new mrg_qpn(merge_node_name,src_names,Schema);
+ int m_op_pos = Schema->add_table(merge_node->table_layout);
+ itbls[t]->set_schema(merge_node_name);
+ itbls[t]->set_schema_ref(m_op_pos);
+ query_plan.push_back(merge_node);
+ }
+ }
+ }
+
+ generate_linkage();
+}
+////////////////////////////////////////
+/// End de-siloing
+
+
+
+// Given a collection of LFTA stream queries,
+// extract their WHERE predicates
+// and pass them to an analysis routine which will extract
+// common elements
+//
+void get_common_lfta_filter(std::vector<stream_query *> lfta_list,table_list *Schema,ext_fcn_list *Ext_fcns, vector<cnf_set *> &prefilter_preds, set<unsigned int> &pred_ids){
+ int s;
+ std::vector< std::vector<cnf_elem *> > where_list;
+
+// still safe to assume that LFTA queries have a single
+// query node, which is at position 0.
+ for(s=0;s<lfta_list.size();++s){
+ vector<cnf_elem *> cnf_list = lfta_list[s]->query_plan[0]->get_filter_clause();
+ if(lfta_list[s]->query_plan[0]->node_type() == "sgah_qpn"){
+ gb_table *gtbl = ((sgah_qpn *)(lfta_list[s]->query_plan[0]))->get_gb_tbl();
+ int c;
+ for(c=0;c<cnf_list.size();++c){
+ insert_gb_def_pr(cnf_list[c]->pr,gtbl);
+ }
+ }
+ where_list.push_back(lfta_list[s]->query_plan[0]->get_filter_clause());
+ }
+
+ find_common_filter(where_list,Schema,Ext_fcns,prefilter_preds, pred_ids);
+}
+
+
+
+
+
+// Given a collection of LFTA stream queries,
+// extract the union of all temporal attributes referenced in select clauses
+// those attributes will need to be unpacked in prefilter
+//
+void get_prefilter_temporal_cids(std::vector<stream_query *> lfta_list, col_id_set &temp_cids){
+ int s, sl;
+ vector<scalarexp_t *> sl_list;
+ gb_table *gb_tbl = NULL;
+
+
+// still safe to assume that LFTA queries have a single
+// query node, which is at position 0.
+ for(s=0;s<lfta_list.size();++s){
+
+ if(lfta_list[s]->query_plan[0]->node_type() == "spx_qpn"){
+ spx_qpn *spx_node = (spx_qpn *)lfta_list[s]->query_plan[0];
+ sl_list = spx_node->get_select_se_list();
+ }
+ if(lfta_list[s]->query_plan[0]->node_type() == "sgah_qpn"){
+ sgah_qpn *sgah_node = (sgah_qpn *)lfta_list[s]->query_plan[0];
+ sl_list = sgah_node->get_select_se_list();
+ gb_tbl = sgah_node->get_gb_tbl();
+ }
+
+ for(sl=0;sl<sl_list.size();sl++){
+ data_type *sdt = sl_list[sl]->get_data_type();
+ if (sdt->is_temporal()) {
+ gather_se_col_ids(sl_list[sl],temp_cids, gb_tbl);
+ }
+ }
+ }
+}
+
+