1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
15 #include"stream_query.h"
16 #include"generate_utils.h"
17 #include"analyze_fta.h"
22 static char tmpstr[500];
27 // Create a query plan from a query node and an existing
28 // query plan. Use for lfta queries, the parent query plan provides
30 stream_query::stream_query(qp_node *qnode, stream_query *parent){
31 query_plan.push_back(qnode);
34 attributes = qnode->get_fields();
35 parameters = qnode->get_param_tbl();
36 defines = parent->defines;
37 query_name = qnode->get_node_name();
40 // Copy the query plan.
41 stream_query::stream_query(stream_query &src){
42 query_plan = src.query_plan;
45 attributes = src.attributes;
46 parameters = src.parameters;
47 defines = src.defines;
48 query_name = src.query_name;
53 // Create a query plan from an analyzed parse tree.
54 // Perform analyses to find the output node, input nodes, etc.
56 stream_query::stream_query(query_summary_class *qs,table_list *Schema){
57 // Generate the query plan nodes from the analyzed parse tree.
58 // There is only one for now, so just assign the return value
59 // of create_query_nodes to query_plan
61 query_plan = create_query_nodes(qs,Schema);
63 if(query_plan.size() == 0){
64 fprintf(stderr,"INTERNAL ERROR, zero-size query plan in stream_query::stream_query\n");
67 for(i=0;i<query_plan.size();++i){
69 if(query_plan[i]->get_error_code() != 0){
70 error_code = query_plan[i]->get_error_code();
71 err_str += query_plan[i]->get_error_str();
75 qhead = query_plan.size()-1;
81 stream_query * stream_query::add_query(query_summary_class *qs,table_list *Schema){
82 // Add another query block to the query plan
84 vector<qp_node *> new_nodes = create_query_nodes(qs, Schema);
85 query_plan.insert(query_plan.end(),new_nodes.begin(), new_nodes.end());
89 stream_query * stream_query::add_query(stream_query &src){
90 // Add another query block to the query plan
92 query_plan.insert(query_plan.end(),src.query_plan.begin(), src.query_plan.end());
97 void stream_query::generate_protocol_se(map<string,stream_query *> &sq_map, table_list *Schema){
100 // Mapping fields to protocol fields requires schema binding.
101 // First ensure all nodes are in the schema.
102 for(n=0;n<query_plan.size();++n){
103 if(query_plan[n] != NULL){
104 Schema->add_table(query_plan[n]->get_fields());
107 // Now do schema binding
108 for(n=0;n<query_plan.size();++n){
109 if(query_plan[n] != NULL){
110 query_plan[n]->bind_to_schema(Schema);
114 // create name-to-index map
115 map<string, int> name_to_node;
116 for(n=0;n<query_plan.size();++n){
118 name_to_node[query_plan[n]->get_node_name()] = n;
122 // Create a list of the nodes to process, in order.
123 // Search from the root down.
129 search_q.push_back(qhead);
130 while(! search_q.empty()){
131 int the_q = search_q.front();
132 search_q.pop_front(); work_list.push_front(the_q);
133 vector<int> the_pred = query_plan[the_q]->get_predecessors();
134 for(i=0;i<the_pred.size();i++)
135 search_q.push_back(the_pred[i]);
138 // Scan through the work list, from the front,
139 // gather the qp_node's ref'd, and call the
140 // protocol se generator. Pass NULL for
141 // sources not found - presumably user-defined operator
143 while(! work_list.empty()){
144 int the_q = work_list.front();
145 work_list.pop_front();
146 vector<qp_node *> q_sources;
147 vector<tablevar_t *> q_input_tbls = query_plan[the_q]->get_input_tbls();
148 for(i=0;i<q_input_tbls.size();++i){
149 string itbl_nm = q_input_tbls[i]->get_schema_name();
150 if(name_to_node.count(itbl_nm)>0){
151 q_sources.push_back(query_plan[name_to_node[itbl_nm]]);
152 }else if(sq_map.count(itbl_nm)>0){
153 q_sources.push_back(sq_map[itbl_nm]->get_query_head());
155 q_sources.push_back(NULL);
158 query_plan[the_q]->create_protocol_se(q_sources, Schema);
161 //////////////////////////////////////////////////////////
166 for(i=0;i<query_plan.size();++i){
168 printf("query node %s, type=%s:\n",query_plan[i]->get_node_name().c_str(),
169 query_plan[i]->node_type().c_str());
170 map<std::string, scalarexp_t *> *pse_map = query_plan[i]->get_protocol_se();
171 map<std::string, scalarexp_t *>::iterator mssi;
172 for(mssi=pse_map->begin();mssi!=pse_map->end();++mssi){
174 printf("\t%s : %s\n",(*mssi).first.c_str(), (*mssi).second->to_string().c_str());
176 printf("\t%s : NULL\n",(*mssi).first.c_str());
178 if(query_plan[i]->node_type() == "filter_join" || query_plan[i]->node_type() == "join_eq_hash_qpn"){
179 vector<scalarexp_t *> pse_l;
180 vector<scalarexp_t *> pse_r;
181 if(query_plan[i]->node_type() == "filter_join"){
182 pse_l = ((filter_join_qpn *)query_plan[i])->get_hash_l();
183 pse_r = ((filter_join_qpn *)query_plan[i])->get_hash_r();
185 if(query_plan[i]->node_type() == "join_eq_hash_qpn"){
186 pse_l = ((join_eq_hash_qpn *)query_plan[i])->get_hash_l();
187 pse_r = ((join_eq_hash_qpn *)query_plan[i])->get_hash_r();
190 for(p=0;p<pse_l.size();++p){
192 printf("\t\t%s = ",pse_l[p]->to_string().c_str());
194 printf("\t\tNULL = ");
196 printf("%s\n",pse_r[p]->to_string().c_str());
201 if(query_plan[i]->node_type() == "sgah_qpn" || query_plan[i]->node_type() == "rsgah_qpn" || query_plan[i]->node_type() == "sgahcwcb_qpn"){
202 vector<scalarexp_t *> pseg;
203 if(query_plan[i]->node_type() == "sgah_qpn")
204 pseg = ((sgah_qpn *)query_plan[i])->get_gb_sources();
205 if(query_plan[i]->node_type() == "rsgah_qpn")
206 pseg = ((rsgah_qpn *)query_plan[i])->get_gb_sources();
207 if(query_plan[i]->node_type() == "sgahcwcb_qpn")
208 pseg = ((sgahcwcb_qpn *)query_plan[i])->get_gb_sources();
210 for(g=0;g<pseg.size();g++){
212 printf("\t\tgb %d = %s\n",g,pseg[g]->to_string().c_str());
214 printf("\t\tgb %d = NULL\n",g);
223 bool stream_query::generate_linkage(){
224 bool create_failed = false;
227 // Clear any leftover linkages
228 for(n=0;n<query_plan.size();++n){
230 query_plan[n]->clear_predecessors();
231 query_plan[n]->clear_successors();
236 // create name-to-index map
237 map<string, int> name_to_node;
238 for(n=0;n<query_plan.size();++n){
240 name_to_node[query_plan[n]->get_node_name()] = n;
244 // Do the 2-way linkage.
245 for(n=0;n<query_plan.size();++n){
247 vector<tablevar_t *> fm = query_plan[n]->get_input_tbls();
248 for(f=0;f<fm.size();++f){
249 string src_tbl = fm[f]->get_schema_name();
250 if(name_to_node.count(src_tbl)>0){
251 int s = name_to_node[src_tbl];
252 query_plan[n]->add_predecessor(s);
253 query_plan[s]->add_successor(n);
259 // Find the head (no successors) and the tails (at least one
260 // predecessor is external).
261 // Verify that there is only one head,
262 // and that all other nodes have one successor (because
263 // right now I can only handle trees).
265 qhead = -1; // no head yet found.
266 for(n=0;n<query_plan.size();++n){
268 vector<int> succ = query_plan[n]->get_successors();
271 fprintf(stderr,"ERROR, query node %s supplies data to multiple nodes, but currently only tree query plans are supported:\n",query_plan[n]->get_node_name().c_str());
272 for(s=0;s<succ.size();++s){
273 fprintf(stderr,"%s ",query_plan[succ[s]]->get_node_name().c_str());
275 fprintf(stderr,"\n");
276 create_failed = true;
279 if(succ.size() == 0){
281 fprintf(stderr,"ERROR, query nodes %s and %s both have zero successors.\n",query_plan[n]->get_node_name().c_str(), query_plan[qhead]->get_node_name().c_str());
282 create_failed = true;
287 // does the query node read a source, or is it a source?
288 if(query_plan[n]->n_predecessors() < query_plan[n]->get_input_tbls().size() || query_plan[n]->get_input_tbls().size() == 0){
294 return create_failed;
297 // After the collection of query plan nodes is generated,
298 // analyze their structure to link them up into a tree (or dag?).
299 // Verify that the structure is acceptable.
300 // Do some other analysis and verification tasks (?)
301 // then gather summar information.
302 int stream_query::generate_plan(table_list *Schema){
304 // The first thing to do is verify that the query plan
305 // nodes were successfully created.
306 bool create_failed = false;
308 for(n=0;n<query_plan.size();++n){
309 if(query_plan[n]!=NULL && query_plan[n]->get_error_code()){
310 fprintf(stderr,"%s",query_plan[n]->get_error_str().c_str());
311 create_failed = true;
315 for(n=0;n<query_plan.size();++n){
316 if(query_plan[n] != NULL){
317 string nstr = query_plan[n]->get_node_name();
318 printf("In generate_plan, node %d is %s, reads from:\n\t",n,nstr.c_str());
319 vector<tablevar_t *> inv = query_plan[n]->get_input_tbls();
321 for(nn=0;nn<inv.size();nn++){
322 printf("%s (%d) ",inv[nn]->to_string().c_str(),inv[nn]->get_schema_ref());
329 if(create_failed) return(1);
331 // Here, link up the query nodes, then verify that the
332 // structure is acceptable (single root, no cycles, no stranded
334 create_failed = generate_linkage();
335 if(create_failed) return -1;
338 // Here, do optimizations such as predicate pushing,
339 // join rearranging, etc.
340 // Nothing to do yet.
343 for(n=0;n<query_plan.size();++n){
344 if(query_plan[n] != NULL){
345 string nstr = query_plan[n]->get_node_name();
346 printf("B generate_plan, node %d is %s, reads from:\n\t",n,nstr.c_str());
347 vector<tablevar_t *> inv = query_plan[n]->get_input_tbls();
349 for(nn=0;nn<inv.size();nn++){
350 printf("%s (%d) ",inv[nn]->to_string().c_str(),inv[nn]->get_schema_ref());
355 printf("qhead=%d, qtail = ",qhead);
357 for(nn=0;nn<qtail.size();++nn)
358 printf("%d ",qtail[nn]);
362 // Collect query summaries. The query is reperesented by its head node.
363 query_name = query_plan[qhead]->get_node_name();
364 attributes = query_plan[qhead]->get_fields();
365 // TODO: The params and defines require a lot more thought.
366 parameters = query_plan[qhead]->get_param_tbl();
367 defines = query_plan[qhead]->get_definitions();
372 void stream_query::add_output_operator(ospec_str *o){
373 output_specs.push_back(o);
377 void stream_query::get_external_libs(set<string> &libset){
380 for(qn=0;qn<query_plan.size();++qn){
381 if(query_plan[qn] != NULL){
382 vector<string> op_libs = query_plan[qn]->external_libs();
383 for(i=0;i<op_libs.size();++i){
384 libset.insert(op_libs[i]);
389 for(qn=0;qn<output_operators.size();++qn){
390 if(output_operators[qn] != NULL){
391 vector<string> op_libs = output_operators[qn]->external_libs();
392 for(i=0;i<op_libs.size();++i){
393 libset.insert(op_libs[i]);
401 // Split into LFTA, HFTA components.
402 // Split this query into LFTA and HFTA queries.
403 // Four possible outcomes:
404 // 1) the query reads from a protocol, but does not need to
405 // split (can be evaluated as an LFTA).
406 // The lfta query is the only element in the return vector,
407 // and hfta_returned is false.
408 // 2) the query reads from no protocol, and therefore cannot be split.
409 // THe hfta query is the only element in the return vector,
410 // and hfta_returned is true.
411 // 3) reads from at least one protocol, but cannot be split : failure.
412 // return vector is empty, the error conditions are written
413 // in err_str and error_code
414 // 4) The query splits into an hfta query and one or more LFTA queries.
415 // the return vector has two or more elements, and hfta_returned
416 // is true. The last element is the HFTA.
418 vector<stream_query *> stream_query::split_query(ext_fcn_list *Ext_fcns, table_list *Schema, bool &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
419 vector<stream_query *> queries;
423 hfta_returned = false; // assume until proven otherwise
425 for(l=0;l<qtail.size();++l){
429 vector<qp_node *> qnodes = query_plan[leaf]->split_node_for_fta(Ext_fcns, Schema, qp_hfta, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
432 if(qnodes.size() == 0 || query_plan[leaf]->get_error_code()){ // error
434 error_code = query_plan[leaf]->get_error_code();
435 err_str = query_plan[leaf]->get_error_str();
436 vector<stream_query *> null_result;
439 if(qnodes.size() == 1 && qp_hfta){ // nothing happened
440 //printf("no change\n");
441 query_plan[leaf] = qnodes[0];
443 if(qnodes.size() == 1 && !qp_hfta){ // push to lfta
444 //printf("lfta only\n");
445 queries.push_back(new stream_query(qnodes[0], this));
446 vector<int> succ = query_plan[leaf]->get_successors();
447 for(s=0;s<succ.size();++s){
448 query_plan[succ[s]]->remove_predecessor(leaf);
450 query_plan[leaf] = NULL; // delete it?
452 if(qnodes.size() > 1){ // actual splitting occurred.
453 if(!qp_hfta){ // internal consistency check.
455 err_str = "INTERNAL ERROR: mulitple nodes returned by split_node_for_fta, but none are hfta nodes.\n";
456 vector<stream_query *> null_result;
460 for(q=0;q<qnodes.size()-qp_hfta;++q){ // process lfta nodes
461 //printf("creating lfta %d (%s)\n",q,qnodes[q]->get_node_name().c_str());
462 queries.push_back(new stream_query(qnodes[q], this));
465 query_plan[leaf] = qnodes[qnodes.size()-1];
466 // Add in any extra hfta nodes
467 for(q=qnodes.size()-qp_hfta;q<qnodes.size()-1;++q)
468 query_plan.push_back(qnodes[q]);
474 for(q=0;q<query_plan.size();++q){
475 if(query_plan[q] != NULL){
476 n_ifprefs += query_plan[q]->count_ifp_refs(ifps);
477 hfta_returned = true;
482 set<string>::iterator ssi;
483 err_str += "ERROR, unresolved interface parameters in HFTA:\n";
484 for(ssi=ifps.begin();ssi!=ifps.end();++ssi){
485 err_str += (*ssi)+" ";
489 vector<stream_query *> null_result;
495 if(generate_linkage()){
496 fprintf(stderr,"INTERNAL ERROR, generate_linkage failed in split_query.\n");
499 queries.push_back(this);
507 vector<table_exp_t *> stream_query::extract_opview(table_list *Schema, vector<query_node *> &qnodes, opview_set &opviews, string silo_nm){
508 vector<table_exp_t *> subqueries;
511 string root_name = this->get_output_tabledef()->get_tbl_name();
514 for(l=0;l<qtail.size();++l){
516 vector<table_exp_t *> new_qnodes = query_plan[leaf]->extract_opview(Schema, qnodes, opviews, root_name, silo_nm);
518 for(q=0;q<new_qnodes.size();++q){ // process lfta nodes
519 subqueries.push_back( new_qnodes[q]);
529 string stream_query::make_schema(){
530 return make_schema(qhead);
533 string stream_query::make_schema(int q){
534 string ret="FTA{\n\n";
536 ret += query_plan[q]->get_fields()->to_string();
539 ret += "\tquery_name '"+query_plan[q]->get_node_name()+"';\n";
541 map<string, string> defs = query_plan[q]->get_definitions();
542 map<string, string>::iterator dfi;
543 for(dfi=defs.begin(); dfi!=defs.end(); ++dfi){
544 ret += "\t"+ (*dfi).first + " '" + (*dfi).second + "';\n";
549 param_table *params = query_plan[q]->get_param_tbl();
550 vector<string> param_names = params->get_param_names();
552 for(p=0;p<param_names.size();p++){
553 data_type *dt = params->get_data_type( param_names[p] );
554 ret += "\t" + param_names[p] + " '" + dt->get_type_str() + "';\n";
558 ret += query_plan[q]->to_query_string();
565 string stream_query::collect_refd_ifaces(){
568 for(q=0;q<query_plan.size();++q){
570 map<string, string> defs = query_plan[q]->get_definitions();
571 if(defs.count("_referenced_ifaces")){
572 if(ret != "") ret += ",";
573 ret += defs["_referenced_ifaces"];
582 bool stream_query::stream_input_only(table_list *Schema){
583 vector<tablevar_t *> input_tbls = this->get_input_tables();
585 for(i=0;i<input_tbls.size();++i){
586 int t = Schema->get_table_ref(input_tbls[i]->get_schema_name());
587 if(Schema->get_schema_type(t) == PROTOCOL_SCHEMA) return(false);
592 // Return input tables. No duplicate removal performed.
593 vector<tablevar_t *> stream_query::get_input_tables(){
594 vector<tablevar_t *> retval;
596 // create name-to-index map
598 map<string, int> name_to_node;
599 for(n=0;n<query_plan.size();++n){
601 name_to_node[query_plan[n]->get_node_name()] = n;
606 for(l=0;l<qtail.size();++l){
608 vector<tablevar_t *> tmp_v = query_plan[leaf]->get_input_tbls();
610 for(i=0;i<tmp_v.size();++i){
611 if(name_to_node.count(tmp_v[i]->get_schema_name()) == 0)
612 retval.push_back(tmp_v[i]);
619 void stream_query::compute_node_format(int q, vector<int> &nfmt, map<string, int> &op_idx){
620 int netcnt = 0, hostcnt = 0;
623 vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
624 for(i=0;i<itbls.size();++i){
625 string tname = itbls[i]->get_schema_name();
626 if(op_idx.count(tname)){
627 int o = op_idx[tname];
628 if(nfmt[o] == UNKNOWNFORMAT)
629 compute_node_format(o,nfmt,op_idx);
630 if(nfmt[o] == NETFORMAT) netcnt++;
636 if(query_plan[q]->makes_transform()){
637 nfmt[q] = HOSTFORMAT;
640 nfmt[q] = HOSTFORMAT;
645 //printf("query plan %d (%s) is ",q,query_plan[q]->get_node_name().c_str());
646 //if(nfmt[q] == HOSTFORMAT) printf(" host format.\n");
647 //else printf("net format\n");
651 string stream_query::generate_hfta(table_list *Schema, ext_fcn_list *Ext_fcns, opview_set &opviews, bool distributed_mode){
652 int schref, ov_ix, i, q, param_sz;
653 bool dag_graph = false;
656 // Bind the SEs in all query plan nodes to this schema, and
657 // Add all tables used by this query to the schema.
658 // Question: Will I be polluting the global schema by adding all
659 // query node schemas?
661 // First ensure all nodes are in the schema.
663 for(qn=0;qn<query_plan.size();++qn){
664 if(query_plan[qn] != NULL){
665 Schema->add_table(query_plan[qn]->get_fields());
669 for(qn=0;qn<query_plan.size();++qn){
670 if(query_plan[qn] != NULL){
671 query_plan[qn]->bind_to_schema(Schema);
676 set<string> qsources;
678 for(n=0;n<query_plan.size();++n){
680 vector<tablevar_t *> tmp_v = query_plan[n]->get_input_tbls();
682 for(i=0;i<tmp_v.size();++i){
683 if(qsources.count(tmp_v[i]->get_schema_name()) > 0)
685 qsources.insert(tmp_v[i]->get_schema_name());
692 // Collect set of tables ref'd in this HFTA
694 for(qn=0;qn<query_plan.size();++qn){
696 // get names of the tables
697 vector<tablevar_t *> input_tbls = query_plan[qn]->get_input_tbls();
698 vector<tablevar_t *> output_tbls = query_plan[qn]->get_output_tbls();
699 // Convert to tblrefs, add to set of ref'd tables
701 for(i=0;i<input_tbls.size();i++){
702 // int t = Schema->get_table_ref(input_tbls[i]->get_schema_name());
703 int t = input_tbls[i]->get_schema_ref();
705 fprintf(stderr,"INTERNAL ERROR in generate_hfta. "
706 "query plan node %s references input table %s, which is not in schema.\n",
707 query_name.c_str(), input_tbls[i]->get_schema_name().c_str());
713 for(i=0;i<output_tbls.size();i++){
714 int t = Schema->get_table_ref(output_tbls[i]->get_schema_name());
716 fprintf(stderr,"INTERNAL ERROR in generate_hfta."
717 "query plan node %s references output table %s, which is not in schema.\n",
718 query_name.c_str(), output_tbls[i]->get_schema_name().c_str());
726 // Collect map of lftas, query nodes
727 map<string, int> op_idx;
728 for(q=0;q<query_plan.size();q++){
730 op_idx[query_plan[q]->get_node_name()] = q;
734 // map of input tables must include query id and input
735 // source (0,1) becuase several queries might reference the same source
736 vector<tablevar_t *> input_tbls = this->get_input_tables();
737 vector<bool> input_tbl_free;
738 for(i=0;i<input_tbls.size();++i){
739 input_tbl_free.push_back(true);
741 map<string, int> lfta_idx;
742 //fprintf(stderr,"%d input tables, %d query nodes\n",input_tbls.size(), query_plan.size());
743 for(q=0;q<query_plan.size();q++){
745 vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
747 for(it=0;it<itbls.size();it++){
748 string tname = itbls[it]->get_schema_name()+"-"+int_to_string(q)+"-"+int_to_string(it);
749 string src_tblname = itbls[it]->get_schema_name();
750 bool src_is_external = false;
751 for(i=0;i<input_tbls.size();++i){
752 if(src_tblname == input_tbls[i]->get_schema_name()){
753 src_is_external = true;
754 if(input_tbl_free[i]){
756 input_tbl_free[i] = false;
757 //fprintf(stderr,"Adding %s (src_tblname=%s, q=%d, it=%d) to %d.\n",tname.c_str(), src_tblname.c_str(), q, it, i);
762 if(i==input_tbls.size() && src_is_external){
763 fprintf(stderr,"INTERNAL ERROR in stream_query::generate_hfta, can't find free entry in input_tbls for query %d, intput %d (%s)\n",q,it,src_tblname.c_str());
770 for(i=0;i<input_tbls.size();++i){
771 string src_tblname = input_tbls[i]->get_schema_name();
772 lfta_idx[src_tblname] = i;
776 // Compute the output formats of the operators.
777 vector<int> node_fmt(query_plan.size(),UNKNOWNFORMAT);
778 compute_node_format(qhead, node_fmt, op_idx);
781 // Generate the schema strings for the outputs.
783 for(i=0;i<query_plan.size();++i){
784 if(i != qhead && query_plan[i]){
785 string schema_tmpstr = this->make_schema(i);
786 schema_str += "gs_csp_t node"+int_to_string(i)+"_schema = "+make_C_embedded_string(schema_tmpstr)+";\n";
790 attributes = query_plan[qhead]->get_fields();
793 string schema_tmpstr = this->make_schema();
794 schema_str += "gs_csp_t "+generate_schema_string_name(query_name)+" = "+make_C_embedded_string(schema_tmpstr)+";\n";
796 // Generate the collection of tuple defs.
798 string tuple_defs = "\n/*\tDefine tuple structures \t*/\n\n";
799 set<int>::iterator si;
800 for(si=tbl_set.begin(); si!=tbl_set.end(); ++si){
801 tuple_defs += generate_host_tuple_struct( Schema->get_table( (*si) ));
802 tuple_defs += "\n\n";
805 // generate the finalize tuple function
806 string finalize_str = generate_hfta_finalize_tuple(attributes);
808 // Analyze and make the output operators
809 bool eat_input = false;
810 string src_op = query_name;
811 string pred_op = src_op;
813 if(output_specs.size()>0)
816 eat_input = false; // must create stream output for successor HFTAs.
817 int n_filestreams = 0;
818 for(i=0;i<output_specs.size();++i){
819 if(output_specs[i]->operator_type == "stream" ){
822 if(output_specs[i]->operator_type == "file" || output_specs[i]->operator_type == "zfile" ){
827 int filestream_id = 0;
828 for(i=0;i<output_specs.size();++i){
829 if(output_specs[i]->operator_type == "file" || output_specs[i]->operator_type == "zfile"){
831 int n_fstreams = output_specs[i]->n_partitions / n_parallel;
832 if(n_fstreams * n_parallel < output_specs[i]->n_partitions){
834 if(n_parallel == 1 || query_name.find("__copy1") != string::npos){
835 fprintf(stderr,"WARNING, in query %s, %d streams requested for %s output, but it must be a multiple of the hfta parallelism (%d), increasing number of output streams to %d.\n",query_name.c_str(), output_specs[i]->n_partitions, output_specs[i]->operator_type.c_str(), n_parallel, n_fstreams*n_parallel);
838 // output_file_qpn *new_ofq = new output_file_qpn();
839 string filestream_tag = "";
841 filestream_tag = "_fileoutput"+int_to_string(filestream_id);
844 output_file_qpn *new_ofq = new output_file_qpn(pred_op, src_op, filestream_tag, query_plan[qhead]->get_fields(), output_specs[i], (i==last_op ? eat_input : false) );
845 // if(n_fstreams > 1){
848 bool err_ret = new_ofq->set_splitting_params(n_parallel,parallel_idx,n_fstreams,output_specs[i]->partitioning_flds,err_str);
850 fprintf(stderr,"%s",err_str.c_str());
854 output_operators.push_back(new_ofq );
855 pred_op = output_operators.back()->get_node_name();
856 }else if(! (output_specs[i]->operator_type == "stream" || output_specs[i]->operator_type == "Stream" || output_specs[i]->operator_type == "STREAM") ){
857 fprintf(stderr,"WARNING, output operator type %s (on query %s) is not supported, ignoring\n",output_specs[i]->operator_type.c_str(),query_name.c_str() );
863 // Generate functors for the query nodes.
865 string functor_defs = "\n/*\tFunctor definitions\t*/\n\n";
866 for(qn=0;qn<query_plan.size();++qn){
867 if(query_plan[qn]!=NULL){
868 // Compute whether the input needs a ntoh xform.
869 vector<bool> needs_xform;
870 vector<tablevar_t *> itbls = query_plan[qn]->get_input_tbls();
871 for(i=0;i<itbls.size();++i){
872 string tname = itbls[i]->get_schema_name();
873 // if(query_plan[qn]->makes_transform()){
874 if(op_idx.count(tname)>0){
875 if(node_fmt[ op_idx[tname] ] == NETFORMAT){
876 needs_xform.push_back(true);
878 needs_xform.push_back(false);
881 needs_xform.push_back(true);
884 // if(op_idx.count(tname)>0){
885 // if(node_fmt[qn] != node_fmt[ op_idx[tname] ]){
886 // needs_xform.push_back(true);
888 // needs_xform.push_back(false);
891 // if(node_fmt[qn] == HOSTFORMAT){
892 // needs_xform.push_back(true);
894 // needs_xform.push_back(false);
900 functor_defs += query_plan[qn]->generate_functor(Schema, Ext_fcns, needs_xform);
904 // Generate output operator functors
906 vector<bool> needs_xform;
907 for(i=0;i<output_operators.size();++i)
908 functor_defs += output_operators[i]->generate_functor(Schema, Ext_fcns, needs_xform);
912 "#include <lapp.h>\n"
914 "#include <gshub.h>\n"
915 "#include <stdlib.h>\n"
916 "#include <stdio.h>\n"
917 "#include <limits.h>\n"
925 "#include <schemaparser.h>\n"
926 "#include<hfta_runtime_library.h>\n"
928 "#include <host_tuple.h>\n"
929 "#include <hfta.h>\n"
930 "#include <hfta_udaf.h>\n"
931 "#include <hfta_sfun.h>\n"
933 //"#define MAXSCHEMASZ 16384\n"
934 "#include <stdio.h>\n\n"
937 // Get include file for each of the operators.
938 // avoid duplicate inserts.
939 set<string> include_fls;
940 for(qn=0;qn<query_plan.size();++qn){
941 if(query_plan[qn] != NULL)
942 include_fls.insert(query_plan[qn]->get_include_file());
944 for(i=0;i<output_operators.size();++i)
945 include_fls.insert(output_operators[i]->get_include_file());
946 set<string>::iterator ssi;
947 for(ssi=include_fls.begin();ssi!=include_fls.end();++ssi)
950 // Add defines for hash functions
953 "#define hfta_BOOL_to_hash(x) (x)\n"
954 "#define hfta_USHORT_to_hash(x) (x)\n"
955 "#define hfta_UINT_to_hash(x) (x)\n"
956 "#define hfta_IP_to_hash(x) (x)\n"
957 "#define hfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
958 "#define hfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
959 "#define hfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
960 "#define hfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
961 "#define hfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
965 // ret += "#define SERIOUS_LFTA \""+input_tbls[0]->get_schema_name()+"\"\n";
966 ret += "#define OUTPUT_HFTA \""+query_name+"\"\n\n";
968 // HACK ALERT: I know for now that all query plans are
969 // single operator plans, but while SPX and SGAH can use the
970 // UNOP template, the merge operator must use the MULTOP template.
971 // WORSE HACK ALERT : merge does not translate its input,
972 // so don't apply finalize to the output.
973 // TODO: clean this up.
975 // string node_type = query_plan[0]->node_type();
980 // Need to work on the input, output xform logic.
981 // For now, just add it in.
982 // ret += finalize_str;
984 if(node_fmt[qhead] == NETFORMAT){
986 "void finalize_tuple(host_tuple &tup){\n"
996 // Parameter block management
997 // The proper parameter block must be transmitted to each
998 // external stream source.
999 // There is a 1-1 mapping between the param blocks returned
1000 // by this list and the registered data sources ...
1001 // TODO: make this more manageable, but for now
1002 // there is no parameter block manipulation so I just
1003 // need to have the same number.
1006 "int get_lfta_params(gs_int32_t sz, void * value,list<param_block>& lst){\n"
1007 " // for now every lfta receive the full copy of hfta parameters\n"
1008 " struct param_block pb;\n";
1010 set<string> lfta_seen;
1011 for(i=0;i<input_tbls.size();++i){
1012 string src_tblname = input_tbls[i]->get_schema_name();
1013 if(lfta_seen.count(src_tblname) == 0){
1014 lfta_seen.insert(src_tblname);
1015 schref = input_tbls[i]->get_schema_ref();
1016 if(Schema->get_schema_type(schref) == OPERATOR_VIEW_SCHEMA){
1017 ov_ix = input_tbls[i]->get_opview_idx();
1018 opview_entry *opv = opviews.get_entry(ov_ix);
1019 string op_param = "SUBQ:";
1021 for(q=0;q<opv->subq_names.size();++q){
1022 if(q>0) op_param+=",";
1023 op_param+=opv->subq_names[q];
1026 param_sz = op_param.size()-1;
1028 sprintf(tmpstr,"\t\tpb.block_length = %d;\n",param_sz); ret+=tmpstr;
1030 " pb.data = malloc(pb.block_length);\n";
1031 ret+="\t\tmemcpy(pb.data,\""+op_param+"\",pb.block_length);\n"
1032 " lst.push_back(pb);\n\n";
1035 " pb.block_length = sz;\n"
1036 " pb.data = malloc(pb.block_length);\n"
1037 " memcpy(pb.data, value, pb.block_length);\n"
1038 " lst.push_back(pb);\n\n";
1049 "struct FTA* alloc_hfta (struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command, gs_int32_t sz, void * value ) {\n"
1051 " // find the lftas\n"
1052 " list<lfta_info*> *lfta_list = new list<lfta_info*>;\n"
1055 " char schemabuf[MAXSCHEMASZ];\n"
1056 " gs_schemahandle_t schema_handle;\n"
1059 // Register the input data sources.
1060 // Register a source only once.
1062 // vector<string> ext_reg_txt;
1063 map<string, int> input_tbl_srcid;
1064 for(i=0;i<input_tbls.size();++i){
1065 string src_tblname = input_tbls[i]->get_schema_name();
1066 // Use UDOP alias when in distributed mode.
1067 // the cluster manager will make the translation
1068 // using infr from qtree.xml
1069 if(distributed_mode && input_tbls[i]->get_udop_alias() != "")
1070 src_tblname = input_tbls[i]->get_udop_alias();
1071 if(input_tbl_srcid.count(src_tblname) == 0){
1072 int srcid = input_tbl_srcid.size();
1073 input_tbl_srcid[src_tblname] = srcid;
1075 "\n // find "+src_tblname+"\n"
1076 " if (fta_find(\""+src_tblname+"\",1,&f,schemabuf,MAXSCHEMASZ)!=0) {\n"
1077 " fprintf(stderr,\"HFTA::error:could not find LFTA \\n\");\n"
1080 " //fprintf(stderr,\"HFTA::FTA found at %u[%u]\\n\",ftamsgid,ftaindex);\n"
1082 " // parse the schema and get the schema handle\n"
1083 " schema_handle = ftaschema_parse_string(schemabuf);\n"
1084 " lfta_info* inf"+int_to_string(srcid)+" = new lfta_info();\n"
1085 " inf"+int_to_string(srcid)+"->f = f;\n"
1086 " inf"+int_to_string(srcid)+"->fta_name = strdup(\""+src_tblname+"\");\n"
1087 " inf"+int_to_string(srcid)+"->schema = strdup(schemabuf);\n"
1088 " inf"+int_to_string(srcid)+"->schema_handle = schema_handle;\n"
1089 " lfta_list->push_back(inf"+int_to_string(srcid)+");\n\n";
1090 // ext_reg_txt.push_back(tmp_s);
1096 ret += "\tgs_schemahandle_t root_schema_handle = ftaschema_parse_string("+generate_schema_string_name(query_name)+");\n";
1097 for(i=0;i<query_plan.size();++i){
1098 if(i != qhead && query_plan[i]){
1099 ret += "\tgs_schemahandle_t op"+int_to_string(i)+"_schema_handle = ftaschema_parse_string(node"+int_to_string(i)+"_schema);\n";
1104 // Create the operators.
1106 for(q=0;q<query_plan.size();q++){
1110 " // create an instance of operator "+int_to_string(q)+" ("+query_plan[q]->get_node_name()+") \n";
1112 // Create parameters for operator construction.
1114 vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
1115 string tname = itbls[0]->get_schema_name();
1116 // string li_tname = tname +"-"+int_to_string(q)+"-0";
1117 // if(lfta_idx.count(li_tname)>0)
1118 if(input_tbl_srcid.count(tname)>0){
1119 // ret += ext_reg_txt[lfta_idx[li_tname]];
1120 // op_params += "inf"+int_to_string( lfta_idx[li_tname] )+"->schema_handle";
1121 op_params += "inf"+int_to_string( input_tbl_srcid[tname] )+"->schema_handle";
1122 }else if(op_idx.count(tname)>0){
1123 op_params += "op"+int_to_string( op_idx[tname] )+"_schema_handle";
1125 fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when creating operator (3) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str());
1129 string tname = itbls[1]->get_schema_name();
1130 // string li_tname = tname +"-"+int_to_string(q)+"-1";
1131 // if(lfta_idx.count(li_tname)>0)
1132 if(input_tbl_srcid.count(tname)>0){
1133 // ret += ext_reg_txt[lfta_idx[li_tname]];
1134 // op_params += ",inf"+int_to_string( lfta_idx[li_tname] )+"->schema_handle";
1135 op_params += ",inf"+int_to_string( input_tbl_srcid[tname] )+"->schema_handle";
1136 }else if(op_idx.count(tname)>0){
1137 op_params += ",op"+int_to_string( op_idx[tname] )+"_schema_handle";
1139 fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when creating operator (4) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str());
1143 ret += query_plan[q]->generate_operator(q,op_params);
1145 " operator_node* node"+int_to_string(q)+" = new operator_node(op"+int_to_string(q)+");\n";
1151 // Next for the output operators if any
1152 for(i=0;i<output_operators.size();++i){
1153 ret += output_operators[i]->generate_operator(n_basic_ops+i,"root_schema_handle");
1155 " operator_node* node"+int_to_string(n_basic_ops+i)+" = new operator_node(op"+int_to_string(n_basic_ops+i)+");\n";
1159 // Link up operators.
1160 for(q=0;q<query_plan.size();++q){
1162 // NOTE: this code assume that the operator has at most
1163 // two inputs. But the template code also makes
1164 // this assumption. Both will need to be changed.
1165 vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
1166 string tname = itbls[0]->get_schema_name();
1167 // string li_tname = tname +"-"+int_to_string(q)+"-0";
1168 // if(lfta_idx.count(li_tname)>0)
1169 if(input_tbl_srcid.count(tname)>0){
1170 // ret += "\tnode"+int_to_string(q)+"->set_left_lfta(inf"+int_to_string( lfta_idx[li_tname] )+");\n";
1171 ret += "\tnode"+int_to_string(q)+"->set_left_lfta(inf"+int_to_string( input_tbl_srcid[tname] )+");\n";
1172 }else if(op_idx.count(tname)>0){
1173 ret += "\tnode"+int_to_string(q)+"->set_left_child_node(node"+int_to_string( op_idx[tname] )+");\n";
1175 fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when linking operator (1) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str());
1179 string tname = itbls[1]->get_schema_name();
1180 // string li_tname = tname +"-"+int_to_string(q)+"-1";
1181 // if(lfta_idx.count(li_tname)>0)
1182 if(input_tbl_srcid.count(tname)>0){
1183 // ret += "\tnode"+int_to_string(q)+"->set_right_lfta(inf"+int_to_string( lfta_idx[li_tname] )+");\n";
1184 ret += "\tnode"+int_to_string(q)+"->set_right_lfta(inf"+int_to_string( input_tbl_srcid[tname] )+");\n";
1185 }else if(op_idx.count(tname)>0){
1186 ret += "\tnode"+int_to_string(q)+"->set_right_child_node(node"+int_to_string( op_idx[tname] )+");\n";
1188 fprintf(stderr,"INTERNAL ERROR, can't identify input table %s (%s) when linking operator (2) %d (%s)\n",tname.c_str(), tname.c_str(),q,query_plan[q]->get_node_name().c_str());
1194 for(i=0;i<output_operators.size();++i){
1196 ret += "\tnode"+int_to_string(n_basic_ops)+"->set_left_child_node(node"+int_to_string( qhead )+");\n";
1198 ret += "\tnode"+int_to_string(n_basic_ops+i)+"->set_left_child_node(node"+int_to_string( n_basic_ops+i-1 )+");\n";
1201 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
1203 bool fta_reusable = false;
1204 if (query_plan[qhead]->get_val_of_def("reusable") == "yes" ||
1205 query_plan[qhead]->get_param_tbl()->size() == 0) {
1209 int root_node = qhead;
1210 if(output_operators.size()>0)
1211 root_node = n_basic_ops+i-1;
1216 " MULTOP_HFTA* ftap = new MULTOP_HFTA(ftaid, OUTPUT_HFTA, command, sz, value, root_schema_handle, node"+int_to_string(root_node)+", lfta_list, " + (fta_reusable ?"true":"false") + ", reusable);\n"
1217 " if(ftap->init_failed()){ delete ftap; return 0;}\n"
1218 " return (FTA*)ftap;\n"
1224 string comm_bufsize = "16*1024*1024";
1225 if(defines.count("hfta_comm_buf")>0){
1226 comm_bufsize = defines["hfta_comm_buf"];
1231 "int main(int argc, char * argv[]) {\n"
1234 " /* parse the arguments */\n"
1236 " gs_int32_t tip1,tip2,tip3,tip4;\n"
1237 " endpoint gshub;\n"
1238 " gs_sp_t instance_name;\n"
1240 " gslog(LOG_EMERG,\"Wrong arguments at startup\");\n"
1244 " if ((sscanf(argv[1],\"%u.%u.%u.%u:%hu\",&tip1,&tip2,&tip3,&tip4,&(gshub.port)) != 5)) {\n"
1245 " gslog(LOG_EMERG,\"HUB IP NOT DEFINED\");\n"
1248 " gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);\n"
1249 " gshub.port=htons(gshub.port);\n"
1250 " instance_name=strdup(argv[2]);\n"
1251 " if (set_hub(gshub)!=0) {\n"
1252 " gslog(LOG_EMERG,\"Could not set hub\");\n"
1255 " if (set_instance_name(instance_name)!=0) {\n"
1256 " gslog(LOG_EMERG,\"Could not set instance name\");\n"
1261 " /* initialize host library */\n"
1263 //" fprintf(stderr,\"Initializing gscp\\n\");\n"
1264 " gsopenlog(argv[0]);\n"
1266 " if (hostlib_init(HFTA, "+comm_bufsize+", DEFAULTDEV, 0, 0)!=0) {\n"
1267 " fprintf(stderr,\"%s::error:could not initialize gscp\\n\",\n"
1274 " FTAID ret = fta_register(OUTPUT_HFTA, " + (fta_reusable?"1":"0") + ", DEFAULTDEV, alloc_hfta, "+generate_schema_string_name(query_name)+", -1, 0ull);\n"
1275 " fta_start_service(-1);\n"
1281 ////////////////////
1286 // Checks if the node i is compatible with interface partitioning
1287 // (can be pushed below the merge that combines partitioned stream)
1288 // i - index of the query node in a query plan
1289 bool stream_query::is_partn_compatible(int index, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result) {
1291 qp_node* node = query_plan[index];
1292 qp_node* child_node = NULL;
1294 if (node->predecessors.empty())
1297 // all the node predecessors must be partition merges with the same partition definition
1298 partn_def_t* partn_def = NULL;
1299 for (i = 0; i < node->predecessors.size(); ++i) {
1300 child_node = query_plan[node->predecessors[i]];
1301 if (child_node->node_type() != "mrg_qpn")
1304 // merge must have only one parent for this optimization to work
1305 // check that all its successors are the same
1306 for (int j = 1; j < child_node->successors.size(); ++j) {
1307 if (child_node->successors[j] != child_node->successors[0])
1311 partn_def_t* new_partn_def = ((mrg_qpn*)child_node)->get_partn_definition(lfta_names, interface_names, machine_names, ifaces_db, partn_parse_result);
1315 partn_def = new_partn_def;
1316 else if (new_partn_def != partn_def)
1321 if (node->node_type() == "spx_qpn") // spx nodes are always partition compatible
1323 else if (node->node_type() == "sgah_qpn") {
1324 gb_table gb_tbl = ((sgah_qpn*)node)->gb_tbl;
1325 return true; //partn_def->is_compatible(&gb_tbl);
1327 else if (node->node_type() == "rsgah_qpn") {
1328 gb_table gb_tbl = ((rsgah_qpn*)node)->gb_tbl;
1329 return partn_def->is_compatible(&gb_tbl);
1331 else if (node->node_type() == "sgahcwcb_qpn") {
1332 gb_table gb_tbl = ((sgahcwcb_qpn*)node)->gb_tbl;
1333 return partn_def->is_compatible(&gb_tbl);
1335 else if (node->node_type() == "join_eq_hash_qpn") {
1342 // Push the operator below the merge that combines
1343 void stream_query::pushdown_partn_operator(int index) {
1344 qp_node* node = query_plan[index];
1346 char node_index[128];
1348 // fprintf(stderr, "Partn pushdown, query %s of type %s\n", node->get_node_name().c_str(), node->node_type().c_str());
1351 // HACK ALERT: When reordering merges we screw up slack computation
1352 // since slack should no longer be used, it is not an issue
1355 // we can safely reorder nodes that have one and only one temporal atribute in select list
1356 table_def* table_layout = node->get_fields();
1357 vector<field_entry*> fields = table_layout->get_fields();
1358 int merge_fieldpos = -1;
1361 for (i = 0; i < fields.size(); ++i) {
1362 data_type dt(fields[i]->get_type(),fields[i]->get_modifier_list());
1363 if(dt.is_temporal()) {
1364 if (merge_fieldpos != -1) // more that one temporal field found
1370 if (merge_fieldpos == -1) // if no temporal fieldf found
1373 std::vector<colref_t *> mvars; // the merge-by columns.
1375 // reodring procedure is different for unary operators and joins
1376 if (node->node_type() == "join_eq_hash_qpn") {
1377 vector<qp_node*> new_nodes;
1379 tablevar_t *left_table_name;
1380 tablevar_t *right_table_name;
1381 mrg_qpn* left_mrg = (mrg_qpn*)query_plan[node->predecessors[0]];
1382 mrg_qpn* right_mrg = (mrg_qpn*)query_plan[node->predecessors[1]];
1384 // for now we will only consider plans where both child merges
1385 // merge the same set of streams
1387 if (left_mrg->fm.size() != right_mrg->fm.size())
1390 // maping of interface names to table definitions
1391 map<string, tablevar_t*> iface_map;
1392 for (i = 0; i < left_mrg->fm.size(); i++) {
1393 left_table_name = left_mrg->fm[i];
1394 iface_map[left_table_name->get_machine() + left_table_name->get_interface()] = left_table_name;
1397 for (i = 0; i < right_mrg->fm.size(); i++) {
1398 right_table_name = right_mrg->fm[i];
1400 // find corresponding left tabke
1401 if (!iface_map.count(right_table_name->get_machine() + right_table_name->get_interface()))
1404 left_table_name = iface_map[right_table_name->get_machine() + right_table_name->get_interface()];
1406 // create new join nodes
1407 sprintf(node_index, "_%d", i);
1408 join_eq_hash_qpn* new_node = (join_eq_hash_qpn*)node->make_copy(node_index);
1410 // make a copy of right_table_name
1411 right_table_name = right_table_name->duplicate();
1412 left_table_name->set_range_var(((join_eq_hash_qpn*)node)->from[0]->get_var_name());
1413 right_table_name->set_range_var(((join_eq_hash_qpn*)node)->from[1]->get_var_name());
1414 new_node->from[0] = left_table_name;
1415 new_node->from[1] = right_table_name;
1416 new_nodes.push_back(new_node);
1419 // make right_mrg a new root
1420 right_mrg->set_node_name(node->get_node_name());
1421 right_mrg->table_layout = table_layout;
1422 right_mrg->merge_fieldpos = merge_fieldpos;
1424 for (i = 0; i < right_mrg->fm.size(); i++) {
1425 // make newly create joins children of merge
1426 sprintf(node_index, "_%d", i);
1427 right_mrg->fm[i] = new tablevar_t(right_mrg->fm[i]->get_machine().c_str(), right_mrg->fm[i]->get_interface().c_str(), (node->get_node_name() + string(node_index)).c_str());
1428 sprintf(node_index, "_m%d", i);
1429 right_mrg->fm[i]->set_range_var(node_index);
1430 right_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos));
1434 if (left_mrg != right_mrg)
1435 query_plan[node->predecessors[0]] = NULL; // remove left merge from the plan
1437 query_plan.insert(query_plan.end(), new_nodes.begin(), new_nodes.end());
1439 } else { // unary operator
1440 // get the child merge node
1441 mrg_qpn* child_mrg = (mrg_qpn*)query_plan[node->predecessors[0]];
1443 child_mrg->set_node_name(node->get_node_name());
1444 child_mrg->table_layout = table_layout;
1445 child_mrg->merge_fieldpos = merge_fieldpos;
1447 // create new nodes for every source stream
1448 for (i = 0; i < child_mrg->fm.size(); i++) {
1449 tablevar_t *table_name = child_mrg->fm[i];
1450 sprintf(node_index, "_%d", i);
1451 qp_node* new_node = node->make_copy(node_index);
1453 if (node->node_type() == "spx_qpn")
1454 ((spx_qpn*)new_node)->table_name = table_name;
1455 else if (node->node_type() == "sgah_qpn")
1456 ((sgah_qpn*)new_node)->table_name = table_name;
1457 else if (node->node_type() == "rsgah_qpn")
1458 ((rsgah_qpn*)new_node)->table_name = table_name;
1459 else if (node->node_type() == "sgahcwcb_qpn")
1460 ((sgahcwcb_qpn*)new_node)->table_name = table_name;
1461 table_name->set_range_var("_t0");
1463 child_mrg->fm[i] = new tablevar_t(table_name->get_machine().c_str(), table_name->get_interface().c_str(), new_node->get_node_name().c_str());
1464 sprintf(node_index, "_m%d", i);
1465 child_mrg->fm[i]->set_range_var(node_index);
1466 child_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos));
1468 // add new node to query plan
1469 query_plan.push_back(new_node);
1472 query_plan[index] = NULL;
1477 // Checks if the node i can be pushed below the merge
1478 bool stream_query::is_pushdown_compatible(int index, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names) {
1481 qp_node* node = query_plan[index];
1482 qp_node* child_node = NULL;
1484 if (node->predecessors.size() != 1)
1487 // node predecessor must be merge that combine streams from multiple hosts
1488 child_node = query_plan[node->predecessors[0]];
1489 if (child_node->node_type() != "mrg_qpn")
1492 if (!((mrg_qpn*)child_node)->is_multihost_merge())
1495 // merge must have only one parent for this optimization to work
1496 // check that all its successors are the same
1497 for (int j = 1; j < child_node->successors.size(); ++j) {
1498 if (child_node->successors[j] != child_node->successors[0])
1502 // selections can always be pushed down, aggregations can always be split into selection/aggr or aggr/aggr pair
1504 if (node->node_type() == "spx_qpn")
1506 else if (node->node_type() == "sgah_qpn")
1513 // Push the operator below the merge
1514 void stream_query::pushdown_operator(int index, ext_fcn_list *Ext_fcns, table_list *Schema) {
1515 qp_node* node = query_plan[index];
1517 char node_suffix[128];
1519 // we can only safely push down queries that have one and only one temporal atribute in select list
1520 table_def* table_layout = node->get_fields();
1521 vector<field_entry*> fields = table_layout->get_fields();
1522 int merge_fieldpos = -1;
1525 for (i = 0; i < fields.size(); ++i) {
1526 data_type dt(fields[i]->get_type(),fields[i]->get_modifier_list());
1527 if(dt.is_temporal()) {
1528 if (merge_fieldpos != -1) // more that one temporal field found
1534 if (merge_fieldpos == -1) // if no temporal field found
1537 std::vector<colref_t *> mvars; // the merge-by columns.
1539 fprintf(stderr, "Regular pushdown, query %s of type %s\n", node->get_node_name().c_str(), node->node_type().c_str());
1541 // get the child merge node
1542 mrg_qpn* child_mrg = (mrg_qpn*)query_plan[node->predecessors[0]];
1544 tablevar_t *table_name = NULL;
1547 if (node->node_type() == "spx_qpn") {
1548 // get the child merge node
1550 // create new nodes for every source stream
1551 for (i = 0; i < child_mrg->fm.size(); i++) {
1552 table_name = child_mrg->fm[i];
1553 sprintf(node_suffix, "_%d", i);
1554 qp_node* new_node = node->make_copy(node_suffix);
1556 ((spx_qpn*)new_node)->table_name = table_name;
1557 table_name->set_range_var("_t0");
1559 child_mrg->fm[i] = new tablevar_t(table_name->get_machine().c_str(), table_name->get_interface().c_str(), new_node->get_node_name().c_str());
1560 sprintf(node_suffix, "_m%d", i);
1561 child_mrg->fm[i]->set_range_var(node_suffix);
1562 child_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos));
1564 // add new node to query plan
1565 query_plan.push_back(new_node);
1567 child_mrg->table_layout = table_layout;
1568 child_mrg->merge_fieldpos = merge_fieldpos;
1570 } else { // aggregation node
1572 vector<qp_node*> new_nodes;
1574 // split aggregations into high and low-level part
1575 vector<qp_node *> split_nodes = ((sgah_qpn*)node)->split_node_for_hfta(Ext_fcns, Schema);
1576 if (split_nodes.size() != 2)
1579 sgah_qpn* super_aggr = (sgah_qpn*)split_nodes[1];
1580 super_aggr->table_name = ((sgah_qpn*)node)->table_name;
1582 // group all the sources by host
1583 map<string, vector<int> > host_map;
1584 for (i = 0; i < child_mrg->fm.size(); i++) {
1585 tablevar_t *table_name = child_mrg->fm[i];
1586 if (host_map.count(table_name->get_machine()))
1587 host_map[table_name->get_machine()].push_back(i);
1590 tables.push_back(i);
1591 host_map[table_name->get_machine()] = tables;
1595 // create a new merge and low-level aggregation for each host
1596 map<string, vector<int> >::iterator iter;
1597 for (iter = host_map.begin(); iter != host_map.end(); iter++) {
1598 string host_name = (*iter).first;
1599 vector<int> tables = (*iter).second;
1601 sprintf(node_suffix, "_%s", host_name.c_str());
1602 string suffix(node_suffix);
1604 mrg_qpn *new_mrg = (mrg_qpn *)child_mrg->make_copy(suffix);
1605 for (i = 0; i < tables.size(); ++i) {
1606 sprintf(node_suffix, "_m%d", i);
1607 new_mrg->fm.push_back(child_mrg->fm[tables[i]]);
1608 new_mrg->mvars.push_back(child_mrg->mvars[i]);
1609 new_mrg->fm[i]->set_range_var(node_suffix);
1611 qp_node* new_node = split_nodes[0]->make_copy(suffix);
1613 if (new_node->node_type() == "spx_qpn") {
1614 ((spx_qpn*)new_node)->table_name = new tablevar_t(host_name.c_str(), "IFACE", new_mrg->get_node_name().c_str());
1615 ((spx_qpn*)new_node)->table_name->set_range_var("_t0");
1617 ((sgah_qpn*)new_node)->table_name = new tablevar_t(host_name.c_str(), "IFACE", new_mrg->get_node_name().c_str());
1618 ((sgah_qpn*)new_node)->table_name->set_range_var("_t0");
1620 query_plan.push_back(new_mrg);
1621 new_nodes.push_back(new_node);
1624 child_mrg->merge_fieldpos = merge_fieldpos;
1625 if (split_nodes[0]->node_type() == "spx_qpn")
1626 child_mrg->table_layout = create_attributes(child_mrg->get_node_name(), ((spx_qpn*)split_nodes[0])->select_list);
1628 child_mrg->table_layout = create_attributes(child_mrg->get_node_name(), ((sgah_qpn*)split_nodes[0])->select_list);
1631 // connect newly created nodes with parent multihost merge
1632 for (i = 0; i < new_nodes.size(); ++i) {
1633 if (new_nodes[i]->node_type() == "spx_qpn")
1634 child_mrg->fm[i] = new tablevar_t(((spx_qpn*)new_nodes[i])->table_name->get_machine().c_str(), "IFACE", new_nodes[i]->get_node_name().c_str());
1636 child_mrg->fm[i] = new tablevar_t(((sgah_qpn*)new_nodes[i])->table_name->get_machine().c_str(), "IFACE", new_nodes[i]->get_node_name().c_str());
1639 child_mrg->mvars[i]->set_field(child_mrg->table_layout->get_field_name(merge_fieldpos));
1641 sprintf(node_suffix, "_m%d", i);
1642 child_mrg->fm[i]->set_range_var(node_suffix);
1643 query_plan.push_back(new_nodes[i]);
1645 child_mrg->fm.resize(new_nodes.size());
1646 child_mrg->mvars.resize(new_nodes.size());
1648 // push the new high-level aggregation
1649 query_plan.push_back(super_aggr);
1652 query_plan[index] = NULL;
1657 // Extract subtree rooted at node i into separate hfta
1658 stream_query* stream_query::extract_subtree(int index) {
1661 stream_query* new_query = new stream_query(query_plan[index], this);
1663 nodes.push_back(index);
1664 for (int i = 0; i < nodes.size(); ++i) {
1665 qp_node* node = query_plan[nodes[i]];
1669 // add all children to nodes list
1670 for (int j = 0; j < node->predecessors.size(); ++j)
1671 nodes.push_back(node->predecessors[j]);
1673 new_query->query_plan.push_back(node);
1675 query_plan[nodes[i]] = NULL;
1681 // Splits query that combines data from multiple hosts into separate hftas.
1682 vector<stream_query*> stream_query::split_multihost_query() {
1684 vector<stream_query*> ret;
1685 char node_suffix[128];
1688 // find merges combining multiple hosts into per-host groups
1689 int plan_size = query_plan.size();
1690 vector<mrg_qpn*> new_nodes;
1692 for (i = 0; i < plan_size; ++i) {
1693 qp_node* node = query_plan[i];
1694 if (node && node->node_type() == "mrg_qpn") {
1695 mrg_qpn* mrg = (mrg_qpn*)node;
1696 if (mrg->is_multihost_merge()) {
1698 // group all the sources by host
1699 map<string, vector<int> > host_map;
1700 for (int j = 0; j < mrg->fm.size(); j++) {
1701 tablevar_t *table_name = mrg->fm[j];
1702 if (host_map.count(table_name->get_machine()))
1703 host_map[table_name->get_machine()].push_back(j);
1706 tables.push_back(j);
1707 host_map[table_name->get_machine()] = tables;
1711 // create a new merge for each host
1712 map<string, vector<int> >::iterator iter;
1713 for (iter = host_map.begin(); iter != host_map.end(); iter++) {
1714 string host_name = (*iter).first;
1715 vector<int> tables = (*iter).second;
1717 if (tables.size() == 1)
1720 sprintf(node_suffix, "_%s", host_name.c_str());
1721 string suffix(node_suffix);
1723 mrg_qpn *new_mrg = (mrg_qpn *)mrg->make_copy(suffix);
1724 for (int j = 0; j < tables.size(); ++j) {
1725 new_mrg->fm.push_back(mrg->fm[tables[j]]);
1726 new_mrg->mvars.push_back(mrg->mvars[j]);
1727 sprintf(node_suffix, "m_%d", j);
1728 new_mrg->fm[j]->set_range_var(node_suffix);
1730 new_nodes.push_back(new_mrg);
1733 if (!new_nodes.empty()) {
1734 // connect newly created merge nodes with parent multihost merge
1735 for (int j = 0; j < new_nodes.size(); ++j) {
1736 mrg->fm[j] = new tablevar_t(new_nodes[j]->fm[0]->get_machine().c_str(), "IFACE", new_nodes[j]->get_node_name().c_str());
1737 query_plan.push_back(new_nodes[j]);
1739 mrg->fm.resize(new_nodes.size());
1740 mrg->mvars.resize(new_nodes.size());
1745 // now externalize the sources
1746 for (int j = 0; j < node->predecessors.size(); ++j) {
1747 // Extract subtree rooted at node i into separate hfta
1748 stream_query* q = extract_subtree(node->predecessors[j]);
1750 q->generate_linkage();
1764 // Perform local FTA optimizations
1765 void stream_query::optimize(vector<stream_query *>& hfta_list, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ext_fcn_list *Ext_fcns, table_list *Schema, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result){
1767 // Topologically sort the nodes in query plan (leaf-first)
1769 vector<int> sorted_nodes;
1771 int num_nodes = query_plan.size();
1772 bool* leaf_flags = new bool[num_nodes];
1773 memset(leaf_flags, 0, num_nodes * sizeof(bool));
1775 // run topological sort
1778 // add all leafs to sorted_nodes
1781 for (i = 0; i < num_nodes; ++i) {
1785 if (!leaf_flags[i] && query_plan[i]->predecessors.empty()) {
1786 leaf_flags[i] = true;
1787 sorted_nodes.push_back(i);
1790 // remove the node from its parents predecessor lists
1791 // since we only care about number of predecessors, it is sufficient just to remove
1792 // one element from the parent's predecessors list
1793 for (int j = query_plan[i]->successors.size() - 1; j >= 0; --j)
1794 query_plan[query_plan[i]->successors[j]]->predecessors.pop_back();
1798 delete[] leaf_flags;
1799 num_nodes = sorted_nodes.size();
1800 generate_linkage(); // rebuild the recently destroyed predecessor lists.
1802 // collect the information about interfaces nodes read from
1803 for (i = 0; i < num_nodes; ++i) {
1804 qp_node* node = query_plan[sorted_nodes[i]];
1805 vector<tablevar_t *> input_tables = node->get_input_tbls();
1806 for (j = 0; j < input_tables.size(); ++j) {
1807 tablevar_t * table = input_tables[j];
1808 if (lfta_names.count(table->schema_name)) {
1809 int index = lfta_names[table->schema_name];
1810 table->set_machine(machine_names[index]);
1811 table->set_interface(interface_names[index]);
1818 // push eligible operators down in the query plan
1819 for (i = 0; i < num_nodes; ++i) {
1820 if (partn_parse_result && is_partn_compatible(sorted_nodes[i], lfta_names, interface_names, machine_names, ifaces_db, partn_parse_result)) {
1821 pushdown_partn_operator(sorted_nodes[i]);
1822 } else if (is_pushdown_compatible(sorted_nodes[i], lfta_names, interface_names, machine_names)) {
1823 pushdown_operator(sorted_nodes[i], Ext_fcns, Schema);
1827 // split the query into multiple hftas if it combines the data from multiple hosts
1828 vector<stream_query*> hftas = split_multihost_query();
1829 hfta_list.insert(hfta_list.end(), hftas.begin(), hftas.end());
1832 num_nodes = query_plan.size();
1833 // also split multi-way merges into two-way merges
1834 for (i = 0; i < num_nodes; ++i) {
1835 qp_node* node = query_plan[i];
1836 if (node && node->node_type() == "mrg_qpn") {
1837 vector<mrg_qpn *> split_merge = ((mrg_qpn *)node)->split_sources();
1839 query_plan.insert(query_plan.end(), split_merge.begin(), split_merge.end());
1840 // delete query_plan[sorted_nodes[i]];
1841 query_plan[i] = NULL;
1850 table_def *stream_query::get_output_tabledef(){
1851 return( query_plan[qhead]->get_fields() );
1854 vector<string> stream_query::get_tbl_keys(vector<string> &partial_keys){
1855 return query_plan[qhead]->get_tbl_keys(partial_keys);
1860 //////////////////////////////////////////////////////////
1861 //// De-siloing. TO BE REMOVED
1863 void stream_query::desilo_lftas(map<string, int> &lfta_names,vector<string> &silo_names,table_list *Schema){
1866 int suffix_len = silo_names.back().size();
1868 for(i=0;i<qtail.size();++i){
1869 vector<tablevar_t *> itbls = query_plan[qtail[i]]->get_input_tbls();
1870 for(t=0;t<itbls.size();++t){
1871 string itbl_name = itbls[t]->get_schema_name();
1872 if(lfta_names.count(itbl_name)>0){
1873 //printf("Query %s input %d references lfta input %s\n",query_plan[qtail[i]]->get_node_name().c_str(),t,itbl_name.c_str());
1874 vector<string> src_names;
1875 string lfta_base = itbl_name.substr(0,itbl_name.size()-suffix_len);
1876 for(s=0;s<silo_names.size();++s){
1877 string lfta_subsilo = lfta_base + silo_names[s];
1878 //printf("\t%s\n",lfta_subsilo.c_str());
1879 src_names.push_back(lfta_subsilo);
1881 string merge_node_name = "desilo_"+query_plan[qtail[i]]->get_node_name()+
1882 "_input_"+int_to_string(t);
1883 mrg_qpn *merge_node = new mrg_qpn(merge_node_name,src_names,Schema);
1884 int m_op_pos = Schema->add_table(merge_node->table_layout);
1885 itbls[t]->set_schema(merge_node_name);
1886 itbls[t]->set_schema_ref(m_op_pos);
1887 query_plan.push_back(merge_node);
1894 ////////////////////////////////////////
1899 // Given a collection of LFTA stream queries,
1900 // extract their WHERE predicates
1901 // and pass them to an analysis routine which will extract
1904 void get_common_lfta_filter(std::vector<stream_query *> lfta_list,table_list *Schema,ext_fcn_list *Ext_fcns, vector<cnf_set *> &prefilter_preds, set<unsigned int> &pred_ids){
1906 std::vector< std::vector<cnf_elem *> > where_list;
1908 // still safe to assume that LFTA queries have a single
1909 // query node, which is at position 0.
1910 for(s=0;s<lfta_list.size();++s){
1911 vector<cnf_elem *> cnf_list = lfta_list[s]->query_plan[0]->get_filter_clause();
1912 if(lfta_list[s]->query_plan[0]->node_type() == "sgah_qpn"){
1913 gb_table *gtbl = ((sgah_qpn *)(lfta_list[s]->query_plan[0]))->get_gb_tbl();
1915 for(c=0;c<cnf_list.size();++c){
1916 insert_gb_def_pr(cnf_list[c]->pr,gtbl);
1919 where_list.push_back(lfta_list[s]->query_plan[0]->get_filter_clause());
1922 find_common_filter(where_list,Schema,Ext_fcns,prefilter_preds, pred_ids);
1929 // Given a collection of LFTA stream queries,
1930 // extract the union of all temporal attributes referenced in select clauses
1931 // those attributes will need to be unpacked in prefilter
1933 void get_prefilter_temporal_cids(std::vector<stream_query *> lfta_list, col_id_set &temp_cids){
1935 vector<scalarexp_t *> sl_list;
1936 gb_table *gb_tbl = NULL;
1939 // still safe to assume that LFTA queries have a single
1940 // query node, which is at position 0.
1941 for(s=0;s<lfta_list.size();++s){
1943 if(lfta_list[s]->query_plan[0]->node_type() == "spx_qpn"){
1944 spx_qpn *spx_node = (spx_qpn *)lfta_list[s]->query_plan[0];
1945 sl_list = spx_node->get_select_se_list();
1947 if(lfta_list[s]->query_plan[0]->node_type() == "sgah_qpn"){
1948 sgah_qpn *sgah_node = (sgah_qpn *)lfta_list[s]->query_plan[0];
1949 sl_list = sgah_node->get_select_se_list();
1950 gb_tbl = sgah_node->get_gb_tbl();
1953 if(lfta_list[s]->query_plan[0]->node_type() == "filter_join"){
1954 filter_join_qpn *fj_node = (filter_join_qpn *)lfta_list[s]->query_plan[0];
1955 sl_list = fj_node->get_select_se_list();
1956 col_id ci; // also get the temporal var in case not in select list
1957 ci.load_from_colref(fj_node->temporal_var);
1958 temp_cids.insert(ci);
1960 if(lfta_list[s]->query_plan[0]->node_type() == "watch_join"){
1961 watch_join_qpn *wj_node = (watch_join_qpn *)lfta_list[s]->query_plan[0];
1962 sl_list = wj_node->get_select_se_list();
1965 for(sl=0;sl<sl_list.size();sl++){
1966 data_type *sdt = sl_list[sl]->get_data_type();
1967 if (sdt->is_temporal()) {
1968 gather_se_col_ids(sl_list[sl],temp_cids, gb_tbl);