1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
15 #include"stream_query.h"
16 #include"generate_utils.h"
17 #include"analyze_fta.h"
22 static char tmpstr[500];
27 // Create a query plan from a query node and an existing
28 // query plan. Use for lfta queries, the parent query plan provides
30 stream_query::stream_query(qp_node *qnode, stream_query *parent){
31 query_plan.push_back(qnode);
34 attributes = qnode->get_fields();
35 parameters = qnode->get_param_tbl();
36 defines = parent->defines;
37 query_name = qnode->get_node_name();
40 // Copy the query plan.
41 stream_query::stream_query(stream_query &src){
42 query_plan = src.query_plan;
45 attributes = src.attributes;
46 parameters = src.parameters;
47 defines = src.defines;
48 query_name = src.query_name;
53 // Create a query plan from an analyzed parse tree.
54 // Perform analyses to find the output node, input nodes, etc.
56 stream_query::stream_query(query_summary_class *qs,table_list *Schema){
57 // Generate the query plan nodes from the analyzed parse tree.
58 // There is only one for now, so just assign the return value
59 // of create_query_nodes to query_plan
61 query_plan = create_query_nodes(qs,Schema);
63 if(query_plan.size() == 0){
64 fprintf(stderr,"INTERNAL ERROR, zero-size query plan in stream_query::stream_query\n");
67 for(i=0;i<query_plan.size();++i){
69 if(query_plan[i]->get_error_code() != 0){
70 error_code = query_plan[i]->get_error_code();
71 err_str += query_plan[i]->get_error_str();
75 qhead = query_plan.size()-1;
81 stream_query * stream_query::add_query(query_summary_class *qs,table_list *Schema){
82 // Add another query block to the query plan
84 vector<qp_node *> new_nodes = create_query_nodes(qs, Schema);
85 query_plan.insert(query_plan.end(),new_nodes.begin(), new_nodes.end());
89 stream_query * stream_query::add_query(stream_query &src){
90 // Add another query block to the query plan
92 query_plan.insert(query_plan.end(),src.query_plan.begin(), src.query_plan.end());
97 void stream_query::generate_protocol_se(map<string,stream_query *> &sq_map, table_list *Schema){
100 // Mapping fields to protocol fields requires schema binding.
101 // First ensure all nodes are in the schema.
102 for(n=0;n<query_plan.size();++n){
103 if(query_plan[n] != NULL){
104 Schema->add_table(query_plan[n]->get_fields());
107 // Now do schema binding
108 for(n=0;n<query_plan.size();++n){
109 if(query_plan[n] != NULL){
110 query_plan[n]->bind_to_schema(Schema);
114 // create name-to-index map
115 map<string, int> name_to_node;
116 for(n=0;n<query_plan.size();++n){
118 name_to_node[query_plan[n]->get_node_name()] = n;
122 // Create a list of the nodes to process, in order.
123 // Search from the root down.
129 search_q.push_back(qhead);
130 while(! search_q.empty()){
131 int the_q = search_q.front();
132 search_q.pop_front(); work_list.push_front(the_q);
133 vector<int> the_pred = query_plan[the_q]->get_predecessors();
134 for(i=0;i<the_pred.size();i++)
135 search_q.push_back(the_pred[i]);
138 // Scan through the work list, from the front,
139 // gather the qp_node's ref'd, and call the
140 // protocol se generator. Pass NULL for
141 // sources not found - presumably user-defined operator
143 while(! work_list.empty()){
144 int the_q = work_list.front();
145 work_list.pop_front();
146 vector<qp_node *> q_sources;
147 vector<tablevar_t *> q_input_tbls = query_plan[the_q]->get_input_tbls();
148 for(i=0;i<q_input_tbls.size();++i){
149 string itbl_nm = q_input_tbls[i]->get_schema_name();
150 if(name_to_node.count(itbl_nm)>0){
151 q_sources.push_back(query_plan[name_to_node[itbl_nm]]);
152 }else if(sq_map.count(itbl_nm)>0){
153 q_sources.push_back(sq_map[itbl_nm]->get_query_head());
155 q_sources.push_back(NULL);
158 query_plan[the_q]->create_protocol_se(q_sources, Schema);
161 //////////////////////////////////////////////////////////
166 for(i=0;i<query_plan.size();++i){
168 printf("query node %s, type=%s:\n",query_plan[i]->get_node_name().c_str(),
169 query_plan[i]->node_type().c_str());
170 map<std::string, scalarexp_t *> *pse_map = query_plan[i]->get_protocol_se();
171 map<std::string, scalarexp_t *>::iterator mssi;
172 for(mssi=pse_map->begin();mssi!=pse_map->end();++mssi){
174 printf("\t%s : %s\n",(*mssi).first.c_str(), (*mssi).second->to_string().c_str());
176 printf("\t%s : NULL\n",(*mssi).first.c_str());
178 if(query_plan[i]->node_type() == "filter_join" || query_plan[i]->node_type() == "join_eq_hash_qpn"){
179 vector<scalarexp_t *> pse_l;
180 vector<scalarexp_t *> pse_r;
181 if(query_plan[i]->node_type() == "filter_join"){
182 pse_l = ((filter_join_qpn *)query_plan[i])->get_hash_l();
183 pse_r = ((filter_join_qpn *)query_plan[i])->get_hash_r();
185 if(query_plan[i]->node_type() == "join_eq_hash_qpn"){
186 pse_l = ((join_eq_hash_qpn *)query_plan[i])->get_hash_l();
187 pse_r = ((join_eq_hash_qpn *)query_plan[i])->get_hash_r();
190 for(p=0;p<pse_l.size();++p){
192 printf("\t\t%s = ",pse_l[p]->to_string().c_str());
194 printf("\t\tNULL = ");
196 printf("%s\n",pse_r[p]->to_string().c_str());
201 if(query_plan[i]->node_type() == "sgah_qpn" || query_plan[i]->node_type() == "rsgah_qpn" || query_plan[i]->node_type() == "sgahcwcb_qpn"){
202 vector<scalarexp_t *> pseg;
203 if(query_plan[i]->node_type() == "sgah_qpn")
204 pseg = ((sgah_qpn *)query_plan[i])->get_gb_sources();
205 if(query_plan[i]->node_type() == "rsgah_qpn")
206 pseg = ((rsgah_qpn *)query_plan[i])->get_gb_sources();
207 if(query_plan[i]->node_type() == "sgahcwcb_qpn")
208 pseg = ((sgahcwcb_qpn *)query_plan[i])->get_gb_sources();
210 for(g=0;g<pseg.size();g++){
212 printf("\t\tgb %d = %s\n",g,pseg[g]->to_string().c_str());
214 printf("\t\tgb %d = NULL\n",g);
223 bool stream_query::generate_linkage(){
224 bool create_failed = false;
227 // Clear any leftover linkages
228 for(n=0;n<query_plan.size();++n){
230 query_plan[n]->clear_predecessors();
231 query_plan[n]->clear_successors();
236 // create name-to-index map
237 map<string, int> name_to_node;
238 for(n=0;n<query_plan.size();++n){
240 name_to_node[query_plan[n]->get_node_name()] = n;
244 // Do the 2-way linkage.
245 for(n=0;n<query_plan.size();++n){
247 vector<tablevar_t *> fm = query_plan[n]->get_input_tbls();
248 for(f=0;f<fm.size();++f){
249 string src_tbl = fm[f]->get_schema_name();
250 if(name_to_node.count(src_tbl)>0){
251 int s = name_to_node[src_tbl];
252 query_plan[n]->add_predecessor(s);
253 query_plan[s]->add_successor(n);
259 // Find the head (no successors) and the tails (at least one
260 // predecessor is external).
261 // Verify that there is only one head,
262 // and that all other nodes have one successor (because
263 // right now I can only handle trees).
265 qhead = -1; // no head yet found.
266 for(n=0;n<query_plan.size();++n){
268 vector<int> succ = query_plan[n]->get_successors();
271 fprintf(stderr,"ERROR, query node %s supplies data to multiple nodes, but currently only tree query plans are supported:\n",query_plan[n]->get_node_name().c_str());
272 for(s=0;s<succ.size();++s){
273 fprintf(stderr,"%s ",query_plan[succ[s]]->get_node_name().c_str());
275 fprintf(stderr,"\n");
276 create_failed = true;
279 if(succ.size() == 0){
281 fprintf(stderr,"ERROR, query nodes %s and %s both have zero successors.\n",query_plan[n]->get_node_name().c_str(), query_plan[qhead]->get_node_name().c_str());
282 create_failed = true;
287 if(query_plan[n]->n_predecessors() < query_plan[n]->get_input_tbls().size()){
293 return create_failed;
296 // After the collection of query plan nodes is generated,
297 // analyze their structure to link them up into a tree (or dag?).
298 // Verify that the structure is acceptable.
299 // Do some other analysis and verification tasks (?)
300 // then gather summar information.
301 int stream_query::generate_plan(table_list *Schema){
303 // The first thing to do is verify that the query plan
304 // nodes were successfully created.
305 bool create_failed = false;
307 for(n=0;n<query_plan.size();++n){
308 if(query_plan[n]!=NULL && query_plan[n]->get_error_code()){
309 fprintf(stderr,"%s",query_plan[n]->get_error_str().c_str());
310 create_failed = true;
314 for(n=0;n<query_plan.size();++n){
315 if(query_plan[n] != NULL){
316 string nstr = query_plan[n]->get_node_name();
317 printf("In generate_plan, node %d is %s, reads from:\n\t",n,nstr.c_str());
318 vector<tablevar_t *> inv = query_plan[n]->get_input_tbls();
320 for(nn=0;nn<inv.size();nn++){
321 printf("%s (%d) ",inv[nn]->to_string().c_str(),inv[nn]->get_schema_ref());
328 if(create_failed) return(1);
330 // Here, link up the query nodes, then verify that the
331 // structure is acceptable (single root, no cycles, no stranded
333 create_failed = generate_linkage();
334 if(create_failed) return -1;
337 // Here, do optimizations such as predicate pushing,
338 // join rearranging, etc.
339 // Nothing to do yet.
342 for(n=0;n<query_plan.size();++n){
343 if(query_plan[n] != NULL){
344 string nstr = query_plan[n]->get_node_name();
345 printf("B generate_plan, node %d is %s, reads from:\n\t",n,nstr.c_str());
346 vector<tablevar_t *> inv = query_plan[n]->get_input_tbls();
348 for(nn=0;nn<inv.size();nn++){
349 printf("%s (%d) ",inv[nn]->to_string().c_str(),inv[nn]->get_schema_ref());
354 printf("qhead=%d, qtail = ",qhead);
356 for(nn=0;nn<qtail.size();++nn)
357 printf("%d ",qtail[nn]);
361 // Collect query summaries. The query is reperesented by its head node.
362 query_name = query_plan[qhead]->get_node_name();
363 attributes = query_plan[qhead]->get_fields();
364 // TODO: The params and defines require a lot more thought.
365 parameters = query_plan[qhead]->get_param_tbl();
366 defines = query_plan[qhead]->get_definitions();
371 void stream_query::add_output_operator(ospec_str *o){
372 output_specs.push_back(o);
376 void stream_query::get_external_libs(set<string> &libset){
379 for(qn=0;qn<query_plan.size();++qn){
380 if(query_plan[qn] != NULL){
381 vector<string> op_libs = query_plan[qn]->external_libs();
382 for(i=0;i<op_libs.size();++i){
383 libset.insert(op_libs[i]);
388 for(qn=0;qn<output_operators.size();++qn){
389 if(output_operators[qn] != NULL){
390 vector<string> op_libs = output_operators[qn]->external_libs();
391 for(i=0;i<op_libs.size();++i){
392 libset.insert(op_libs[i]);
400 // Split into LFTA, HFTA components.
401 // Split this query into LFTA and HFTA queries.
402 // Four possible outcomes:
403 // 1) the query reads from a protocol, but does not need to
404 // split (can be evaluated as an LFTA).
405 // The lfta query is the only element in the return vector,
406 // and hfta_returned is false.
407 // 2) the query reads from no protocol, and therefore cannot be split.
408 // THe hfta query is the only element in the return vector,
409 // and hfta_returned is true.
410 // 3) reads from at least one protocol, but cannot be split : failure.
411 // return vector is empty, the error conditions are written
412 // in err_str and error_code
413 // 4) The query splits into an hfta query and one or more LFTA queries.
414 // the return vector has two or more elements, and hfta_returned
415 // is true. The last element is the HFTA.
417 vector<stream_query *> stream_query::split_query(ext_fcn_list *Ext_fcns, table_list *Schema, bool &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
418 vector<stream_query *> queries;
422 hfta_returned = false; // assume until proven otherwise
424 for(l=0;l<qtail.size();++l){
428 vector<qp_node *> qnodes = query_plan[leaf]->split_node_for_fta(Ext_fcns, Schema, qp_hfta, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
431 if(qnodes.size() == 0 || query_plan[leaf]->get_error_code()){ // error
433 error_code = query_plan[leaf]->get_error_code();
434 err_str = query_plan[leaf]->get_error_str();
435 vector<stream_query *> null_result;
438 if(qnodes.size() == 1 && qp_hfta){ // nothing happened
439 //printf("no change\n");
440 query_plan[leaf] = qnodes[0];
442 if(qnodes.size() == 1 && !qp_hfta){ // push to lfta
443 //printf("lfta only\n");
444 queries.push_back(new stream_query(qnodes[0], this));
445 vector<int> succ = query_plan[leaf]->get_successors();
446 for(s=0;s<succ.size();++s){
447 query_plan[succ[s]]->remove_predecessor(leaf);
449 query_plan[leaf] = NULL; // delete it?
451 if(qnodes.size() > 1){ // actual splitting occurred.
452 if(!qp_hfta){ // internal consistency check.
454 err_str = "INTERNAL ERROR: mulitple nodes returned by split_node_for_fta, but none are hfta nodes.\n";
455 vector<stream_query *> null_result;
459 for(q=0;q<qnodes.size()-qp_hfta;++q){ // process lfta nodes
460 //printf("creating lfta %d (%s)\n",q,qnodes[q]->get_node_name().c_str());
461 queries.push_back(new stream_query(qnodes[q], this));
464 query_plan[leaf] = qnodes[qnodes.size()-1];
465 // Add in any extra hfta nodes
466 for(q=qnodes.size()-qp_hfta;q<qnodes.size()-1;++q)
467 query_plan.push_back(qnodes[q]);
473 for(q=0;q<query_plan.size();++q){
474 if(query_plan[q] != NULL){
475 n_ifprefs += query_plan[q]->count_ifp_refs(ifps);
476 hfta_returned = true;
481 set<string>::iterator ssi;
482 err_str += "ERROR, unresolved interface parameters in HFTA:\n";
483 for(ssi=ifps.begin();ssi!=ifps.end();++ssi){
484 err_str += (*ssi)+" ";
488 vector<stream_query *> null_result;
494 if(generate_linkage()){
495 fprintf(stderr,"INTERNAL ERROR, generate_linkage failed in split_query.\n");
498 queries.push_back(this);
506 vector<table_exp_t *> stream_query::extract_opview(table_list *Schema, vector<query_node *> &qnodes, opview_set &opviews, string silo_nm){
507 vector<table_exp_t *> subqueries;
510 string root_name = this->get_output_tabledef()->get_tbl_name();
513 for(l=0;l<qtail.size();++l){
515 vector<table_exp_t *> new_qnodes = query_plan[leaf]->extract_opview(Schema, qnodes, opviews, root_name, silo_nm);
517 for(q=0;q<new_qnodes.size();++q){ // process lfta nodes
518 subqueries.push_back( new_qnodes[q]);
528 string stream_query::make_schema(){
529 return make_schema(qhead);
532 string stream_query::make_schema(int q){
533 string ret="FTA{\n\n";
535 ret += query_plan[q]->get_fields()->to_string();
538 ret += "\tquery_name '"+query_plan[q]->get_node_name()+"';\n";
540 map<string, string> defs = query_plan[q]->get_definitions();
541 map<string, string>::iterator dfi;
542 for(dfi=defs.begin(); dfi!=defs.end(); ++dfi){
543 ret += "\t"+ (*dfi).first + " '" + (*dfi).second + "';\n";
548 param_table *params = query_plan[q]->get_param_tbl();
549 vector<string> param_names = params->get_param_names();
551 for(p=0;p<param_names.size();p++){
552 data_type *dt = params->get_data_type( param_names[p] );
553 ret += "\t" + param_names[p] + " '" + dt->get_type_str() + "';\n";
557 ret += query_plan[q]->to_query_string();
564 string stream_query::collect_refd_ifaces(){
567 for(q=0;q<query_plan.size();++q){
569 map<string, string> defs = query_plan[q]->get_definitions();
570 if(defs.count("_referenced_ifaces")){
571 if(ret != "") ret += ",";
572 ret += defs["_referenced_ifaces"];
581 bool stream_query::stream_input_only(table_list *Schema){
582 vector<tablevar_t *> input_tbls = this->get_input_tables();
584 for(i=0;i<input_tbls.size();++i){
585 int t = Schema->get_table_ref(input_tbls[i]->get_schema_name());
586 if(Schema->get_schema_type(t) == PROTOCOL_SCHEMA) return(false);
591 // Return input tables. No duplicate removal performed.
592 vector<tablevar_t *> stream_query::get_input_tables(){
593 vector<tablevar_t *> retval;
595 // create name-to-index map
597 map<string, int> name_to_node;
598 for(n=0;n<query_plan.size();++n){
600 name_to_node[query_plan[n]->get_node_name()] = n;
605 for(l=0;l<qtail.size();++l){
607 vector<tablevar_t *> tmp_v = query_plan[leaf]->get_input_tbls();
609 for(i=0;i<tmp_v.size();++i){
610 if(name_to_node.count(tmp_v[i]->get_schema_name()) == 0)
611 retval.push_back(tmp_v[i]);
618 void stream_query::compute_node_format(int q, vector<int> &nfmt, map<string, int> &op_idx){
619 int netcnt = 0, hostcnt = 0;
622 vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
623 for(i=0;i<itbls.size();++i){
624 string tname = itbls[i]->get_schema_name();
625 if(op_idx.count(tname)){
626 int o = op_idx[tname];
627 if(nfmt[o] == UNKNOWNFORMAT)
628 compute_node_format(o,nfmt,op_idx);
629 if(nfmt[o] == NETFORMAT) netcnt++;
635 if(query_plan[q]->makes_transform()){
636 nfmt[q] = HOSTFORMAT;
639 nfmt[q] = HOSTFORMAT;
644 //printf("query plan %d (%s) is ",q,query_plan[q]->get_node_name().c_str());
645 //if(nfmt[q] == HOSTFORMAT) printf(" host format.\n");
646 //else printf("net format\n");
650 string stream_query::generate_hfta(table_list *Schema, ext_fcn_list *Ext_fcns, opview_set &opviews, bool distributed_mode){
651 int schref, ov_ix, i, q, param_sz;
652 bool dag_graph = false;
655 // Bind the SEs in all query plan nodes to this schema, and
656 // Add all tables used by this query to the schema.
657 // Question: Will I be polluting the global schema by adding all
658 // query node schemas?
660 // First ensure all nodes are in the schema.
662 for(qn=0;qn<query_plan.size();++qn){
663 if(query_plan[qn] != NULL){
664 Schema->add_table(query_plan[qn]->get_fields());
668 for(qn=0;qn<query_plan.size();++qn){
669 if(query_plan[qn] != NULL){
670 query_plan[qn]->bind_to_schema(Schema);
675 set<string> qsources;
677 for(n=0;n<query_plan.size();++n){
679 vector<tablevar_t *> tmp_v = query_plan[n]->get_input_tbls();
681 for(i=0;i<tmp_v.size();++i){
682 if(qsources.count(tmp_v[i]->get_schema_name()) > 0)
684 qsources.insert(tmp_v[i]->get_schema_name());
691 // Collect set of tables ref'd in this HFTA
693 for(qn=0;qn<query_plan.size();++qn){
695 // get names of the tables
696 vector<tablevar_t *> input_tbls = query_plan[qn]->get_input_tbls();
697 vector<tablevar_t *> output_tbls = query_plan[qn]->get_output_tbls();
698 // Convert to tblrefs, add to set of ref'd tables
700 for(i=0;i<input_tbls.size();i++){
701 // int t = Schema->get_table_ref(input_tbls[i]->get_schema_name());
702 int t = input_tbls[i]->get_schema_ref();
704 fprintf(stderr,"INTERNAL ERROR in generate_hfta. "
705 "query plan node %s references input table %s, which is not in schema.\n",
706 query_name.c_str(), input_tbls[i]->get_schema_name().c_str());
712 for(i=0;i<output_tbls.size();i++){
713 int t = Schema->get_table_ref(output_tbls[i]->get_schema_name());
715 fprintf(stderr,"INTERNAL ERROR in generate_hfta."
716 "query plan node %s references output table %s, which is not in schema.\n",
717 query_name.c_str(), output_tbls[i]->get_schema_name().c_str());
725 // Collect map of lftas, query nodes
726 map<string, int> op_idx;
727 for(q=0;q<query_plan.size();q++){
729 op_idx[query_plan[q]->get_node_name()] = q;
733 // map of input tables must include query id and input
734 // source (0,1) becuase several queries might reference the same source
735 vector<tablevar_t *> input_tbls = this->get_input_tables();
736 vector<bool> input_tbl_free;
737 for(i=0;i<input_tbls.size();++i){
738 input_tbl_free.push_back(true);
740 map<string, int> lfta_idx;
741 //fprintf(stderr,"%d input tables, %d query nodes\n",input_tbls.size(), query_plan.size());
742 for(q=0;q<query_plan.size();q++){
744 vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
746 for(it=0;it<itbls.size();it++){
747 string tname = itbls[it]->get_schema_name()+"-"+int_to_string(q)+"-"+int_to_string(it);
748 string src_tblname = itbls[it]->get_schema_name();
749 bool src_is_external = false;
750 for(i=0;i<input_tbls.size();++i){
751 if(src_tblname == input_tbls[i]->get_schema_name()){
752 src_is_external = true;
753 if(input_tbl_free[i]){
755 input_tbl_free[i] = false;
756 //fprintf(stderr,"Adding %s (src_tblname=%s, q=%d, it=%d) to %d.\n",tname.c_str(), src_tblname.c_str(), q, it, i);
761 if(i==input_tbls.size() && src_is_external){
762 fprintf(stderr,"INTERNAL ERROR in stream_query::generate_hfta, can't find free entry in input_tbls for query %d, intput %d (%s)\n",q,it,src_tblname.c_str());
769 for(i=0;i<input_tbls.size();++i){
770 string src_tblname = input_tbls[i]->get_schema_name();
771 lfta_idx[src_tblname] = i;
775 // Compute the output formats of the operators.
776 vector<int> node_fmt(query_plan.size(),UNKNOWNFORMAT);
777 compute_node_format(qhead, node_fmt, op_idx);
780 // Generate the schema strings for the outputs.
782 for(i=0;i<query_plan.size();++i){
783 if(i != qhead && query_plan[i]){
784 string schema_tmpstr = this->make_schema(i);
785 schema_str += "gs_csp_t node"+int_to_string(i)+"_schema = "+make_C_embedded_string(schema_tmpstr)+";\n";
789 attributes = query_plan[qhead]->get_fields();
792 string schema_tmpstr = this->make_schema();
793 schema_str += "gs_csp_t "+generate_schema_string_name(query_name)+" = "+make_C_embedded_string(schema_tmpstr)+";\n";
795 // Generate the collection of tuple defs.
797 string tuple_defs = "\n/*\tDefine tuple structures \t*/\n\n";
798 set<int>::iterator si;
799 for(si=tbl_set.begin(); si!=tbl_set.end(); ++si){
800 tuple_defs += generate_host_tuple_struct( Schema->get_table( (*si) ));
801 tuple_defs += "\n\n";
804 // generate the finalize tuple function
805 string finalize_str = generate_hfta_finalize_tuple(attributes);
807 // Analyze and make the output operators
808 bool eat_input = false;
809 string src_op = query_name;
810 string pred_op = src_op;
812 if(output_specs.size()>0)
815 eat_input = false; // must create stream output for successor HFTAs.
816 int n_filestreams = 0;
817 for(i=0;i<output_specs.size();++i){
818 if(output_specs[i]->operator_type == "stream" ){
821 if(output_specs[i]->operator_type == "file" || output_specs[i]->operator_type == "zfile" ){
826 int filestream_id = 0;
827 for(i=0;i<output_specs.size();++i){
828 if(output_specs[i]->operator_type == "file" || output_specs[i]->operator_type == "zfile"){
830 int n_fstreams = output_specs[i]->n_partitions / n_parallel;
831 if(n_fstreams * n_parallel < output_specs[i]->n_partitions){
833 if(n_parallel == 1 || query_name.find("__copy1") != string::npos){
834 fprintf(stderr,"WARNING, in query %s, %d streams requested for %s output, but it must be a multiple of the hfta parallelism (%d), increasing number of output streams to %d.\n",query_name.c_str(), output_specs[i]->n_partitions, output_specs[i]->operator_type.c_str(), n_parallel, n_fstreams*n_parallel);
837 // output_file_qpn *new_ofq = new output_file_qpn();
838 string filestream_tag = "";
840 filestream_tag = "_fileoutput"+int_to_string(filestream_id);
843 output_file_qpn *new_ofq = new output_file_qpn(pred_op, src_op, filestream_tag, query_plan[qhead]->get_fields(), output_specs[i], (i==last_op ? eat_input : false) );
844 // if(n_fstreams > 1){
847 bool err_ret = new_ofq->set_splitting_params(n_parallel,parallel_idx,n_fstreams,output_specs[i]->partitioning_flds,err_str);
849 fprintf(stderr,"%s",err_str.c_str());
853 output_operators.push_back(new_ofq );
854 pred_op = output_operators.back()->get_node_name();
855 }else if(! (output_specs[i]->operator_type == "stream" || output_specs[i]->operator_type == "Stream" || output_specs[i]->operator_type == "STREAM") ){
856 fprintf(stderr,"WARNING, output operator type %s (on query %s) is not supported, ignoring\n",output_specs[i]->operator_type.c_str(),query_name.c_str() );
862 // Generate functors for the query nodes.
864 string functor_defs = "\n/*\tFunctor definitions\t*/\n\n";
865 for(qn=0;qn<query_plan.size();++qn){
866 if(query_plan[qn]!=NULL){
867 // Compute whether the input needs a ntoh xform.
868 vector<bool> needs_xform;
869 vector<tablevar_t *> itbls = query_plan[qn]->get_input_tbls();
870 for(i=0;i<itbls.size();++i){
871 string tname = itbls[i]->get_schema_name();
872 // if(query_plan[qn]->makes_transform()){
873 if(op_idx.count(tname)>0){
874 if(node_fmt[ op_idx[tname] ] == NETFORMAT){
875 needs_xform.push_back(true);
877 needs_xform.push_back(false);
880 needs_xform.push_back(true);
883 // if(op_idx.count(tname)>0){
884 // if(node_fmt[qn] != node_fmt[ op_idx[tname] ]){
885 // needs_xform.push_back(true);
887 // needs_xform.push_back(false);
890 // if(node_fmt[qn] == HOSTFORMAT){
891 // needs_xform.push_back(true);
893 // needs_xform.push_back(false);
899 functor_defs += query_plan[qn]->generate_functor(Schema, Ext_fcns, needs_xform);
903 // Generate output operator functors
905 vector<bool> needs_xform;
906 for(i=0;i<output_operators.size();++i)
907 functor_defs += output_operators[i]->generate_functor(Schema, Ext_fcns, needs_xform);
911 "#include <lapp.h>\n"
913 "#include <gshub.h>\n"
914 "#include <stdlib.h>\n"
915 "#include <stdio.h>\n"
916 "#include <limits.h>\n"
924 "#include <schemaparser.h>\n"
925 "#include<hfta_runtime_library.h>\n"
927 "#include <host_tuple.h>\n"
928 "#include <hfta.h>\n"
929 "#include <hfta_udaf.h>\n"
930 "#include <hfta_sfun.h>\n"
932 //"#define MAXSCHEMASZ 16384\n"
933 "#include <stdio.h>\n\n"
936 // Get include file for each of the operators.
937 // avoid duplicate inserts.
938 set<string> include_fls;
939 for(qn=0;qn<query_plan.size();++qn){
940 if(query_plan[qn] != NULL)
941 include_fls.insert(query_plan[qn]->get_include_file());
943 for(i=0;i<output_operators.size();++i)
944 include_fls.insert(output_operators[i]->get_include_file());
945 set<string>::iterator ssi;
946 for(ssi=include_fls.begin();ssi!=include_fls.end();++ssi)
949 // Add defines for hash functions
952 "#define hfta_BOOL_to_hash(x) (x)\n"
953 "#define hfta_USHORT_to_hash(x) (x)\n"
954 "#define hfta_UINT_to_hash(x) (x)\n"
955 "#define hfta_IP_to_hash(x) (x)\n"
956 "#define hfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
957 "#define hfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
958 "#define hfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
959 "#define hfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
960 "#define hfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
964 // ret += "#define SERIOUS_LFTA \""+input_tbls[0]->get_schema_name()+"\"\n";
965 ret += "#define OUTPUT_HFTA \""+query_name+"\"\n\n";
967 // HACK ALERT: I know for now that all query plans are
968 // single operator plans, but while SPX and SGAH can use the
969 // UNOP template, the merge operator must use the MULTOP template.
970 // WORSE HACK ALERT : merge does not translate its input,
971 // so don't apply finalize to the output.
972 // TODO: clean this up.
974 // string node_type = query_plan[0]->node_type();
979 // Need to work on the input, output xform logic.
980 // For now, just add it in.
981 // ret += finalize_str;
983 if(node_fmt[qhead] == NETFORMAT){
985 "void finalize_tuple(host_tuple &tup){\n"
995 // Parameter block management
996 // The proper parameter block must be transmitted to each
997 // external stream source.
998 // There is a 1-1 mapping between the param blocks returned
999 // by this list and the registered data sources ...
1000 // TODO: make this more manageable, but for now
1001 // there is no parameter block manipulation so I just
1002 // need to have the same number.
1005 "int get_lfta_params(gs_int32_t sz, void * value,list<param_block>& lst){\n"
1006 " // for now every lfta receive the full copy of hfta parameters\n"
1007 " struct param_block pb;\n";
1009 set<string> lfta_seen;
1010 for(i=0;i<input_tbls.size();++i){
1011 string src_tblname = input_tbls[i]->get_schema_name();
1012 if(lfta_seen.count(src_tblname) == 0){
1013 lfta_seen.insert(src_tblname);
1014 schref = input_tbls[i]->get_schema_ref();
1015 if(Schema->get_schema_type(schref) == OPERATOR_VIEW_SCHEMA){
1016 ov_ix = input_tbls[i]->get_opview_idx();
1017 opview_entry *opv = opviews.get_entry(ov_ix);
1018 string op_param = "SUBQ:";
1020 for(q=0;q<opv->subq_names.size();++q){
1021 if(q>0) op_param+=",";
1022 op_param+=opv->subq_names[q];
1025 param_sz = op_param.size()-1;
1027 sprintf(tmpstr,"\t\tpb.block_length = %d;\n",param_sz); ret+=tmpstr;
1029 " pb.data = malloc(pb.block_length);\n";
1030 ret+="\t\tmemcpy(pb.data,\""+op_param+"\",pb.block_length);\n"
1031 " lst.push_back(pb);\n\n";
1034 " pb.block_length = sz;\n"
1035 " pb.data = malloc(pb.block_length);\n"
1036 " memcpy(pb.data, value, pb.block_length);\n"
1037 " lst.push_back(pb);\n\n";
1048 "struct FTA* alloc_hfta (struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command, gs_int32_t sz, void * value ) {\n"
1050 " // find the lftas\n"
1051 " list<lfta_info*> *lfta_list = new list<lfta_info*>;\n"
1054 " char schemabuf[MAXSCHEMASZ];\n"
1055 " gs_schemahandle_t schema_handle;\n"
1058 // Register the input data sources.
1059 // Register a source only once.
1061 // vector<string> ext_reg_txt;
1062 map<string, int> input_tbl_srcid;
1063 for(i=0;i<input_tbls.size();++i){
1064 string src_tblname = input_tbls[i]->get_schema_name();
1065 // Use UDOP alias when in distributed mode.
1066 // the cluster manager will make the translation
1067 // using infr from qtree.xml
1068 if(distributed_mode && input_tbls[i]->get_udop_alias() != "")
1069 src_tblname = input_tbls[i]->get_udop_alias();
1070 if(input_tbl_srcid.count(src_tblname) == 0){
1071 int srcid = input_tbl_srcid.size();
1072 input_tbl_srcid[src_tblname] = srcid;
1074 "\n // find "+src_tblname+"\n"
1075 " if (fta_find(\""+src_tblname+"\",1,&f,schemabuf,MAXSCHEMASZ)!=0) {\n"
1076 " fprintf(stderr,\"HFTA::error:could not find LFTA \\n\");\n"
1079 " //fprintf(stderr,\"HFTA::FTA found at %u[%u]\\n\",ftamsgid,ftaindex);\n"
1081 " // parse the schema and get the schema handle\n"
1082 " schema_handle = ftaschema_parse_string(schemabuf);\n"
1083 " lfta_info* inf"+int_to_string(srcid)+" = new lfta_info();\n"
1084 " inf"+int_to_string(srcid)+"->f = f;\n"
1085 " inf"+int_to_string(srcid)+"->fta_name = strdup(\""+src_tblname+"\");\n"
1086 " inf"+int_to_string(srcid)+"->schema = strdup(schemabuf);\n"
1087 " inf"+int_to_string(srcid)+"->schema_handle = schema_handle;\n"
1088 " lfta_list->push_back(inf"+int_to_string(srcid)+");\n\n";
1089 // ext_reg_txt.push_back(tmp_s);
1095 ret += "\tgs_schemahandle_t root_schema_handle = ftaschema_parse_string("+generate_schema_string_name(query_name)+");\n";
1096 for(i=0;i<query_plan.size();++i){
1097 if(i != qhead && query_plan[i]){
1098 ret += "\tgs_schemahandle_t op"+int_to_string(i)+"_schema_handle = ftaschema_parse_string(node"+int_to_string(i)+"_schema);\n";
1103 // Create the operators.
1105 for(q=0;q<query_plan.size();q++){
1109 " // create an instance of operator "+int_to_string(q)+" ("+query_plan[q]->get_node_name()+") \n";
1111 // Create parameters for operator construction.
1113 vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
1114 string tname = itbls[0]->get_schema_name();
1115 // string li_tname = tname +"-"+int_to_string(q)+"-0";
1116 // if(lfta_idx.count(li_tname)>0)
1117 if(input_tbl_srcid.count(tname)>0){
1118 // ret += ext_reg_txt[lfta_idx[li_tname]];
1119 // op_params += "inf"+int_to_string( lfta_idx[li_tname] )+"->schema_handle";
1120 op_params += "inf"+int_to_string( input_tbl_srcid[tname] )+"->schema_handle";
1121 }else if(op_idx.count(tname)>0){
1122 op_params += "op"+int_to_string( op_idx[tname] )+"_schema_handle";
1124 fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when creating operator (3) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str());
1128 string tname = itbls[1]->get_schema_name();
1129 // string li_tname = tname +"-"+int_to_string(q)+"-1";
1130 // if(lfta_idx.count(li_tname)>0)
1131 if(input_tbl_srcid.count(tname)>0){
1132 // ret += ext_reg_txt[lfta_idx[li_tname]];
1133 // op_params += ",inf"+int_to_string( lfta_idx[li_tname] )+"->schema_handle";
1134 op_params += ",inf"+int_to_string( input_tbl_srcid[tname] )+"->schema_handle";
1135 }else if(op_idx.count(tname)>0){
1136 op_params += ",op"+int_to_string( op_idx[tname] )+"_schema_handle";
1138 fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when creating operator (4) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str());
1142 ret += query_plan[q]->generate_operator(q,op_params);
1144 " operator_node* node"+int_to_string(q)+" = new operator_node(op"+int_to_string(q)+");\n";
1150 // Next for the output operators if any
1151 for(i=0;i<output_operators.size();++i){
1152 ret += output_operators[i]->generate_operator(n_basic_ops+i,"root_schema_handle");
1154 " operator_node* node"+int_to_string(n_basic_ops+i)+" = new operator_node(op"+int_to_string(n_basic_ops+i)+");\n";
1158 // Link up operators.
1159 for(q=0;q<query_plan.size();++q){
1161 // NOTE: this code assume that the operator has at most
1162 // two inputs. But the template code also makes
1163 // this assumption. Both will need to be changed.
1164 vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
1165 string tname = itbls[0]->get_schema_name();
1166 // string li_tname = tname +"-"+int_to_string(q)+"-0";
1167 // if(lfta_idx.count(li_tname)>0)
1168 if(input_tbl_srcid.count(tname)>0){
1169 // ret += "\tnode"+int_to_string(q)+"->set_left_lfta(inf"+int_to_string( lfta_idx[li_tname] )+");\n";
1170 ret += "\tnode"+int_to_string(q)+"->set_left_lfta(inf"+int_to_string( input_tbl_srcid[tname] )+");\n";
1171 }else if(op_idx.count(tname)>0){
1172 ret += "\tnode"+int_to_string(q)+"->set_left_child_node(node"+int_to_string( op_idx[tname] )+");\n";
1174 fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when linking operator (1) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str());
1178 string tname = itbls[1]->get_schema_name();
1179 // string li_tname = tname +"-"+int_to_string(q)+"-1";
1180 // if(lfta_idx.count(li_tname)>0)
1181 if(input_tbl_srcid.count(tname)>0){
1182 // ret += "\tnode"+int_to_string(q)+"->set_right_lfta(inf"+int_to_string( lfta_idx[li_tname] )+");\n";
1183 ret += "\tnode"+int_to_string(q)+"->set_right_lfta(inf"+int_to_string( input_tbl_srcid[tname] )+");\n";
1184 }else if(op_idx.count(tname)>0){
1185 ret += "\tnode"+int_to_string(q)+"->set_right_child_node(node"+int_to_string( op_idx[tname] )+");\n";
1187 fprintf(stderr,"INTERNAL ERROR, can't identify input table %s (%s) when linking operator (2) %d (%s)\n",tname.c_str(), tname.c_str(),q,query_plan[q]->get_node_name().c_str());
1193 for(i=0;i<output_operators.size();++i){
1195 ret += "\tnode"+int_to_string(n_basic_ops)+"->set_left_child_node(node"+int_to_string( qhead )+");\n";
1197 ret += "\tnode"+int_to_string(n_basic_ops+i)+"->set_left_child_node(node"+int_to_string( n_basic_ops+i-1 )+");\n";
1200 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
1202 bool fta_reusable = false;
1203 if (query_plan[qhead]->get_val_of_def("reusable") == "yes" ||
1204 query_plan[qhead]->get_param_tbl()->size() == 0) {
1208 int root_node = qhead;
1209 if(output_operators.size()>0)
1210 root_node = n_basic_ops+i-1;
1215 " MULTOP_HFTA* ftap = new MULTOP_HFTA(ftaid, OUTPUT_HFTA, command, sz, value, root_schema_handle, node"+int_to_string(root_node)+", lfta_list, " + (fta_reusable ?"true":"false") + ", reusable);\n"
1216 " if(ftap->init_failed()){ delete ftap; return 0;}\n"
1217 " return (FTA*)ftap;\n"
1223 string comm_bufsize = "16*1024*1024";
1224 if(defines.count("hfta_comm_buf")>0){
1225 comm_bufsize = defines["hfta_comm_buf"];
1230 "int main(int argc, char * argv[]) {\n"
1233 " /* parse the arguments */\n"
1235 " gs_int32_t tip1,tip2,tip3,tip4;\n"
1236 " endpoint gshub;\n"
1237 " gs_sp_t instance_name;\n"
1239 " gslog(LOG_EMERG,\"Wrong arguments at startup\");\n"
1243 " if ((sscanf(argv[1],\"%u.%u.%u.%u:%hu\",&tip1,&tip2,&tip3,&tip4,&(gshub.port)) != 5)) {\n"
1244 " gslog(LOG_EMERG,\"HUB IP NOT DEFINED\");\n"
1247 " gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);\n"
1248 " gshub.port=htons(gshub.port);\n"
1249 " instance_name=strdup(argv[2]);\n"
1250 " if (set_hub(gshub)!=0) {\n"
1251 " gslog(LOG_EMERG,\"Could not set hub\");\n"
1254 " if (set_instance_name(instance_name)!=0) {\n"
1255 " gslog(LOG_EMERG,\"Could not set instance name\");\n"
1260 " /* initialize host library */\n"
1262 //" fprintf(stderr,\"Initializing gscp\\n\");\n"
1263 " gsopenlog(argv[0]);\n"
1265 " if (hostlib_init(HFTA, "+comm_bufsize+", DEFAULTDEV, 0, 0)!=0) {\n"
1266 " fprintf(stderr,\"%s::error:could not initialize gscp\\n\",\n"
1273 " FTAID ret = fta_register(OUTPUT_HFTA, " + (fta_reusable?"1":"0") + ", DEFAULTDEV, alloc_hfta, "+generate_schema_string_name(query_name)+", -1, 0ull);\n"
1274 " fta_start_service(-1);\n"
1280 ////////////////////
1285 // Checks if the node i is compatible with interface partitioning
1286 // (can be pushed below the merge that combines partitioned stream)
1287 // i - index of the query node in a query plan
1288 bool stream_query::is_partn_compatible(int index, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result) {
1290 qp_node* node = query_plan[index];
1291 qp_node* child_node = NULL;
1293 if (node->predecessors.empty())
1296 // all the node predecessors must be partition merges with the same partition definition
1297 partn_def_t* partn_def = NULL;
1298 for (i = 0; i < node->predecessors.size(); ++i) {
1299 child_node = query_plan[node->predecessors[i]];
1300 if (child_node->node_type() != "mrg_qpn")
1303 // merge must have only one parent for this optimization to work
1304 // check that all its successors are the same
1305 for (int j = 1; j < child_node->successors.size(); ++j) {
1306 if (child_node->successors[j] != child_node->successors[0])
1310 partn_def_t* new_partn_def = ((mrg_qpn*)child_node)->get_partn_definition(lfta_names, interface_names, machine_names, ifaces_db, partn_parse_result);
1314 partn_def = new_partn_def;
1315 else if (new_partn_def != partn_def)
1320 if (node->node_type() == "spx_qpn") // spx nodes are always partition compatible
1322 else if (node->node_type() == "sgah_qpn") {
1323 gb_table gb_tbl = ((sgah_qpn*)node)->gb_tbl;
1324 return true; //partn_def->is_compatible(&gb_tbl);
1326 else if (node->node_type() == "rsgah_qpn") {
1327 gb_table gb_tbl = ((rsgah_qpn*)node)->gb_tbl;
1328 return partn_def->is_compatible(&gb_tbl);
1330 else if (node->node_type() == "sgahcwcb_qpn") {
1331 gb_table gb_tbl = ((sgahcwcb_qpn*)node)->gb_tbl;
1332 return partn_def->is_compatible(&gb_tbl);
1334 else if (node->node_type() == "join_eq_hash_qpn") {
1341 // Push the operator below the merge that combines
1342 void stream_query::pushdown_partn_operator(int index) {
1343 qp_node* node = query_plan[index];
1345 char node_index[128];
1347 // fprintf(stderr, "Partn pushdown, query %s of type %s\n", node->get_node_name().c_str(), node->node_type().c_str());
1350 // HACK ALERT: When reordering merges we screw up slack computation
1351 // since slack should no longer be used, it is not an issue
1354 // we can safely reorder nodes that have one and only one temporal atribute in select list
1355 table_def* table_layout = node->get_fields();
1356 vector<field_entry*> fields = table_layout->get_fields();
1357 int merge_fieldpos = -1;
1360 for (i = 0; i < fields.size(); ++i) {
1361 data_type dt(fields[i]->get_type(),fields[i]->get_modifier_list());
1362 if(dt.is_temporal()) {
1363 if (merge_fieldpos != -1) // more that one temporal field found
1369 if (merge_fieldpos == -1) // if no temporal fieldf found
1372 std::vector<colref_t *> mvars; // the merge-by columns.
1374 // reodring procedure is different for unary operators and joins
1375 if (node->node_type() == "join_eq_hash_qpn") {
1376 vector<qp_node*> new_nodes;
1378 tablevar_t *left_table_name;
1379 tablevar_t *right_table_name;
1380 mrg_qpn* left_mrg = (mrg_qpn*)query_plan[node->predecessors[0]];
1381 mrg_qpn* right_mrg = (mrg_qpn*)query_plan[node->predecessors[1]];
1383 // for now we will only consider plans where both child merges
1384 // merge the same set of streams
1386 if (left_mrg->fm.size() != right_mrg->fm.size())
1389 // maping of interface names to table definitions
1390 map<string, tablevar_t*> iface_map;
1391 for (i = 0; i < left_mrg->fm.size(); i++) {
1392 left_table_name = left_mrg->fm[i];
1393 iface_map[left_table_name->get_machine() + left_table_name->get_interface()] = left_table_name;
1396 for (i = 0; i < right_mrg->fm.size(); i++) {
1397 right_table_name = right_mrg->fm[i];
1399 // find corresponding left tabke
1400 if (!iface_map.count(right_table_name->get_machine() + right_table_name->get_interface()))
1403 left_table_name = iface_map[right_table_name->get_machine() + right_table_name->get_interface()];
1405 // create new join nodes
1406 sprintf(node_index, "_%d", i);
1407 join_eq_hash_qpn* new_node = (join_eq_hash_qpn*)node->make_copy(node_index);
1409 // make a copy of right_table_name
1410 right_table_name = right_table_name->duplicate();
1411 left_table_name->set_range_var(((join_eq_hash_qpn*)node)->from[0]->get_var_name());
1412 right_table_name->set_range_var(((join_eq_hash_qpn*)node)->from[1]->get_var_name());
1413 new_node->from[0] = left_table_name;
1414 new_node->from[1] = right_table_name;
1415 new_nodes.push_back(new_node);
1418 // make right_mrg a new root
1419 right_mrg->set_node_name(node->get_node_name());
1420 right_mrg->table_layout = table_layout;
1421 right_mrg->merge_fieldpos = merge_fieldpos;
1423 for (i = 0; i < right_mrg->fm.size(); i++) {
1424 // make newly create joins children of merge
1425 sprintf(node_index, "_%d", i);
1426 right_mrg->fm[i] = new tablevar_t(right_mrg->fm[i]->get_machine().c_str(), right_mrg->fm[i]->get_interface().c_str(), (node->get_node_name() + string(node_index)).c_str());
1427 sprintf(node_index, "_m%d", i);
1428 right_mrg->fm[i]->set_range_var(node_index);
1429 right_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos));
1433 if (left_mrg != right_mrg)
1434 query_plan[node->predecessors[0]] = NULL; // remove left merge from the plan
1436 query_plan.insert(query_plan.end(), new_nodes.begin(), new_nodes.end());
1438 } else { // unary operator
1439 // get the child merge node
1440 mrg_qpn* child_mrg = (mrg_qpn*)query_plan[node->predecessors[0]];
1442 child_mrg->set_node_name(node->get_node_name());
1443 child_mrg->table_layout = table_layout;
1444 child_mrg->merge_fieldpos = merge_fieldpos;
1446 // create new nodes for every source stream
1447 for (i = 0; i < child_mrg->fm.size(); i++) {
1448 tablevar_t *table_name = child_mrg->fm[i];
1449 sprintf(node_index, "_%d", i);
1450 qp_node* new_node = node->make_copy(node_index);
1452 if (node->node_type() == "spx_qpn")
1453 ((spx_qpn*)new_node)->table_name = table_name;
1454 else if (node->node_type() == "sgah_qpn")
1455 ((sgah_qpn*)new_node)->table_name = table_name;
1456 else if (node->node_type() == "rsgah_qpn")
1457 ((rsgah_qpn*)new_node)->table_name = table_name;
1458 else if (node->node_type() == "sgahcwcb_qpn")
1459 ((sgahcwcb_qpn*)new_node)->table_name = table_name;
1460 table_name->set_range_var("_t0");
1462 child_mrg->fm[i] = new tablevar_t(table_name->get_machine().c_str(), table_name->get_interface().c_str(), new_node->get_node_name().c_str());
1463 sprintf(node_index, "_m%d", i);
1464 child_mrg->fm[i]->set_range_var(node_index);
1465 child_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos));
1467 // add new node to query plan
1468 query_plan.push_back(new_node);
1471 query_plan[index] = NULL;
1476 // Checks if the node i can be pushed below the merge
1477 bool stream_query::is_pushdown_compatible(int index, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names) {
1480 qp_node* node = query_plan[index];
1481 qp_node* child_node = NULL;
1483 if (node->predecessors.size() != 1)
1486 // node predecessor must be merge that combine streams from multiple hosts
1487 child_node = query_plan[node->predecessors[0]];
1488 if (child_node->node_type() != "mrg_qpn")
1491 if (!((mrg_qpn*)child_node)->is_multihost_merge())
1494 // merge must have only one parent for this optimization to work
1495 // check that all its successors are the same
1496 for (int j = 1; j < child_node->successors.size(); ++j) {
1497 if (child_node->successors[j] != child_node->successors[0])
1501 // selections can always be pushed down, aggregations can always be split into selection/aggr or aggr/aggr pair
1503 if (node->node_type() == "spx_qpn")
1505 else if (node->node_type() == "sgah_qpn")
1512 // Push the operator below the merge
1513 void stream_query::pushdown_operator(int index, ext_fcn_list *Ext_fcns, table_list *Schema) {
1514 qp_node* node = query_plan[index];
1516 char node_suffix[128];
1518 // we can only safely push down queries that have one and only one temporal atribute in select list
1519 table_def* table_layout = node->get_fields();
1520 vector<field_entry*> fields = table_layout->get_fields();
1521 int merge_fieldpos = -1;
1524 for (i = 0; i < fields.size(); ++i) {
1525 data_type dt(fields[i]->get_type(),fields[i]->get_modifier_list());
1526 if(dt.is_temporal()) {
1527 if (merge_fieldpos != -1) // more that one temporal field found
1533 if (merge_fieldpos == -1) // if no temporal field found
1536 std::vector<colref_t *> mvars; // the merge-by columns.
1538 fprintf(stderr, "Regular pushdown, query %s of type %s\n", node->get_node_name().c_str(), node->node_type().c_str());
1540 // get the child merge node
1541 mrg_qpn* child_mrg = (mrg_qpn*)query_plan[node->predecessors[0]];
1543 tablevar_t *table_name = NULL;
1546 if (node->node_type() == "spx_qpn") {
1547 // get the child merge node
1549 // create new nodes for every source stream
1550 for (i = 0; i < child_mrg->fm.size(); i++) {
1551 table_name = child_mrg->fm[i];
1552 sprintf(node_suffix, "_%d", i);
1553 qp_node* new_node = node->make_copy(node_suffix);
1555 ((spx_qpn*)new_node)->table_name = table_name;
1556 table_name->set_range_var("_t0");
1558 child_mrg->fm[i] = new tablevar_t(table_name->get_machine().c_str(), table_name->get_interface().c_str(), new_node->get_node_name().c_str());
1559 sprintf(node_suffix, "_m%d", i);
1560 child_mrg->fm[i]->set_range_var(node_suffix);
1561 child_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos));
1563 // add new node to query plan
1564 query_plan.push_back(new_node);
1566 child_mrg->table_layout = table_layout;
1567 child_mrg->merge_fieldpos = merge_fieldpos;
1569 } else { // aggregation node
1571 vector<qp_node*> new_nodes;
1573 // split aggregations into high and low-level part
1574 vector<qp_node *> split_nodes = ((sgah_qpn*)node)->split_node_for_hfta(Ext_fcns, Schema);
1575 if (split_nodes.size() != 2)
1578 sgah_qpn* super_aggr = (sgah_qpn*)split_nodes[1];
1579 super_aggr->table_name = ((sgah_qpn*)node)->table_name;
1581 // group all the sources by host
1582 map<string, vector<int> > host_map;
1583 for (i = 0; i < child_mrg->fm.size(); i++) {
1584 tablevar_t *table_name = child_mrg->fm[i];
1585 if (host_map.count(table_name->get_machine()))
1586 host_map[table_name->get_machine()].push_back(i);
1589 tables.push_back(i);
1590 host_map[table_name->get_machine()] = tables;
1594 // create a new merge and low-level aggregation for each host
1595 map<string, vector<int> >::iterator iter;
1596 for (iter = host_map.begin(); iter != host_map.end(); iter++) {
1597 string host_name = (*iter).first;
1598 vector<int> tables = (*iter).second;
1600 sprintf(node_suffix, "_%s", host_name.c_str());
1601 string suffix(node_suffix);
1603 mrg_qpn *new_mrg = (mrg_qpn *)child_mrg->make_copy(suffix);
1604 for (i = 0; i < tables.size(); ++i) {
1605 sprintf(node_suffix, "_m%d", i);
1606 new_mrg->fm.push_back(child_mrg->fm[tables[i]]);
1607 new_mrg->mvars.push_back(child_mrg->mvars[i]);
1608 new_mrg->fm[i]->set_range_var(node_suffix);
1610 qp_node* new_node = split_nodes[0]->make_copy(suffix);
1612 if (new_node->node_type() == "spx_qpn") {
1613 ((spx_qpn*)new_node)->table_name = new tablevar_t(host_name.c_str(), "IFACE", new_mrg->get_node_name().c_str());
1614 ((spx_qpn*)new_node)->table_name->set_range_var("_t0");
1616 ((sgah_qpn*)new_node)->table_name = new tablevar_t(host_name.c_str(), "IFACE", new_mrg->get_node_name().c_str());
1617 ((sgah_qpn*)new_node)->table_name->set_range_var("_t0");
1619 query_plan.push_back(new_mrg);
1620 new_nodes.push_back(new_node);
1623 child_mrg->merge_fieldpos = merge_fieldpos;
1624 if (split_nodes[0]->node_type() == "spx_qpn")
1625 child_mrg->table_layout = create_attributes(child_mrg->get_node_name(), ((spx_qpn*)split_nodes[0])->select_list);
1627 child_mrg->table_layout = create_attributes(child_mrg->get_node_name(), ((sgah_qpn*)split_nodes[0])->select_list);
1630 // connect newly created nodes with parent multihost merge
1631 for (i = 0; i < new_nodes.size(); ++i) {
1632 if (new_nodes[i]->node_type() == "spx_qpn")
1633 child_mrg->fm[i] = new tablevar_t(((spx_qpn*)new_nodes[i])->table_name->get_machine().c_str(), "IFACE", new_nodes[i]->get_node_name().c_str());
1635 child_mrg->fm[i] = new tablevar_t(((sgah_qpn*)new_nodes[i])->table_name->get_machine().c_str(), "IFACE", new_nodes[i]->get_node_name().c_str());
1638 child_mrg->mvars[i]->set_field(child_mrg->table_layout->get_field_name(merge_fieldpos));
1640 sprintf(node_suffix, "_m%d", i);
1641 child_mrg->fm[i]->set_range_var(node_suffix);
1642 query_plan.push_back(new_nodes[i]);
1644 child_mrg->fm.resize(new_nodes.size());
1645 child_mrg->mvars.resize(new_nodes.size());
1647 // push the new high-level aggregation
1648 query_plan.push_back(super_aggr);
1651 query_plan[index] = NULL;
1656 // Extract subtree rooted at node i into separate hfta
1657 stream_query* stream_query::extract_subtree(int index) {
1660 stream_query* new_query = new stream_query(query_plan[index], this);
1662 nodes.push_back(index);
1663 for (int i = 0; i < nodes.size(); ++i) {
1664 qp_node* node = query_plan[nodes[i]];
1668 // add all children to nodes list
1669 for (int j = 0; j < node->predecessors.size(); ++j)
1670 nodes.push_back(node->predecessors[j]);
1672 new_query->query_plan.push_back(node);
1674 query_plan[nodes[i]] = NULL;
1680 // Splits query that combines data from multiple hosts into separate hftas.
1681 vector<stream_query*> stream_query::split_multihost_query() {
1683 vector<stream_query*> ret;
1684 char node_suffix[128];
1687 // find merges combining multiple hosts into per-host groups
1688 int plan_size = query_plan.size();
1689 vector<mrg_qpn*> new_nodes;
1691 for (i = 0; i < plan_size; ++i) {
1692 qp_node* node = query_plan[i];
1693 if (node && node->node_type() == "mrg_qpn") {
1694 mrg_qpn* mrg = (mrg_qpn*)node;
1695 if (mrg->is_multihost_merge()) {
1697 // group all the sources by host
1698 map<string, vector<int> > host_map;
1699 for (int j = 0; j < mrg->fm.size(); j++) {
1700 tablevar_t *table_name = mrg->fm[j];
1701 if (host_map.count(table_name->get_machine()))
1702 host_map[table_name->get_machine()].push_back(j);
1705 tables.push_back(j);
1706 host_map[table_name->get_machine()] = tables;
1710 // create a new merge for each host
1711 map<string, vector<int> >::iterator iter;
1712 for (iter = host_map.begin(); iter != host_map.end(); iter++) {
1713 string host_name = (*iter).first;
1714 vector<int> tables = (*iter).second;
1716 if (tables.size() == 1)
1719 sprintf(node_suffix, "_%s", host_name.c_str());
1720 string suffix(node_suffix);
1722 mrg_qpn *new_mrg = (mrg_qpn *)mrg->make_copy(suffix);
1723 for (int j = 0; j < tables.size(); ++j) {
1724 new_mrg->fm.push_back(mrg->fm[tables[j]]);
1725 new_mrg->mvars.push_back(mrg->mvars[j]);
1726 sprintf(node_suffix, "m_%d", j);
1727 new_mrg->fm[j]->set_range_var(node_suffix);
1729 new_nodes.push_back(new_mrg);
1732 if (!new_nodes.empty()) {
1733 // connect newly created merge nodes with parent multihost merge
1734 for (int j = 0; j < new_nodes.size(); ++j) {
1735 mrg->fm[j] = new tablevar_t(new_nodes[j]->fm[0]->get_machine().c_str(), "IFACE", new_nodes[j]->get_node_name().c_str());
1736 query_plan.push_back(new_nodes[j]);
1738 mrg->fm.resize(new_nodes.size());
1739 mrg->mvars.resize(new_nodes.size());
1744 // now externalize the sources
1745 for (int j = 0; j < node->predecessors.size(); ++j) {
1746 // Extract subtree rooted at node i into separate hfta
1747 stream_query* q = extract_subtree(node->predecessors[j]);
1749 q->generate_linkage();
1763 // Perform local FTA optimizations
1764 void stream_query::optimize(vector<stream_query *>& hfta_list, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ext_fcn_list *Ext_fcns, table_list *Schema, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result){
1766 // Topologically sort the nodes in query plan (leaf-first)
1768 vector<int> sorted_nodes;
1770 int num_nodes = query_plan.size();
1771 bool* leaf_flags = new bool[num_nodes];
1772 memset(leaf_flags, 0, num_nodes * sizeof(bool));
1774 // run topological sort
1777 // add all leafs to sorted_nodes
1780 for (i = 0; i < num_nodes; ++i) {
1784 if (!leaf_flags[i] && query_plan[i]->predecessors.empty()) {
1785 leaf_flags[i] = true;
1786 sorted_nodes.push_back(i);
1789 // remove the node from its parents predecessor lists
1790 // since we only care about number of predecessors, it is sufficient just to remove
1791 // one element from the parent's predecessors list
1792 for (int j = query_plan[i]->successors.size() - 1; j >= 0; --j)
1793 query_plan[query_plan[i]->successors[j]]->predecessors.pop_back();
1797 delete[] leaf_flags;
1798 num_nodes = sorted_nodes.size();
1799 generate_linkage(); // rebuild the recently destroyed predecessor lists.
1801 // collect the information about interfaces nodes read from
1802 for (i = 0; i < num_nodes; ++i) {
1803 qp_node* node = query_plan[sorted_nodes[i]];
1804 vector<tablevar_t *> input_tables = node->get_input_tbls();
1805 for (j = 0; j < input_tables.size(); ++j) {
1806 tablevar_t * table = input_tables[j];
1807 if (lfta_names.count(table->schema_name)) {
1808 int index = lfta_names[table->schema_name];
1809 table->set_machine(machine_names[index]);
1810 table->set_interface(interface_names[index]);
1817 // push eligible operators down in the query plan
1818 for (i = 0; i < num_nodes; ++i) {
1819 if (partn_parse_result && is_partn_compatible(sorted_nodes[i], lfta_names, interface_names, machine_names, ifaces_db, partn_parse_result)) {
1820 pushdown_partn_operator(sorted_nodes[i]);
1821 } else if (is_pushdown_compatible(sorted_nodes[i], lfta_names, interface_names, machine_names)) {
1822 pushdown_operator(sorted_nodes[i], Ext_fcns, Schema);
1826 // split the query into multiple hftas if it combines the data from multiple hosts
1827 vector<stream_query*> hftas = split_multihost_query();
1828 hfta_list.insert(hfta_list.end(), hftas.begin(), hftas.end());
1831 num_nodes = query_plan.size();
1832 // also split multi-way merges into two-way merges
1833 for (i = 0; i < num_nodes; ++i) {
1834 qp_node* node = query_plan[i];
1835 if (node && node->node_type() == "mrg_qpn") {
1836 vector<mrg_qpn *> split_merge = ((mrg_qpn *)node)->split_sources();
1838 query_plan.insert(query_plan.end(), split_merge.begin(), split_merge.end());
1839 // delete query_plan[sorted_nodes[i]];
1840 query_plan[i] = NULL;
1849 table_def *stream_query::get_output_tabledef(){
1850 return( query_plan[qhead]->get_fields() );
1853 vector<string> stream_query::get_tbl_keys(vector<string> &partial_keys){
1854 return query_plan[qhead]->get_tbl_keys(partial_keys);
1859 //////////////////////////////////////////////////////////
1860 //// De-siloing. TO BE REMOVED
1862 void stream_query::desilo_lftas(map<string, int> &lfta_names,vector<string> &silo_names,table_list *Schema){
1865 int suffix_len = silo_names.back().size();
1867 for(i=0;i<qtail.size();++i){
1868 vector<tablevar_t *> itbls = query_plan[qtail[i]]->get_input_tbls();
1869 for(t=0;t<itbls.size();++t){
1870 string itbl_name = itbls[t]->get_schema_name();
1871 if(lfta_names.count(itbl_name)>0){
1872 //printf("Query %s input %d references lfta input %s\n",query_plan[qtail[i]]->get_node_name().c_str(),t,itbl_name.c_str());
1873 vector<string> src_names;
1874 string lfta_base = itbl_name.substr(0,itbl_name.size()-suffix_len);
1875 for(s=0;s<silo_names.size();++s){
1876 string lfta_subsilo = lfta_base + silo_names[s];
1877 //printf("\t%s\n",lfta_subsilo.c_str());
1878 src_names.push_back(lfta_subsilo);
1880 string merge_node_name = "desilo_"+query_plan[qtail[i]]->get_node_name()+
1881 "_input_"+int_to_string(t);
1882 mrg_qpn *merge_node = new mrg_qpn(merge_node_name,src_names,Schema);
1883 int m_op_pos = Schema->add_table(merge_node->table_layout);
1884 itbls[t]->set_schema(merge_node_name);
1885 itbls[t]->set_schema_ref(m_op_pos);
1886 query_plan.push_back(merge_node);
1893 ////////////////////////////////////////
1898 // Given a collection of LFTA stream queries,
1899 // extract their WHERE predicates
1900 // and pass them to an analysis routine which will extract
1903 void get_common_lfta_filter(std::vector<stream_query *> lfta_list,table_list *Schema,ext_fcn_list *Ext_fcns, vector<cnf_set *> &prefilter_preds, set<unsigned int> &pred_ids){
1905 std::vector< std::vector<cnf_elem *> > where_list;
1907 // still safe to assume that LFTA queries have a single
1908 // query node, which is at position 0.
1909 for(s=0;s<lfta_list.size();++s){
1910 vector<cnf_elem *> cnf_list = lfta_list[s]->query_plan[0]->get_filter_clause();
1911 if(lfta_list[s]->query_plan[0]->node_type() == "sgah_qpn"){
1912 gb_table *gtbl = ((sgah_qpn *)(lfta_list[s]->query_plan[0]))->get_gb_tbl();
1914 for(c=0;c<cnf_list.size();++c){
1915 insert_gb_def_pr(cnf_list[c]->pr,gtbl);
1918 where_list.push_back(lfta_list[s]->query_plan[0]->get_filter_clause());
1921 find_common_filter(where_list,Schema,Ext_fcns,prefilter_preds, pred_ids);
1928 // Given a collection of LFTA stream queries,
1929 // extract the union of all temporal attributes referenced in select clauses
1930 // those attributes will need to be unpacked in prefilter
1932 void get_prefilter_temporal_cids(std::vector<stream_query *> lfta_list, col_id_set &temp_cids){
1934 vector<scalarexp_t *> sl_list;
1935 gb_table *gb_tbl = NULL;
1938 // still safe to assume that LFTA queries have a single
1939 // query node, which is at position 0.
1940 for(s=0;s<lfta_list.size();++s){
1942 if(lfta_list[s]->query_plan[0]->node_type() == "spx_qpn"){
1943 spx_qpn *spx_node = (spx_qpn *)lfta_list[s]->query_plan[0];
1944 sl_list = spx_node->get_select_se_list();
1946 if(lfta_list[s]->query_plan[0]->node_type() == "sgah_qpn"){
1947 sgah_qpn *sgah_node = (sgah_qpn *)lfta_list[s]->query_plan[0];
1948 sl_list = sgah_node->get_select_se_list();
1949 gb_tbl = sgah_node->get_gb_tbl();
1952 for(sl=0;sl<sl_list.size();sl++){
1953 data_type *sdt = sl_list[sl]->get_data_type();
1954 if (sdt->is_temporal()) {
1955 gather_se_col_ids(sl_list[sl],temp_cids, gb_tbl);