Add new udafs and RMR support to gsprintconsole_ves
[com/gs-lite.git] / src / ftacmp / stream_query.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15 #include"stream_query.h"
16 #include"generate_utils.h"
17 #include"analyze_fta.h"
18 #include<algorithm>
19 #include<utility>
20 #include <list>
21
22 static char tmpstr[500];
23
24 using namespace std;
25
26
27 //              Create a query plan from a query node and an existing
28 //              query plan.  Use for lfta queries, the parent query plan provides
29 //              the annotations.
30 stream_query::stream_query(qp_node *qnode, stream_query *parent){
31         query_plan.push_back(qnode);
32         qhead = 0;
33         qtail.push_back(0);
34         attributes = qnode->get_fields();
35         parameters = qnode->get_param_tbl();
36         defines = parent->defines;
37         query_name = qnode->get_node_name();
38 }
39
40 //              Copy the query plan.
41 stream_query::stream_query(stream_query &src){
42         query_plan = src.query_plan;
43         qhead = src.qhead;
44         qtail = src.qtail;
45         attributes = src.attributes;
46         parameters = src.parameters;
47         defines = src.defines;
48         query_name = src.query_name;
49         gid = src.gid;
50 }
51
52
53 //              Create a query plan from an analyzed parse tree.
54 //              Perform analyses to find the output node, input nodes, etc.
55
56 stream_query::stream_query(query_summary_class *qs,table_list *Schema){
57 //              Generate the query plan nodes from the analyzed parse tree.
58 //              There is only one for now, so just assign the return value
59 //              of create_query_nodes to query_plan
60         error_code = 0;
61         query_plan = create_query_nodes(qs,Schema);
62     int i;
63 if(query_plan.size() == 0){
64   fprintf(stderr,"INTERNAL ERROR, zero-size query plan in stream_query::stream_query\n");
65   exit(1);
66 }
67     for(i=0;i<query_plan.size();++i){
68                 if(query_plan[i]){
69                         if(query_plan[i]->get_error_code() != 0){
70                                 error_code = query_plan[i]->get_error_code();
71                                 err_str +=  query_plan[i]->get_error_str();
72                         }
73                 }
74         }
75         qhead = query_plan.size()-1;
76         gid = -1;
77
78 }
79
80
81 stream_query * stream_query::add_query(query_summary_class *qs,table_list *Schema){
82 //              Add another query block to the query plan
83         error_code = 0;
84         vector<qp_node *> new_nodes = create_query_nodes(qs, Schema);
85         query_plan.insert(query_plan.end(),new_nodes.begin(), new_nodes.end());
86         return this;
87 }
88
89 stream_query * stream_query::add_query(stream_query &src){
90 //              Add another query block to the query plan
91         error_code = 0;
92         query_plan.insert(query_plan.end(),src.query_plan.begin(), src.query_plan.end());
93         return this;
94 }
95
96
97 void stream_query::generate_protocol_se(map<string,stream_query *> &sq_map, table_list *Schema){
98         int i,n;
99
100 //              Mapping fields to protocol fields requires schema binding.
101 //      First ensure all nodes are in the schema.
102     for(n=0;n<query_plan.size();++n){
103         if(query_plan[n] != NULL){
104             Schema->add_table(query_plan[n]->get_fields());
105         }
106     }
107 //              Now do schema binding
108         for(n=0;n<query_plan.size();++n){
109                 if(query_plan[n] != NULL){
110                         query_plan[n]->bind_to_schema(Schema);
111                 }
112         }
113
114 //                      create name-to-index map
115         map<string, int> name_to_node;
116         for(n=0;n<query_plan.size();++n){
117                 if(query_plan[n]){
118                         name_to_node[query_plan[n]->get_node_name()] = n;
119                 }
120         }
121
122 //              Create a list of the nodes to process, in order.
123 //              Search from the root down.
124 //                      ASSUME tree plan.
125
126         list<int> search_q;
127         list<int> work_list;
128
129         search_q.push_back(qhead);
130         while(! search_q.empty()){
131                 int the_q = search_q.front();
132                 search_q.pop_front(); work_list.push_front(the_q);
133                 vector<int> the_pred = query_plan[the_q]->get_predecessors();
134                 for(i=0;i<the_pred.size();i++)
135                         search_q.push_back(the_pred[i]);
136         }
137
138 //              Scan through the work list, from the front,
139 //              gather the qp_node's ref'd, and call the
140 //              protocol se generator.  Pass NULL for
141 //              sources not found - presumably user-defined operator
142
143         while(! work_list.empty()){
144                 int the_q = work_list.front();
145                 work_list.pop_front();
146                 vector<qp_node *> q_sources;
147                 vector<tablevar_t *> q_input_tbls = query_plan[the_q]->get_input_tbls();
148                 for(i=0;i<q_input_tbls.size();++i){
149                          string itbl_nm = q_input_tbls[i]->get_schema_name();
150                          if(name_to_node.count(itbl_nm)>0){
151                                  q_sources.push_back(query_plan[name_to_node[itbl_nm]]);
152                          }else if(sq_map.count(itbl_nm)>0){
153                                  q_sources.push_back(sq_map[itbl_nm]->get_query_head());
154                          }else{
155                                  q_sources.push_back(NULL);
156                          }
157                 }
158                 query_plan[the_q]->create_protocol_se(q_sources, Schema);
159         }
160
161 //////////////////////////////////////////////////////////
162 //                      trust but verify
163
164
165 /*
166         for(i=0;i<query_plan.size();++i){
167                 if(query_plan[i]){
168                         printf("query node %s, type=%s:\n",query_plan[i]->get_node_name().c_str(),
169                                         query_plan[i]->node_type().c_str());
170                         map<std::string, scalarexp_t *> *pse_map = query_plan[i]->get_protocol_se();
171                         map<std::string, scalarexp_t *>::iterator mssi;
172                         for(mssi=pse_map->begin();mssi!=pse_map->end();++mssi){
173                                 if((*mssi).second)
174                                         printf("\t%s : %s\n",(*mssi).first.c_str(), (*mssi).second->to_string().c_str());
175                                 else
176                                         printf("\t%s : NULL\n",(*mssi).first.c_str());
177                         }
178                         if(query_plan[i]->node_type() == "filter_join" || query_plan[i]->node_type() == "join_eq_hash_qpn"){
179                                 vector<scalarexp_t *> pse_l;
180                                 vector<scalarexp_t *> pse_r;
181                                 if(query_plan[i]->node_type() == "filter_join"){
182                                         pse_l = ((filter_join_qpn *)query_plan[i])->get_hash_l();
183                                         pse_r = ((filter_join_qpn *)query_plan[i])->get_hash_r();
184                                 }
185                                 if(query_plan[i]->node_type() == "join_eq_hash_qpn"){
186                                         pse_l = ((join_eq_hash_qpn *)query_plan[i])->get_hash_l();
187                                         pse_r = ((join_eq_hash_qpn *)query_plan[i])->get_hash_r();
188                                 }
189                                 int p;
190                                 for(p=0;p<pse_l.size();++p){
191                                         if(pse_l[p] != NULL)
192                                                 printf("\t\t%s = ",pse_l[p]->to_string().c_str());
193                                         else
194                                                 printf("\t\tNULL = ");
195                                         if(pse_r[p] != NULL)
196                                                 printf("%s\n",pse_r[p]->to_string().c_str());
197                                         else
198                                                 printf("NULL\n");
199                                 }
200                         }
201                         if(query_plan[i]->node_type() == "sgah_qpn" || query_plan[i]->node_type() == "rsgah_qpn" || query_plan[i]->node_type() == "sgahcwcb_qpn"){
202                                 vector<scalarexp_t *> pseg;
203                 if(query_plan[i]->node_type() == "sgah_qpn")
204                                         pseg = ((sgah_qpn *)query_plan[i])->get_gb_sources();
205                 if(query_plan[i]->node_type() == "rsgah_qpn")
206                                         pseg = ((rsgah_qpn *)query_plan[i])->get_gb_sources();
207                 if(query_plan[i]->node_type() == "sgahcwcb_qpn")
208                                         pseg = ((sgahcwcb_qpn *)query_plan[i])->get_gb_sources();
209                                 int g;
210                                 for(g=0;g<pseg.size();g++){
211                                         if(pseg[g] != NULL)
212                                                 printf("\t\tgb %d = %s\n",g,pseg[g]->to_string().c_str());
213                                         else
214                                                 printf("\t\tgb %d = NULL\n",g);
215                                 }
216                         }
217                 }
218         }
219 */
220
221 }
222
223 bool stream_query::generate_linkage(){
224         bool create_failed = false;
225         int n, f,s;
226
227 //                      Clear any leftover linkages
228         for(n=0;n<query_plan.size();++n){
229                 if(query_plan[n]){
230                         query_plan[n]->clear_predecessors();
231                         query_plan[n]->clear_successors();
232                 }
233         }
234         qtail.clear();
235
236 //                      create name-to-index map
237         map<string, int> name_to_node;
238         for(n=0;n<query_plan.size();++n){
239                 if(query_plan[n]){
240                         name_to_node[query_plan[n]->get_node_name()] = n;
241                 }
242         }
243
244 //              Do the 2-way linkage.
245         for(n=0;n<query_plan.size();++n){
246                 if(query_plan[n]){
247                   vector<tablevar_t *> fm  = query_plan[n]->get_input_tbls();
248                   for(f=0;f<fm.size();++f){
249                         string src_tbl = fm[f]->get_schema_name();
250                         if(name_to_node.count(src_tbl)>0){
251                                 int s = name_to_node[src_tbl];
252                                 query_plan[n]->add_predecessor(s);
253                                 query_plan[s]->add_successor(n);
254                         }
255                   }
256                 }
257         }
258
259 //              Find the head (no successors) and the tails (at least one
260 //              predecessor is external).
261 //              Verify that there is only one head,
262 //              and that all other nodes have one successor (because
263 //               right now I can only handle trees).
264
265         qhead = -1;             // no head yet found.
266         for(n=0;n<query_plan.size();++n){
267                 if(query_plan[n]){
268                   vector<int> succ = query_plan[n]->get_successors();
269 /*
270                   if(succ.size() > 1){
271                         fprintf(stderr,"ERROR, query node %s supplies data to multiple nodes, but currently only tree query plans are supported:\n",query_plan[n]->get_node_name().c_str());
272                         for(s=0;s<succ.size();++s){
273                                 fprintf(stderr,"%s ",query_plan[succ[s]]->get_node_name().c_str());
274                         }
275                         fprintf(stderr,"\n");
276                         create_failed = true;
277                   }
278 */
279                   if(succ.size() == 0){
280                         if(qhead >= 0){
281                                 fprintf(stderr,"ERROR, query nodes %s and %s both have zero successors.\n",query_plan[n]->get_node_name().c_str(), query_plan[qhead]->get_node_name().c_str());
282                                 create_failed = true;
283                         }else{
284                                 qhead = n;
285                         }
286                   }
287 // does the query node read a source, or is it a source?
288                   if(query_plan[n]->n_predecessors() < query_plan[n]->get_input_tbls().size() || query_plan[n]->get_input_tbls().size() == 0){
289                         qtail.push_back(n);
290                   }
291                 }
292         }
293
294         return create_failed;
295 }
296
297 //              After the collection of query plan nodes is generated,
298 //              analyze their structure to link them up into a tree (or dag?).
299 //              Verify that the structure is acceptable.
300 //              Do some other analysis and verification tasks (?)
301 //              then gather summar information.
302 int stream_query::generate_plan(table_list *Schema){
303
304 //              The first thing to do is verify that the query plan
305 //              nodes were successfully created.
306         bool create_failed = false;
307         int n,f,s;
308         for(n=0;n<query_plan.size();++n){
309                 if(query_plan[n]!=NULL && query_plan[n]->get_error_code()){
310                         fprintf(stderr,"%s",query_plan[n]->get_error_str().c_str());
311                         create_failed = true;
312                 }
313         }
314 /*
315 for(n=0;n<query_plan.size();++n){
316 if(query_plan[n] != NULL){
317 string nstr = query_plan[n]->get_node_name();
318 printf("In generate_plan, node %d is %s, reads from:\n\t",n,nstr.c_str());
319 vector<tablevar_t *> inv = query_plan[n]->get_input_tbls();
320 int nn;
321 for(nn=0;nn<inv.size();nn++){
322 printf("%s (%d) ",inv[nn]->to_string().c_str(),inv[nn]->get_schema_ref());
323 }
324 printf("\n");
325 }
326 }
327 */
328
329         if(create_failed) return(1);
330
331 //              Here, link up the query nodes, then verify that the
332 //              structure is acceptable (single root, no cycles, no stranded
333 //              nodes, etc.)
334         create_failed = generate_linkage();
335         if(create_failed) return -1;
336
337
338 //              Here, do optimizations such as predicate pushing,
339 //              join rearranging, etc.
340 //                      Nothing to do yet.
341
342 /*
343 for(n=0;n<query_plan.size();++n){
344 if(query_plan[n] != NULL){
345 string nstr = query_plan[n]->get_node_name();
346 printf("B generate_plan, node %d is %s, reads from:\n\t",n,nstr.c_str());
347 vector<tablevar_t *> inv = query_plan[n]->get_input_tbls();
348 int nn;
349 for(nn=0;nn<inv.size();nn++){
350 printf("%s (%d) ",inv[nn]->to_string().c_str(),inv[nn]->get_schema_ref());
351 }
352 printf("\n");
353 }
354 }
355 printf("qhead=%d, qtail = ",qhead);
356 int nn;
357 for(nn=0;nn<qtail.size();++nn)
358 printf("%d ",qtail[nn]);
359 printf("\n");
360 */
361
362 //              Collect query summaries.  The query is reperesented by its head node.
363         query_name = query_plan[qhead]->get_node_name();
364         attributes = query_plan[qhead]->get_fields();
365 //              TODO: The params and defines require a lot more thought.
366         parameters = query_plan[qhead]->get_param_tbl();
367         defines = query_plan[qhead]->get_definitions();
368
369         return(0);
370   };
371
372 void stream_query::add_output_operator(ospec_str *o){
373         output_specs.push_back(o);
374 }
375
376
377 void stream_query::get_external_libs(set<string> &libset){
378
379         int qn,i;
380         for(qn=0;qn<query_plan.size();++qn){
381                 if(query_plan[qn] != NULL){
382                         vector<string> op_libs = query_plan[qn]->external_libs();
383                         for(i=0;i<op_libs.size();++i){
384                                 libset.insert(op_libs[i]);
385                         }
386                 }
387         }
388
389         for(qn=0;qn<output_operators.size();++qn){
390                 if(output_operators[qn] != NULL){
391                         vector<string> op_libs = output_operators[qn]->external_libs();
392                         for(i=0;i<op_libs.size();++i){
393                                 libset.insert(op_libs[i]);
394                         }
395                 }
396         }
397 }
398
399
400
401 //      Split into LFTA, HFTA components.
402 //              Split this query into LFTA and HFTA queries.
403 //              Four possible outcomes:
404 //              1) the query reads from a protocol, but does not need to
405 //                      split (can be evaluated as an LFTA).
406 //                      The lfta query is the only element in the return vector,
407 //                      and hfta_returned is false.
408 //              2) the query reads from no protocol, and therefore cannot be split.
409 //                      THe hfta query is the only element in the return vector,
410 //                      and hfta_returned is true.
411 //              3) reads from at least one protocol, but cannot be split : failure.
412 //                      return vector is empty, the error conditions are written
413 //                      in err_str and error_code
414 //              4) The query splits into an hfta query and one or more LFTA queries.
415 //                      the return vector has two or more elements, and hfta_returned
416 //                      is true.  The last element is the HFTA.
417
418 vector<stream_query *> stream_query::split_query(ext_fcn_list *Ext_fcns, table_list *Schema, bool &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
419         vector<stream_query *> queries;
420         int l,q,s;
421         int qp_hfta;
422
423         hfta_returned = false;  // assume until proven otherwise
424
425         for(l=0;l<qtail.size();++l){
426                 int leaf = qtail[l];
427
428
429                 vector<qp_node *> qnodes = query_plan[leaf]->split_node_for_fta(Ext_fcns, Schema, qp_hfta, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
430
431
432                 if(qnodes.size() == 0 || query_plan[leaf]->get_error_code()){   // error
433 //printf("error\n");
434                         error_code = query_plan[leaf]->get_error_code();
435                         err_str = query_plan[leaf]->get_error_str();
436                         vector<stream_query *> null_result;
437                         return(null_result);
438                 }
439                 if(qnodes.size() == 1 && qp_hfta){  // nothing happened
440 //printf("no change\n");
441                         query_plan[leaf] = qnodes[0];
442                 }
443                 if(qnodes.size() == 1 && !qp_hfta){     // push to lfta
444 //printf("lfta only\n");
445                         queries.push_back(new stream_query(qnodes[0], this));
446                         vector<int> succ = query_plan[leaf]->get_successors();
447                         for(s=0;s<succ.size();++s){
448                                 query_plan[succ[s]]->remove_predecessor(leaf);
449                         }
450                         query_plan[leaf] = NULL;        // delete it?
451                 }
452                 if(qnodes.size() > 1){  // actual splitting occurred.
453                         if(!qp_hfta){           // internal consistency check.
454                                 error_code = 1;
455                                 err_str = "INTERNAL ERROR: mulitple nodes returned by split_node_for_fta, but none are hfta nodes.\n";
456                                 vector<stream_query *> null_result;
457                                 return(null_result);
458                         }
459
460                         for(q=0;q<qnodes.size()-qp_hfta;++q){  // process lfta nodes
461 //printf("creating lfta %d (%s)\n",q,qnodes[q]->get_node_name().c_str());
462                                 queries.push_back(new stream_query(qnodes[q], this));
463                         }
464 //                                      Use new hfta node
465                         query_plan[leaf] = qnodes[qnodes.size()-1];
466 //                                      Add in any extra hfta nodes
467                         for(q=qnodes.size()-qp_hfta;q<qnodes.size()-1;++q)
468                                 query_plan.push_back(qnodes[q]);
469                 }
470         }
471
472     int n_ifprefs = 0;
473         set<string> ifps;
474         for(q=0;q<query_plan.size();++q){
475                 if(query_plan[q] != NULL){
476                          n_ifprefs += query_plan[q]->count_ifp_refs(ifps);
477                          hfta_returned = true;
478                 }
479         }
480
481         if(n_ifprefs){
482                 set<string>::iterator ssi;
483                 err_str += "ERROR, unresolved interface parameters in HFTA:\n";
484                 for(ssi=ifps.begin();ssi!=ifps.end();++ssi){
485                         err_str += (*ssi)+" ";
486                 }
487                 err_str += "\n";
488                 error_code = 3;
489                 vector<stream_query *> null_result;
490                 return(null_result);
491         }
492
493
494         if(hfta_returned){
495                 if(generate_linkage()){
496                         fprintf(stderr,"INTERNAL ERROR, generate_linkage failed in split_query.\n");
497                         exit(1);
498                 }
499                 queries.push_back(this);
500         }
501
502         return(queries);
503 }
504
505
506
507 vector<table_exp_t *> stream_query::extract_opview(table_list *Schema, vector<query_node *> &qnodes, opview_set &opviews, string silo_nm){
508         vector<table_exp_t *> subqueries;
509         int l,q;
510
511         string root_name = this->get_output_tabledef()->get_tbl_name();
512
513
514         for(l=0;l<qtail.size();++l){
515                 int leaf = qtail[l];
516                 vector<table_exp_t *> new_qnodes = query_plan[leaf]->extract_opview(Schema, qnodes, opviews, root_name, silo_nm);
517
518                 for(q=0;q<new_qnodes.size();++q){  // process lfta nodes
519                         subqueries.push_back( new_qnodes[q]);
520                 }
521         }
522
523         return(subqueries);
524 }
525
526
527
528
529 string stream_query::make_schema(){
530         return make_schema(qhead);
531 }
532
533 string stream_query::make_schema(int q){
534         string ret="FTA{\n\n";
535
536         ret += query_plan[q]->get_fields()->to_string();
537
538         ret += "DEFINE{\n";
539         ret += "\tquery_name '"+query_plan[q]->get_node_name()+"';\n";
540
541         map<string, string> defs = query_plan[q]->get_definitions();
542         map<string, string>::iterator dfi;
543         for(dfi=defs.begin(); dfi!=defs.end(); ++dfi){
544                 ret += "\t"+ (*dfi).first + " '" + (*dfi).second + "';\n";
545         }
546         ret += "}\n\n";
547
548         ret += "PARAM{\n";
549         param_table *params = query_plan[q]->get_param_tbl();
550         vector<string> param_names = params->get_param_names();
551         int p;
552         for(p=0;p<param_names.size();p++){
553                 data_type *dt = params->get_data_type( param_names[p] );
554                 ret += "\t" + param_names[p] + " '" + dt->get_type_str() + "';\n";
555         }
556         ret += "}\n";
557
558         ret += query_plan[q]->to_query_string();
559         ret += "\n}\n\n";
560
561         return(ret);
562
563 }
564
565 string stream_query::collect_refd_ifaces(){
566         string ret="";
567         int q;
568         for(q=0;q<query_plan.size();++q){
569                 if(query_plan[q]){
570                         map<string, string> defs = query_plan[q]->get_definitions();
571                         if(defs.count("_referenced_ifaces")){
572                                 if(ret != "") ret += ",";
573                                 ret += defs["_referenced_ifaces"];
574                         }
575                 }
576         }
577
578         return ret;
579 }
580
581
582 bool stream_query::stream_input_only(table_list *Schema){
583         vector<tablevar_t *> input_tbls = this->get_input_tables();
584         int i;
585         for(i=0;i<input_tbls.size();++i){
586                 int t = Schema->get_table_ref(input_tbls[i]->get_schema_name());
587                 if(Schema->get_schema_type(t) == PROTOCOL_SCHEMA) return(false);
588         }
589         return(true);
590 }
591
592 //              Return input tables.  No duplicate removal performed.
593 vector<tablevar_t *> stream_query::get_input_tables(){
594         vector<tablevar_t *> retval;
595
596 //                      create name-to-index map
597         int n;
598         map<string, int> name_to_node;
599         for(n=0;n<query_plan.size();++n){
600           if(query_plan[n]){
601                 name_to_node[query_plan[n]->get_node_name()] = n;
602           }
603         }
604
605         int l;
606         for(l=0;l<qtail.size();++l){
607                 int leaf = qtail[l];
608                 vector<tablevar_t *> tmp_v = query_plan[leaf]->get_input_tbls();
609                 int i;
610                 for(i=0;i<tmp_v.size();++i){
611                         if(name_to_node.count(tmp_v[i]->get_schema_name()) == 0)
612                                 retval.push_back(tmp_v[i]);
613                 }
614         }
615         return(retval);
616 }
617
618
619 void stream_query::compute_node_format(int q, vector<int> &nfmt, map<string, int> &op_idx){
620         int netcnt = 0, hostcnt = 0;
621         int i;
622
623         vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
624         for(i=0;i<itbls.size();++i){
625                 string tname = itbls[i]->get_schema_name();
626                 if(op_idx.count(tname)){
627                         int o = op_idx[tname];
628                         if(nfmt[o] == UNKNOWNFORMAT)
629                                 compute_node_format(o,nfmt,op_idx);
630                         if(nfmt[o] == NETFORMAT) netcnt++;
631                         else    hostcnt++;
632                 }else{
633                         netcnt++;
634                 }
635         }
636         if(query_plan[q]->makes_transform()){
637                 nfmt[q] = HOSTFORMAT;
638         }else{
639                 if(hostcnt>0){
640                         nfmt[q] = HOSTFORMAT;
641                 }else{
642                         nfmt[q] = NETFORMAT;
643                 }
644         }
645 //printf("query plan %d (%s) is ",q,query_plan[q]->get_node_name().c_str());
646 //if(nfmt[q] == HOSTFORMAT) printf(" host format.\n");
647 //else printf("net format\n");
648 }
649
650
651 string stream_query::generate_hfta(table_list *Schema, ext_fcn_list *Ext_fcns, opview_set &opviews, bool distributed_mode){
652         int schref, ov_ix, i, q, param_sz;
653         bool dag_graph = false;
654
655
656 //              Bind the SEs in all query plan nodes to this schema, and
657 //                      Add all tables used by this query to the schema.
658 //                      Question: Will I be polluting the global schema by adding all
659 //                      query node schemas?
660
661 //              First ensure all nodes are in the schema.
662         int qn;
663         for(qn=0;qn<query_plan.size();++qn){
664                 if(query_plan[qn] != NULL){
665                         Schema->add_table(query_plan[qn]->get_fields());
666                 }
667         }
668 //              Now do binding.
669         for(qn=0;qn<query_plan.size();++qn){
670                 if(query_plan[qn] != NULL){
671                         query_plan[qn]->bind_to_schema(Schema);
672                 }
673         }
674
675 //              Is it a DAG plan?
676         set<string> qsources;
677         int n;
678         for(n=0;n<query_plan.size();++n){
679           if(query_plan[n]){
680                 vector<tablevar_t *> tmp_v = query_plan[n]->get_input_tbls();
681                 int i;
682                 for(i=0;i<tmp_v.size();++i){
683                         if(qsources.count(tmp_v[i]->get_schema_name()) > 0)
684                                 dag_graph = true;
685                         qsources.insert(tmp_v[i]->get_schema_name());
686                 }
687           }
688         }
689
690
691
692 //              Collect set of tables ref'd in this HFTA
693         set<int> tbl_set;
694         for(qn=0;qn<query_plan.size();++qn){
695           if(query_plan[qn]){
696 //                      get names of the tables
697                 vector<tablevar_t *> input_tbls = query_plan[qn]->get_input_tbls();
698                 vector<tablevar_t *> output_tbls = query_plan[qn]->get_output_tbls();
699 //                      Convert to tblrefs, add to set of ref'd tables
700                 int i;
701                 for(i=0;i<input_tbls.size();i++){
702 //                      int t = Schema->get_table_ref(input_tbls[i]->get_schema_name());
703                         int t = input_tbls[i]->get_schema_ref();
704                         if(t < 0){
705                                 fprintf(stderr,"INTERNAL ERROR in generate_hfta. "
706                                                                 "query plan node %s references input table %s, which is not in schema.\n",
707                                                                 query_name.c_str(), input_tbls[i]->get_schema_name().c_str());
708                                 exit(1);
709                         }
710                         tbl_set.insert(t);
711                 }
712
713                 for(i=0;i<output_tbls.size();i++){
714                         int t = Schema->get_table_ref(output_tbls[i]->get_schema_name());
715                         if(t < 0){
716                                 fprintf(stderr,"INTERNAL ERROR in generate_hfta."
717                                                                 "query plan node %s references output table %s, which is not in schema.\n",
718                                                                 query_name.c_str(), output_tbls[i]->get_schema_name().c_str());
719                                 exit(1);
720                         }
721                         tbl_set.insert(t);
722                 }
723           }
724         }
725
726 //                      Collect map of lftas, query nodes
727         map<string, int> op_idx;
728         for(q=0;q<query_plan.size();q++){
729           if(query_plan[q]){
730                 op_idx[query_plan[q]->get_node_name()] = q;
731           }
732         }
733
734 //              map of input tables must include query id and input
735 //              source (0,1) becuase several queries might reference the same source
736         vector<tablevar_t *> input_tbls = this->get_input_tables();
737         vector<bool> input_tbl_free;
738         for(i=0;i<input_tbls.size();++i){
739                 input_tbl_free.push_back(true);
740         }
741         map<string, int> lfta_idx;
742 //fprintf(stderr,"%d input tables, %d query nodes\n",input_tbls.size(), query_plan.size());
743         for(q=0;q<query_plan.size();q++){
744           if(query_plan[q]){
745                 vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
746                 int it;
747                 for(it=0;it<itbls.size();it++){
748                         string tname = itbls[it]->get_schema_name()+"-"+int_to_string(q)+"-"+int_to_string(it);
749                         string src_tblname = itbls[it]->get_schema_name();
750                         bool src_is_external = false;
751                         for(i=0;i<input_tbls.size();++i){
752                                 if(src_tblname == input_tbls[i]->get_schema_name()){
753                                   src_is_external = true;
754                                   if(input_tbl_free[i]){
755                                         lfta_idx[tname] = i;
756                                         input_tbl_free[i] = false;
757 //fprintf(stderr,"Adding %s (src_tblname=%s, q=%d, it=%d) to %d.\n",tname.c_str(), src_tblname.c_str(), q, it, i);
758                                         break;
759                                   }
760                                 }
761                         }
762                         if(i==input_tbls.size() && src_is_external){
763                                 fprintf(stderr,"INTERNAL ERROR in stream_query::generate_hfta, can't find free entry in input_tbls for query %d, intput %d (%s)\n",q,it,src_tblname.c_str());
764                                 exit(1);
765                         }
766                 }
767           }
768         }
769 /*
770         for(i=0;i<input_tbls.size();++i){
771                 string src_tblname = input_tbls[i]->get_schema_name();
772                 lfta_idx[src_tblname] = i;
773         }
774 */
775
776 //              Compute the output formats of the operators.
777         vector<int> node_fmt(query_plan.size(),UNKNOWNFORMAT);
778         compute_node_format(qhead, node_fmt, op_idx);
779
780
781 //              Generate the schema strings for the outputs.
782         string schema_str;
783         for(i=0;i<query_plan.size();++i){
784                 if(i != qhead && query_plan[i]){
785                         string schema_tmpstr = this->make_schema(i);
786                         schema_str += "gs_csp_t node"+int_to_string(i)+"_schema = "+make_C_embedded_string(schema_tmpstr)+";\n";
787                 }
788         }
789
790         attributes = query_plan[qhead]->get_fields();
791
792
793         string schema_tmpstr = this->make_schema();
794         schema_str += "gs_csp_t "+generate_schema_string_name(query_name)+" = "+make_C_embedded_string(schema_tmpstr)+";\n";
795
796 //              Generate the collection of tuple defs.
797
798         string tuple_defs = "\n/*\tDefine tuple structures \t*/\n\n";
799         set<int>::iterator si;
800         for(si=tbl_set.begin(); si!=tbl_set.end(); ++si){
801                 tuple_defs += generate_host_tuple_struct( Schema->get_table( (*si) ));
802                 tuple_defs += "\n\n";
803         }
804
805 //              generate the finalize tuple function
806         string finalize_str = generate_hfta_finalize_tuple(attributes);
807
808 //              Analyze and make the output operators
809         bool eat_input = false;
810         string src_op = query_name;
811         string pred_op = src_op;
812         int last_op = -1;
813         if(output_specs.size()>0)
814                 eat_input = true;
815         if(n_successors>0)
816                 eat_input = false;      // must create stream output for successor HFTAs.
817         int n_filestreams = 0;
818         for(i=0;i<output_specs.size();++i){
819                 if(output_specs[i]->operator_type == "stream" ){
820                         eat_input = false;
821                 }
822                 if(output_specs[i]->operator_type == "file" || output_specs[i]->operator_type == "zfile" ){
823                         last_op = i;
824                         n_filestreams++;
825                 }
826         }
827         int filestream_id = 0;
828         for(i=0;i<output_specs.size();++i){
829                 if(output_specs[i]->operator_type == "file" || output_specs[i]->operator_type == "zfile"){
830
831                         int n_fstreams = output_specs[i]->n_partitions / n_parallel;
832                         if(n_fstreams * n_parallel < output_specs[i]->n_partitions){
833                                 n_fstreams++;
834                                 if(n_parallel == 1 || query_name.find("__copy1") != string::npos){
835                                         fprintf(stderr,"WARNING, in query %s, %d streams requested for %s output, but it must be a multiple of the hfta parallelism (%d), increasing number of output streams to %d.\n",query_name.c_str(), output_specs[i]->n_partitions,  output_specs[i]->operator_type.c_str(), n_parallel, n_fstreams*n_parallel);
836                                 }
837                         }
838 //                      output_file_qpn *new_ofq = new output_file_qpn();
839                         string filestream_tag = "";
840                         if(n_filestreams>1){
841                                 filestream_tag = "_fileoutput"+int_to_string(filestream_id);
842                                 filestream_id++;
843                         }
844                         output_file_qpn *new_ofq = new output_file_qpn(pred_op, src_op, filestream_tag, query_plan[qhead]->get_fields(), output_specs[i], (i==last_op ? eat_input : false) );
845 //                      if(n_fstreams > 1){
846                         if(n_fstreams > 0){
847                                 string err_str;
848                                 bool err_ret = new_ofq->set_splitting_params(n_parallel,parallel_idx,n_fstreams,output_specs[i]->partitioning_flds,err_str);
849                                 if(err_ret){
850                                         fprintf(stderr,"%s",err_str.c_str());
851                                         exit(1);
852                                 }
853                         }
854                         output_operators.push_back(new_ofq );
855                         pred_op = output_operators.back()->get_node_name();
856                 }else if(! (output_specs[i]->operator_type == "stream" || output_specs[i]->operator_type == "Stream" || output_specs[i]->operator_type == "STREAM") ){
857                         fprintf(stderr,"WARNING, output operator type %s (on query %s) is not supported, ignoring\n",output_specs[i]->operator_type.c_str(),query_name.c_str() );
858                 }
859         }
860
861
862
863 //              Generate functors for the query nodes.
864
865         string functor_defs = "\n/*\tFunctor definitions\t*/\n\n";
866         for(qn=0;qn<query_plan.size();++qn){
867                 if(query_plan[qn]!=NULL){
868 //                              Compute whether the input needs a ntoh xform.
869                         vector<bool> needs_xform;
870                         vector<tablevar_t *> itbls = query_plan[qn]->get_input_tbls();
871                         for(i=0;i<itbls.size();++i){
872                                 string tname = itbls[i]->get_schema_name();
873 //                              if(query_plan[qn]->makes_transform()){
874                                         if(op_idx.count(tname)>0){
875                                                 if(node_fmt[ op_idx[tname] ] == NETFORMAT){
876                                                         needs_xform.push_back(true);
877                                                 }else{
878                                                         needs_xform.push_back(false);
879                                                 }
880                                         }else{
881                                                 needs_xform.push_back(true);
882                                         }
883 //                              }else{
884 //                                      if(op_idx.count(tname)>0){
885 //                                              if(node_fmt[qn] != node_fmt[ op_idx[tname] ]){
886 //                                                      needs_xform.push_back(true);
887 //                                              }else{
888 //                                                      needs_xform.push_back(false);
889 //                                              }
890 //                                      }else{
891 //                                              if(node_fmt[qn] == HOSTFORMAT){
892 //                                                      needs_xform.push_back(true);
893 //                                              }else{
894 //                                                      needs_xform.push_back(false);
895 //                                              }
896 //                                      }
897 //                              }
898                         }
899
900                         functor_defs += query_plan[qn]->generate_functor(Schema, Ext_fcns, needs_xform);
901                 }
902         }
903
904 //                      Generate output operator functors
905
906         vector<bool> needs_xform;
907         for(i=0;i<output_operators.size();++i)
908                 functor_defs += output_operators[i]->generate_functor(Schema, Ext_fcns, needs_xform);
909
910         string ret =
911 "extern \"C\" {\n"
912 "#include <lapp.h>\n"
913 "#include <fta.h>\n"
914 "#include <gshub.h>\n"
915 "#include <stdlib.h>\n"
916 "#include <stdio.h>\n"
917 "#include <limits.h>\n"
918 "}\n"
919 ;
920         if(dag_graph)
921                 ret +=
922 "#define PLAN_DAG\n"
923 ;
924         ret +=
925 "#include <schemaparser.h>\n"
926 "#include<hfta_runtime_library.h>\n"
927 "\n"
928 "#include <host_tuple.h>\n"
929 "#include <hfta.h>\n"
930 "#include <hfta_udaf.h>\n"
931 "#include <hfta_sfun.h>\n"
932 "\n"
933 //"#define MAXSCHEMASZ 16384\n"
934 "#include <stdio.h>\n\n"
935 ;
936
937 //              Get include file for each of the operators.
938 //              avoid duplicate inserts.
939         set<string> include_fls;
940         for(qn=0;qn<query_plan.size();++qn){
941                 if(query_plan[qn] != NULL)
942                         include_fls.insert(query_plan[qn]->get_include_file());
943         }
944         for(i=0;i<output_operators.size();++i)
945                         include_fls.insert(output_operators[i]->get_include_file());
946         set<string>::iterator ssi;
947         for(ssi=include_fls.begin();ssi!=include_fls.end();++ssi)
948                 ret += (*ssi);
949
950 //              Add defines for hash functions
951         ret +=
952 "\n"
953 "#define hfta_BOOL_to_hash(x) (x)\n"
954 "#define hfta_USHORT_to_hash(x) (x)\n"
955 "#define hfta_UINT_to_hash(x) (x)\n"
956 "#define hfta_IP_to_hash(x) (x)\n"
957 "#define hfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
958 "#define hfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
959 "#define hfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
960 "#define hfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
961 "#define hfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
962 "\n"
963 ;
964
965 //      ret += "#define SERIOUS_LFTA \""+input_tbls[0]->get_schema_name()+"\"\n";
966         ret += "#define OUTPUT_HFTA \""+query_name+"\"\n\n";
967
968 //              HACK ALERT: I know for now that all query plans are
969 //              single operator plans, but while SPX and SGAH can use the
970 //              UNOP template, the merge operator must use the MULTOP template.
971 //              WORSE HACK ALERT : merge does not translate its input,
972 //              so don't apply finalize to the output.
973 //              TODO: clean this up.
974
975 //      string node_type = query_plan[0]->node_type();
976
977         ret += schema_str;
978         ret += tuple_defs;
979
980 //                      Need to work on the input, output xform logic.
981 //                      For now, just add it in.
982 //      ret += finalize_str;
983
984         if(node_fmt[qhead] == NETFORMAT){
985                 ret +=
986 "void finalize_tuple(host_tuple &tup){\n"
987 "return;\n"
988 "}\n"
989 "\n";
990         }else{
991                 ret += finalize_str;
992         }
993
994         ret += functor_defs;
995
996 //                      Parameter block management
997 //                      The proper parameter block must be transmitted to each
998 //                      external stream source.
999 //                      There is a 1-1 mapping between the param blocks returned
1000 //                      by this list and the registered data sources ...
1001 //                      TODO: make this more manageable, but for now
1002 //                      there is no parameter block manipulation so I just
1003 //                      need to have the same number.
1004
1005         ret +=
1006 "int get_lfta_params(gs_int32_t sz, void * value,list<param_block>& lst){\n"
1007 "               // for now every  lfta receive the full copy of hfta parameters\n"
1008 "        struct param_block pb;\n";
1009
1010         set<string> lfta_seen;
1011         for(i=0;i<input_tbls.size();++i){
1012                 string src_tblname = input_tbls[i]->get_schema_name();
1013                 if(lfta_seen.count(src_tblname) == 0){
1014                   lfta_seen.insert(src_tblname);
1015                   schref = input_tbls[i]->get_schema_ref();
1016                   if(Schema->get_schema_type(schref) == OPERATOR_VIEW_SCHEMA){
1017                         ov_ix = input_tbls[i]->get_opview_idx();
1018                         opview_entry *opv = opviews.get_entry(ov_ix);
1019                         string op_param = "SUBQ:";
1020                         int q;
1021                         for(q=0;q<opv->subq_names.size();++q){
1022                                 if(q>0) op_param+=",";
1023                                 op_param+=opv->subq_names[q];
1024                         }
1025                         op_param+="\\n";
1026                         param_sz = op_param.size()-1;
1027
1028                         sprintf(tmpstr,"\t\tpb.block_length = %d;\n",param_sz); ret+=tmpstr;
1029                         ret+=
1030 "        pb.data = malloc(pb.block_length);\n";
1031                         ret+="\t\tmemcpy(pb.data,\""+op_param+"\",pb.block_length);\n"
1032 "        lst.push_back(pb);\n\n";
1033                   }else{
1034                         ret+=
1035 "        pb.block_length = sz;\n"
1036 "        pb.data = malloc(pb.block_length);\n"
1037 "        memcpy(pb.data, value, pb.block_length);\n"
1038 "        lst.push_back(pb);\n\n";
1039                   }
1040                 }
1041         }
1042
1043         ret +=
1044 "        return 0;\n"
1045 "}\n"
1046 "\n";
1047
1048                 ret+=
1049 "struct FTA* alloc_hfta (struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command, gs_int32_t sz, void * value ) {\n"
1050 "\n"
1051 "       // find the lftas\n"
1052 "       list<lfta_info*> *lfta_list = new list<lfta_info*>;\n"
1053 "\n"
1054 "       FTAID f;\n"
1055 "       char schemabuf[MAXSCHEMASZ];\n"
1056 "       gs_schemahandle_t schema_handle;\n"
1057 "\n";
1058
1059 //              Register the input data sources.
1060 //              Register a source only once.
1061
1062 //      vector<string> ext_reg_txt;
1063         map<string, int> input_tbl_srcid;
1064         for(i=0;i<input_tbls.size();++i){
1065                 string src_tblname = input_tbls[i]->get_schema_name();
1066 //                      Use UDOP alias when in distributed mode.
1067 //                      the cluster manager will make the translation
1068 //                      using infr from qtree.xml
1069                 if(distributed_mode && input_tbls[i]->get_udop_alias() != "")
1070                         src_tblname = input_tbls[i]->get_udop_alias();
1071         if(input_tbl_srcid.count(src_tblname) == 0){
1072                 int srcid = input_tbl_srcid.size();
1073                 input_tbl_srcid[src_tblname] = srcid;
1074                 string tmp_s=
1075 "\n     // find "+src_tblname+"\n"
1076 "       if (fta_find(\""+src_tblname+"\",1,&f,schemabuf,MAXSCHEMASZ)!=0) {\n"
1077 "                           fprintf(stderr,\"HFTA::error:could not find LFTA \\n\");\n"
1078 "                           return 0;\n"
1079 "       }\n"
1080 "       //fprintf(stderr,\"HFTA::FTA found at %u[%u]\\n\",ftamsgid,ftaindex);\n"
1081 "\n"
1082 "       // parse the schema and get the schema handle\n"
1083 "       schema_handle = ftaschema_parse_string(schemabuf);\n"
1084 "       lfta_info* inf"+int_to_string(srcid)+" = new lfta_info();\n"
1085 "       inf"+int_to_string(srcid)+"->f = f;\n"
1086 "       inf"+int_to_string(srcid)+"->fta_name = strdup(\""+src_tblname+"\");\n"
1087 "       inf"+int_to_string(srcid)+"->schema = strdup(schemabuf);\n"
1088 "       inf"+int_to_string(srcid)+"->schema_handle = schema_handle;\n"
1089 "       lfta_list->push_back(inf"+int_to_string(srcid)+");\n\n";
1090 //              ext_reg_txt.push_back(tmp_s);
1091                 ret += tmp_s;
1092           }
1093         }
1094
1095         ret+="\n";
1096         ret += "\tgs_schemahandle_t root_schema_handle = ftaschema_parse_string("+generate_schema_string_name(query_name)+");\n";
1097         for(i=0;i<query_plan.size();++i){
1098                 if(i != qhead && query_plan[i]){
1099                         ret += "\tgs_schemahandle_t op"+int_to_string(i)+"_schema_handle = ftaschema_parse_string(node"+int_to_string(i)+"_schema);\n";
1100                 }
1101         }
1102         ret+="\n";
1103
1104 //              Create the operators.
1105         int n_basic_ops;
1106         for(q=0;q<query_plan.size();q++){
1107           if(query_plan[q]){
1108                 ret+=
1109 "\n"
1110 "       // create an instance of operator "+int_to_string(q)+" ("+query_plan[q]->get_node_name()+")     \n";
1111
1112 //                      Create parameters for operator construction.
1113                 string op_params;
1114                 vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
1115                 string tname = itbls[0]->get_schema_name();
1116 //              string li_tname = tname +"-"+int_to_string(q)+"-0";
1117 //              if(lfta_idx.count(li_tname)>0)
1118                 if(input_tbl_srcid.count(tname)>0){
1119 //                      ret += ext_reg_txt[lfta_idx[li_tname]];
1120 //                      op_params += "inf"+int_to_string( lfta_idx[li_tname] )+"->schema_handle";
1121                         op_params += "inf"+int_to_string( input_tbl_srcid[tname] )+"->schema_handle";
1122                 }else if(op_idx.count(tname)>0){
1123                         op_params += "op"+int_to_string( op_idx[tname] )+"_schema_handle";
1124                 }else{
1125                         fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when creating operator (3) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str());
1126                         exit(1);
1127                 }
1128                 if(itbls.size()>1){
1129                         string tname = itbls[1]->get_schema_name();
1130 //                      string li_tname = tname +"-"+int_to_string(q)+"-1";
1131 //                      if(lfta_idx.count(li_tname)>0)
1132                         if(input_tbl_srcid.count(tname)>0){
1133 //                              ret += ext_reg_txt[lfta_idx[li_tname]];
1134 //                              op_params += ",inf"+int_to_string( lfta_idx[li_tname] )+"->schema_handle";
1135                                 op_params += ",inf"+int_to_string( input_tbl_srcid[tname] )+"->schema_handle";
1136                         }else if(op_idx.count(tname)>0){
1137                                 op_params += ",op"+int_to_string( op_idx[tname] )+"_schema_handle";
1138                         }else{
1139                                 fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when creating operator (4) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str());
1140                                 exit(1);
1141                         }
1142                 }
1143                 ret += query_plan[q]->generate_operator(q,op_params);
1144                 ret +=
1145 "       operator_node* node"+int_to_string(q)+" = new operator_node(op"+int_to_string(q)+");\n";
1146
1147                 n_basic_ops = q;
1148           }
1149         }
1150         n_basic_ops++;
1151 //                      Next for the output operators if any
1152         for(i=0;i<output_operators.size();++i){
1153                 ret += output_operators[i]->generate_operator(n_basic_ops+i,"root_schema_handle");
1154                 ret +=
1155 "       operator_node* node"+int_to_string(n_basic_ops+i)+" = new operator_node(op"+int_to_string(n_basic_ops+i)+");\n";
1156         }
1157
1158
1159 //              Link up operators.
1160         for(q=0;q<query_plan.size();++q){
1161           if(query_plan[q]){
1162 //                      NOTE: this code assume that the operator has at most
1163 //                      two inputs.  But the template code also makes
1164 //                      this assumption.  Both will need to be changed.
1165                 vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
1166                 string tname = itbls[0]->get_schema_name();
1167 //              string li_tname = tname +"-"+int_to_string(q)+"-0";
1168 //              if(lfta_idx.count(li_tname)>0)
1169                 if(input_tbl_srcid.count(tname)>0){
1170 //                      ret += "\tnode"+int_to_string(q)+"->set_left_lfta(inf"+int_to_string( lfta_idx[li_tname] )+");\n";
1171                         ret += "\tnode"+int_to_string(q)+"->set_left_lfta(inf"+int_to_string( input_tbl_srcid[tname] )+");\n";
1172                 }else if(op_idx.count(tname)>0){
1173                         ret += "\tnode"+int_to_string(q)+"->set_left_child_node(node"+int_to_string( op_idx[tname] )+");\n";
1174                 }else{
1175                         fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when linking operator (1) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str());
1176                         exit(1);
1177                 }
1178                 if(itbls.size()>1){
1179                         string tname = itbls[1]->get_schema_name();
1180 //                      string li_tname = tname +"-"+int_to_string(q)+"-1";
1181 //                      if(lfta_idx.count(li_tname)>0)
1182                         if(input_tbl_srcid.count(tname)>0){
1183 //                              ret += "\tnode"+int_to_string(q)+"->set_right_lfta(inf"+int_to_string( lfta_idx[li_tname] )+");\n";
1184                                 ret += "\tnode"+int_to_string(q)+"->set_right_lfta(inf"+int_to_string( input_tbl_srcid[tname] )+");\n";
1185                         }else if(op_idx.count(tname)>0){
1186                                 ret += "\tnode"+int_to_string(q)+"->set_right_child_node(node"+int_to_string( op_idx[tname] )+");\n";
1187                         }else{
1188                                 fprintf(stderr,"INTERNAL ERROR, can't identify input table %s (%s) when linking operator (2) %d (%s)\n",tname.c_str(), tname.c_str(),q,query_plan[q]->get_node_name().c_str());
1189                                 exit(1);
1190                         }
1191                 }
1192           }
1193         }
1194         for(i=0;i<output_operators.size();++i){
1195                 if(i==0)
1196                         ret += "\tnode"+int_to_string(n_basic_ops)+"->set_left_child_node(node"+int_to_string( qhead )+");\n";
1197                 else
1198                         ret += "\tnode"+int_to_string(n_basic_ops+i)+"->set_left_child_node(node"+int_to_string( n_basic_ops+i-1 )+");\n";
1199         }
1200
1201         // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
1202
1203         bool fta_reusable = false;
1204         if (query_plan[qhead]->get_val_of_def("reusable") == "yes" ||
1205                 query_plan[qhead]->get_param_tbl()->size() == 0) {
1206                 fta_reusable = 1;
1207         }
1208
1209         int root_node = qhead;
1210         if(output_operators.size()>0)
1211                 root_node = n_basic_ops+i-1;
1212
1213         ret+=
1214 "\n"
1215 "\n"
1216 "       MULTOP_HFTA* ftap = new MULTOP_HFTA(ftaid, OUTPUT_HFTA, command, sz, value, root_schema_handle, node"+int_to_string(root_node)+", lfta_list, " + (fta_reusable ?"true":"false") + ", reusable);\n"
1217 "       if(ftap->init_failed()){ delete ftap; return 0;}\n"
1218 "       return (FTA*)ftap;\n"
1219 "}\n"
1220 "\n"
1221 "\n";
1222
1223
1224         string comm_bufsize = "16*1024*1024";
1225         if(defines.count("hfta_comm_buf")>0){
1226                 comm_bufsize = defines["hfta_comm_buf"];
1227         }
1228
1229         ret+=
1230 "\n"
1231 "int main(int argc, char * argv[]) {\n"
1232 "\n"
1233 "\n"
1234 "   /* parse the arguments */\n"
1235 "\n"
1236 "    gs_int32_t tip1,tip2,tip3,tip4;\n"
1237 "    endpoint gshub;\n"
1238 "    gs_sp_t instance_name;\n"
1239 "    if (argc<3) {\n"
1240 "                gslog(LOG_EMERG,\"Wrong arguments at startup\");\n"
1241 "        exit(1);\n"
1242 "    }\n"
1243 "\n"
1244 "    if ((sscanf(argv[1],\"%u.%u.%u.%u:%hu\",&tip1,&tip2,&tip3,&tip4,&(gshub.port)) != 5)) {\n"
1245 "        gslog(LOG_EMERG,\"HUB IP NOT DEFINED\");\n"
1246 "        exit(1);\n"
1247 "    }\n"
1248 "    gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);\n"
1249 "    gshub.port=htons(gshub.port);\n"
1250 "    instance_name=strdup(argv[2]);\n"
1251 "    if (set_hub(gshub)!=0) {\n"
1252 "        gslog(LOG_EMERG,\"Could not set hub\");\n"
1253 "        exit(1);\n"
1254 "    }\n"
1255 "    if (set_instance_name(instance_name)!=0) {\n"
1256 "        gslog(LOG_EMERG,\"Could not set instance name\");\n"
1257 "        exit(1);\n"
1258 "    }\n"
1259 "\n"
1260 "\n"
1261 "    /* initialize host library  */\n"
1262 "\n"
1263 //"    fprintf(stderr,\"Initializing gscp\\n\");\n"
1264 "    gsopenlog(argv[0]);\n"
1265 "\n"
1266 "    if (hostlib_init(HFTA, "+comm_bufsize+", DEFAULTDEV, 0, 0)!=0) {\n"
1267 "        fprintf(stderr,\"%s::error:could not initialize gscp\\n\",\n"
1268 "               argv[0]);\n"
1269 "        exit(1);\n"
1270 "    }\n"
1271 "\n"
1272 "\n"
1273 "\n"
1274 "    FTAID ret = fta_register(OUTPUT_HFTA, " + (fta_reusable?"1":"0") + ", DEFAULTDEV, alloc_hfta, "+generate_schema_string_name(query_name)+", -1, 0ull);\n"
1275 "    fta_start_service(-1);\n"
1276 "\n"
1277 "   return 0;\n"
1278 "\n"
1279 "}\n"
1280 "\n";
1281 ////////////////////
1282         return(ret);
1283 }
1284
1285
1286 //              Checks if the node i is compatible with interface partitioning
1287 //              (can be pushed below the merge that combines partitioned stream)
1288 //              i - index of the query node in a query plan
1289 bool stream_query::is_partn_compatible(int index, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result) {
1290         int i;
1291         qp_node* node = query_plan[index];
1292         qp_node* child_node = NULL;
1293
1294         if (node->predecessors.empty())
1295                 return false;
1296
1297         // all the node predecessors must be partition merges with the same partition definition
1298         partn_def_t* partn_def = NULL;
1299         for (i = 0; i < node->predecessors.size(); ++i) {
1300                 child_node = query_plan[node->predecessors[i]];
1301                 if (child_node->node_type() != "mrg_qpn")
1302                         return false;
1303
1304                 // merge must have only one parent for this optimization to work
1305                 // check that all its successors are the same
1306                 for (int j = 1; j < child_node->successors.size(); ++j) {
1307                         if (child_node->successors[j] != child_node->successors[0])
1308                                 return false;
1309                 }
1310
1311                 partn_def_t* new_partn_def = ((mrg_qpn*)child_node)->get_partn_definition(lfta_names, interface_names, machine_names, ifaces_db, partn_parse_result);
1312                 if (!new_partn_def)
1313                         return false;
1314                 if (!i)
1315                         partn_def = new_partn_def;
1316                 else if (new_partn_def != partn_def)
1317                         return false;
1318
1319         }
1320
1321         if (node->node_type() == "spx_qpn")     // spx nodes are always partition compatible
1322                 return true;
1323         else if (node->node_type() == "sgah_qpn") {
1324                 gb_table gb_tbl = ((sgah_qpn*)node)->gb_tbl;
1325                 return true; //partn_def->is_compatible(&gb_tbl);
1326         }
1327         else if (node->node_type() == "rsgah_qpn") {
1328                 gb_table gb_tbl = ((rsgah_qpn*)node)->gb_tbl;
1329                 return partn_def->is_compatible(&gb_tbl);
1330         }
1331         else if (node->node_type() == "sgahcwcb_qpn") {
1332                 gb_table gb_tbl = ((sgahcwcb_qpn*)node)->gb_tbl;
1333                 return partn_def->is_compatible(&gb_tbl);
1334         }
1335         else if (node->node_type() == "join_eq_hash_qpn") {
1336                 return true;
1337         }
1338         else
1339                 return false;
1340 }
1341
1342 //              Push the operator below the merge that combines
1343 void stream_query::pushdown_partn_operator(int index) {
1344         qp_node* node = query_plan[index];
1345         int i;
1346         char node_index[128];
1347
1348 //      fprintf(stderr, "Partn pushdown, query %s of type %s\n", node->get_node_name().c_str(), node->node_type().c_str());
1349
1350
1351         // HACK ALERT: When reordering merges we screw up slack computation
1352         // since slack should no longer be used, it is not an issue
1353
1354
1355         // we can safely reorder nodes that have one and only one temporal atribute in select list
1356         table_def* table_layout = node->get_fields();
1357         vector<field_entry*> fields = table_layout->get_fields();
1358         int merge_fieldpos = -1;
1359
1360         data_type* dt;
1361         for (i = 0; i < fields.size(); ++i) {
1362                 data_type dt(fields[i]->get_type(),fields[i]->get_modifier_list());
1363                 if(dt.is_temporal()) {
1364                         if (merge_fieldpos != -1)       // more that one temporal field found
1365                                 return;
1366                         merge_fieldpos = i;
1367                 }
1368         }
1369
1370         if (merge_fieldpos == -1)       // if no temporal fieldf found
1371                 return;
1372
1373         std::vector<colref_t *> mvars;                  // the merge-by columns.
1374
1375         // reodring procedure is different for unary operators and joins
1376         if (node->node_type() == "join_eq_hash_qpn") {
1377                 vector<qp_node*> new_nodes;
1378
1379                 tablevar_t *left_table_name;
1380                 tablevar_t *right_table_name;
1381                 mrg_qpn* left_mrg = (mrg_qpn*)query_plan[node->predecessors[0]];
1382                 mrg_qpn* right_mrg = (mrg_qpn*)query_plan[node->predecessors[1]];
1383
1384                 // for now we will only consider plans where both child merges
1385                 // merge the same set of streams
1386
1387                 if (left_mrg->fm.size() != right_mrg->fm.size())
1388                         return;
1389
1390                 // maping of interface names to table definitions
1391                 map<string, tablevar_t*> iface_map;
1392                 for (i = 0; i < left_mrg->fm.size(); i++) {
1393                         left_table_name = left_mrg->fm[i];
1394                         iface_map[left_table_name->get_machine() + left_table_name->get_interface()] = left_table_name;
1395                 }
1396
1397                 for (i = 0; i < right_mrg->fm.size(); i++) {
1398                         right_table_name = right_mrg->fm[i];
1399
1400                         // find corresponding left tabke
1401                         if (!iface_map.count(right_table_name->get_machine() + right_table_name->get_interface()))
1402                                 return;
1403
1404                         left_table_name = iface_map[right_table_name->get_machine() + right_table_name->get_interface()];
1405
1406                         // create new join nodes
1407                         sprintf(node_index, "_%d", i);
1408                         join_eq_hash_qpn* new_node = (join_eq_hash_qpn*)node->make_copy(node_index);
1409
1410                         // make a copy of right_table_name
1411                         right_table_name = right_table_name->duplicate();
1412                         left_table_name->set_range_var(((join_eq_hash_qpn*)node)->from[0]->get_var_name());
1413                         right_table_name->set_range_var(((join_eq_hash_qpn*)node)->from[1]->get_var_name());
1414                         new_node->from[0] = left_table_name;
1415                         new_node->from[1] = right_table_name;
1416                         new_nodes.push_back(new_node);
1417                 }
1418
1419                 // make right_mrg a new root
1420                 right_mrg->set_node_name(node->get_node_name());
1421                 right_mrg->table_layout = table_layout;
1422                 right_mrg->merge_fieldpos = merge_fieldpos;
1423
1424                 for (i = 0; i < right_mrg->fm.size(); i++) {
1425                         // make newly create joins children of merge
1426                         sprintf(node_index, "_%d", i);
1427                         right_mrg->fm[i] = new tablevar_t(right_mrg->fm[i]->get_machine().c_str(), right_mrg->fm[i]->get_interface().c_str(), (node->get_node_name() + string(node_index)).c_str());
1428                         sprintf(node_index, "_m%d", i);
1429                         right_mrg->fm[i]->set_range_var(node_index);
1430                         right_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos));
1431
1432                 }
1433
1434                 if (left_mrg != right_mrg)
1435                         query_plan[node->predecessors[0]] = NULL;       // remove left merge from the plan
1436
1437                 query_plan.insert(query_plan.end(), new_nodes.begin(), new_nodes.end());
1438
1439         } else {        // unary operator
1440                 // get the child merge node
1441                 mrg_qpn* child_mrg = (mrg_qpn*)query_plan[node->predecessors[0]];
1442
1443                 child_mrg->set_node_name(node->get_node_name());
1444                 child_mrg->table_layout = table_layout;
1445                 child_mrg->merge_fieldpos = merge_fieldpos;
1446
1447                 // create new nodes for every source stream
1448                 for (i = 0; i < child_mrg->fm.size(); i++) {
1449                         tablevar_t *table_name = child_mrg->fm[i];
1450                         sprintf(node_index, "_%d", i);
1451                         qp_node* new_node = node->make_copy(node_index);
1452
1453                         if (node->node_type() == "spx_qpn")
1454                                 ((spx_qpn*)new_node)->table_name = table_name;
1455                         else if (node->node_type() == "sgah_qpn")
1456                                 ((sgah_qpn*)new_node)->table_name = table_name;
1457                         else if (node->node_type() == "rsgah_qpn")
1458                                 ((rsgah_qpn*)new_node)->table_name = table_name;
1459                         else if (node->node_type() == "sgahcwcb_qpn")
1460                                 ((sgahcwcb_qpn*)new_node)->table_name = table_name;
1461                         table_name->set_range_var("_t0");
1462
1463                         child_mrg->fm[i] = new tablevar_t(table_name->get_machine().c_str(), table_name->get_interface().c_str(), new_node->get_node_name().c_str());
1464                         sprintf(node_index, "_m%d", i);
1465                         child_mrg->fm[i]->set_range_var(node_index);
1466                         child_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos));
1467
1468                         // add new node to query plan
1469                         query_plan.push_back(new_node);
1470                 }
1471         }
1472         query_plan[index] = NULL;
1473         generate_linkage();
1474 }
1475
1476
1477 //              Checks if the node i can be pushed below the merge
1478   bool stream_query::is_pushdown_compatible(int index, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names) {
1479
1480         int i;
1481         qp_node* node = query_plan[index];
1482         qp_node* child_node = NULL;
1483
1484         if (node->predecessors.size() != 1)
1485                 return false;
1486
1487         // node predecessor must be merge that combine streams from multiple hosts
1488         child_node = query_plan[node->predecessors[0]];
1489         if (child_node->node_type() != "mrg_qpn")
1490                 return false;
1491
1492         if (!((mrg_qpn*)child_node)->is_multihost_merge())
1493                 return false;
1494
1495         // merge must have only one parent for this optimization to work
1496         // check that all its successors are the same
1497         for (int j = 1; j < child_node->successors.size(); ++j) {
1498                 if (child_node->successors[j] != child_node->successors[0])
1499                         return false;
1500         }
1501
1502         // selections can always be pushed down, aggregations can always be split into selection/aggr or aggr/aggr pair
1503         // and pushed down
1504         if (node->node_type() == "spx_qpn")
1505                 return true;
1506         else if (node->node_type() == "sgah_qpn")
1507                 return true;
1508         else
1509                 return false;
1510
1511   }
1512
1513 //              Push the operator below the merge
1514   void stream_query::pushdown_operator(int index, ext_fcn_list *Ext_fcns, table_list *Schema) {
1515         qp_node* node = query_plan[index];
1516         int i;
1517         char node_suffix[128];
1518
1519         // we can only safely push down queries that have one and only one temporal atribute in select list
1520         table_def* table_layout = node->get_fields();
1521         vector<field_entry*> fields = table_layout->get_fields();
1522         int merge_fieldpos = -1;
1523
1524         data_type* dt;
1525         for (i = 0; i < fields.size(); ++i) {
1526                 data_type dt(fields[i]->get_type(),fields[i]->get_modifier_list());
1527                 if(dt.is_temporal()) {
1528                         if (merge_fieldpos != -1)       // more that one temporal field found
1529                                 return;
1530                         merge_fieldpos = i;
1531                 }
1532         }
1533
1534         if (merge_fieldpos == -1)       // if no temporal field found
1535                 return;
1536
1537         std::vector<colref_t *> mvars;                  // the merge-by columns.
1538
1539         fprintf(stderr, "Regular pushdown, query %s of type %s\n", node->get_node_name().c_str(), node->node_type().c_str());
1540
1541         // get the child merge node
1542         mrg_qpn* child_mrg = (mrg_qpn*)query_plan[node->predecessors[0]];
1543
1544         tablevar_t *table_name = NULL;
1545
1546
1547         if (node->node_type() == "spx_qpn") {
1548                 // get the child merge node
1549
1550                 // create new nodes for every source stream
1551                 for (i = 0; i < child_mrg->fm.size(); i++) {
1552                         table_name = child_mrg->fm[i];
1553                         sprintf(node_suffix, "_%d", i);
1554                         qp_node* new_node = node->make_copy(node_suffix);
1555
1556                         ((spx_qpn*)new_node)->table_name = table_name;
1557                         table_name->set_range_var("_t0");
1558
1559                         child_mrg->fm[i] = new tablevar_t(table_name->get_machine().c_str(), table_name->get_interface().c_str(), new_node->get_node_name().c_str());
1560                         sprintf(node_suffix, "_m%d", i);
1561                         child_mrg->fm[i]->set_range_var(node_suffix);
1562                         child_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos));
1563
1564                         // add new node to query plan
1565                         query_plan.push_back(new_node);
1566                 }
1567                 child_mrg->table_layout = table_layout;
1568                 child_mrg->merge_fieldpos = merge_fieldpos;
1569
1570         } else {                // aggregation node
1571
1572                 vector<qp_node*> new_nodes;
1573
1574                 // split aggregations into high and low-level part
1575                 vector<qp_node *> split_nodes = ((sgah_qpn*)node)->split_node_for_hfta(Ext_fcns, Schema);
1576                 if (split_nodes.size() != 2)
1577                         return;
1578
1579                 sgah_qpn* super_aggr = (sgah_qpn*)split_nodes[1];
1580                 super_aggr->table_name = ((sgah_qpn*)node)->table_name;
1581
1582                 // group all the sources by host
1583                 map<string, vector<int> > host_map;
1584                 for (i = 0; i < child_mrg->fm.size(); i++) {
1585                         tablevar_t *table_name = child_mrg->fm[i];
1586                         if (host_map.count(table_name->get_machine()))
1587                                 host_map[table_name->get_machine()].push_back(i);
1588                         else {
1589                                 vector<int> tables;
1590                                 tables.push_back(i);
1591                                 host_map[table_name->get_machine()] = tables;
1592                         }
1593                 }
1594
1595                 // create a new merge and low-level aggregation for each host
1596                 map<string, vector<int> >::iterator iter;
1597                 for (iter = host_map.begin(); iter != host_map.end(); iter++) {
1598                         string host_name = (*iter).first;
1599                         vector<int> tables = (*iter).second;
1600
1601                         sprintf(node_suffix, "_%s", host_name.c_str());
1602                         string suffix(node_suffix);
1603                         untaboo(suffix);
1604                         mrg_qpn *new_mrg = (mrg_qpn *)child_mrg->make_copy(suffix);
1605                         for (i = 0; i < tables.size(); ++i) {
1606                                 sprintf(node_suffix, "_m%d", i);
1607                                 new_mrg->fm.push_back(child_mrg->fm[tables[i]]);
1608                                 new_mrg->mvars.push_back(child_mrg->mvars[i]);
1609                                 new_mrg->fm[i]->set_range_var(node_suffix);
1610                         }
1611                         qp_node* new_node = split_nodes[0]->make_copy(suffix);
1612
1613                         if (new_node->node_type() == "spx_qpn")  {
1614                                 ((spx_qpn*)new_node)->table_name = new tablevar_t(host_name.c_str(), "IFACE", new_mrg->get_node_name().c_str());
1615                                 ((spx_qpn*)new_node)->table_name->set_range_var("_t0");
1616                         } else {
1617                                 ((sgah_qpn*)new_node)->table_name = new tablevar_t(host_name.c_str(), "IFACE", new_mrg->get_node_name().c_str());
1618                                 ((sgah_qpn*)new_node)->table_name->set_range_var("_t0");
1619                         }
1620                         query_plan.push_back(new_mrg);
1621                         new_nodes.push_back(new_node);
1622                 }
1623
1624                 child_mrg->merge_fieldpos = merge_fieldpos;
1625                 if (split_nodes[0]->node_type() == "spx_qpn")
1626                         child_mrg->table_layout = create_attributes(child_mrg->get_node_name(), ((spx_qpn*)split_nodes[0])->select_list);
1627                 else
1628                         child_mrg->table_layout = create_attributes(child_mrg->get_node_name(), ((sgah_qpn*)split_nodes[0])->select_list);
1629
1630
1631                 // connect newly created nodes with parent multihost merge
1632                 for (i = 0; i < new_nodes.size(); ++i) {
1633                         if (new_nodes[i]->node_type() == "spx_qpn")
1634                                 child_mrg->fm[i] = new tablevar_t(((spx_qpn*)new_nodes[i])->table_name->get_machine().c_str(), "IFACE", new_nodes[i]->get_node_name().c_str());
1635                         else {
1636                                 child_mrg->fm[i] = new tablevar_t(((sgah_qpn*)new_nodes[i])->table_name->get_machine().c_str(), "IFACE", new_nodes[i]->get_node_name().c_str());
1637
1638                         }
1639                         child_mrg->mvars[i]->set_field(child_mrg->table_layout->get_field_name(merge_fieldpos));
1640
1641                         sprintf(node_suffix, "_m%d", i);
1642                         child_mrg->fm[i]->set_range_var(node_suffix);
1643                         query_plan.push_back(new_nodes[i]);
1644                 }
1645                 child_mrg->fm.resize(new_nodes.size());
1646                 child_mrg->mvars.resize(new_nodes.size());
1647
1648                 // push the new high-level aggregation
1649                 query_plan.push_back(super_aggr);
1650
1651         }
1652         query_plan[index] = NULL;
1653         generate_linkage();
1654
1655   }
1656
1657 //              Extract subtree rooted at node i into separate hfta
1658 stream_query* stream_query::extract_subtree(int index) {
1659
1660         vector<int> nodes;
1661         stream_query* new_query = new stream_query(query_plan[index], this);
1662
1663         nodes.push_back(index);
1664         for (int i = 0; i < nodes.size(); ++i) {
1665                 qp_node* node = query_plan[nodes[i]];
1666                 if (!node)
1667                         continue;
1668
1669                 // add all children to nodes list
1670                 for (int j = 0; j < node->predecessors.size(); ++j)
1671                         nodes.push_back(node->predecessors[j]);
1672                 if (i)
1673                         new_query->query_plan.push_back(node);
1674
1675                 query_plan[nodes[i]] = NULL;
1676         }
1677
1678         return new_query;
1679 }
1680
1681 //              Splits query that combines data from multiple hosts into separate hftas.
1682 vector<stream_query*> stream_query::split_multihost_query()  {
1683
1684         vector<stream_query*> ret;
1685         char node_suffix[128];
1686         int i;
1687
1688         // find merges combining multiple hosts into per-host groups
1689         int plan_size = query_plan.size();
1690         vector<mrg_qpn*> new_nodes;
1691
1692         for (i = 0; i < plan_size; ++i) {
1693                 qp_node* node = query_plan[i];
1694                 if (node && node->node_type() == "mrg_qpn") {
1695                         mrg_qpn* mrg = (mrg_qpn*)node;
1696                         if (mrg->is_multihost_merge()) {
1697
1698                                 // group all the sources by host
1699                                 map<string, vector<int> > host_map;
1700                                 for (int j = 0; j < mrg->fm.size(); j++) {
1701                                         tablevar_t *table_name = mrg->fm[j];
1702                                         if (host_map.count(table_name->get_machine()))
1703                                                 host_map[table_name->get_machine()].push_back(j);
1704                                         else {
1705                                                 vector<int> tables;
1706                                                 tables.push_back(j);
1707                                                 host_map[table_name->get_machine()] = tables;
1708                                         }
1709                                 }
1710
1711                                 // create a new merge for each host
1712                                 map<string, vector<int> >::iterator iter;
1713                                 for (iter = host_map.begin(); iter != host_map.end(); iter++) {
1714                                         string host_name = (*iter).first;
1715                                         vector<int> tables = (*iter).second;
1716
1717                                         if (tables.size() == 1)
1718                                                 continue;
1719
1720                                         sprintf(node_suffix, "_%s", host_name.c_str());
1721                                         string suffix(node_suffix);
1722                                         untaboo(suffix);
1723                                         mrg_qpn *new_mrg = (mrg_qpn *)mrg->make_copy(suffix);
1724                                         for (int j = 0; j < tables.size(); ++j) {
1725                                                 new_mrg->fm.push_back(mrg->fm[tables[j]]);
1726                                                 new_mrg->mvars.push_back(mrg->mvars[j]);
1727                                                 sprintf(node_suffix, "m_%d", j);
1728                                                 new_mrg->fm[j]->set_range_var(node_suffix);
1729                                         }
1730                                         new_nodes.push_back(new_mrg);
1731                                 }
1732
1733                                 if (!new_nodes.empty()) {
1734                                         // connect newly created merge nodes with parent multihost merge
1735                                         for (int j = 0; j < new_nodes.size(); ++j) {
1736                                                 mrg->fm[j] = new tablevar_t(new_nodes[j]->fm[0]->get_machine().c_str(), "IFACE", new_nodes[j]->get_node_name().c_str());
1737                                                 query_plan.push_back(new_nodes[j]);
1738                                         }
1739                                         mrg->fm.resize(new_nodes.size());
1740                                         mrg->mvars.resize(new_nodes.size());
1741                                         generate_linkage();
1742                                 }
1743
1744
1745                                 // now externalize the sources
1746                                 for (int j = 0; j < node->predecessors.size(); ++j) {
1747                                         //              Extract subtree rooted at node i into separate hfta
1748                                         stream_query* q = extract_subtree(node->predecessors[j]);
1749                                         if (q) {
1750                                                 q->generate_linkage();
1751                                                 ret.push_back(q);
1752                                         }
1753
1754                                 }
1755                                 generate_linkage();
1756                         }
1757                 }
1758         }
1759
1760         return ret;
1761 }
1762
1763
1764 //              Perform local FTA optimizations
1765 void stream_query::optimize(vector<stream_query *>& hfta_list, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ext_fcn_list *Ext_fcns, table_list *Schema, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result){
1766
1767         // Topologically sort the nodes in query plan (leaf-first)
1768         int i = 0, j = 0;
1769         vector<int> sorted_nodes;
1770
1771         int num_nodes = query_plan.size();
1772         bool* leaf_flags = new bool[num_nodes];
1773         memset(leaf_flags, 0, num_nodes * sizeof(bool));
1774
1775         // run topological sort
1776         bool done = false;
1777
1778         // add all leafs to sorted_nodes
1779         while (!done) {
1780                 done = true;
1781                 for (i = 0; i < num_nodes; ++i) {
1782                         if (!query_plan[i])
1783                                 continue;
1784
1785                         if (!leaf_flags[i] && query_plan[i]->predecessors.empty()) {
1786                                 leaf_flags[i] = true;
1787                                 sorted_nodes.push_back(i);
1788                                 done = false;
1789
1790                                 // remove the node from its parents predecessor lists
1791                                 // since we only care about number of predecessors, it is sufficient just to remove
1792                                 // one element from the parent's predecessors list
1793                                 for (int j = query_plan[i]->successors.size() - 1; j >= 0; --j)
1794                                         query_plan[query_plan[i]->successors[j]]->predecessors.pop_back();
1795                         }
1796                 }
1797         }
1798         delete[] leaf_flags;
1799         num_nodes = sorted_nodes.size();
1800         generate_linkage();             // rebuild the recently destroyed predecessor lists.
1801
1802         // collect the information about interfaces nodes read from
1803         for (i = 0; i < num_nodes; ++i) {
1804                 qp_node* node = query_plan[sorted_nodes[i]];
1805                 vector<tablevar_t *> input_tables = node->get_input_tbls();
1806                 for (j = 0; j <  input_tables.size(); ++j) {
1807                         tablevar_t * table = input_tables[j];
1808                         if (lfta_names.count(table->schema_name)) {
1809                                 int index = lfta_names[table->schema_name];
1810                                 table->set_machine(machine_names[index]);
1811                                 table->set_interface(interface_names[index]);
1812                         }
1813                 }
1814         }
1815
1816         /*
1817
1818         // push eligible operators down in the query plan
1819         for (i = 0; i < num_nodes; ++i) {
1820                 if (partn_parse_result && is_partn_compatible(sorted_nodes[i], lfta_names, interface_names, machine_names, ifaces_db, partn_parse_result)) {
1821                         pushdown_partn_operator(sorted_nodes[i]);
1822                 } else if (is_pushdown_compatible(sorted_nodes[i], lfta_names, interface_names, machine_names)) {
1823                         pushdown_operator(sorted_nodes[i], Ext_fcns, Schema);
1824                 }
1825         }
1826
1827         // split the query into multiple hftas if it combines the data from multiple hosts
1828         vector<stream_query*> hftas = split_multihost_query();
1829         hfta_list.insert(hfta_list.end(), hftas.begin(), hftas.end());
1830         */
1831
1832         num_nodes = query_plan.size();
1833         // also split multi-way merges into two-way merges
1834         for (i = 0; i < num_nodes; ++i) {
1835                 qp_node* node = query_plan[i];
1836                 if (node && node->node_type() == "mrg_qpn") {
1837                         vector<mrg_qpn *> split_merge = ((mrg_qpn *)node)->split_sources();
1838
1839                         query_plan.insert(query_plan.end(), split_merge.begin(), split_merge.end());
1840                         // delete query_plan[sorted_nodes[i]];
1841                         query_plan[i] = NULL;
1842                 }
1843         }
1844
1845         generate_linkage();
1846 }
1847
1848
1849
1850 table_def *stream_query::get_output_tabledef(){
1851         return( query_plan[qhead]->get_fields() );
1852 }
1853
1854 vector<string> stream_query::get_tbl_keys(vector<string> &partial_keys){
1855         return query_plan[qhead]->get_tbl_keys(partial_keys);
1856 }
1857
1858
1859
1860 //////////////////////////////////////////////////////////
1861 ////            De-siloing.  TO BE REMOVED
1862
1863 void stream_query::desilo_lftas(map<string, int> &lfta_names,vector<string> &silo_names,table_list *Schema){
1864         int i,t,s;
1865
1866         int suffix_len = silo_names.back().size();
1867
1868         for(i=0;i<qtail.size();++i){
1869                 vector<tablevar_t *> itbls = query_plan[qtail[i]]->get_input_tbls();
1870                 for(t=0;t<itbls.size();++t){
1871                         string itbl_name = itbls[t]->get_schema_name();
1872                         if(lfta_names.count(itbl_name)>0){
1873 //printf("Query %s input %d references lfta input %s\n",query_plan[qtail[i]]->get_node_name().c_str(),t,itbl_name.c_str());
1874                                 vector<string> src_names;
1875                                 string lfta_base = itbl_name.substr(0,itbl_name.size()-suffix_len);
1876                                 for(s=0;s<silo_names.size();++s){
1877                                         string lfta_subsilo = lfta_base + silo_names[s];
1878 //printf("\t%s\n",lfta_subsilo.c_str());
1879                                         src_names.push_back(lfta_subsilo);
1880                                 }
1881                                 string merge_node_name = "desilo_"+query_plan[qtail[i]]->get_node_name()+
1882                                         "_input_"+int_to_string(t);
1883                                 mrg_qpn *merge_node = new mrg_qpn(merge_node_name,src_names,Schema);
1884                                 int m_op_pos = Schema->add_table(merge_node->table_layout);
1885                                 itbls[t]->set_schema(merge_node_name);
1886                                 itbls[t]->set_schema_ref(m_op_pos);
1887                                 query_plan.push_back(merge_node);
1888                         }
1889                 }
1890         }
1891
1892         generate_linkage();
1893 }
1894 ////////////////////////////////////////
1895 ///             End de-siloing
1896
1897
1898
1899 //                      Given a collection of LFTA stream queries,
1900 //                      extract their WHERE predicates
1901 //                      and pass them to an analysis routine which will extract
1902 //                      common elements
1903 //
1904 void get_common_lfta_filter(std::vector<stream_query *> lfta_list,table_list *Schema,ext_fcn_list *Ext_fcns, vector<cnf_set *> &prefilter_preds, set<unsigned int> &pred_ids){
1905         int s;
1906         std::vector< std::vector<cnf_elem *> > where_list;
1907
1908 //              still safe to assume that LFTA queries have a single
1909 //              query node, which is at position 0.
1910         for(s=0;s<lfta_list.size();++s){
1911                 vector<cnf_elem *> cnf_list = lfta_list[s]->query_plan[0]->get_filter_clause();
1912                 if(lfta_list[s]->query_plan[0]->node_type() == "sgah_qpn"){
1913                         gb_table *gtbl = ((sgah_qpn *)(lfta_list[s]->query_plan[0]))->get_gb_tbl();
1914                         int c;
1915                         for(c=0;c<cnf_list.size();++c){
1916                                 insert_gb_def_pr(cnf_list[c]->pr,gtbl);
1917                         }
1918                 }
1919                 where_list.push_back(lfta_list[s]->query_plan[0]->get_filter_clause());
1920         }
1921
1922         find_common_filter(where_list,Schema,Ext_fcns,prefilter_preds, pred_ids);
1923 }
1924
1925
1926
1927
1928
1929 //                      Given a collection of LFTA stream queries,
1930 //                      extract the union of all temporal attributes referenced in select clauses
1931 //                      those attributes will need to be unpacked in prefilter
1932 //
1933 void get_prefilter_temporal_cids(std::vector<stream_query *> lfta_list, col_id_set &temp_cids){
1934         int s, sl;
1935         vector<scalarexp_t *> sl_list;
1936         gb_table *gb_tbl = NULL;
1937
1938
1939 //              still safe to assume that LFTA queries have a single
1940 //              query node, which is at position 0.
1941         for(s=0;s<lfta_list.size();++s){
1942
1943                 if(lfta_list[s]->query_plan[0]->node_type() == "spx_qpn"){
1944                         spx_qpn *spx_node = (spx_qpn *)lfta_list[s]->query_plan[0];
1945                         sl_list = spx_node->get_select_se_list();
1946                 }
1947                 if(lfta_list[s]->query_plan[0]->node_type() == "sgah_qpn"){
1948                         sgah_qpn *sgah_node = (sgah_qpn *)lfta_list[s]->query_plan[0];
1949                         sl_list = sgah_node->get_select_se_list();
1950                         gb_tbl = sgah_node->get_gb_tbl();
1951                 }
1952
1953                 if(lfta_list[s]->query_plan[0]->node_type() == "filter_join"){
1954                         filter_join_qpn *fj_node = (filter_join_qpn *)lfta_list[s]->query_plan[0];
1955                         sl_list = fj_node->get_select_se_list();
1956                         col_id ci;      // also get the temporal var in case not in select list
1957                         ci.load_from_colref(fj_node->temporal_var);
1958                         temp_cids.insert(ci);
1959                 }
1960                 if(lfta_list[s]->query_plan[0]->node_type() == "watch_join"){
1961                         watch_join_qpn *wj_node = (watch_join_qpn *)lfta_list[s]->query_plan[0];
1962                         sl_list = wj_node->get_select_se_list();
1963                 }
1964
1965                 for(sl=0;sl<sl_list.size();sl++){
1966                         data_type *sdt = sl_list[sl]->get_data_type();
1967                         if (sdt->is_temporal()) {
1968                                 gather_se_col_ids(sl_list[sl],temp_cids, gb_tbl);
1969                         }
1970                 }
1971         }
1972 }
1973
1974