1 /* ------------------------------------------------
\r
2 Copyright 2014 AT&T Intellectual Property
\r
3 Licensed under the Apache License, Version 2.0 (the "License");
\r
4 you may not use this file except in compliance with the License.
\r
5 You may obtain a copy of the License at
\r
7 http://www.apache.org/licenses/LICENSE-2.0
\r
9 Unless required by applicable law or agreed to in writing, software
\r
10 distributed under the License is distributed on an "AS IS" BASIS,
\r
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
12 See the License for the specific language governing permissions and
\r
13 limitations under the License.
\r
14 ------------------------------------------- */
\r
15 #include"stream_query.h"
\r
16 #include"generate_utils.h"
\r
17 #include"analyze_fta.h"
\r
22 static char tmpstr[500];
\r
24 using namespace std;
\r
27 // Create a query plan from a query node and an existing
\r
28 // query plan. Use for lfta queries, the parent query plan provides
\r
30 stream_query::stream_query(qp_node *qnode, stream_query *parent){
\r
31 query_plan.push_back(qnode);
\r
34 attributes = qnode->get_fields();
\r
35 parameters = qnode->get_param_tbl();
\r
36 defines = parent->defines;
\r
37 query_name = qnode->get_node_name();
\r
40 // Copy the query plan.
\r
41 stream_query::stream_query(stream_query &src){
\r
42 query_plan = src.query_plan;
\r
45 attributes = src.attributes;
\r
46 parameters = src.parameters;
\r
47 defines = src.defines;
\r
48 query_name = src.query_name;
\r
53 // Create a query plan from an analyzed parse tree.
\r
54 // Perform analyses to find the output node, input nodes, etc.
\r
56 stream_query::stream_query(query_summary_class *qs,table_list *Schema){
\r
57 // Generate the query plan nodes from the analyzed parse tree.
\r
58 // There is only one for now, so just assign the return value
\r
59 // of create_query_nodes to query_plan
\r
61 query_plan = create_query_nodes(qs,Schema);
\r
63 if(query_plan.size() == 0){
\r
64 fprintf(stderr,"INTERNAL ERROR, zero-size query plan in stream_query::stream_query\n");
\r
67 for(i=0;i<query_plan.size();++i){
\r
69 if(query_plan[i]->get_error_code() != 0){
\r
70 error_code = query_plan[i]->get_error_code();
\r
71 err_str += query_plan[i]->get_error_str();
\r
75 qhead = query_plan.size()-1;
\r
81 stream_query * stream_query::add_query(query_summary_class *qs,table_list *Schema){
\r
82 // Add another query block to the query plan
\r
84 vector<qp_node *> new_nodes = create_query_nodes(qs, Schema);
\r
85 query_plan.insert(query_plan.end(),new_nodes.begin(), new_nodes.end());
\r
89 stream_query * stream_query::add_query(stream_query &src){
\r
90 // Add another query block to the query plan
\r
92 query_plan.insert(query_plan.end(),src.query_plan.begin(), src.query_plan.end());
\r
97 void stream_query::generate_protocol_se(map<string,stream_query *> &sq_map, table_list *Schema){
\r
100 // Mapping fields to protocol fields requires schema binding.
\r
101 // First ensure all nodes are in the schema.
\r
102 for(n=0;n<query_plan.size();++n){
\r
103 if(query_plan[n] != NULL){
\r
104 Schema->add_table(query_plan[n]->get_fields());
\r
107 // Now do schema binding
\r
108 for(n=0;n<query_plan.size();++n){
\r
109 if(query_plan[n] != NULL){
\r
110 query_plan[n]->bind_to_schema(Schema);
\r
114 // create name-to-index map
\r
115 map<string, int> name_to_node;
\r
116 for(n=0;n<query_plan.size();++n){
\r
118 name_to_node[query_plan[n]->get_node_name()] = n;
\r
122 // Create a list of the nodes to process, in order.
\r
123 // Search from the root down.
\r
124 // ASSUME tree plan.
\r
126 list<int> search_q;
\r
127 list<int> work_list;
\r
129 search_q.push_back(qhead);
\r
130 while(! search_q.empty()){
\r
131 int the_q = search_q.front();
\r
132 search_q.pop_front(); work_list.push_front(the_q);
\r
133 vector<int> the_pred = query_plan[the_q]->get_predecessors();
\r
134 for(i=0;i<the_pred.size();i++)
\r
135 search_q.push_back(the_pred[i]);
\r
138 // Scan through the work list, from the front,
\r
139 // gather the qp_node's ref'd, and call the
\r
140 // protocol se generator. Pass NULL for
\r
141 // sources not found - presumably user-defined operator
\r
143 while(! work_list.empty()){
\r
144 int the_q = work_list.front();
\r
145 work_list.pop_front();
\r
146 vector<qp_node *> q_sources;
\r
147 vector<tablevar_t *> q_input_tbls = query_plan[the_q]->get_input_tbls();
\r
148 for(i=0;i<q_input_tbls.size();++i){
\r
149 string itbl_nm = q_input_tbls[i]->get_schema_name();
\r
150 if(name_to_node.count(itbl_nm)>0){
\r
151 q_sources.push_back(query_plan[name_to_node[itbl_nm]]);
\r
152 }else if(sq_map.count(itbl_nm)>0){
\r
153 q_sources.push_back(sq_map[itbl_nm]->get_query_head());
\r
155 q_sources.push_back(NULL);
\r
158 query_plan[the_q]->create_protocol_se(q_sources, Schema);
\r
161 //////////////////////////////////////////////////////////
\r
162 // trust but verify
\r
166 for(i=0;i<query_plan.size();++i){
\r
168 printf("query node %s, type=%s:\n",query_plan[i]->get_node_name().c_str(),
\r
169 query_plan[i]->node_type().c_str());
\r
170 map<std::string, scalarexp_t *> *pse_map = query_plan[i]->get_protocol_se();
\r
171 map<std::string, scalarexp_t *>::iterator mssi;
\r
172 for(mssi=pse_map->begin();mssi!=pse_map->end();++mssi){
\r
174 printf("\t%s : %s\n",(*mssi).first.c_str(), (*mssi).second->to_string().c_str());
\r
176 printf("\t%s : NULL\n",(*mssi).first.c_str());
\r
178 if(query_plan[i]->node_type() == "filter_join" || query_plan[i]->node_type() == "join_eq_hash_qpn"){
\r
179 vector<scalarexp_t *> pse_l;
\r
180 vector<scalarexp_t *> pse_r;
\r
181 if(query_plan[i]->node_type() == "filter_join"){
\r
182 pse_l = ((filter_join_qpn *)query_plan[i])->get_hash_l();
\r
183 pse_r = ((filter_join_qpn *)query_plan[i])->get_hash_r();
\r
185 if(query_plan[i]->node_type() == "join_eq_hash_qpn"){
\r
186 pse_l = ((join_eq_hash_qpn *)query_plan[i])->get_hash_l();
\r
187 pse_r = ((join_eq_hash_qpn *)query_plan[i])->get_hash_r();
\r
190 for(p=0;p<pse_l.size();++p){
\r
191 if(pse_l[p] != NULL)
\r
192 printf("\t\t%s = ",pse_l[p]->to_string().c_str());
\r
194 printf("\t\tNULL = ");
\r
195 if(pse_r[p] != NULL)
\r
196 printf("%s\n",pse_r[p]->to_string().c_str());
\r
201 if(query_plan[i]->node_type() == "sgah_qpn" || query_plan[i]->node_type() == "rsgah_qpn" || query_plan[i]->node_type() == "sgahcwcb_qpn"){
\r
202 vector<scalarexp_t *> pseg;
\r
203 if(query_plan[i]->node_type() == "sgah_qpn")
\r
204 pseg = ((sgah_qpn *)query_plan[i])->get_gb_sources();
\r
205 if(query_plan[i]->node_type() == "rsgah_qpn")
\r
206 pseg = ((rsgah_qpn *)query_plan[i])->get_gb_sources();
\r
207 if(query_plan[i]->node_type() == "sgahcwcb_qpn")
\r
208 pseg = ((sgahcwcb_qpn *)query_plan[i])->get_gb_sources();
\r
210 for(g=0;g<pseg.size();g++){
\r
211 if(pseg[g] != NULL)
\r
212 printf("\t\tgb %d = %s\n",g,pseg[g]->to_string().c_str());
\r
214 printf("\t\tgb %d = NULL\n",g);
\r
223 bool stream_query::generate_linkage(){
\r
224 bool create_failed = false;
\r
227 // Clear any leftover linkages
\r
228 for(n=0;n<query_plan.size();++n){
\r
230 query_plan[n]->clear_predecessors();
\r
231 query_plan[n]->clear_successors();
\r
236 // create name-to-index map
\r
237 map<string, int> name_to_node;
\r
238 for(n=0;n<query_plan.size();++n){
\r
240 name_to_node[query_plan[n]->get_node_name()] = n;
\r
244 // Do the 2-way linkage.
\r
245 for(n=0;n<query_plan.size();++n){
\r
247 vector<tablevar_t *> fm = query_plan[n]->get_input_tbls();
\r
248 for(f=0;f<fm.size();++f){
\r
249 string src_tbl = fm[f]->get_schema_name();
\r
250 if(name_to_node.count(src_tbl)>0){
\r
251 int s = name_to_node[src_tbl];
\r
252 query_plan[n]->add_predecessor(s);
\r
253 query_plan[s]->add_successor(n);
\r
259 // Find the head (no successors) and the tails (at least one
\r
260 // predecessor is external).
\r
261 // Verify that there is only one head,
\r
262 // and that all other nodes have one successor (because
\r
263 // right now I can only handle trees).
\r
265 qhead = -1; // no head yet found.
\r
266 for(n=0;n<query_plan.size();++n){
\r
268 vector<int> succ = query_plan[n]->get_successors();
\r
270 if(succ.size() > 1){
\r
271 fprintf(stderr,"ERROR, query node %s supplies data to multiple nodes, but currently only tree query plans are supported:\n",query_plan[n]->get_node_name().c_str());
\r
272 for(s=0;s<succ.size();++s){
\r
273 fprintf(stderr,"%s ",query_plan[succ[s]]->get_node_name().c_str());
\r
275 fprintf(stderr,"\n");
\r
276 create_failed = true;
\r
279 if(succ.size() == 0){
\r
281 fprintf(stderr,"ERROR, query nodes %s and %s both have zero successors.\n",query_plan[n]->get_node_name().c_str(), query_plan[qhead]->get_node_name().c_str());
\r
282 create_failed = true;
\r
287 if(query_plan[n]->n_predecessors() < query_plan[n]->get_input_tbls().size()){
\r
288 qtail.push_back(n);
\r
293 return create_failed;
\r
296 // After the collection of query plan nodes is generated,
\r
297 // analyze their structure to link them up into a tree (or dag?).
\r
298 // Verify that the structure is acceptable.
\r
299 // Do some other analysis and verification tasks (?)
\r
300 // then gather summar information.
\r
301 int stream_query::generate_plan(table_list *Schema){
\r
303 // The first thing to do is verify that the query plan
\r
304 // nodes were successfully created.
\r
305 bool create_failed = false;
\r
307 for(n=0;n<query_plan.size();++n){
\r
308 if(query_plan[n]!=NULL && query_plan[n]->get_error_code()){
\r
309 fprintf(stderr,"%s",query_plan[n]->get_error_str().c_str());
\r
310 create_failed = true;
\r
314 for(n=0;n<query_plan.size();++n){
\r
315 if(query_plan[n] != NULL){
\r
316 string nstr = query_plan[n]->get_node_name();
\r
317 printf("In generate_plan, node %d is %s, reads from:\n\t",n,nstr.c_str());
\r
318 vector<tablevar_t *> inv = query_plan[n]->get_input_tbls();
\r
320 for(nn=0;nn<inv.size();nn++){
\r
321 printf("%s (%d) ",inv[nn]->to_string().c_str(),inv[nn]->get_schema_ref());
\r
328 if(create_failed) return(1);
\r
330 // Here, link up the query nodes, then verify that the
\r
331 // structure is acceptable (single root, no cycles, no stranded
\r
333 create_failed = generate_linkage();
\r
334 if(create_failed) return -1;
\r
337 // Here, do optimizations such as predicate pushing,
\r
338 // join rearranging, etc.
\r
339 // Nothing to do yet.
\r
342 for(n=0;n<query_plan.size();++n){
\r
343 if(query_plan[n] != NULL){
\r
344 string nstr = query_plan[n]->get_node_name();
\r
345 printf("B generate_plan, node %d is %s, reads from:\n\t",n,nstr.c_str());
\r
346 vector<tablevar_t *> inv = query_plan[n]->get_input_tbls();
\r
348 for(nn=0;nn<inv.size();nn++){
\r
349 printf("%s (%d) ",inv[nn]->to_string().c_str(),inv[nn]->get_schema_ref());
\r
354 printf("qhead=%d, qtail = ",qhead);
\r
356 for(nn=0;nn<qtail.size();++nn)
\r
357 printf("%d ",qtail[nn]);
\r
361 // Collect query summaries. The query is reperesented by its head node.
\r
362 query_name = query_plan[qhead]->get_node_name();
\r
363 attributes = query_plan[qhead]->get_fields();
\r
364 // TODO: The params and defines require a lot more thought.
\r
365 parameters = query_plan[qhead]->get_param_tbl();
\r
366 defines = query_plan[qhead]->get_definitions();
\r
371 void stream_query::add_output_operator(ospec_str *o){
\r
372 output_specs.push_back(o);
\r
376 void stream_query::get_external_libs(set<string> &libset){
\r
379 for(qn=0;qn<query_plan.size();++qn){
\r
380 if(query_plan[qn] != NULL){
\r
381 vector<string> op_libs = query_plan[qn]->external_libs();
\r
382 for(i=0;i<op_libs.size();++i){
\r
383 libset.insert(op_libs[i]);
\r
388 for(qn=0;qn<output_operators.size();++qn){
\r
389 if(output_operators[qn] != NULL){
\r
390 vector<string> op_libs = output_operators[qn]->external_libs();
\r
391 for(i=0;i<op_libs.size();++i){
\r
392 libset.insert(op_libs[i]);
\r
400 // Split into LFTA, HFTA components.
\r
401 // Split this query into LFTA and HFTA queries.
\r
402 // Four possible outcomes:
\r
403 // 1) the query reads from a protocol, but does not need to
\r
404 // split (can be evaluated as an LFTA).
\r
405 // The lfta query is the only element in the return vector,
\r
406 // and hfta_returned is false.
\r
407 // 2) the query reads from no protocol, and therefore cannot be split.
\r
408 // THe hfta query is the only element in the return vector,
\r
409 // and hfta_returned is true.
\r
410 // 3) reads from at least one protocol, but cannot be split : failure.
\r
411 // return vector is empty, the error conditions are written
\r
412 // in err_str and error_code
\r
413 // 4) The query splits into an hfta query and one or more LFTA queries.
\r
414 // the return vector has two or more elements, and hfta_returned
\r
415 // is true. The last element is the HFTA.
\r
417 vector<stream_query *> stream_query::split_query(ext_fcn_list *Ext_fcns, table_list *Schema, bool &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
\r
418 vector<stream_query *> queries;
\r
422 hfta_returned = false; // assume until proven otherwise
\r
424 for(l=0;l<qtail.size();++l){
\r
425 int leaf = qtail[l];
\r
428 vector<qp_node *> qnodes = query_plan[leaf]->split_node_for_fta(Ext_fcns, Schema, qp_hfta, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
\r
431 if(qnodes.size() == 0 || query_plan[leaf]->get_error_code()){ // error
\r
432 //printf("error\n");
\r
433 error_code = query_plan[leaf]->get_error_code();
\r
434 err_str = query_plan[leaf]->get_error_str();
\r
435 vector<stream_query *> null_result;
\r
436 return(null_result);
\r
438 if(qnodes.size() == 1 && qp_hfta){ // nothing happened
\r
439 //printf("no change\n");
\r
440 query_plan[leaf] = qnodes[0];
\r
442 if(qnodes.size() == 1 && !qp_hfta){ // push to lfta
\r
443 //printf("lfta only\n");
\r
444 queries.push_back(new stream_query(qnodes[0], this));
\r
445 vector<int> succ = query_plan[leaf]->get_successors();
\r
446 for(s=0;s<succ.size();++s){
\r
447 query_plan[succ[s]]->remove_predecessor(leaf);
\r
449 query_plan[leaf] = NULL; // delete it?
\r
451 if(qnodes.size() > 1){ // actual splitting occurred.
\r
452 if(!qp_hfta){ // internal consistency check.
\r
454 err_str = "INTERNAL ERROR: mulitple nodes returned by split_node_for_fta, but none are hfta nodes.\n";
\r
455 vector<stream_query *> null_result;
\r
456 return(null_result);
\r
459 for(q=0;q<qnodes.size()-qp_hfta;++q){ // process lfta nodes
\r
460 //printf("creating lfta %d (%s)\n",q,qnodes[q]->get_node_name().c_str());
\r
461 queries.push_back(new stream_query(qnodes[q], this));
\r
463 // Use new hfta node
\r
464 query_plan[leaf] = qnodes[qnodes.size()-1];
\r
465 // Add in any extra hfta nodes
\r
466 for(q=qnodes.size()-qp_hfta;q<qnodes.size()-1;++q)
\r
467 query_plan.push_back(qnodes[q]);
\r
473 for(q=0;q<query_plan.size();++q){
\r
474 if(query_plan[q] != NULL){
\r
475 n_ifprefs += query_plan[q]->count_ifp_refs(ifps);
\r
476 hfta_returned = true;
\r
481 set<string>::iterator ssi;
\r
482 err_str += "ERROR, unresolved interface parameters in HFTA:\n";
\r
483 for(ssi=ifps.begin();ssi!=ifps.end();++ssi){
\r
484 err_str += (*ssi)+" ";
\r
488 vector<stream_query *> null_result;
\r
489 return(null_result);
\r
494 if(generate_linkage()){
\r
495 fprintf(stderr,"INTERNAL ERROR, generate_linkage failed in split_query.\n");
\r
498 queries.push_back(this);
\r
506 vector<table_exp_t *> stream_query::extract_opview(table_list *Schema, vector<query_node *> &qnodes, opview_set &opviews, string silo_nm){
\r
507 vector<table_exp_t *> subqueries;
\r
510 string root_name = this->get_output_tabledef()->get_tbl_name();
\r
513 for(l=0;l<qtail.size();++l){
\r
514 int leaf = qtail[l];
\r
515 vector<table_exp_t *> new_qnodes = query_plan[leaf]->extract_opview(Schema, qnodes, opviews, root_name, silo_nm);
\r
517 for(q=0;q<new_qnodes.size();++q){ // process lfta nodes
\r
518 subqueries.push_back( new_qnodes[q]);
\r
522 return(subqueries);
\r
528 string stream_query::make_schema(){
\r
529 return make_schema(qhead);
\r
532 string stream_query::make_schema(int q){
\r
533 string ret="FTA{\n\n";
\r
535 ret += query_plan[q]->get_fields()->to_string();
\r
537 ret += "DEFINE{\n";
\r
538 ret += "\tquery_name '"+query_plan[q]->get_node_name()+"';\n";
\r
540 map<string, string> defs = query_plan[q]->get_definitions();
\r
541 map<string, string>::iterator dfi;
\r
542 for(dfi=defs.begin(); dfi!=defs.end(); ++dfi){
\r
543 ret += "\t"+ (*dfi).first + " '" + (*dfi).second + "';\n";
\r
548 param_table *params = query_plan[q]->get_param_tbl();
\r
549 vector<string> param_names = params->get_param_names();
\r
551 for(p=0;p<param_names.size();p++){
\r
552 data_type *dt = params->get_data_type( param_names[p] );
\r
553 ret += "\t" + param_names[p] + " '" + dt->get_type_str() + "';\n";
\r
557 ret += query_plan[q]->to_query_string();
\r
564 string stream_query::collect_refd_ifaces(){
\r
567 for(q=0;q<query_plan.size();++q){
\r
569 map<string, string> defs = query_plan[q]->get_definitions();
\r
570 if(defs.count("_referenced_ifaces")){
\r
571 if(ret != "") ret += ",";
\r
572 ret += defs["_referenced_ifaces"];
\r
581 bool stream_query::stream_input_only(table_list *Schema){
\r
582 vector<tablevar_t *> input_tbls = this->get_input_tables();
\r
584 for(i=0;i<input_tbls.size();++i){
\r
585 int t = Schema->get_table_ref(input_tbls[i]->get_schema_name());
\r
586 if(Schema->get_schema_type(t) == PROTOCOL_SCHEMA) return(false);
\r
591 // Return input tables. No duplicate removal performed.
\r
592 vector<tablevar_t *> stream_query::get_input_tables(){
\r
593 vector<tablevar_t *> retval;
\r
595 // create name-to-index map
\r
597 map<string, int> name_to_node;
\r
598 for(n=0;n<query_plan.size();++n){
\r
600 name_to_node[query_plan[n]->get_node_name()] = n;
\r
605 for(l=0;l<qtail.size();++l){
\r
606 int leaf = qtail[l];
\r
607 vector<tablevar_t *> tmp_v = query_plan[leaf]->get_input_tbls();
\r
609 for(i=0;i<tmp_v.size();++i){
\r
610 if(name_to_node.count(tmp_v[i]->get_schema_name()) == 0)
\r
611 retval.push_back(tmp_v[i]);
\r
618 void stream_query::compute_node_format(int q, vector<int> &nfmt, map<string, int> &op_idx){
\r
619 int netcnt = 0, hostcnt = 0;
\r
622 vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
\r
623 for(i=0;i<itbls.size();++i){
\r
624 string tname = itbls[i]->get_schema_name();
\r
625 if(op_idx.count(tname)){
\r
626 int o = op_idx[tname];
\r
627 if(nfmt[o] == UNKNOWNFORMAT)
\r
628 compute_node_format(o,nfmt,op_idx);
\r
629 if(nfmt[o] == NETFORMAT) netcnt++;
\r
635 if(query_plan[q]->makes_transform()){
\r
636 nfmt[q] = HOSTFORMAT;
\r
639 nfmt[q] = HOSTFORMAT;
\r
641 nfmt[q] = NETFORMAT;
\r
644 //printf("query plan %d (%s) is ",q,query_plan[q]->get_node_name().c_str());
\r
645 //if(nfmt[q] == HOSTFORMAT) printf(" host format.\n");
\r
646 //else printf("net format\n");
\r
650 string stream_query::generate_hfta(table_list *Schema, ext_fcn_list *Ext_fcns, opview_set &opviews, bool distributed_mode){
\r
651 int schref, ov_ix, i, q, param_sz;
\r
652 bool dag_graph = false;
\r
655 // Bind the SEs in all query plan nodes to this schema, and
\r
656 // Add all tables used by this query to the schema.
\r
657 // Question: Will I be polluting the global schema by adding all
\r
658 // query node schemas?
\r
660 // First ensure all nodes are in the schema.
\r
662 for(qn=0;qn<query_plan.size();++qn){
\r
663 if(query_plan[qn] != NULL){
\r
664 Schema->add_table(query_plan[qn]->get_fields());
\r
668 for(qn=0;qn<query_plan.size();++qn){
\r
669 if(query_plan[qn] != NULL){
\r
670 query_plan[qn]->bind_to_schema(Schema);
\r
674 // Is it a DAG plan?
\r
675 set<string> qsources;
\r
677 for(n=0;n<query_plan.size();++n){
\r
679 vector<tablevar_t *> tmp_v = query_plan[n]->get_input_tbls();
\r
681 for(i=0;i<tmp_v.size();++i){
\r
682 if(qsources.count(tmp_v[i]->get_schema_name()) > 0)
\r
684 qsources.insert(tmp_v[i]->get_schema_name());
\r
691 // Collect set of tables ref'd in this HFTA
\r
693 for(qn=0;qn<query_plan.size();++qn){
\r
694 if(query_plan[qn]){
\r
695 // get names of the tables
\r
696 vector<tablevar_t *> input_tbls = query_plan[qn]->get_input_tbls();
\r
697 vector<tablevar_t *> output_tbls = query_plan[qn]->get_output_tbls();
\r
698 // Convert to tblrefs, add to set of ref'd tables
\r
700 for(i=0;i<input_tbls.size();i++){
\r
701 // int t = Schema->get_table_ref(input_tbls[i]->get_schema_name());
\r
702 int t = input_tbls[i]->get_schema_ref();
\r
704 fprintf(stderr,"INTERNAL ERROR in generate_hfta. "
\r
705 "query plan node %s references input table %s, which is not in schema.\n",
\r
706 query_name.c_str(), input_tbls[i]->get_schema_name().c_str());
\r
712 for(i=0;i<output_tbls.size();i++){
\r
713 int t = Schema->get_table_ref(output_tbls[i]->get_schema_name());
\r
715 fprintf(stderr,"INTERNAL ERROR in generate_hfta."
\r
716 "query plan node %s references output table %s, which is not in schema.\n",
\r
717 query_name.c_str(), output_tbls[i]->get_schema_name().c_str());
\r
725 // Collect map of lftas, query nodes
\r
726 map<string, int> op_idx;
\r
727 for(q=0;q<query_plan.size();q++){
\r
729 op_idx[query_plan[q]->get_node_name()] = q;
\r
733 // map of input tables must include query id and input
\r
734 // source (0,1) becuase several queries might reference the same source
\r
735 vector<tablevar_t *> input_tbls = this->get_input_tables();
\r
736 vector<bool> input_tbl_free;
\r
737 for(i=0;i<input_tbls.size();++i){
\r
738 input_tbl_free.push_back(true);
\r
740 map<string, int> lfta_idx;
\r
741 //fprintf(stderr,"%d input tables, %d query nodes\n",input_tbls.size(), query_plan.size());
\r
742 for(q=0;q<query_plan.size();q++){
\r
744 vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
\r
746 for(it=0;it<itbls.size();it++){
\r
747 string tname = itbls[it]->get_schema_name()+"-"+int_to_string(q)+"-"+int_to_string(it);
\r
748 string src_tblname = itbls[it]->get_schema_name();
\r
749 bool src_is_external = false;
\r
750 for(i=0;i<input_tbls.size();++i){
\r
751 if(src_tblname == input_tbls[i]->get_schema_name()){
\r
752 src_is_external = true;
\r
753 if(input_tbl_free[i]){
\r
754 lfta_idx[tname] = i;
\r
755 input_tbl_free[i] = false;
\r
756 //fprintf(stderr,"Adding %s (src_tblname=%s, q=%d, it=%d) to %d.\n",tname.c_str(), src_tblname.c_str(), q, it, i);
\r
761 if(i==input_tbls.size() && src_is_external){
\r
762 fprintf(stderr,"INTERNAL ERROR in stream_query::generate_hfta, can't find free entry in input_tbls for query %d, intput %d (%s)\n",q,it,src_tblname.c_str());
\r
769 for(i=0;i<input_tbls.size();++i){
\r
770 string src_tblname = input_tbls[i]->get_schema_name();
\r
771 lfta_idx[src_tblname] = i;
\r
775 // Compute the output formats of the operators.
\r
776 vector<int> node_fmt(query_plan.size(),UNKNOWNFORMAT);
\r
777 compute_node_format(qhead, node_fmt, op_idx);
\r
780 // Generate the schema strings for the outputs.
\r
782 for(i=0;i<query_plan.size();++i){
\r
783 if(i != qhead && query_plan[i]){
\r
784 string schema_tmpstr = this->make_schema(i);
\r
785 schema_str += "gs_csp_t node"+int_to_string(i)+"_schema = "+make_C_embedded_string(schema_tmpstr)+";\n";
\r
789 attributes = query_plan[qhead]->get_fields();
\r
792 string schema_tmpstr = this->make_schema();
\r
793 schema_str += "gs_csp_t "+generate_schema_string_name(query_name)+" = "+make_C_embedded_string(schema_tmpstr)+";\n";
\r
795 // Generate the collection of tuple defs.
\r
797 string tuple_defs = "\n/*\tDefine tuple structures \t*/\n\n";
\r
798 set<int>::iterator si;
\r
799 for(si=tbl_set.begin(); si!=tbl_set.end(); ++si){
\r
800 tuple_defs += generate_host_tuple_struct( Schema->get_table( (*si) ));
\r
801 tuple_defs += "\n\n";
\r
804 // generate the finalize tuple function
\r
805 string finalize_str = generate_hfta_finalize_tuple(attributes);
\r
807 // Analyze and make the output operators
\r
808 bool eat_input = false;
\r
809 string src_op = query_name;
\r
810 string pred_op = src_op;
\r
812 if(output_specs.size()>0)
\r
815 eat_input = false; // must create stream output for successor HFTAs.
\r
816 int n_filestreams = 0;
\r
817 for(i=0;i<output_specs.size();++i){
\r
818 if(output_specs[i]->operator_type == "stream" ){
\r
821 if(output_specs[i]->operator_type == "file" || output_specs[i]->operator_type == "zfile" ){
\r
826 int filestream_id = 0;
\r
827 for(i=0;i<output_specs.size();++i){
\r
828 if(output_specs[i]->operator_type == "file" || output_specs[i]->operator_type == "zfile"){
\r
830 int n_fstreams = output_specs[i]->n_partitions / n_parallel;
\r
831 if(n_fstreams * n_parallel < output_specs[i]->n_partitions){
\r
833 if(n_parallel == 1 || query_name.find("__copy1") != string::npos){
\r
834 fprintf(stderr,"WARNING, in query %s, %d streams requested for %s output, but it must be a multiple of the hfta parallelism (%d), increasing number of output streams to %d.\n",query_name.c_str(), output_specs[i]->n_partitions, output_specs[i]->operator_type.c_str(), n_parallel, n_fstreams*n_parallel);
\r
837 // output_file_qpn *new_ofq = new output_file_qpn();
\r
838 string filestream_tag = "";
\r
839 if(n_filestreams>1){
\r
840 filestream_tag = "_fileoutput"+int_to_string(filestream_id);
\r
843 output_file_qpn *new_ofq = new output_file_qpn(pred_op, src_op, filestream_tag, query_plan[qhead]->get_fields(), output_specs[i], (i==last_op ? eat_input : false) );
\r
844 // if(n_fstreams > 1){
\r
845 if(n_fstreams > 0){
\r
847 bool err_ret = new_ofq->set_splitting_params(n_parallel,parallel_idx,n_fstreams,output_specs[i]->partitioning_flds,err_str);
\r
849 fprintf(stderr,"%s",err_str.c_str());
\r
853 output_operators.push_back(new_ofq );
\r
854 pred_op = output_operators.back()->get_node_name();
\r
855 }else if(! (output_specs[i]->operator_type == "stream" || output_specs[i]->operator_type == "Stream" || output_specs[i]->operator_type == "STREAM") ){
\r
856 fprintf(stderr,"WARNING, output operator type %s (on query %s) is not supported, ignoring\n",output_specs[i]->operator_type.c_str(),query_name.c_str() );
\r
862 // Generate functors for the query nodes.
\r
864 string functor_defs = "\n/*\tFunctor definitions\t*/\n\n";
\r
865 for(qn=0;qn<query_plan.size();++qn){
\r
866 if(query_plan[qn]!=NULL){
\r
867 // Compute whether the input needs a ntoh xform.
\r
868 vector<bool> needs_xform;
\r
869 vector<tablevar_t *> itbls = query_plan[qn]->get_input_tbls();
\r
870 for(i=0;i<itbls.size();++i){
\r
871 string tname = itbls[i]->get_schema_name();
\r
872 // if(query_plan[qn]->makes_transform()){
\r
873 if(op_idx.count(tname)>0){
\r
874 if(node_fmt[ op_idx[tname] ] == NETFORMAT){
\r
875 needs_xform.push_back(true);
\r
877 needs_xform.push_back(false);
\r
880 needs_xform.push_back(true);
\r
883 // if(op_idx.count(tname)>0){
\r
884 // if(node_fmt[qn] != node_fmt[ op_idx[tname] ]){
\r
885 // needs_xform.push_back(true);
\r
887 // needs_xform.push_back(false);
\r
890 // if(node_fmt[qn] == HOSTFORMAT){
\r
891 // needs_xform.push_back(true);
\r
893 // needs_xform.push_back(false);
\r
899 functor_defs += query_plan[qn]->generate_functor(Schema, Ext_fcns, needs_xform);
\r
903 // Generate output operator functors
\r
905 vector<bool> needs_xform;
\r
906 for(i=0;i<output_operators.size();++i)
\r
907 functor_defs += output_operators[i]->generate_functor(Schema, Ext_fcns, needs_xform);
\r
911 "#include <lapp.h>\n"
\r
912 "#include <fta.h>\n"
\r
913 "#include <gshub.h>\n"
\r
914 "#include <stdlib.h>\n"
\r
915 "#include <stdio.h>\n"
\r
916 "#include <limits.h>\n"
\r
921 "#define PLAN_DAG\n"
\r
924 "#include <schemaparser.h>\n"
\r
925 "#include<hfta_runtime_library.h>\n"
\r
927 "#include <host_tuple.h>\n"
\r
928 "#include <hfta.h>\n"
\r
929 "#include <hfta_udaf.h>\n"
\r
930 "#include <hfta_sfun.h>\n"
\r
932 //"#define MAXSCHEMASZ 16384\n"
\r
933 "#include <stdio.h>\n\n"
\r
936 // Get include file for each of the operators.
\r
937 // avoid duplicate inserts.
\r
938 set<string> include_fls;
\r
939 for(qn=0;qn<query_plan.size();++qn){
\r
940 if(query_plan[qn] != NULL)
\r
941 include_fls.insert(query_plan[qn]->get_include_file());
\r
943 for(i=0;i<output_operators.size();++i)
\r
944 include_fls.insert(output_operators[i]->get_include_file());
\r
945 set<string>::iterator ssi;
\r
946 for(ssi=include_fls.begin();ssi!=include_fls.end();++ssi)
\r
949 // Add defines for hash functions
\r
952 "#define hfta_BOOL_to_hash(x) (x)\n"
\r
953 "#define hfta_USHORT_to_hash(x) (x)\n"
\r
954 "#define hfta_UINT_to_hash(x) (x)\n"
\r
955 "#define hfta_IP_to_hash(x) (x)\n"
\r
956 "#define hfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
\r
957 "#define hfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
\r
958 "#define hfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
\r
959 "#define hfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
\r
960 "#define hfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
\r
964 // ret += "#define SERIOUS_LFTA \""+input_tbls[0]->get_schema_name()+"\"\n";
\r
965 ret += "#define OUTPUT_HFTA \""+query_name+"\"\n\n";
\r
967 // HACK ALERT: I know for now that all query plans are
\r
968 // single operator plans, but while SPX and SGAH can use the
\r
969 // UNOP template, the merge operator must use the MULTOP template.
\r
970 // WORSE HACK ALERT : merge does not translate its input,
\r
971 // so don't apply finalize to the output.
\r
972 // TODO: clean this up.
\r
974 // string node_type = query_plan[0]->node_type();
\r
979 // Need to work on the input, output xform logic.
\r
980 // For now, just add it in.
\r
981 // ret += finalize_str;
\r
983 if(node_fmt[qhead] == NETFORMAT){
\r
985 "void finalize_tuple(host_tuple &tup){\n"
\r
990 ret += finalize_str;
\r
993 ret += functor_defs;
\r
995 // Parameter block management
\r
996 // The proper parameter block must be transmitted to each
\r
997 // external stream source.
\r
998 // There is a 1-1 mapping between the param blocks returned
\r
999 // by this list and the registered data sources ...
\r
1000 // TODO: make this more manageable, but for now
\r
1001 // there is no parameter block manipulation so I just
\r
1002 // need to have the same number.
\r
1005 "int get_lfta_params(gs_int32_t sz, void * value,list<param_block>& lst){\n"
\r
1006 " // for now every lfta receive the full copy of hfta parameters\n"
\r
1007 " struct param_block pb;\n";
\r
1009 set<string> lfta_seen;
\r
1010 for(i=0;i<input_tbls.size();++i){
\r
1011 string src_tblname = input_tbls[i]->get_schema_name();
\r
1012 if(lfta_seen.count(src_tblname) == 0){
\r
1013 lfta_seen.insert(src_tblname);
\r
1014 schref = input_tbls[i]->get_schema_ref();
\r
1015 if(Schema->get_schema_type(schref) == OPERATOR_VIEW_SCHEMA){
\r
1016 ov_ix = input_tbls[i]->get_opview_idx();
\r
1017 opview_entry *opv = opviews.get_entry(ov_ix);
\r
1018 string op_param = "SUBQ:";
\r
1020 for(q=0;q<opv->subq_names.size();++q){
\r
1021 if(q>0) op_param+=",";
\r
1022 op_param+=opv->subq_names[q];
\r
1025 param_sz = op_param.size()-1;
\r
1027 sprintf(tmpstr,"\t\tpb.block_length = %d;\n",param_sz); ret+=tmpstr;
\r
1029 " pb.data = malloc(pb.block_length);\n";
\r
1030 ret+="\t\tmemcpy(pb.data,\""+op_param+"\",pb.block_length);\n"
\r
1031 " lst.push_back(pb);\n\n";
\r
1034 " pb.block_length = sz;\n"
\r
1035 " pb.data = malloc(pb.block_length);\n"
\r
1036 " memcpy(pb.data, value, pb.block_length);\n"
\r
1037 " lst.push_back(pb);\n\n";
\r
1048 "struct FTA* alloc_hfta (struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command, gs_int32_t sz, void * value ) {\n"
\r
1050 " // find the lftas\n"
\r
1051 " list<lfta_info*> *lfta_list = new list<lfta_info*>;\n"
\r
1054 " char schemabuf[MAXSCHEMASZ];\n"
\r
1055 " gs_schemahandle_t schema_handle;\n"
\r
1058 // Register the input data sources.
\r
1059 // Register a source only once.
\r
1061 // vector<string> ext_reg_txt;
\r
1062 map<string, int> input_tbl_srcid;
\r
1063 for(i=0;i<input_tbls.size();++i){
\r
1064 string src_tblname = input_tbls[i]->get_schema_name();
\r
1065 // Use UDOP alias when in distributed mode.
\r
1066 // the cluster manager will make the translation
\r
1067 // using infr from qtree.xml
\r
1068 if(distributed_mode && input_tbls[i]->get_udop_alias() != "")
\r
1069 src_tblname = input_tbls[i]->get_udop_alias();
\r
1070 if(input_tbl_srcid.count(src_tblname) == 0){
\r
1071 int srcid = input_tbl_srcid.size();
\r
1072 input_tbl_srcid[src_tblname] = srcid;
\r
1074 "\n // find "+src_tblname+"\n"
\r
1075 " if (fta_find(\""+src_tblname+"\",1,&f,schemabuf,MAXSCHEMASZ)!=0) {\n"
\r
1076 " fprintf(stderr,\"HFTA::error:could not find LFTA \\n\");\n"
\r
1079 " //fprintf(stderr,\"HFTA::FTA found at %u[%u]\\n\",ftamsgid,ftaindex);\n"
\r
1081 " // parse the schema and get the schema handle\n"
\r
1082 " schema_handle = ftaschema_parse_string(schemabuf);\n"
\r
1083 " lfta_info* inf"+int_to_string(srcid)+" = new lfta_info();\n"
\r
1084 " inf"+int_to_string(srcid)+"->f = f;\n"
\r
1085 " inf"+int_to_string(srcid)+"->fta_name = strdup(\""+src_tblname+"\");\n"
\r
1086 " inf"+int_to_string(srcid)+"->schema = strdup(schemabuf);\n"
\r
1087 " inf"+int_to_string(srcid)+"->schema_handle = schema_handle;\n"
\r
1088 " lfta_list->push_back(inf"+int_to_string(srcid)+");\n\n";
\r
1089 // ext_reg_txt.push_back(tmp_s);
\r
1095 ret += "\tgs_schemahandle_t root_schema_handle = ftaschema_parse_string("+generate_schema_string_name(query_name)+");\n";
\r
1096 for(i=0;i<query_plan.size();++i){
\r
1097 if(i != qhead && query_plan[i]){
\r
1098 ret += "\tgs_schemahandle_t op"+int_to_string(i)+"_schema_handle = ftaschema_parse_string(node"+int_to_string(i)+"_schema);\n";
\r
1103 // Create the operators.
\r
1105 for(q=0;q<query_plan.size();q++){
\r
1106 if(query_plan[q]){
\r
1109 " // create an instance of operator "+int_to_string(q)+" ("+query_plan[q]->get_node_name()+") \n";
\r
1111 // Create parameters for operator construction.
\r
1113 vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
\r
1114 string tname = itbls[0]->get_schema_name();
\r
1115 // string li_tname = tname +"-"+int_to_string(q)+"-0";
\r
1116 // if(lfta_idx.count(li_tname)>0)
\r
1117 if(input_tbl_srcid.count(tname)>0){
\r
1118 // ret += ext_reg_txt[lfta_idx[li_tname]];
\r
1119 // op_params += "inf"+int_to_string( lfta_idx[li_tname] )+"->schema_handle";
\r
1120 op_params += "inf"+int_to_string( input_tbl_srcid[tname] )+"->schema_handle";
\r
1121 }else if(op_idx.count(tname)>0){
\r
1122 op_params += "op"+int_to_string( op_idx[tname] )+"_schema_handle";
\r
1124 fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when creating operator (3) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str());
\r
1127 if(itbls.size()>1){
\r
1128 string tname = itbls[1]->get_schema_name();
\r
1129 // string li_tname = tname +"-"+int_to_string(q)+"-1";
\r
1130 // if(lfta_idx.count(li_tname)>0)
\r
1131 if(input_tbl_srcid.count(tname)>0){
\r
1132 // ret += ext_reg_txt[lfta_idx[li_tname]];
\r
1133 // op_params += ",inf"+int_to_string( lfta_idx[li_tname] )+"->schema_handle";
\r
1134 op_params += ",inf"+int_to_string( input_tbl_srcid[tname] )+"->schema_handle";
\r
1135 }else if(op_idx.count(tname)>0){
\r
1136 op_params += ",op"+int_to_string( op_idx[tname] )+"_schema_handle";
\r
1138 fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when creating operator (4) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str());
\r
1142 ret += query_plan[q]->generate_operator(q,op_params);
\r
1144 " operator_node* node"+int_to_string(q)+" = new operator_node(op"+int_to_string(q)+");\n";
\r
1150 // Next for the output operators if any
\r
1151 for(i=0;i<output_operators.size();++i){
\r
1152 ret += output_operators[i]->generate_operator(n_basic_ops+i,"root_schema_handle");
\r
1154 " operator_node* node"+int_to_string(n_basic_ops+i)+" = new operator_node(op"+int_to_string(n_basic_ops+i)+");\n";
\r
1158 // Link up operators.
\r
1159 for(q=0;q<query_plan.size();++q){
\r
1160 if(query_plan[q]){
\r
1161 // NOTE: this code assume that the operator has at most
\r
1162 // two inputs. But the template code also makes
\r
1163 // this assumption. Both will need to be changed.
\r
1164 vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
\r
1165 string tname = itbls[0]->get_schema_name();
\r
1166 // string li_tname = tname +"-"+int_to_string(q)+"-0";
\r
1167 // if(lfta_idx.count(li_tname)>0)
\r
1168 if(input_tbl_srcid.count(tname)>0){
\r
1169 // ret += "\tnode"+int_to_string(q)+"->set_left_lfta(inf"+int_to_string( lfta_idx[li_tname] )+");\n";
\r
1170 ret += "\tnode"+int_to_string(q)+"->set_left_lfta(inf"+int_to_string( input_tbl_srcid[tname] )+");\n";
\r
1171 }else if(op_idx.count(tname)>0){
\r
1172 ret += "\tnode"+int_to_string(q)+"->set_left_child_node(node"+int_to_string( op_idx[tname] )+");\n";
\r
1174 fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when linking operator (1) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str());
\r
1177 if(itbls.size()>1){
\r
1178 string tname = itbls[1]->get_schema_name();
\r
1179 // string li_tname = tname +"-"+int_to_string(q)+"-1";
\r
1180 // if(lfta_idx.count(li_tname)>0)
\r
1181 if(input_tbl_srcid.count(tname)>0){
\r
1182 // ret += "\tnode"+int_to_string(q)+"->set_right_lfta(inf"+int_to_string( lfta_idx[li_tname] )+");\n";
\r
1183 ret += "\tnode"+int_to_string(q)+"->set_right_lfta(inf"+int_to_string( input_tbl_srcid[tname] )+");\n";
\r
1184 }else if(op_idx.count(tname)>0){
\r
1185 ret += "\tnode"+int_to_string(q)+"->set_right_child_node(node"+int_to_string( op_idx[tname] )+");\n";
\r
1187 fprintf(stderr,"INTERNAL ERROR, can't identify input table %s (%s) when linking operator (2) %d (%s)\n",tname.c_str(), tname.c_str(),q,query_plan[q]->get_node_name().c_str());
\r
1193 for(i=0;i<output_operators.size();++i){
\r
1195 ret += "\tnode"+int_to_string(n_basic_ops)+"->set_left_child_node(node"+int_to_string( qhead )+");\n";
\r
1197 ret += "\tnode"+int_to_string(n_basic_ops+i)+"->set_left_child_node(node"+int_to_string( n_basic_ops+i-1 )+");\n";
\r
1200 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
\r
1202 bool fta_reusable = false;
\r
1203 if (query_plan[qhead]->get_val_of_def("reusable") == "yes" ||
\r
1204 query_plan[qhead]->get_param_tbl()->size() == 0) {
\r
1208 int root_node = qhead;
\r
1209 if(output_operators.size()>0)
\r
1210 root_node = n_basic_ops+i-1;
\r
1215 " MULTOP_HFTA* ftap = new MULTOP_HFTA(ftaid, OUTPUT_HFTA, command, sz, value, root_schema_handle, node"+int_to_string(root_node)+", lfta_list, " + (fta_reusable ?"true":"false") + ", reusable);\n"
\r
1216 " if(ftap->init_failed()){ delete ftap; return 0;}\n"
\r
1217 " return (FTA*)ftap;\n"
\r
1223 string comm_bufsize = "16*1024*1024";
\r
1224 if(defines.count("hfta_comm_buf")>0){
\r
1225 comm_bufsize = defines["hfta_comm_buf"];
\r
1230 "int main(int argc, char * argv[]) {\n"
\r
1233 " /* parse the arguments */\n"
\r
1235 " gs_int32_t tip1,tip2,tip3,tip4;\n"
\r
1236 " endpoint gshub;\n"
\r
1237 " gs_sp_t instance_name;\n"
\r
1238 " if (argc<3) {\n"
\r
1239 " gslog(LOG_EMERG,\"Wrong arguments at startup\");\n"
\r
1243 " if ((sscanf(argv[1],\"%u.%u.%u.%u:%hu\",&tip1,&tip2,&tip3,&tip4,&(gshub.port)) != 5)) {\n"
\r
1244 " gslog(LOG_EMERG,\"HUB IP NOT DEFINED\");\n"
\r
1247 " gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);\n"
\r
1248 " gshub.port=htons(gshub.port);\n"
\r
1249 " instance_name=strdup(argv[2]);\n"
\r
1250 " if (set_hub(gshub)!=0) {\n"
\r
1251 " gslog(LOG_EMERG,\"Could not set hub\");\n"
\r
1254 " if (set_instance_name(instance_name)!=0) {\n"
\r
1255 " gslog(LOG_EMERG,\"Could not set instance name\");\n"
\r
1260 " /* initialize host library */\n"
\r
1262 //" fprintf(stderr,\"Initializing gscp\\n\");\n"
\r
1263 " gsopenlog(argv[0]);\n"
\r
1265 " if (hostlib_init(HFTA, "+comm_bufsize+", DEFAULTDEV, 0, 0)!=0) {\n"
\r
1266 " fprintf(stderr,\"%s::error:could not initialize gscp\\n\",\n"
\r
1273 " FTAID ret = fta_register(OUTPUT_HFTA, " + (fta_reusable?"1":"0") + ", DEFAULTDEV, alloc_hfta, "+generate_schema_string_name(query_name)+", -1, 0ull);\n"
\r
1274 " fta_start_service(-1);\n"
\r
1280 ////////////////////
\r
1285 // Checks if the node i is compatible with interface partitioning
\r
1286 // (can be pushed below the merge that combines partitioned stream)
\r
1287 // i - index of the query node in a query plan
\r
1288 bool stream_query::is_partn_compatible(int index, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result) {
\r
1290 qp_node* node = query_plan[index];
\r
1291 qp_node* child_node = NULL;
\r
1293 if (node->predecessors.empty())
\r
1296 // all the node predecessors must be partition merges with the same partition definition
\r
1297 partn_def_t* partn_def = NULL;
\r
1298 for (i = 0; i < node->predecessors.size(); ++i) {
\r
1299 child_node = query_plan[node->predecessors[i]];
\r
1300 if (child_node->node_type() != "mrg_qpn")
\r
1303 // merge must have only one parent for this optimization to work
\r
1304 // check that all its successors are the same
\r
1305 for (int j = 1; j < child_node->successors.size(); ++j) {
\r
1306 if (child_node->successors[j] != child_node->successors[0])
\r
1310 partn_def_t* new_partn_def = ((mrg_qpn*)child_node)->get_partn_definition(lfta_names, interface_names, machine_names, ifaces_db, partn_parse_result);
\r
1311 if (!new_partn_def)
\r
1314 partn_def = new_partn_def;
\r
1315 else if (new_partn_def != partn_def)
\r
1320 if (node->node_type() == "spx_qpn") // spx nodes are always partition compatible
\r
1322 else if (node->node_type() == "sgah_qpn") {
\r
1323 gb_table gb_tbl = ((sgah_qpn*)node)->gb_tbl;
\r
1324 return true; //partn_def->is_compatible(&gb_tbl);
\r
1326 else if (node->node_type() == "rsgah_qpn") {
\r
1327 gb_table gb_tbl = ((rsgah_qpn*)node)->gb_tbl;
\r
1328 return partn_def->is_compatible(&gb_tbl);
\r
1330 else if (node->node_type() == "sgahcwcb_qpn") {
\r
1331 gb_table gb_tbl = ((sgahcwcb_qpn*)node)->gb_tbl;
\r
1332 return partn_def->is_compatible(&gb_tbl);
\r
1334 else if (node->node_type() == "join_eq_hash_qpn") {
\r
1341 // Push the operator below the merge that combines
\r
1342 void stream_query::pushdown_partn_operator(int index) {
\r
1343 qp_node* node = query_plan[index];
\r
1345 char node_index[128];
\r
1347 // fprintf(stderr, "Partn pushdown, query %s of type %s\n", node->get_node_name().c_str(), node->node_type().c_str());
\r
1350 // HACK ALERT: When reordering merges we screw up slack computation
\r
1351 // since slack should no longer be used, it is not an issue
\r
1354 // we can safely reorder nodes that have one and only one temporal atribute in select list
\r
1355 table_def* table_layout = node->get_fields();
\r
1356 vector<field_entry*> fields = table_layout->get_fields();
\r
1357 int merge_fieldpos = -1;
\r
1360 for (i = 0; i < fields.size(); ++i) {
\r
1361 data_type dt(fields[i]->get_type(),fields[i]->get_modifier_list());
\r
1362 if(dt.is_temporal()) {
\r
1363 if (merge_fieldpos != -1) // more that one temporal field found
\r
1365 merge_fieldpos = i;
\r
1369 if (merge_fieldpos == -1) // if no temporal fieldf found
\r
1372 std::vector<colref_t *> mvars; // the merge-by columns.
\r
1374 // reodring procedure is different for unary operators and joins
\r
1375 if (node->node_type() == "join_eq_hash_qpn") {
\r
1376 vector<qp_node*> new_nodes;
\r
1378 tablevar_t *left_table_name;
\r
1379 tablevar_t *right_table_name;
\r
1380 mrg_qpn* left_mrg = (mrg_qpn*)query_plan[node->predecessors[0]];
\r
1381 mrg_qpn* right_mrg = (mrg_qpn*)query_plan[node->predecessors[1]];
\r
1383 // for now we will only consider plans where both child merges
\r
1384 // merge the same set of streams
\r
1386 if (left_mrg->fm.size() != right_mrg->fm.size())
\r
1389 // maping of interface names to table definitions
\r
1390 map<string, tablevar_t*> iface_map;
\r
1391 for (i = 0; i < left_mrg->fm.size(); i++) {
\r
1392 left_table_name = left_mrg->fm[i];
\r
1393 iface_map[left_table_name->get_machine() + left_table_name->get_interface()] = left_table_name;
\r
1396 for (i = 0; i < right_mrg->fm.size(); i++) {
\r
1397 right_table_name = right_mrg->fm[i];
\r
1399 // find corresponding left tabke
\r
1400 if (!iface_map.count(right_table_name->get_machine() + right_table_name->get_interface()))
\r
1403 left_table_name = iface_map[right_table_name->get_machine() + right_table_name->get_interface()];
\r
1405 // create new join nodes
\r
1406 sprintf(node_index, "_%d", i);
\r
1407 join_eq_hash_qpn* new_node = (join_eq_hash_qpn*)node->make_copy(node_index);
\r
1409 // make a copy of right_table_name
\r
1410 right_table_name = right_table_name->duplicate();
\r
1411 left_table_name->set_range_var(((join_eq_hash_qpn*)node)->from[0]->get_var_name());
\r
1412 right_table_name->set_range_var(((join_eq_hash_qpn*)node)->from[1]->get_var_name());
\r
1413 new_node->from[0] = left_table_name;
\r
1414 new_node->from[1] = right_table_name;
\r
1415 new_nodes.push_back(new_node);
\r
1418 // make right_mrg a new root
\r
1419 right_mrg->set_node_name(node->get_node_name());
\r
1420 right_mrg->table_layout = table_layout;
\r
1421 right_mrg->merge_fieldpos = merge_fieldpos;
\r
1423 for (i = 0; i < right_mrg->fm.size(); i++) {
\r
1424 // make newly create joins children of merge
\r
1425 sprintf(node_index, "_%d", i);
\r
1426 right_mrg->fm[i] = new tablevar_t(right_mrg->fm[i]->get_machine().c_str(), right_mrg->fm[i]->get_interface().c_str(), (node->get_node_name() + string(node_index)).c_str());
\r
1427 sprintf(node_index, "_m%d", i);
\r
1428 right_mrg->fm[i]->set_range_var(node_index);
\r
1429 right_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos));
\r
1433 if (left_mrg != right_mrg)
\r
1434 query_plan[node->predecessors[0]] = NULL; // remove left merge from the plan
\r
1436 query_plan.insert(query_plan.end(), new_nodes.begin(), new_nodes.end());
\r
1438 } else { // unary operator
\r
1439 // get the child merge node
\r
1440 mrg_qpn* child_mrg = (mrg_qpn*)query_plan[node->predecessors[0]];
\r
1442 child_mrg->set_node_name(node->get_node_name());
\r
1443 child_mrg->table_layout = table_layout;
\r
1444 child_mrg->merge_fieldpos = merge_fieldpos;
\r
1446 // create new nodes for every source stream
\r
1447 for (i = 0; i < child_mrg->fm.size(); i++) {
\r
1448 tablevar_t *table_name = child_mrg->fm[i];
\r
1449 sprintf(node_index, "_%d", i);
\r
1450 qp_node* new_node = node->make_copy(node_index);
\r
1452 if (node->node_type() == "spx_qpn")
\r
1453 ((spx_qpn*)new_node)->table_name = table_name;
\r
1454 else if (node->node_type() == "sgah_qpn")
\r
1455 ((sgah_qpn*)new_node)->table_name = table_name;
\r
1456 else if (node->node_type() == "rsgah_qpn")
\r
1457 ((rsgah_qpn*)new_node)->table_name = table_name;
\r
1458 else if (node->node_type() == "sgahcwcb_qpn")
\r
1459 ((sgahcwcb_qpn*)new_node)->table_name = table_name;
\r
1460 table_name->set_range_var("_t0");
\r
1462 child_mrg->fm[i] = new tablevar_t(table_name->get_machine().c_str(), table_name->get_interface().c_str(), new_node->get_node_name().c_str());
\r
1463 sprintf(node_index, "_m%d", i);
\r
1464 child_mrg->fm[i]->set_range_var(node_index);
\r
1465 child_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos));
\r
1467 // add new node to query plan
\r
1468 query_plan.push_back(new_node);
\r
1471 query_plan[index] = NULL;
\r
1472 generate_linkage();
\r
1476 // Checks if the node i can be pushed below the merge
\r
1477 bool stream_query::is_pushdown_compatible(int index, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names) {
\r
1480 qp_node* node = query_plan[index];
\r
1481 qp_node* child_node = NULL;
\r
1483 if (node->predecessors.size() != 1)
\r
1486 // node predecessor must be merge that combine streams from multiple hosts
\r
1487 child_node = query_plan[node->predecessors[0]];
\r
1488 if (child_node->node_type() != "mrg_qpn")
\r
1491 if (!((mrg_qpn*)child_node)->is_multihost_merge())
\r
1494 // merge must have only one parent for this optimization to work
\r
1495 // check that all its successors are the same
\r
1496 for (int j = 1; j < child_node->successors.size(); ++j) {
\r
1497 if (child_node->successors[j] != child_node->successors[0])
\r
1501 // selections can always be pushed down, aggregations can always be split into selection/aggr or aggr/aggr pair
\r
1502 // and pushed down
\r
1503 if (node->node_type() == "spx_qpn")
\r
1505 else if (node->node_type() == "sgah_qpn")
\r
1512 // Push the operator below the merge
\r
1513 void stream_query::pushdown_operator(int index, ext_fcn_list *Ext_fcns, table_list *Schema) {
\r
1514 qp_node* node = query_plan[index];
\r
1516 char node_suffix[128];
\r
1518 // we can only safely push down queries that have one and only one temporal atribute in select list
\r
1519 table_def* table_layout = node->get_fields();
\r
1520 vector<field_entry*> fields = table_layout->get_fields();
\r
1521 int merge_fieldpos = -1;
\r
1524 for (i = 0; i < fields.size(); ++i) {
\r
1525 data_type dt(fields[i]->get_type(),fields[i]->get_modifier_list());
\r
1526 if(dt.is_temporal()) {
\r
1527 if (merge_fieldpos != -1) // more that one temporal field found
\r
1529 merge_fieldpos = i;
\r
1533 if (merge_fieldpos == -1) // if no temporal field found
\r
1536 std::vector<colref_t *> mvars; // the merge-by columns.
\r
1538 fprintf(stderr, "Regular pushdown, query %s of type %s\n", node->get_node_name().c_str(), node->node_type().c_str());
\r
1540 // get the child merge node
\r
1541 mrg_qpn* child_mrg = (mrg_qpn*)query_plan[node->predecessors[0]];
\r
1543 tablevar_t *table_name = NULL;
\r
1546 if (node->node_type() == "spx_qpn") {
\r
1547 // get the child merge node
\r
1549 // create new nodes for every source stream
\r
1550 for (i = 0; i < child_mrg->fm.size(); i++) {
\r
1551 table_name = child_mrg->fm[i];
\r
1552 sprintf(node_suffix, "_%d", i);
\r
1553 qp_node* new_node = node->make_copy(node_suffix);
\r
1555 ((spx_qpn*)new_node)->table_name = table_name;
\r
1556 table_name->set_range_var("_t0");
\r
1558 child_mrg->fm[i] = new tablevar_t(table_name->get_machine().c_str(), table_name->get_interface().c_str(), new_node->get_node_name().c_str());
\r
1559 sprintf(node_suffix, "_m%d", i);
\r
1560 child_mrg->fm[i]->set_range_var(node_suffix);
\r
1561 child_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos));
\r
1563 // add new node to query plan
\r
1564 query_plan.push_back(new_node);
\r
1566 child_mrg->table_layout = table_layout;
\r
1567 child_mrg->merge_fieldpos = merge_fieldpos;
\r
1569 } else { // aggregation node
\r
1571 vector<qp_node*> new_nodes;
\r
1573 // split aggregations into high and low-level part
\r
1574 vector<qp_node *> split_nodes = ((sgah_qpn*)node)->split_node_for_hfta(Ext_fcns, Schema);
\r
1575 if (split_nodes.size() != 2)
\r
1578 sgah_qpn* super_aggr = (sgah_qpn*)split_nodes[1];
\r
1579 super_aggr->table_name = ((sgah_qpn*)node)->table_name;
\r
1581 // group all the sources by host
\r
1582 map<string, vector<int> > host_map;
\r
1583 for (i = 0; i < child_mrg->fm.size(); i++) {
\r
1584 tablevar_t *table_name = child_mrg->fm[i];
\r
1585 if (host_map.count(table_name->get_machine()))
\r
1586 host_map[table_name->get_machine()].push_back(i);
\r
1588 vector<int> tables;
\r
1589 tables.push_back(i);
\r
1590 host_map[table_name->get_machine()] = tables;
\r
1594 // create a new merge and low-level aggregation for each host
\r
1595 map<string, vector<int> >::iterator iter;
\r
1596 for (iter = host_map.begin(); iter != host_map.end(); iter++) {
\r
1597 string host_name = (*iter).first;
\r
1598 vector<int> tables = (*iter).second;
\r
1600 sprintf(node_suffix, "_%s", host_name.c_str());
\r
1601 string suffix(node_suffix);
\r
1603 mrg_qpn *new_mrg = (mrg_qpn *)child_mrg->make_copy(suffix);
\r
1604 for (i = 0; i < tables.size(); ++i) {
\r
1605 sprintf(node_suffix, "_m%d", i);
\r
1606 new_mrg->fm.push_back(child_mrg->fm[tables[i]]);
\r
1607 new_mrg->mvars.push_back(child_mrg->mvars[i]);
\r
1608 new_mrg->fm[i]->set_range_var(node_suffix);
\r
1610 qp_node* new_node = split_nodes[0]->make_copy(suffix);
\r
1612 if (new_node->node_type() == "spx_qpn") {
\r
1613 ((spx_qpn*)new_node)->table_name = new tablevar_t(host_name.c_str(), "IFACE", new_mrg->get_node_name().c_str());
\r
1614 ((spx_qpn*)new_node)->table_name->set_range_var("_t0");
\r
1616 ((sgah_qpn*)new_node)->table_name = new tablevar_t(host_name.c_str(), "IFACE", new_mrg->get_node_name().c_str());
\r
1617 ((sgah_qpn*)new_node)->table_name->set_range_var("_t0");
\r
1619 query_plan.push_back(new_mrg);
\r
1620 new_nodes.push_back(new_node);
\r
1623 child_mrg->merge_fieldpos = merge_fieldpos;
\r
1624 if (split_nodes[0]->node_type() == "spx_qpn")
\r
1625 child_mrg->table_layout = create_attributes(child_mrg->get_node_name(), ((spx_qpn*)split_nodes[0])->select_list);
\r
1627 child_mrg->table_layout = create_attributes(child_mrg->get_node_name(), ((sgah_qpn*)split_nodes[0])->select_list);
\r
1630 // connect newly created nodes with parent multihost merge
\r
1631 for (i = 0; i < new_nodes.size(); ++i) {
\r
1632 if (new_nodes[i]->node_type() == "spx_qpn")
\r
1633 child_mrg->fm[i] = new tablevar_t(((spx_qpn*)new_nodes[i])->table_name->get_machine().c_str(), "IFACE", new_nodes[i]->get_node_name().c_str());
\r
1635 child_mrg->fm[i] = new tablevar_t(((sgah_qpn*)new_nodes[i])->table_name->get_machine().c_str(), "IFACE", new_nodes[i]->get_node_name().c_str());
\r
1638 child_mrg->mvars[i]->set_field(child_mrg->table_layout->get_field_name(merge_fieldpos));
\r
1640 sprintf(node_suffix, "_m%d", i);
\r
1641 child_mrg->fm[i]->set_range_var(node_suffix);
\r
1642 query_plan.push_back(new_nodes[i]);
\r
1644 child_mrg->fm.resize(new_nodes.size());
\r
1645 child_mrg->mvars.resize(new_nodes.size());
\r
1647 // push the new high-level aggregation
\r
1648 query_plan.push_back(super_aggr);
\r
1651 query_plan[index] = NULL;
\r
1652 generate_linkage();
\r
1656 // Extract subtree rooted at node i into separate hfta
\r
1657 stream_query* stream_query::extract_subtree(int index) {
\r
1659 vector<int> nodes;
\r
1660 stream_query* new_query = new stream_query(query_plan[index], this);
\r
1662 nodes.push_back(index);
\r
1663 for (int i = 0; i < nodes.size(); ++i) {
\r
1664 qp_node* node = query_plan[nodes[i]];
\r
1668 // add all children to nodes list
\r
1669 for (int j = 0; j < node->predecessors.size(); ++j)
\r
1670 nodes.push_back(node->predecessors[j]);
\r
1672 new_query->query_plan.push_back(node);
\r
1674 query_plan[nodes[i]] = NULL;
\r
1680 // Splits query that combines data from multiple hosts into separate hftas.
\r
1681 vector<stream_query*> stream_query::split_multihost_query() {
\r
1683 vector<stream_query*> ret;
\r
1684 char node_suffix[128];
\r
1687 // find merges combining multiple hosts into per-host groups
\r
1688 int plan_size = query_plan.size();
\r
1689 vector<mrg_qpn*> new_nodes;
\r
1691 for (i = 0; i < plan_size; ++i) {
\r
1692 qp_node* node = query_plan[i];
\r
1693 if (node && node->node_type() == "mrg_qpn") {
\r
1694 mrg_qpn* mrg = (mrg_qpn*)node;
\r
1695 if (mrg->is_multihost_merge()) {
\r
1697 // group all the sources by host
\r
1698 map<string, vector<int> > host_map;
\r
1699 for (int j = 0; j < mrg->fm.size(); j++) {
\r
1700 tablevar_t *table_name = mrg->fm[j];
\r
1701 if (host_map.count(table_name->get_machine()))
\r
1702 host_map[table_name->get_machine()].push_back(j);
\r
1704 vector<int> tables;
\r
1705 tables.push_back(j);
\r
1706 host_map[table_name->get_machine()] = tables;
\r
1710 // create a new merge for each host
\r
1711 map<string, vector<int> >::iterator iter;
\r
1712 for (iter = host_map.begin(); iter != host_map.end(); iter++) {
\r
1713 string host_name = (*iter).first;
\r
1714 vector<int> tables = (*iter).second;
\r
1716 if (tables.size() == 1)
\r
1719 sprintf(node_suffix, "_%s", host_name.c_str());
\r
1720 string suffix(node_suffix);
\r
1722 mrg_qpn *new_mrg = (mrg_qpn *)mrg->make_copy(suffix);
\r
1723 for (int j = 0; j < tables.size(); ++j) {
\r
1724 new_mrg->fm.push_back(mrg->fm[tables[j]]);
\r
1725 new_mrg->mvars.push_back(mrg->mvars[j]);
\r
1726 sprintf(node_suffix, "m_%d", j);
\r
1727 new_mrg->fm[j]->set_range_var(node_suffix);
\r
1729 new_nodes.push_back(new_mrg);
\r
1732 if (!new_nodes.empty()) {
\r
1733 // connect newly created merge nodes with parent multihost merge
\r
1734 for (int j = 0; j < new_nodes.size(); ++j) {
\r
1735 mrg->fm[j] = new tablevar_t(new_nodes[j]->fm[0]->get_machine().c_str(), "IFACE", new_nodes[j]->get_node_name().c_str());
\r
1736 query_plan.push_back(new_nodes[j]);
\r
1738 mrg->fm.resize(new_nodes.size());
\r
1739 mrg->mvars.resize(new_nodes.size());
\r
1740 generate_linkage();
\r
1744 // now externalize the sources
\r
1745 for (int j = 0; j < node->predecessors.size(); ++j) {
\r
1746 // Extract subtree rooted at node i into separate hfta
\r
1747 stream_query* q = extract_subtree(node->predecessors[j]);
\r
1749 q->generate_linkage();
\r
1754 generate_linkage();
\r
1763 // Perform local FTA optimizations
\r
1764 void stream_query::optimize(vector<stream_query *>& hfta_list, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ext_fcn_list *Ext_fcns, table_list *Schema, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result){
\r
1766 // Topologically sort the nodes in query plan (leaf-first)
\r
1768 vector<int> sorted_nodes;
\r
1770 int num_nodes = query_plan.size();
\r
1771 bool* leaf_flags = new bool[num_nodes];
\r
1772 memset(leaf_flags, 0, num_nodes * sizeof(bool));
\r
1774 // run topological sort
\r
1775 bool done = false;
\r
1777 // add all leafs to sorted_nodes
\r
1780 for (i = 0; i < num_nodes; ++i) {
\r
1781 if (!query_plan[i])
\r
1784 if (!leaf_flags[i] && query_plan[i]->predecessors.empty()) {
\r
1785 leaf_flags[i] = true;
\r
1786 sorted_nodes.push_back(i);
\r
1789 // remove the node from its parents predecessor lists
\r
1790 // since we only care about number of predecessors, it is sufficient just to remove
\r
1791 // one element from the parent's predecessors list
\r
1792 for (int j = query_plan[i]->successors.size() - 1; j >= 0; --j)
\r
1793 query_plan[query_plan[i]->successors[j]]->predecessors.pop_back();
\r
1797 delete[] leaf_flags;
\r
1798 num_nodes = sorted_nodes.size();
\r
1799 generate_linkage(); // rebuild the recently destroyed predecessor lists.
\r
1801 // collect the information about interfaces nodes read from
\r
1802 for (i = 0; i < num_nodes; ++i) {
\r
1803 qp_node* node = query_plan[sorted_nodes[i]];
\r
1804 vector<tablevar_t *> input_tables = node->get_input_tbls();
\r
1805 for (j = 0; j < input_tables.size(); ++j) {
\r
1806 tablevar_t * table = input_tables[j];
\r
1807 if (lfta_names.count(table->schema_name)) {
\r
1808 int index = lfta_names[table->schema_name];
\r
1809 table->set_machine(machine_names[index]);
\r
1810 table->set_interface(interface_names[index]);
\r
1817 // push eligible operators down in the query plan
\r
1818 for (i = 0; i < num_nodes; ++i) {
\r
1819 if (partn_parse_result && is_partn_compatible(sorted_nodes[i], lfta_names, interface_names, machine_names, ifaces_db, partn_parse_result)) {
\r
1820 pushdown_partn_operator(sorted_nodes[i]);
\r
1821 } else if (is_pushdown_compatible(sorted_nodes[i], lfta_names, interface_names, machine_names)) {
\r
1822 pushdown_operator(sorted_nodes[i], Ext_fcns, Schema);
\r
1826 // split the query into multiple hftas if it combines the data from multiple hosts
\r
1827 vector<stream_query*> hftas = split_multihost_query();
\r
1828 hfta_list.insert(hfta_list.end(), hftas.begin(), hftas.end());
\r
1831 num_nodes = query_plan.size();
\r
1832 // also split multi-way merges into two-way merges
\r
1833 for (i = 0; i < num_nodes; ++i) {
\r
1834 qp_node* node = query_plan[i];
\r
1835 if (node && node->node_type() == "mrg_qpn") {
\r
1836 vector<mrg_qpn *> split_merge = ((mrg_qpn *)node)->split_sources();
\r
1838 query_plan.insert(query_plan.end(), split_merge.begin(), split_merge.end());
\r
1839 // delete query_plan[sorted_nodes[i]];
\r
1840 query_plan[i] = NULL;
\r
1844 generate_linkage();
\r
1849 table_def *stream_query::get_output_tabledef(){
\r
1850 return( query_plan[qhead]->get_fields() );
\r
1855 //////////////////////////////////////////////////////////
\r
1856 //// De-siloing. TO BE REMOVED
\r
1858 void stream_query::desilo_lftas(map<string, int> &lfta_names,vector<string> &silo_names,table_list *Schema){
\r
1861 int suffix_len = silo_names.back().size();
\r
1863 for(i=0;i<qtail.size();++i){
\r
1864 vector<tablevar_t *> itbls = query_plan[qtail[i]]->get_input_tbls();
\r
1865 for(t=0;t<itbls.size();++t){
\r
1866 string itbl_name = itbls[t]->get_schema_name();
\r
1867 if(lfta_names.count(itbl_name)>0){
\r
1868 //printf("Query %s input %d references lfta input %s\n",query_plan[qtail[i]]->get_node_name().c_str(),t,itbl_name.c_str());
\r
1869 vector<string> src_names;
\r
1870 string lfta_base = itbl_name.substr(0,itbl_name.size()-suffix_len);
\r
1871 for(s=0;s<silo_names.size();++s){
\r
1872 string lfta_subsilo = lfta_base + silo_names[s];
\r
1873 //printf("\t%s\n",lfta_subsilo.c_str());
\r
1874 src_names.push_back(lfta_subsilo);
\r
1876 string merge_node_name = "desilo_"+query_plan[qtail[i]]->get_node_name()+
\r
1877 "_input_"+int_to_string(t);
\r
1878 mrg_qpn *merge_node = new mrg_qpn(merge_node_name,src_names,Schema);
\r
1879 int m_op_pos = Schema->add_table(merge_node->table_layout);
\r
1880 itbls[t]->set_schema(merge_node_name);
\r
1881 itbls[t]->set_schema_ref(m_op_pos);
\r
1882 query_plan.push_back(merge_node);
\r
1887 generate_linkage();
\r
1889 ////////////////////////////////////////
\r
1890 /// End de-siloing
\r
1894 // Given a collection of LFTA stream queries,
\r
1895 // extract their WHERE predicates
\r
1896 // and pass them to an analysis routine which will extract
\r
1897 // common elements
\r
1899 void get_common_lfta_filter(std::vector<stream_query *> lfta_list,table_list *Schema,ext_fcn_list *Ext_fcns, vector<cnf_set *> &prefilter_preds, set<unsigned int> &pred_ids){
\r
1901 std::vector< std::vector<cnf_elem *> > where_list;
\r
1903 // still safe to assume that LFTA queries have a single
\r
1904 // query node, which is at position 0.
\r
1905 for(s=0;s<lfta_list.size();++s){
\r
1906 vector<cnf_elem *> cnf_list = lfta_list[s]->query_plan[0]->get_filter_clause();
\r
1907 if(lfta_list[s]->query_plan[0]->node_type() == "sgah_qpn"){
\r
1908 gb_table *gtbl = ((sgah_qpn *)(lfta_list[s]->query_plan[0]))->get_gb_tbl();
\r
1910 for(c=0;c<cnf_list.size();++c){
\r
1911 insert_gb_def_pr(cnf_list[c]->pr,gtbl);
\r
1914 where_list.push_back(lfta_list[s]->query_plan[0]->get_filter_clause());
\r
1917 find_common_filter(where_list,Schema,Ext_fcns,prefilter_preds, pred_ids);
\r
1924 // Given a collection of LFTA stream queries,
\r
1925 // extract the union of all temporal attributes referenced in select clauses
\r
1926 // those attributes will need to be unpacked in prefilter
\r
1928 void get_prefilter_temporal_cids(std::vector<stream_query *> lfta_list, col_id_set &temp_cids){
\r
1930 vector<scalarexp_t *> sl_list;
\r
1931 gb_table *gb_tbl = NULL;
\r
1934 // still safe to assume that LFTA queries have a single
\r
1935 // query node, which is at position 0.
\r
1936 for(s=0;s<lfta_list.size();++s){
\r
1938 if(lfta_list[s]->query_plan[0]->node_type() == "spx_qpn"){
\r
1939 spx_qpn *spx_node = (spx_qpn *)lfta_list[s]->query_plan[0];
\r
1940 sl_list = spx_node->get_select_se_list();
\r
1942 if(lfta_list[s]->query_plan[0]->node_type() == "sgah_qpn"){
\r
1943 sgah_qpn *sgah_node = (sgah_qpn *)lfta_list[s]->query_plan[0];
\r
1944 sl_list = sgah_node->get_select_se_list();
\r
1945 gb_tbl = sgah_node->get_gb_tbl();
\r
1948 for(sl=0;sl<sl_list.size();sl++){
\r
1949 data_type *sdt = sl_list[sl]->get_data_type();
\r
1950 if (sdt->is_temporal()) {
\r
1951 gather_se_col_ids(sl_list[sl],temp_cids, gb_tbl);
\r