Fixed newline characters throughout the code
[com/gs-lite.git] / src / ftacmp / stream_query.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15 #include"stream_query.h"
16 #include"generate_utils.h"
17 #include"analyze_fta.h"
18 #include<algorithm>
19 #include<utility>
20 #include <list>
21
22 static char tmpstr[500];
23
24 using namespace std;
25
26
27 //              Create a query plan from a query node and an existing
28 //              query plan.  Use for lfta queries, the parent query plan provides
29 //              the annotations.
30 stream_query::stream_query(qp_node *qnode, stream_query *parent){
31         query_plan.push_back(qnode);
32         qhead = 0;
33         qtail.push_back(0);
34         attributes = qnode->get_fields();
35         parameters = qnode->get_param_tbl();
36         defines = parent->defines;
37         query_name = qnode->get_node_name();
38 }
39
40 //              Copy the query plan.
41 stream_query::stream_query(stream_query &src){
42         query_plan = src.query_plan;
43         qhead = src.qhead;
44         qtail = src.qtail;
45         attributes = src.attributes;
46         parameters = src.parameters;
47         defines = src.defines;
48         query_name = src.query_name;
49         gid = src.gid;
50 }
51
52
53 //              Create a query plan from an analyzed parse tree.
54 //              Perform analyses to find the output node, input nodes, etc.
55
56 stream_query::stream_query(query_summary_class *qs,table_list *Schema){
57 //              Generate the query plan nodes from the analyzed parse tree.
58 //              There is only one for now, so just assign the return value
59 //              of create_query_nodes to query_plan
60         error_code = 0;
61         query_plan = create_query_nodes(qs,Schema);
62     int i;
63 if(query_plan.size() == 0){
64   fprintf(stderr,"INTERNAL ERROR, zero-size query plan in stream_query::stream_query\n");
65   exit(1);
66 }
67     for(i=0;i<query_plan.size();++i){
68                 if(query_plan[i]){
69                         if(query_plan[i]->get_error_code() != 0){
70                                 error_code = query_plan[i]->get_error_code();
71                                 err_str +=  query_plan[i]->get_error_str();
72                         }
73                 }
74         }
75         qhead = query_plan.size()-1;
76         gid = -1;
77
78 }
79
80
81 stream_query * stream_query::add_query(query_summary_class *qs,table_list *Schema){
82 //              Add another query block to the query plan
83         error_code = 0;
84         vector<qp_node *> new_nodes = create_query_nodes(qs, Schema);
85         query_plan.insert(query_plan.end(),new_nodes.begin(), new_nodes.end());
86         return this;
87 }
88
89 stream_query * stream_query::add_query(stream_query &src){
90 //              Add another query block to the query plan
91         error_code = 0;
92         query_plan.insert(query_plan.end(),src.query_plan.begin(), src.query_plan.end());
93         return this;
94 }
95
96
97 void stream_query::generate_protocol_se(map<string,stream_query *> &sq_map, table_list *Schema){
98         int i,n;
99
100 //              Mapping fields to protocol fields requires schema binding.
101 //      First ensure all nodes are in the schema.
102     for(n=0;n<query_plan.size();++n){
103         if(query_plan[n] != NULL){
104             Schema->add_table(query_plan[n]->get_fields());
105         }
106     }
107 //              Now do schema binding
108         for(n=0;n<query_plan.size();++n){
109                 if(query_plan[n] != NULL){
110                         query_plan[n]->bind_to_schema(Schema);
111                 }
112         }
113
114 //                      create name-to-index map
115         map<string, int> name_to_node;
116         for(n=0;n<query_plan.size();++n){
117                 if(query_plan[n]){
118                         name_to_node[query_plan[n]->get_node_name()] = n;
119                 }
120         }
121
122 //              Create a list of the nodes to process, in order.
123 //              Search from the root down.
124 //                      ASSUME tree plan.
125
126         list<int> search_q;
127         list<int> work_list;
128
129         search_q.push_back(qhead);
130         while(! search_q.empty()){
131                 int the_q = search_q.front();
132                 search_q.pop_front(); work_list.push_front(the_q);
133                 vector<int> the_pred = query_plan[the_q]->get_predecessors();
134                 for(i=0;i<the_pred.size();i++)
135                         search_q.push_back(the_pred[i]);
136         }
137
138 //              Scan through the work list, from the front,
139 //              gather the qp_node's ref'd, and call the
140 //              protocol se generator.  Pass NULL for
141 //              sources not found - presumably user-defined operator
142
143         while(! work_list.empty()){
144                 int the_q = work_list.front();
145                 work_list.pop_front();
146                 vector<qp_node *> q_sources;
147                 vector<tablevar_t *> q_input_tbls = query_plan[the_q]->get_input_tbls();
148                 for(i=0;i<q_input_tbls.size();++i){
149                          string itbl_nm = q_input_tbls[i]->get_schema_name();
150                          if(name_to_node.count(itbl_nm)>0){
151                                  q_sources.push_back(query_plan[name_to_node[itbl_nm]]);
152                          }else if(sq_map.count(itbl_nm)>0){
153                                  q_sources.push_back(sq_map[itbl_nm]->get_query_head());
154                          }else{
155                                  q_sources.push_back(NULL);
156                          }
157                 }
158                 query_plan[the_q]->create_protocol_se(q_sources, Schema);
159         }
160
161 //////////////////////////////////////////////////////////
162 //                      trust but verify
163
164
165 /*
166         for(i=0;i<query_plan.size();++i){
167                 if(query_plan[i]){
168                         printf("query node %s, type=%s:\n",query_plan[i]->get_node_name().c_str(),
169                                         query_plan[i]->node_type().c_str());
170                         map<std::string, scalarexp_t *> *pse_map = query_plan[i]->get_protocol_se();
171                         map<std::string, scalarexp_t *>::iterator mssi;
172                         for(mssi=pse_map->begin();mssi!=pse_map->end();++mssi){
173                                 if((*mssi).second)
174                                         printf("\t%s : %s\n",(*mssi).first.c_str(), (*mssi).second->to_string().c_str());
175                                 else
176                                         printf("\t%s : NULL\n",(*mssi).first.c_str());
177                         }
178                         if(query_plan[i]->node_type() == "filter_join" || query_plan[i]->node_type() == "join_eq_hash_qpn"){
179                                 vector<scalarexp_t *> pse_l;
180                                 vector<scalarexp_t *> pse_r;
181                                 if(query_plan[i]->node_type() == "filter_join"){
182                                         pse_l = ((filter_join_qpn *)query_plan[i])->get_hash_l();
183                                         pse_r = ((filter_join_qpn *)query_plan[i])->get_hash_r();
184                                 }
185                                 if(query_plan[i]->node_type() == "join_eq_hash_qpn"){
186                                         pse_l = ((join_eq_hash_qpn *)query_plan[i])->get_hash_l();
187                                         pse_r = ((join_eq_hash_qpn *)query_plan[i])->get_hash_r();
188                                 }
189                                 int p;
190                                 for(p=0;p<pse_l.size();++p){
191                                         if(pse_l[p] != NULL)
192                                                 printf("\t\t%s = ",pse_l[p]->to_string().c_str());
193                                         else
194                                                 printf("\t\tNULL = ");
195                                         if(pse_r[p] != NULL)
196                                                 printf("%s\n",pse_r[p]->to_string().c_str());
197                                         else
198                                                 printf("NULL\n");
199                                 }
200                         }
201                         if(query_plan[i]->node_type() == "sgah_qpn" || query_plan[i]->node_type() == "rsgah_qpn" || query_plan[i]->node_type() == "sgahcwcb_qpn"){
202                                 vector<scalarexp_t *> pseg;
203                 if(query_plan[i]->node_type() == "sgah_qpn")
204                                         pseg = ((sgah_qpn *)query_plan[i])->get_gb_sources();
205                 if(query_plan[i]->node_type() == "rsgah_qpn")
206                                         pseg = ((rsgah_qpn *)query_plan[i])->get_gb_sources();
207                 if(query_plan[i]->node_type() == "sgahcwcb_qpn")
208                                         pseg = ((sgahcwcb_qpn *)query_plan[i])->get_gb_sources();
209                                 int g;
210                                 for(g=0;g<pseg.size();g++){
211                                         if(pseg[g] != NULL)
212                                                 printf("\t\tgb %d = %s\n",g,pseg[g]->to_string().c_str());
213                                         else
214                                                 printf("\t\tgb %d = NULL\n",g);
215                                 }
216                         }
217                 }
218         }
219 */
220
221 }
222
223 bool stream_query::generate_linkage(){
224         bool create_failed = false;
225         int n, f,s;
226
227 //                      Clear any leftover linkages
228         for(n=0;n<query_plan.size();++n){
229                 if(query_plan[n]){
230                         query_plan[n]->clear_predecessors();
231                         query_plan[n]->clear_successors();
232                 }
233         }
234         qtail.clear();
235
236 //                      create name-to-index map
237         map<string, int> name_to_node;
238         for(n=0;n<query_plan.size();++n){
239                 if(query_plan[n]){
240                         name_to_node[query_plan[n]->get_node_name()] = n;
241                 }
242         }
243
244 //              Do the 2-way linkage.
245         for(n=0;n<query_plan.size();++n){
246                 if(query_plan[n]){
247                   vector<tablevar_t *> fm  = query_plan[n]->get_input_tbls();
248                   for(f=0;f<fm.size();++f){
249                         string src_tbl = fm[f]->get_schema_name();
250                         if(name_to_node.count(src_tbl)>0){
251                                 int s = name_to_node[src_tbl];
252                                 query_plan[n]->add_predecessor(s);
253                                 query_plan[s]->add_successor(n);
254                         }
255                   }
256                 }
257         }
258
259 //              Find the head (no successors) and the tails (at least one
260 //              predecessor is external).
261 //              Verify that there is only one head,
262 //              and that all other nodes have one successor (because
263 //               right now I can only handle trees).
264
265         qhead = -1;             // no head yet found.
266         for(n=0;n<query_plan.size();++n){
267                 if(query_plan[n]){
268                   vector<int> succ = query_plan[n]->get_successors();
269 /*
270                   if(succ.size() > 1){
271                         fprintf(stderr,"ERROR, query node %s supplies data to multiple nodes, but currently only tree query plans are supported:\n",query_plan[n]->get_node_name().c_str());
272                         for(s=0;s<succ.size();++s){
273                                 fprintf(stderr,"%s ",query_plan[succ[s]]->get_node_name().c_str());
274                         }
275                         fprintf(stderr,"\n");
276                         create_failed = true;
277                   }
278 */
279                   if(succ.size() == 0){
280                         if(qhead >= 0){
281                                 fprintf(stderr,"ERROR, query nodes %s and %s both have zero successors.\n",query_plan[n]->get_node_name().c_str(), query_plan[qhead]->get_node_name().c_str());
282                                 create_failed = true;
283                         }else{
284                                 qhead = n;
285                         }
286                   }
287                   if(query_plan[n]->n_predecessors() < query_plan[n]->get_input_tbls().size()){
288                         qtail.push_back(n);
289                   }
290                 }
291         }
292
293         return create_failed;
294 }
295
296 //              After the collection of query plan nodes is generated,
297 //              analyze their structure to link them up into a tree (or dag?).
298 //              Verify that the structure is acceptable.
299 //              Do some other analysis and verification tasks (?)
300 //              then gather summar information.
301 int stream_query::generate_plan(table_list *Schema){
302
303 //              The first thing to do is verify that the query plan
304 //              nodes were successfully created.
305         bool create_failed = false;
306         int n,f,s;
307         for(n=0;n<query_plan.size();++n){
308                 if(query_plan[n]!=NULL && query_plan[n]->get_error_code()){
309                         fprintf(stderr,"%s",query_plan[n]->get_error_str().c_str());
310                         create_failed = true;
311                 }
312         }
313 /*
314 for(n=0;n<query_plan.size();++n){
315 if(query_plan[n] != NULL){
316 string nstr = query_plan[n]->get_node_name();
317 printf("In generate_plan, node %d is %s, reads from:\n\t",n,nstr.c_str());
318 vector<tablevar_t *> inv = query_plan[n]->get_input_tbls();
319 int nn;
320 for(nn=0;nn<inv.size();nn++){
321 printf("%s (%d) ",inv[nn]->to_string().c_str(),inv[nn]->get_schema_ref());
322 }
323 printf("\n");
324 }
325 }
326 */
327
328         if(create_failed) return(1);
329
330 //              Here, link up the query nodes, then verify that the
331 //              structure is acceptable (single root, no cycles, no stranded
332 //              nodes, etc.)
333         create_failed = generate_linkage();
334         if(create_failed) return -1;
335
336
337 //              Here, do optimizations such as predicate pushing,
338 //              join rearranging, etc.
339 //                      Nothing to do yet.
340
341 /*
342 for(n=0;n<query_plan.size();++n){
343 if(query_plan[n] != NULL){
344 string nstr = query_plan[n]->get_node_name();
345 printf("B generate_plan, node %d is %s, reads from:\n\t",n,nstr.c_str());
346 vector<tablevar_t *> inv = query_plan[n]->get_input_tbls();
347 int nn;
348 for(nn=0;nn<inv.size();nn++){
349 printf("%s (%d) ",inv[nn]->to_string().c_str(),inv[nn]->get_schema_ref());
350 }
351 printf("\n");
352 }
353 }
354 printf("qhead=%d, qtail = ",qhead);
355 int nn;
356 for(nn=0;nn<qtail.size();++nn)
357 printf("%d ",qtail[nn]);
358 printf("\n");
359 */
360
361 //              Collect query summaries.  The query is reperesented by its head node.
362         query_name = query_plan[qhead]->get_node_name();
363         attributes = query_plan[qhead]->get_fields();
364 //              TODO: The params and defines require a lot more thought.
365         parameters = query_plan[qhead]->get_param_tbl();
366         defines = query_plan[qhead]->get_definitions();
367
368         return(0);
369   };
370
371 void stream_query::add_output_operator(ospec_str *o){
372         output_specs.push_back(o);
373 }
374
375
376 void stream_query::get_external_libs(set<string> &libset){
377
378         int qn,i;
379         for(qn=0;qn<query_plan.size();++qn){
380                 if(query_plan[qn] != NULL){
381                         vector<string> op_libs = query_plan[qn]->external_libs();
382                         for(i=0;i<op_libs.size();++i){
383                                 libset.insert(op_libs[i]);
384                         }
385                 }
386         }
387
388         for(qn=0;qn<output_operators.size();++qn){
389                 if(output_operators[qn] != NULL){
390                         vector<string> op_libs = output_operators[qn]->external_libs();
391                         for(i=0;i<op_libs.size();++i){
392                                 libset.insert(op_libs[i]);
393                         }
394                 }
395         }
396 }
397
398
399
400 //      Split into LFTA, HFTA components.
401 //              Split this query into LFTA and HFTA queries.
402 //              Four possible outcomes:
403 //              1) the query reads from a protocol, but does not need to
404 //                      split (can be evaluated as an LFTA).
405 //                      The lfta query is the only element in the return vector,
406 //                      and hfta_returned is false.
407 //              2) the query reads from no protocol, and therefore cannot be split.
408 //                      THe hfta query is the only element in the return vector,
409 //                      and hfta_returned is true.
410 //              3) reads from at least one protocol, but cannot be split : failure.
411 //                      return vector is empty, the error conditions are written
412 //                      in err_str and error_code
413 //              4) The query splits into an hfta query and one or more LFTA queries.
414 //                      the return vector has two or more elements, and hfta_returned
415 //                      is true.  The last element is the HFTA.
416
417 vector<stream_query *> stream_query::split_query(ext_fcn_list *Ext_fcns, table_list *Schema, bool &hfta_returned, ifq_t *ifdb, int n_virtual_ifaces, int hfta_parallelism, int hfta_idx){
418         vector<stream_query *> queries;
419         int l,q,s;
420         int qp_hfta;
421
422         hfta_returned = false;  // assume until proven otherwise
423
424         for(l=0;l<qtail.size();++l){
425                 int leaf = qtail[l];
426
427
428                 vector<qp_node *> qnodes = query_plan[leaf]->split_node_for_fta(Ext_fcns, Schema, qp_hfta, ifdb, n_virtual_ifaces, hfta_parallelism, hfta_idx);
429
430
431                 if(qnodes.size() == 0 || query_plan[leaf]->get_error_code()){   // error
432 //printf("error\n");
433                         error_code = query_plan[leaf]->get_error_code();
434                         err_str = query_plan[leaf]->get_error_str();
435                         vector<stream_query *> null_result;
436                         return(null_result);
437                 }
438                 if(qnodes.size() == 1 && qp_hfta){  // nothing happened
439 //printf("no change\n");
440                         query_plan[leaf] = qnodes[0];
441                 }
442                 if(qnodes.size() == 1 && !qp_hfta){     // push to lfta
443 //printf("lfta only\n");
444                         queries.push_back(new stream_query(qnodes[0], this));
445                         vector<int> succ = query_plan[leaf]->get_successors();
446                         for(s=0;s<succ.size();++s){
447                                 query_plan[succ[s]]->remove_predecessor(leaf);
448                         }
449                         query_plan[leaf] = NULL;        // delete it?
450                 }
451                 if(qnodes.size() > 1){  // actual splitting occurred.
452                         if(!qp_hfta){           // internal consistency check.
453                                 error_code = 1;
454                                 err_str = "INTERNAL ERROR: mulitple nodes returned by split_node_for_fta, but none are hfta nodes.\n";
455                                 vector<stream_query *> null_result;
456                                 return(null_result);
457                         }
458
459                         for(q=0;q<qnodes.size()-qp_hfta;++q){  // process lfta nodes
460 //printf("creating lfta %d (%s)\n",q,qnodes[q]->get_node_name().c_str());
461                                 queries.push_back(new stream_query(qnodes[q], this));
462                         }
463 //                                      Use new hfta node
464                         query_plan[leaf] = qnodes[qnodes.size()-1];
465 //                                      Add in any extra hfta nodes
466                         for(q=qnodes.size()-qp_hfta;q<qnodes.size()-1;++q)
467                                 query_plan.push_back(qnodes[q]);
468                 }
469         }
470
471     int n_ifprefs = 0;
472         set<string> ifps;
473         for(q=0;q<query_plan.size();++q){
474                 if(query_plan[q] != NULL){
475                          n_ifprefs += query_plan[q]->count_ifp_refs(ifps);
476                          hfta_returned = true;
477                 }
478         }
479
480         if(n_ifprefs){
481                 set<string>::iterator ssi;
482                 err_str += "ERROR, unresolved interface parameters in HFTA:\n";
483                 for(ssi=ifps.begin();ssi!=ifps.end();++ssi){
484                         err_str += (*ssi)+" ";
485                 }
486                 err_str += "\n";
487                 error_code = 3;
488                 vector<stream_query *> null_result;
489                 return(null_result);
490         }
491
492
493         if(hfta_returned){
494                 if(generate_linkage()){
495                         fprintf(stderr,"INTERNAL ERROR, generate_linkage failed in split_query.\n");
496                         exit(1);
497                 }
498                 queries.push_back(this);
499         }
500
501         return(queries);
502 }
503
504
505
506 vector<table_exp_t *> stream_query::extract_opview(table_list *Schema, vector<query_node *> &qnodes, opview_set &opviews, string silo_nm){
507         vector<table_exp_t *> subqueries;
508         int l,q;
509
510         string root_name = this->get_output_tabledef()->get_tbl_name();
511
512
513         for(l=0;l<qtail.size();++l){
514                 int leaf = qtail[l];
515                 vector<table_exp_t *> new_qnodes = query_plan[leaf]->extract_opview(Schema, qnodes, opviews, root_name, silo_nm);
516
517                 for(q=0;q<new_qnodes.size();++q){  // process lfta nodes
518                         subqueries.push_back( new_qnodes[q]);
519                 }
520         }
521
522         return(subqueries);
523 }
524
525
526
527
528 string stream_query::make_schema(){
529         return make_schema(qhead);
530 }
531
532 string stream_query::make_schema(int q){
533         string ret="FTA{\n\n";
534
535         ret += query_plan[q]->get_fields()->to_string();
536
537         ret += "DEFINE{\n";
538         ret += "\tquery_name '"+query_plan[q]->get_node_name()+"';\n";
539
540         map<string, string> defs = query_plan[q]->get_definitions();
541         map<string, string>::iterator dfi;
542         for(dfi=defs.begin(); dfi!=defs.end(); ++dfi){
543                 ret += "\t"+ (*dfi).first + " '" + (*dfi).second + "';\n";
544         }
545         ret += "}\n\n";
546
547         ret += "PARAM{\n";
548         param_table *params = query_plan[q]->get_param_tbl();
549         vector<string> param_names = params->get_param_names();
550         int p;
551         for(p=0;p<param_names.size();p++){
552                 data_type *dt = params->get_data_type( param_names[p] );
553                 ret += "\t" + param_names[p] + " '" + dt->get_type_str() + "';\n";
554         }
555         ret += "}\n";
556
557         ret += query_plan[q]->to_query_string();
558         ret += "\n}\n\n";
559
560         return(ret);
561
562 }
563
564 string stream_query::collect_refd_ifaces(){
565         string ret="";
566         int q;
567         for(q=0;q<query_plan.size();++q){
568                 if(query_plan[q]){
569                         map<string, string> defs = query_plan[q]->get_definitions();
570                         if(defs.count("_referenced_ifaces")){
571                                 if(ret != "") ret += ",";
572                                 ret += defs["_referenced_ifaces"];
573                         }
574                 }
575         }
576
577         return ret;
578 }
579
580
581 bool stream_query::stream_input_only(table_list *Schema){
582         vector<tablevar_t *> input_tbls = this->get_input_tables();
583         int i;
584         for(i=0;i<input_tbls.size();++i){
585                 int t = Schema->get_table_ref(input_tbls[i]->get_schema_name());
586                 if(Schema->get_schema_type(t) == PROTOCOL_SCHEMA) return(false);
587         }
588         return(true);
589 }
590
591 //              Return input tables.  No duplicate removal performed.
592 vector<tablevar_t *> stream_query::get_input_tables(){
593         vector<tablevar_t *> retval;
594
595 //                      create name-to-index map
596         int n;
597         map<string, int> name_to_node;
598         for(n=0;n<query_plan.size();++n){
599           if(query_plan[n]){
600                 name_to_node[query_plan[n]->get_node_name()] = n;
601           }
602         }
603
604         int l;
605         for(l=0;l<qtail.size();++l){
606                 int leaf = qtail[l];
607                 vector<tablevar_t *> tmp_v = query_plan[leaf]->get_input_tbls();
608                 int i;
609                 for(i=0;i<tmp_v.size();++i){
610                         if(name_to_node.count(tmp_v[i]->get_schema_name()) == 0)
611                                 retval.push_back(tmp_v[i]);
612                 }
613         }
614         return(retval);
615 }
616
617
618 void stream_query::compute_node_format(int q, vector<int> &nfmt, map<string, int> &op_idx){
619         int netcnt = 0, hostcnt = 0;
620         int i;
621
622         vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
623         for(i=0;i<itbls.size();++i){
624                 string tname = itbls[i]->get_schema_name();
625                 if(op_idx.count(tname)){
626                         int o = op_idx[tname];
627                         if(nfmt[o] == UNKNOWNFORMAT)
628                                 compute_node_format(o,nfmt,op_idx);
629                         if(nfmt[o] == NETFORMAT) netcnt++;
630                         else    hostcnt++;
631                 }else{
632                         netcnt++;
633                 }
634         }
635         if(query_plan[q]->makes_transform()){
636                 nfmt[q] = HOSTFORMAT;
637         }else{
638                 if(hostcnt>0){
639                         nfmt[q] = HOSTFORMAT;
640                 }else{
641                         nfmt[q] = NETFORMAT;
642                 }
643         }
644 //printf("query plan %d (%s) is ",q,query_plan[q]->get_node_name().c_str());
645 //if(nfmt[q] == HOSTFORMAT) printf(" host format.\n");
646 //else printf("net format\n");
647 }
648
649
650 string stream_query::generate_hfta(table_list *Schema, ext_fcn_list *Ext_fcns, opview_set &opviews, bool distributed_mode){
651         int schref, ov_ix, i, q, param_sz;
652         bool dag_graph = false;
653
654
655 //              Bind the SEs in all query plan nodes to this schema, and
656 //                      Add all tables used by this query to the schema.
657 //                      Question: Will I be polluting the global schema by adding all
658 //                      query node schemas?
659
660 //              First ensure all nodes are in the schema.
661         int qn;
662         for(qn=0;qn<query_plan.size();++qn){
663                 if(query_plan[qn] != NULL){
664                         Schema->add_table(query_plan[qn]->get_fields());
665                 }
666         }
667 //              Now do binding.
668         for(qn=0;qn<query_plan.size();++qn){
669                 if(query_plan[qn] != NULL){
670                         query_plan[qn]->bind_to_schema(Schema);
671                 }
672         }
673
674 //              Is it a DAG plan?
675         set<string> qsources;
676         int n;
677         for(n=0;n<query_plan.size();++n){
678           if(query_plan[n]){
679                 vector<tablevar_t *> tmp_v = query_plan[n]->get_input_tbls();
680                 int i;
681                 for(i=0;i<tmp_v.size();++i){
682                         if(qsources.count(tmp_v[i]->get_schema_name()) > 0)
683                                 dag_graph = true;
684                         qsources.insert(tmp_v[i]->get_schema_name());
685                 }
686           }
687         }
688
689
690
691 //              Collect set of tables ref'd in this HFTA
692         set<int> tbl_set;
693         for(qn=0;qn<query_plan.size();++qn){
694           if(query_plan[qn]){
695 //                      get names of the tables
696                 vector<tablevar_t *> input_tbls = query_plan[qn]->get_input_tbls();
697                 vector<tablevar_t *> output_tbls = query_plan[qn]->get_output_tbls();
698 //                      Convert to tblrefs, add to set of ref'd tables
699                 int i;
700                 for(i=0;i<input_tbls.size();i++){
701 //                      int t = Schema->get_table_ref(input_tbls[i]->get_schema_name());
702                         int t = input_tbls[i]->get_schema_ref();
703                         if(t < 0){
704                                 fprintf(stderr,"INTERNAL ERROR in generate_hfta. "
705                                                                 "query plan node %s references input table %s, which is not in schema.\n",
706                                                                 query_name.c_str(), input_tbls[i]->get_schema_name().c_str());
707                                 exit(1);
708                         }
709                         tbl_set.insert(t);
710                 }
711
712                 for(i=0;i<output_tbls.size();i++){
713                         int t = Schema->get_table_ref(output_tbls[i]->get_schema_name());
714                         if(t < 0){
715                                 fprintf(stderr,"INTERNAL ERROR in generate_hfta."
716                                                                 "query plan node %s references output table %s, which is not in schema.\n",
717                                                                 query_name.c_str(), output_tbls[i]->get_schema_name().c_str());
718                                 exit(1);
719                         }
720                         tbl_set.insert(t);
721                 }
722           }
723         }
724
725 //                      Collect map of lftas, query nodes
726         map<string, int> op_idx;
727         for(q=0;q<query_plan.size();q++){
728           if(query_plan[q]){
729                 op_idx[query_plan[q]->get_node_name()] = q;
730           }
731         }
732
733 //              map of input tables must include query id and input
734 //              source (0,1) becuase several queries might reference the same source
735         vector<tablevar_t *> input_tbls = this->get_input_tables();
736         vector<bool> input_tbl_free;
737         for(i=0;i<input_tbls.size();++i){
738                 input_tbl_free.push_back(true);
739         }
740         map<string, int> lfta_idx;
741 //fprintf(stderr,"%d input tables, %d query nodes\n",input_tbls.size(), query_plan.size());
742         for(q=0;q<query_plan.size();q++){
743           if(query_plan[q]){
744                 vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
745                 int it;
746                 for(it=0;it<itbls.size();it++){
747                         string tname = itbls[it]->get_schema_name()+"-"+int_to_string(q)+"-"+int_to_string(it);
748                         string src_tblname = itbls[it]->get_schema_name();
749                         bool src_is_external = false;
750                         for(i=0;i<input_tbls.size();++i){
751                                 if(src_tblname == input_tbls[i]->get_schema_name()){
752                                   src_is_external = true;
753                                   if(input_tbl_free[i]){
754                                         lfta_idx[tname] = i;
755                                         input_tbl_free[i] = false;
756 //fprintf(stderr,"Adding %s (src_tblname=%s, q=%d, it=%d) to %d.\n",tname.c_str(), src_tblname.c_str(), q, it, i);
757                                         break;
758                                   }
759                                 }
760                         }
761                         if(i==input_tbls.size() && src_is_external){
762                                 fprintf(stderr,"INTERNAL ERROR in stream_query::generate_hfta, can't find free entry in input_tbls for query %d, intput %d (%s)\n",q,it,src_tblname.c_str());
763                                 exit(1);
764                         }
765                 }
766           }
767         }
768 /*
769         for(i=0;i<input_tbls.size();++i){
770                 string src_tblname = input_tbls[i]->get_schema_name();
771                 lfta_idx[src_tblname] = i;
772         }
773 */
774
775 //              Compute the output formats of the operators.
776         vector<int> node_fmt(query_plan.size(),UNKNOWNFORMAT);
777         compute_node_format(qhead, node_fmt, op_idx);
778
779
780 //              Generate the schema strings for the outputs.
781         string schema_str;
782         for(i=0;i<query_plan.size();++i){
783                 if(i != qhead && query_plan[i]){
784                         string schema_tmpstr = this->make_schema(i);
785                         schema_str += "gs_csp_t node"+int_to_string(i)+"_schema = "+make_C_embedded_string(schema_tmpstr)+";\n";
786                 }
787         }
788
789         attributes = query_plan[qhead]->get_fields();
790
791
792         string schema_tmpstr = this->make_schema();
793         schema_str += "gs_csp_t "+generate_schema_string_name(query_name)+" = "+make_C_embedded_string(schema_tmpstr)+";\n";
794
795 //              Generate the collection of tuple defs.
796
797         string tuple_defs = "\n/*\tDefine tuple structures \t*/\n\n";
798         set<int>::iterator si;
799         for(si=tbl_set.begin(); si!=tbl_set.end(); ++si){
800                 tuple_defs += generate_host_tuple_struct( Schema->get_table( (*si) ));
801                 tuple_defs += "\n\n";
802         }
803
804 //              generate the finalize tuple function
805         string finalize_str = generate_hfta_finalize_tuple(attributes);
806
807 //              Analyze and make the output operators
808         bool eat_input = false;
809         string src_op = query_name;
810         string pred_op = src_op;
811         int last_op = -1;
812         if(output_specs.size()>0)
813                 eat_input = true;
814         if(n_successors>0)
815                 eat_input = false;      // must create stream output for successor HFTAs.
816         int n_filestreams = 0;
817         for(i=0;i<output_specs.size();++i){
818                 if(output_specs[i]->operator_type == "stream" ){
819                         eat_input = false;
820                 }
821                 if(output_specs[i]->operator_type == "file" || output_specs[i]->operator_type == "zfile" ){
822                         last_op = i;
823                         n_filestreams++;
824                 }
825         }
826         int filestream_id = 0;
827         for(i=0;i<output_specs.size();++i){
828                 if(output_specs[i]->operator_type == "file" || output_specs[i]->operator_type == "zfile"){
829
830                         int n_fstreams = output_specs[i]->n_partitions / n_parallel;
831                         if(n_fstreams * n_parallel < output_specs[i]->n_partitions){
832                                 n_fstreams++;
833                                 if(n_parallel == 1 || query_name.find("__copy1") != string::npos){
834                                         fprintf(stderr,"WARNING, in query %s, %d streams requested for %s output, but it must be a multiple of the hfta parallelism (%d), increasing number of output streams to %d.\n",query_name.c_str(), output_specs[i]->n_partitions,  output_specs[i]->operator_type.c_str(), n_parallel, n_fstreams*n_parallel);
835                                 }
836                         }
837 //                      output_file_qpn *new_ofq = new output_file_qpn();
838                         string filestream_tag = "";
839                         if(n_filestreams>1){
840                                 filestream_tag = "_fileoutput"+int_to_string(filestream_id);
841                                 filestream_id++;
842                         }
843                         output_file_qpn *new_ofq = new output_file_qpn(pred_op, src_op, filestream_tag, query_plan[qhead]->get_fields(), output_specs[i], (i==last_op ? eat_input : false) );
844 //                      if(n_fstreams > 1){
845                         if(n_fstreams > 0){
846                                 string err_str;
847                                 bool err_ret = new_ofq->set_splitting_params(n_parallel,parallel_idx,n_fstreams,output_specs[i]->partitioning_flds,err_str);
848                                 if(err_ret){
849                                         fprintf(stderr,"%s",err_str.c_str());
850                                         exit(1);
851                                 }
852                         }
853                         output_operators.push_back(new_ofq );
854                         pred_op = output_operators.back()->get_node_name();
855                 }else if(! (output_specs[i]->operator_type == "stream" || output_specs[i]->operator_type == "Stream" || output_specs[i]->operator_type == "STREAM") ){
856                         fprintf(stderr,"WARNING, output operator type %s (on query %s) is not supported, ignoring\n",output_specs[i]->operator_type.c_str(),query_name.c_str() );
857                 }
858         }
859
860
861
862 //              Generate functors for the query nodes.
863
864         string functor_defs = "\n/*\tFunctor definitions\t*/\n\n";
865         for(qn=0;qn<query_plan.size();++qn){
866                 if(query_plan[qn]!=NULL){
867 //                              Compute whether the input needs a ntoh xform.
868                         vector<bool> needs_xform;
869                         vector<tablevar_t *> itbls = query_plan[qn]->get_input_tbls();
870                         for(i=0;i<itbls.size();++i){
871                                 string tname = itbls[i]->get_schema_name();
872 //                              if(query_plan[qn]->makes_transform()){
873                                         if(op_idx.count(tname)>0){
874                                                 if(node_fmt[ op_idx[tname] ] == NETFORMAT){
875                                                         needs_xform.push_back(true);
876                                                 }else{
877                                                         needs_xform.push_back(false);
878                                                 }
879                                         }else{
880                                                 needs_xform.push_back(true);
881                                         }
882 //                              }else{
883 //                                      if(op_idx.count(tname)>0){
884 //                                              if(node_fmt[qn] != node_fmt[ op_idx[tname] ]){
885 //                                                      needs_xform.push_back(true);
886 //                                              }else{
887 //                                                      needs_xform.push_back(false);
888 //                                              }
889 //                                      }else{
890 //                                              if(node_fmt[qn] == HOSTFORMAT){
891 //                                                      needs_xform.push_back(true);
892 //                                              }else{
893 //                                                      needs_xform.push_back(false);
894 //                                              }
895 //                                      }
896 //                              }
897                         }
898
899                         functor_defs += query_plan[qn]->generate_functor(Schema, Ext_fcns, needs_xform);
900                 }
901         }
902
903 //                      Generate output operator functors
904
905         vector<bool> needs_xform;
906         for(i=0;i<output_operators.size();++i)
907                 functor_defs += output_operators[i]->generate_functor(Schema, Ext_fcns, needs_xform);
908
909         string ret =
910 "extern \"C\" {\n"
911 "#include <lapp.h>\n"
912 "#include <fta.h>\n"
913 "#include <gshub.h>\n"
914 "#include <stdlib.h>\n"
915 "#include <stdio.h>\n"
916 "#include <limits.h>\n"
917 "}\n"
918 ;
919         if(dag_graph)
920                 ret +=
921 "#define PLAN_DAG\n"
922 ;
923         ret +=
924 "#include <schemaparser.h>\n"
925 "#include<hfta_runtime_library.h>\n"
926 "\n"
927 "#include <host_tuple.h>\n"
928 "#include <hfta.h>\n"
929 "#include <hfta_udaf.h>\n"
930 "#include <hfta_sfun.h>\n"
931 "\n"
932 //"#define MAXSCHEMASZ 16384\n"
933 "#include <stdio.h>\n\n"
934 ;
935
936 //              Get include file for each of the operators.
937 //              avoid duplicate inserts.
938         set<string> include_fls;
939         for(qn=0;qn<query_plan.size();++qn){
940                 if(query_plan[qn] != NULL)
941                         include_fls.insert(query_plan[qn]->get_include_file());
942         }
943         for(i=0;i<output_operators.size();++i)
944                         include_fls.insert(output_operators[i]->get_include_file());
945         set<string>::iterator ssi;
946         for(ssi=include_fls.begin();ssi!=include_fls.end();++ssi)
947                 ret += (*ssi);
948
949 //              Add defines for hash functions
950         ret +=
951 "\n"
952 "#define hfta_BOOL_to_hash(x) (x)\n"
953 "#define hfta_USHORT_to_hash(x) (x)\n"
954 "#define hfta_UINT_to_hash(x) (x)\n"
955 "#define hfta_IP_to_hash(x) (x)\n"
956 "#define hfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
957 "#define hfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
958 "#define hfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
959 "#define hfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
960 "#define hfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
961 "\n"
962 ;
963
964 //      ret += "#define SERIOUS_LFTA \""+input_tbls[0]->get_schema_name()+"\"\n";
965         ret += "#define OUTPUT_HFTA \""+query_name+"\"\n\n";
966
967 //              HACK ALERT: I know for now that all query plans are
968 //              single operator plans, but while SPX and SGAH can use the
969 //              UNOP template, the merge operator must use the MULTOP template.
970 //              WORSE HACK ALERT : merge does not translate its input,
971 //              so don't apply finalize to the output.
972 //              TODO: clean this up.
973
974 //      string node_type = query_plan[0]->node_type();
975
976         ret += schema_str;
977         ret += tuple_defs;
978
979 //                      Need to work on the input, output xform logic.
980 //                      For now, just add it in.
981 //      ret += finalize_str;
982
983         if(node_fmt[qhead] == NETFORMAT){
984                 ret +=
985 "void finalize_tuple(host_tuple &tup){\n"
986 "return;\n"
987 "}\n"
988 "\n";
989         }else{
990                 ret += finalize_str;
991         }
992
993         ret += functor_defs;
994
995 //                      Parameter block management
996 //                      The proper parameter block must be transmitted to each
997 //                      external stream source.
998 //                      There is a 1-1 mapping between the param blocks returned
999 //                      by this list and the registered data sources ...
1000 //                      TODO: make this more manageable, but for now
1001 //                      there is no parameter block manipulation so I just
1002 //                      need to have the same number.
1003
1004         ret +=
1005 "int get_lfta_params(gs_int32_t sz, void * value,list<param_block>& lst){\n"
1006 "               // for now every  lfta receive the full copy of hfta parameters\n"
1007 "        struct param_block pb;\n";
1008
1009         set<string> lfta_seen;
1010         for(i=0;i<input_tbls.size();++i){
1011                 string src_tblname = input_tbls[i]->get_schema_name();
1012                 if(lfta_seen.count(src_tblname) == 0){
1013                   lfta_seen.insert(src_tblname);
1014                   schref = input_tbls[i]->get_schema_ref();
1015                   if(Schema->get_schema_type(schref) == OPERATOR_VIEW_SCHEMA){
1016                         ov_ix = input_tbls[i]->get_opview_idx();
1017                         opview_entry *opv = opviews.get_entry(ov_ix);
1018                         string op_param = "SUBQ:";
1019                         int q;
1020                         for(q=0;q<opv->subq_names.size();++q){
1021                                 if(q>0) op_param+=",";
1022                                 op_param+=opv->subq_names[q];
1023                         }
1024                         op_param+="\\n";
1025                         param_sz = op_param.size()-1;
1026
1027                         sprintf(tmpstr,"\t\tpb.block_length = %d;\n",param_sz); ret+=tmpstr;
1028                         ret+=
1029 "        pb.data = malloc(pb.block_length);\n";
1030                         ret+="\t\tmemcpy(pb.data,\""+op_param+"\",pb.block_length);\n"
1031 "        lst.push_back(pb);\n\n";
1032                   }else{
1033                         ret+=
1034 "        pb.block_length = sz;\n"
1035 "        pb.data = malloc(pb.block_length);\n"
1036 "        memcpy(pb.data, value, pb.block_length);\n"
1037 "        lst.push_back(pb);\n\n";
1038                   }
1039                 }
1040         }
1041
1042         ret +=
1043 "        return 0;\n"
1044 "}\n"
1045 "\n";
1046
1047                 ret+=
1048 "struct FTA* alloc_hfta (struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command, gs_int32_t sz, void * value ) {\n"
1049 "\n"
1050 "       // find the lftas\n"
1051 "       list<lfta_info*> *lfta_list = new list<lfta_info*>;\n"
1052 "\n"
1053 "       FTAID f;\n"
1054 "       char schemabuf[MAXSCHEMASZ];\n"
1055 "       gs_schemahandle_t schema_handle;\n"
1056 "\n";
1057
1058 //              Register the input data sources.
1059 //              Register a source only once.
1060
1061 //      vector<string> ext_reg_txt;
1062         map<string, int> input_tbl_srcid;
1063         for(i=0;i<input_tbls.size();++i){
1064                 string src_tblname = input_tbls[i]->get_schema_name();
1065 //                      Use UDOP alias when in distributed mode.
1066 //                      the cluster manager will make the translation
1067 //                      using infr from qtree.xml
1068                 if(distributed_mode && input_tbls[i]->get_udop_alias() != "")
1069                         src_tblname = input_tbls[i]->get_udop_alias();
1070         if(input_tbl_srcid.count(src_tblname) == 0){
1071                 int srcid = input_tbl_srcid.size();
1072                 input_tbl_srcid[src_tblname] = srcid;
1073                 string tmp_s=
1074 "\n     // find "+src_tblname+"\n"
1075 "       if (fta_find(\""+src_tblname+"\",1,&f,schemabuf,MAXSCHEMASZ)!=0) {\n"
1076 "                           fprintf(stderr,\"HFTA::error:could not find LFTA \\n\");\n"
1077 "                           return 0;\n"
1078 "       }\n"
1079 "       //fprintf(stderr,\"HFTA::FTA found at %u[%u]\\n\",ftamsgid,ftaindex);\n"
1080 "\n"
1081 "       // parse the schema and get the schema handle\n"
1082 "       schema_handle = ftaschema_parse_string(schemabuf);\n"
1083 "       lfta_info* inf"+int_to_string(srcid)+" = new lfta_info();\n"
1084 "       inf"+int_to_string(srcid)+"->f = f;\n"
1085 "       inf"+int_to_string(srcid)+"->fta_name = strdup(\""+src_tblname+"\");\n"
1086 "       inf"+int_to_string(srcid)+"->schema = strdup(schemabuf);\n"
1087 "       inf"+int_to_string(srcid)+"->schema_handle = schema_handle;\n"
1088 "       lfta_list->push_back(inf"+int_to_string(srcid)+");\n\n";
1089 //              ext_reg_txt.push_back(tmp_s);
1090                 ret += tmp_s;
1091           }
1092         }
1093
1094         ret+="\n";
1095         ret += "\tgs_schemahandle_t root_schema_handle = ftaschema_parse_string("+generate_schema_string_name(query_name)+");\n";
1096         for(i=0;i<query_plan.size();++i){
1097                 if(i != qhead && query_plan[i]){
1098                         ret += "\tgs_schemahandle_t op"+int_to_string(i)+"_schema_handle = ftaschema_parse_string(node"+int_to_string(i)+"_schema);\n";
1099                 }
1100         }
1101         ret+="\n";
1102
1103 //              Create the operators.
1104         int n_basic_ops;
1105         for(q=0;q<query_plan.size();q++){
1106           if(query_plan[q]){
1107                 ret+=
1108 "\n"
1109 "       // create an instance of operator "+int_to_string(q)+" ("+query_plan[q]->get_node_name()+")     \n";
1110
1111 //                      Create parameters for operator construction.
1112                 string op_params;
1113                 vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
1114                 string tname = itbls[0]->get_schema_name();
1115 //              string li_tname = tname +"-"+int_to_string(q)+"-0";
1116 //              if(lfta_idx.count(li_tname)>0)
1117                 if(input_tbl_srcid.count(tname)>0){
1118 //                      ret += ext_reg_txt[lfta_idx[li_tname]];
1119 //                      op_params += "inf"+int_to_string( lfta_idx[li_tname] )+"->schema_handle";
1120                         op_params += "inf"+int_to_string( input_tbl_srcid[tname] )+"->schema_handle";
1121                 }else if(op_idx.count(tname)>0){
1122                         op_params += "op"+int_to_string( op_idx[tname] )+"_schema_handle";
1123                 }else{
1124                         fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when creating operator (3) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str());
1125                         exit(1);
1126                 }
1127                 if(itbls.size()>1){
1128                         string tname = itbls[1]->get_schema_name();
1129 //                      string li_tname = tname +"-"+int_to_string(q)+"-1";
1130 //                      if(lfta_idx.count(li_tname)>0)
1131                         if(input_tbl_srcid.count(tname)>0){
1132 //                              ret += ext_reg_txt[lfta_idx[li_tname]];
1133 //                              op_params += ",inf"+int_to_string( lfta_idx[li_tname] )+"->schema_handle";
1134                                 op_params += ",inf"+int_to_string( input_tbl_srcid[tname] )+"->schema_handle";
1135                         }else if(op_idx.count(tname)>0){
1136                                 op_params += ",op"+int_to_string( op_idx[tname] )+"_schema_handle";
1137                         }else{
1138                                 fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when creating operator (4) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str());
1139                                 exit(1);
1140                         }
1141                 }
1142                 ret += query_plan[q]->generate_operator(q,op_params);
1143                 ret +=
1144 "       operator_node* node"+int_to_string(q)+" = new operator_node(op"+int_to_string(q)+");\n";
1145
1146                 n_basic_ops = q;
1147           }
1148         }
1149         n_basic_ops++;
1150 //                      Next for the output operators if any
1151         for(i=0;i<output_operators.size();++i){
1152                 ret += output_operators[i]->generate_operator(n_basic_ops+i,"root_schema_handle");
1153                 ret +=
1154 "       operator_node* node"+int_to_string(n_basic_ops+i)+" = new operator_node(op"+int_to_string(n_basic_ops+i)+");\n";
1155         }
1156
1157
1158 //              Link up operators.
1159         for(q=0;q<query_plan.size();++q){
1160           if(query_plan[q]){
1161 //                      NOTE: this code assume that the operator has at most
1162 //                      two inputs.  But the template code also makes
1163 //                      this assumption.  Both will need to be changed.
1164                 vector<tablevar_t *> itbls = query_plan[q]->get_input_tbls();
1165                 string tname = itbls[0]->get_schema_name();
1166 //              string li_tname = tname +"-"+int_to_string(q)+"-0";
1167 //              if(lfta_idx.count(li_tname)>0)
1168                 if(input_tbl_srcid.count(tname)>0){
1169 //                      ret += "\tnode"+int_to_string(q)+"->set_left_lfta(inf"+int_to_string( lfta_idx[li_tname] )+");\n";
1170                         ret += "\tnode"+int_to_string(q)+"->set_left_lfta(inf"+int_to_string( input_tbl_srcid[tname] )+");\n";
1171                 }else if(op_idx.count(tname)>0){
1172                         ret += "\tnode"+int_to_string(q)+"->set_left_child_node(node"+int_to_string( op_idx[tname] )+");\n";
1173                 }else{
1174                         fprintf(stderr,"INTERNAL ERROR, can't identify input table %s when linking operator (1) %d (%s)\n",tname.c_str(),q,query_plan[q]->get_node_name().c_str());
1175                         exit(1);
1176                 }
1177                 if(itbls.size()>1){
1178                         string tname = itbls[1]->get_schema_name();
1179 //                      string li_tname = tname +"-"+int_to_string(q)+"-1";
1180 //                      if(lfta_idx.count(li_tname)>0)
1181                         if(input_tbl_srcid.count(tname)>0){
1182 //                              ret += "\tnode"+int_to_string(q)+"->set_right_lfta(inf"+int_to_string( lfta_idx[li_tname] )+");\n";
1183                                 ret += "\tnode"+int_to_string(q)+"->set_right_lfta(inf"+int_to_string( input_tbl_srcid[tname] )+");\n";
1184                         }else if(op_idx.count(tname)>0){
1185                                 ret += "\tnode"+int_to_string(q)+"->set_right_child_node(node"+int_to_string( op_idx[tname] )+");\n";
1186                         }else{
1187                                 fprintf(stderr,"INTERNAL ERROR, can't identify input table %s (%s) when linking operator (2) %d (%s)\n",tname.c_str(), tname.c_str(),q,query_plan[q]->get_node_name().c_str());
1188                                 exit(1);
1189                         }
1190                 }
1191           }
1192         }
1193         for(i=0;i<output_operators.size();++i){
1194                 if(i==0)
1195                         ret += "\tnode"+int_to_string(n_basic_ops)+"->set_left_child_node(node"+int_to_string( qhead )+");\n";
1196                 else
1197                         ret += "\tnode"+int_to_string(n_basic_ops+i)+"->set_left_child_node(node"+int_to_string( n_basic_ops+i-1 )+");\n";
1198         }
1199
1200         // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
1201
1202         bool fta_reusable = false;
1203         if (query_plan[qhead]->get_val_of_def("reusable") == "yes" ||
1204                 query_plan[qhead]->get_param_tbl()->size() == 0) {
1205                 fta_reusable = 1;
1206         }
1207
1208         int root_node = qhead;
1209         if(output_operators.size()>0)
1210                 root_node = n_basic_ops+i-1;
1211
1212         ret+=
1213 "\n"
1214 "\n"
1215 "       MULTOP_HFTA* ftap = new MULTOP_HFTA(ftaid, OUTPUT_HFTA, command, sz, value, root_schema_handle, node"+int_to_string(root_node)+", lfta_list, " + (fta_reusable ?"true":"false") + ", reusable);\n"
1216 "       if(ftap->init_failed()){ delete ftap; return 0;}\n"
1217 "       return (FTA*)ftap;\n"
1218 "}\n"
1219 "\n"
1220 "\n";
1221
1222
1223         string comm_bufsize = "16*1024*1024";
1224         if(defines.count("hfta_comm_buf")>0){
1225                 comm_bufsize = defines["hfta_comm_buf"];
1226         }
1227
1228         ret+=
1229 "\n"
1230 "int main(int argc, char * argv[]) {\n"
1231 "\n"
1232 "\n"
1233 "   /* parse the arguments */\n"
1234 "\n"
1235 "    gs_int32_t tip1,tip2,tip3,tip4;\n"
1236 "    endpoint gshub;\n"
1237 "    gs_sp_t instance_name;\n"
1238 "    if (argc<3) {\n"
1239 "                gslog(LOG_EMERG,\"Wrong arguments at startup\");\n"
1240 "        exit(1);\n"
1241 "    }\n"
1242 "\n"
1243 "    if ((sscanf(argv[1],\"%u.%u.%u.%u:%hu\",&tip1,&tip2,&tip3,&tip4,&(gshub.port)) != 5)) {\n"
1244 "        gslog(LOG_EMERG,\"HUB IP NOT DEFINED\");\n"
1245 "        exit(1);\n"
1246 "    }\n"
1247 "    gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);\n"
1248 "    gshub.port=htons(gshub.port);\n"
1249 "    instance_name=strdup(argv[2]);\n"
1250 "    if (set_hub(gshub)!=0) {\n"
1251 "        gslog(LOG_EMERG,\"Could not set hub\");\n"
1252 "        exit(1);\n"
1253 "    }\n"
1254 "    if (set_instance_name(instance_name)!=0) {\n"
1255 "        gslog(LOG_EMERG,\"Could not set instance name\");\n"
1256 "        exit(1);\n"
1257 "    }\n"
1258 "\n"
1259 "\n"
1260 "    /* initialize host library  */\n"
1261 "\n"
1262 //"    fprintf(stderr,\"Initializing gscp\\n\");\n"
1263 "    gsopenlog(argv[0]);\n"
1264 "\n"
1265 "    if (hostlib_init(HFTA, "+comm_bufsize+", DEFAULTDEV, 0, 0)!=0) {\n"
1266 "        fprintf(stderr,\"%s::error:could not initialize gscp\\n\",\n"
1267 "               argv[0]);\n"
1268 "        exit(1);\n"
1269 "    }\n"
1270 "\n"
1271 "\n"
1272 "\n"
1273 "    FTAID ret = fta_register(OUTPUT_HFTA, " + (fta_reusable?"1":"0") + ", DEFAULTDEV, alloc_hfta, "+generate_schema_string_name(query_name)+", -1, 0ull);\n"
1274 "    fta_start_service(-1);\n"
1275 "\n"
1276 "   return 0;\n"
1277 "\n"
1278 "}\n"
1279 "\n";
1280 ////////////////////
1281         return(ret);
1282 }
1283
1284
1285 //              Checks if the node i is compatible with interface partitioning
1286 //              (can be pushed below the merge that combines partitioned stream)
1287 //              i - index of the query node in a query plan
1288 bool stream_query::is_partn_compatible(int index, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result) {
1289         int i;
1290         qp_node* node = query_plan[index];
1291         qp_node* child_node = NULL;
1292
1293         if (node->predecessors.empty())
1294                 return false;
1295
1296         // all the node predecessors must be partition merges with the same partition definition
1297         partn_def_t* partn_def = NULL;
1298         for (i = 0; i < node->predecessors.size(); ++i) {
1299                 child_node = query_plan[node->predecessors[i]];
1300                 if (child_node->node_type() != "mrg_qpn")
1301                         return false;
1302
1303                 // merge must have only one parent for this optimization to work
1304                 // check that all its successors are the same
1305                 for (int j = 1; j < child_node->successors.size(); ++j) {
1306                         if (child_node->successors[j] != child_node->successors[0])
1307                                 return false;
1308                 }
1309
1310                 partn_def_t* new_partn_def = ((mrg_qpn*)child_node)->get_partn_definition(lfta_names, interface_names, machine_names, ifaces_db, partn_parse_result);
1311                 if (!new_partn_def)
1312                         return false;
1313                 if (!i)
1314                         partn_def = new_partn_def;
1315                 else if (new_partn_def != partn_def)
1316                         return false;
1317
1318         }
1319
1320         if (node->node_type() == "spx_qpn")     // spx nodes are always partition compatible
1321                 return true;
1322         else if (node->node_type() == "sgah_qpn") {
1323                 gb_table gb_tbl = ((sgah_qpn*)node)->gb_tbl;
1324                 return true; //partn_def->is_compatible(&gb_tbl);
1325         }
1326         else if (node->node_type() == "rsgah_qpn") {
1327                 gb_table gb_tbl = ((rsgah_qpn*)node)->gb_tbl;
1328                 return partn_def->is_compatible(&gb_tbl);
1329         }
1330         else if (node->node_type() == "sgahcwcb_qpn") {
1331                 gb_table gb_tbl = ((sgahcwcb_qpn*)node)->gb_tbl;
1332                 return partn_def->is_compatible(&gb_tbl);
1333         }
1334         else if (node->node_type() == "join_eq_hash_qpn") {
1335                 return true;
1336         }
1337         else
1338                 return false;
1339 }
1340
1341 //              Push the operator below the merge that combines
1342 void stream_query::pushdown_partn_operator(int index) {
1343         qp_node* node = query_plan[index];
1344         int i;
1345         char node_index[128];
1346
1347 //      fprintf(stderr, "Partn pushdown, query %s of type %s\n", node->get_node_name().c_str(), node->node_type().c_str());
1348
1349
1350         // HACK ALERT: When reordering merges we screw up slack computation
1351         // since slack should no longer be used, it is not an issue
1352
1353
1354         // we can safely reorder nodes that have one and only one temporal atribute in select list
1355         table_def* table_layout = node->get_fields();
1356         vector<field_entry*> fields = table_layout->get_fields();
1357         int merge_fieldpos = -1;
1358
1359         data_type* dt;
1360         for (i = 0; i < fields.size(); ++i) {
1361                 data_type dt(fields[i]->get_type(),fields[i]->get_modifier_list());
1362                 if(dt.is_temporal()) {
1363                         if (merge_fieldpos != -1)       // more that one temporal field found
1364                                 return;
1365                         merge_fieldpos = i;
1366                 }
1367         }
1368
1369         if (merge_fieldpos == -1)       // if no temporal fieldf found
1370                 return;
1371
1372         std::vector<colref_t *> mvars;                  // the merge-by columns.
1373
1374         // reodring procedure is different for unary operators and joins
1375         if (node->node_type() == "join_eq_hash_qpn") {
1376                 vector<qp_node*> new_nodes;
1377
1378                 tablevar_t *left_table_name;
1379                 tablevar_t *right_table_name;
1380                 mrg_qpn* left_mrg = (mrg_qpn*)query_plan[node->predecessors[0]];
1381                 mrg_qpn* right_mrg = (mrg_qpn*)query_plan[node->predecessors[1]];
1382
1383                 // for now we will only consider plans where both child merges
1384                 // merge the same set of streams
1385
1386                 if (left_mrg->fm.size() != right_mrg->fm.size())
1387                         return;
1388
1389                 // maping of interface names to table definitions
1390                 map<string, tablevar_t*> iface_map;
1391                 for (i = 0; i < left_mrg->fm.size(); i++) {
1392                         left_table_name = left_mrg->fm[i];
1393                         iface_map[left_table_name->get_machine() + left_table_name->get_interface()] = left_table_name;
1394                 }
1395
1396                 for (i = 0; i < right_mrg->fm.size(); i++) {
1397                         right_table_name = right_mrg->fm[i];
1398
1399                         // find corresponding left tabke
1400                         if (!iface_map.count(right_table_name->get_machine() + right_table_name->get_interface()))
1401                                 return;
1402
1403                         left_table_name = iface_map[right_table_name->get_machine() + right_table_name->get_interface()];
1404
1405                         // create new join nodes
1406                         sprintf(node_index, "_%d", i);
1407                         join_eq_hash_qpn* new_node = (join_eq_hash_qpn*)node->make_copy(node_index);
1408
1409                         // make a copy of right_table_name
1410                         right_table_name = right_table_name->duplicate();
1411                         left_table_name->set_range_var(((join_eq_hash_qpn*)node)->from[0]->get_var_name());
1412                         right_table_name->set_range_var(((join_eq_hash_qpn*)node)->from[1]->get_var_name());
1413                         new_node->from[0] = left_table_name;
1414                         new_node->from[1] = right_table_name;
1415                         new_nodes.push_back(new_node);
1416                 }
1417
1418                 // make right_mrg a new root
1419                 right_mrg->set_node_name(node->get_node_name());
1420                 right_mrg->table_layout = table_layout;
1421                 right_mrg->merge_fieldpos = merge_fieldpos;
1422
1423                 for (i = 0; i < right_mrg->fm.size(); i++) {
1424                         // make newly create joins children of merge
1425                         sprintf(node_index, "_%d", i);
1426                         right_mrg->fm[i] = new tablevar_t(right_mrg->fm[i]->get_machine().c_str(), right_mrg->fm[i]->get_interface().c_str(), (node->get_node_name() + string(node_index)).c_str());
1427                         sprintf(node_index, "_m%d", i);
1428                         right_mrg->fm[i]->set_range_var(node_index);
1429                         right_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos));
1430
1431                 }
1432
1433                 if (left_mrg != right_mrg)
1434                         query_plan[node->predecessors[0]] = NULL;       // remove left merge from the plan
1435
1436                 query_plan.insert(query_plan.end(), new_nodes.begin(), new_nodes.end());
1437
1438         } else {        // unary operator
1439                 // get the child merge node
1440                 mrg_qpn* child_mrg = (mrg_qpn*)query_plan[node->predecessors[0]];
1441
1442                 child_mrg->set_node_name(node->get_node_name());
1443                 child_mrg->table_layout = table_layout;
1444                 child_mrg->merge_fieldpos = merge_fieldpos;
1445
1446                 // create new nodes for every source stream
1447                 for (i = 0; i < child_mrg->fm.size(); i++) {
1448                         tablevar_t *table_name = child_mrg->fm[i];
1449                         sprintf(node_index, "_%d", i);
1450                         qp_node* new_node = node->make_copy(node_index);
1451
1452                         if (node->node_type() == "spx_qpn")
1453                                 ((spx_qpn*)new_node)->table_name = table_name;
1454                         else if (node->node_type() == "sgah_qpn")
1455                                 ((sgah_qpn*)new_node)->table_name = table_name;
1456                         else if (node->node_type() == "rsgah_qpn")
1457                                 ((rsgah_qpn*)new_node)->table_name = table_name;
1458                         else if (node->node_type() == "sgahcwcb_qpn")
1459                                 ((sgahcwcb_qpn*)new_node)->table_name = table_name;
1460                         table_name->set_range_var("_t0");
1461
1462                         child_mrg->fm[i] = new tablevar_t(table_name->get_machine().c_str(), table_name->get_interface().c_str(), new_node->get_node_name().c_str());
1463                         sprintf(node_index, "_m%d", i);
1464                         child_mrg->fm[i]->set_range_var(node_index);
1465                         child_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos));
1466
1467                         // add new node to query plan
1468                         query_plan.push_back(new_node);
1469                 }
1470         }
1471         query_plan[index] = NULL;
1472         generate_linkage();
1473 }
1474
1475
1476 //              Checks if the node i can be pushed below the merge
1477   bool stream_query::is_pushdown_compatible(int index, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names) {
1478
1479         int i;
1480         qp_node* node = query_plan[index];
1481         qp_node* child_node = NULL;
1482
1483         if (node->predecessors.size() != 1)
1484                 return false;
1485
1486         // node predecessor must be merge that combine streams from multiple hosts
1487         child_node = query_plan[node->predecessors[0]];
1488         if (child_node->node_type() != "mrg_qpn")
1489                 return false;
1490
1491         if (!((mrg_qpn*)child_node)->is_multihost_merge())
1492                 return false;
1493
1494         // merge must have only one parent for this optimization to work
1495         // check that all its successors are the same
1496         for (int j = 1; j < child_node->successors.size(); ++j) {
1497                 if (child_node->successors[j] != child_node->successors[0])
1498                         return false;
1499         }
1500
1501         // selections can always be pushed down, aggregations can always be split into selection/aggr or aggr/aggr pair
1502         // and pushed down
1503         if (node->node_type() == "spx_qpn")
1504                 return true;
1505         else if (node->node_type() == "sgah_qpn")
1506                 return true;
1507         else
1508                 return false;
1509
1510   }
1511
1512 //              Push the operator below the merge
1513   void stream_query::pushdown_operator(int index, ext_fcn_list *Ext_fcns, table_list *Schema) {
1514         qp_node* node = query_plan[index];
1515         int i;
1516         char node_suffix[128];
1517
1518         // we can only safely push down queries that have one and only one temporal atribute in select list
1519         table_def* table_layout = node->get_fields();
1520         vector<field_entry*> fields = table_layout->get_fields();
1521         int merge_fieldpos = -1;
1522
1523         data_type* dt;
1524         for (i = 0; i < fields.size(); ++i) {
1525                 data_type dt(fields[i]->get_type(),fields[i]->get_modifier_list());
1526                 if(dt.is_temporal()) {
1527                         if (merge_fieldpos != -1)       // more that one temporal field found
1528                                 return;
1529                         merge_fieldpos = i;
1530                 }
1531         }
1532
1533         if (merge_fieldpos == -1)       // if no temporal field found
1534                 return;
1535
1536         std::vector<colref_t *> mvars;                  // the merge-by columns.
1537
1538         fprintf(stderr, "Regular pushdown, query %s of type %s\n", node->get_node_name().c_str(), node->node_type().c_str());
1539
1540         // get the child merge node
1541         mrg_qpn* child_mrg = (mrg_qpn*)query_plan[node->predecessors[0]];
1542
1543         tablevar_t *table_name = NULL;
1544
1545
1546         if (node->node_type() == "spx_qpn") {
1547                 // get the child merge node
1548
1549                 // create new nodes for every source stream
1550                 for (i = 0; i < child_mrg->fm.size(); i++) {
1551                         table_name = child_mrg->fm[i];
1552                         sprintf(node_suffix, "_%d", i);
1553                         qp_node* new_node = node->make_copy(node_suffix);
1554
1555                         ((spx_qpn*)new_node)->table_name = table_name;
1556                         table_name->set_range_var("_t0");
1557
1558                         child_mrg->fm[i] = new tablevar_t(table_name->get_machine().c_str(), table_name->get_interface().c_str(), new_node->get_node_name().c_str());
1559                         sprintf(node_suffix, "_m%d", i);
1560                         child_mrg->fm[i]->set_range_var(node_suffix);
1561                         child_mrg->mvars[i]->set_field(table_layout->get_field_name(merge_fieldpos));
1562
1563                         // add new node to query plan
1564                         query_plan.push_back(new_node);
1565                 }
1566                 child_mrg->table_layout = table_layout;
1567                 child_mrg->merge_fieldpos = merge_fieldpos;
1568
1569         } else {                // aggregation node
1570
1571                 vector<qp_node*> new_nodes;
1572
1573                 // split aggregations into high and low-level part
1574                 vector<qp_node *> split_nodes = ((sgah_qpn*)node)->split_node_for_hfta(Ext_fcns, Schema);
1575                 if (split_nodes.size() != 2)
1576                         return;
1577
1578                 sgah_qpn* super_aggr = (sgah_qpn*)split_nodes[1];
1579                 super_aggr->table_name = ((sgah_qpn*)node)->table_name;
1580
1581                 // group all the sources by host
1582                 map<string, vector<int> > host_map;
1583                 for (i = 0; i < child_mrg->fm.size(); i++) {
1584                         tablevar_t *table_name = child_mrg->fm[i];
1585                         if (host_map.count(table_name->get_machine()))
1586                                 host_map[table_name->get_machine()].push_back(i);
1587                         else {
1588                                 vector<int> tables;
1589                                 tables.push_back(i);
1590                                 host_map[table_name->get_machine()] = tables;
1591                         }
1592                 }
1593
1594                 // create a new merge and low-level aggregation for each host
1595                 map<string, vector<int> >::iterator iter;
1596                 for (iter = host_map.begin(); iter != host_map.end(); iter++) {
1597                         string host_name = (*iter).first;
1598                         vector<int> tables = (*iter).second;
1599
1600                         sprintf(node_suffix, "_%s", host_name.c_str());
1601                         string suffix(node_suffix);
1602                         untaboo(suffix);
1603                         mrg_qpn *new_mrg = (mrg_qpn *)child_mrg->make_copy(suffix);
1604                         for (i = 0; i < tables.size(); ++i) {
1605                                 sprintf(node_suffix, "_m%d", i);
1606                                 new_mrg->fm.push_back(child_mrg->fm[tables[i]]);
1607                                 new_mrg->mvars.push_back(child_mrg->mvars[i]);
1608                                 new_mrg->fm[i]->set_range_var(node_suffix);
1609                         }
1610                         qp_node* new_node = split_nodes[0]->make_copy(suffix);
1611
1612                         if (new_node->node_type() == "spx_qpn")  {
1613                                 ((spx_qpn*)new_node)->table_name = new tablevar_t(host_name.c_str(), "IFACE", new_mrg->get_node_name().c_str());
1614                                 ((spx_qpn*)new_node)->table_name->set_range_var("_t0");
1615                         } else {
1616                                 ((sgah_qpn*)new_node)->table_name = new tablevar_t(host_name.c_str(), "IFACE", new_mrg->get_node_name().c_str());
1617                                 ((sgah_qpn*)new_node)->table_name->set_range_var("_t0");
1618                         }
1619                         query_plan.push_back(new_mrg);
1620                         new_nodes.push_back(new_node);
1621                 }
1622
1623                 child_mrg->merge_fieldpos = merge_fieldpos;
1624                 if (split_nodes[0]->node_type() == "spx_qpn")
1625                         child_mrg->table_layout = create_attributes(child_mrg->get_node_name(), ((spx_qpn*)split_nodes[0])->select_list);
1626                 else
1627                         child_mrg->table_layout = create_attributes(child_mrg->get_node_name(), ((sgah_qpn*)split_nodes[0])->select_list);
1628
1629
1630                 // connect newly created nodes with parent multihost merge
1631                 for (i = 0; i < new_nodes.size(); ++i) {
1632                         if (new_nodes[i]->node_type() == "spx_qpn")
1633                                 child_mrg->fm[i] = new tablevar_t(((spx_qpn*)new_nodes[i])->table_name->get_machine().c_str(), "IFACE", new_nodes[i]->get_node_name().c_str());
1634                         else {
1635                                 child_mrg->fm[i] = new tablevar_t(((sgah_qpn*)new_nodes[i])->table_name->get_machine().c_str(), "IFACE", new_nodes[i]->get_node_name().c_str());
1636
1637                         }
1638                         child_mrg->mvars[i]->set_field(child_mrg->table_layout->get_field_name(merge_fieldpos));
1639
1640                         sprintf(node_suffix, "_m%d", i);
1641                         child_mrg->fm[i]->set_range_var(node_suffix);
1642                         query_plan.push_back(new_nodes[i]);
1643                 }
1644                 child_mrg->fm.resize(new_nodes.size());
1645                 child_mrg->mvars.resize(new_nodes.size());
1646
1647                 // push the new high-level aggregation
1648                 query_plan.push_back(super_aggr);
1649
1650         }
1651         query_plan[index] = NULL;
1652         generate_linkage();
1653
1654   }
1655
1656 //              Extract subtree rooted at node i into separate hfta
1657 stream_query* stream_query::extract_subtree(int index) {
1658
1659         vector<int> nodes;
1660         stream_query* new_query = new stream_query(query_plan[index], this);
1661
1662         nodes.push_back(index);
1663         for (int i = 0; i < nodes.size(); ++i) {
1664                 qp_node* node = query_plan[nodes[i]];
1665                 if (!node)
1666                         continue;
1667
1668                 // add all children to nodes list
1669                 for (int j = 0; j < node->predecessors.size(); ++j)
1670                         nodes.push_back(node->predecessors[j]);
1671                 if (i)
1672                         new_query->query_plan.push_back(node);
1673
1674                 query_plan[nodes[i]] = NULL;
1675         }
1676
1677         return new_query;
1678 }
1679
1680 //              Splits query that combines data from multiple hosts into separate hftas.
1681 vector<stream_query*> stream_query::split_multihost_query()  {
1682
1683         vector<stream_query*> ret;
1684         char node_suffix[128];
1685         int i;
1686
1687         // find merges combining multiple hosts into per-host groups
1688         int plan_size = query_plan.size();
1689         vector<mrg_qpn*> new_nodes;
1690
1691         for (i = 0; i < plan_size; ++i) {
1692                 qp_node* node = query_plan[i];
1693                 if (node && node->node_type() == "mrg_qpn") {
1694                         mrg_qpn* mrg = (mrg_qpn*)node;
1695                         if (mrg->is_multihost_merge()) {
1696
1697                                 // group all the sources by host
1698                                 map<string, vector<int> > host_map;
1699                                 for (int j = 0; j < mrg->fm.size(); j++) {
1700                                         tablevar_t *table_name = mrg->fm[j];
1701                                         if (host_map.count(table_name->get_machine()))
1702                                                 host_map[table_name->get_machine()].push_back(j);
1703                                         else {
1704                                                 vector<int> tables;
1705                                                 tables.push_back(j);
1706                                                 host_map[table_name->get_machine()] = tables;
1707                                         }
1708                                 }
1709
1710                                 // create a new merge for each host
1711                                 map<string, vector<int> >::iterator iter;
1712                                 for (iter = host_map.begin(); iter != host_map.end(); iter++) {
1713                                         string host_name = (*iter).first;
1714                                         vector<int> tables = (*iter).second;
1715
1716                                         if (tables.size() == 1)
1717                                                 continue;
1718
1719                                         sprintf(node_suffix, "_%s", host_name.c_str());
1720                                         string suffix(node_suffix);
1721                                         untaboo(suffix);
1722                                         mrg_qpn *new_mrg = (mrg_qpn *)mrg->make_copy(suffix);
1723                                         for (int j = 0; j < tables.size(); ++j) {
1724                                                 new_mrg->fm.push_back(mrg->fm[tables[j]]);
1725                                                 new_mrg->mvars.push_back(mrg->mvars[j]);
1726                                                 sprintf(node_suffix, "m_%d", j);
1727                                                 new_mrg->fm[j]->set_range_var(node_suffix);
1728                                         }
1729                                         new_nodes.push_back(new_mrg);
1730                                 }
1731
1732                                 if (!new_nodes.empty()) {
1733                                         // connect newly created merge nodes with parent multihost merge
1734                                         for (int j = 0; j < new_nodes.size(); ++j) {
1735                                                 mrg->fm[j] = new tablevar_t(new_nodes[j]->fm[0]->get_machine().c_str(), "IFACE", new_nodes[j]->get_node_name().c_str());
1736                                                 query_plan.push_back(new_nodes[j]);
1737                                         }
1738                                         mrg->fm.resize(new_nodes.size());
1739                                         mrg->mvars.resize(new_nodes.size());
1740                                         generate_linkage();
1741                                 }
1742
1743
1744                                 // now externalize the sources
1745                                 for (int j = 0; j < node->predecessors.size(); ++j) {
1746                                         //              Extract subtree rooted at node i into separate hfta
1747                                         stream_query* q = extract_subtree(node->predecessors[j]);
1748                                         if (q) {
1749                                                 q->generate_linkage();
1750                                                 ret.push_back(q);
1751                                         }
1752
1753                                 }
1754                                 generate_linkage();
1755                         }
1756                 }
1757         }
1758
1759         return ret;
1760 }
1761
1762
1763 //              Perform local FTA optimizations
1764 void stream_query::optimize(vector<stream_query *>& hfta_list, map<string, int> lfta_names, vector<string> interface_names, vector<string> machine_names, ext_fcn_list *Ext_fcns, table_list *Schema, ifq_t *ifaces_db, partn_def_list_t *partn_parse_result){
1765
1766         // Topologically sort the nodes in query plan (leaf-first)
1767         int i = 0, j = 0;
1768         vector<int> sorted_nodes;
1769
1770         int num_nodes = query_plan.size();
1771         bool* leaf_flags = new bool[num_nodes];
1772         memset(leaf_flags, 0, num_nodes * sizeof(bool));
1773
1774         // run topological sort
1775         bool done = false;
1776
1777         // add all leafs to sorted_nodes
1778         while (!done) {
1779                 done = true;
1780                 for (i = 0; i < num_nodes; ++i) {
1781                         if (!query_plan[i])
1782                                 continue;
1783
1784                         if (!leaf_flags[i] && query_plan[i]->predecessors.empty()) {
1785                                 leaf_flags[i] = true;
1786                                 sorted_nodes.push_back(i);
1787                                 done = false;
1788
1789                                 // remove the node from its parents predecessor lists
1790                                 // since we only care about number of predecessors, it is sufficient just to remove
1791                                 // one element from the parent's predecessors list
1792                                 for (int j = query_plan[i]->successors.size() - 1; j >= 0; --j)
1793                                         query_plan[query_plan[i]->successors[j]]->predecessors.pop_back();
1794                         }
1795                 }
1796         }
1797         delete[] leaf_flags;
1798         num_nodes = sorted_nodes.size();
1799         generate_linkage();             // rebuild the recently destroyed predecessor lists.
1800
1801         // collect the information about interfaces nodes read from
1802         for (i = 0; i < num_nodes; ++i) {
1803                 qp_node* node = query_plan[sorted_nodes[i]];
1804                 vector<tablevar_t *> input_tables = node->get_input_tbls();
1805                 for (j = 0; j <  input_tables.size(); ++j) {
1806                         tablevar_t * table = input_tables[j];
1807                         if (lfta_names.count(table->schema_name)) {
1808                                 int index = lfta_names[table->schema_name];
1809                                 table->set_machine(machine_names[index]);
1810                                 table->set_interface(interface_names[index]);
1811                         }
1812                 }
1813         }
1814
1815         /*
1816
1817         // push eligible operators down in the query plan
1818         for (i = 0; i < num_nodes; ++i) {
1819                 if (partn_parse_result && is_partn_compatible(sorted_nodes[i], lfta_names, interface_names, machine_names, ifaces_db, partn_parse_result)) {
1820                         pushdown_partn_operator(sorted_nodes[i]);
1821                 } else if (is_pushdown_compatible(sorted_nodes[i], lfta_names, interface_names, machine_names)) {
1822                         pushdown_operator(sorted_nodes[i], Ext_fcns, Schema);
1823                 }
1824         }
1825
1826         // split the query into multiple hftas if it combines the data from multiple hosts
1827         vector<stream_query*> hftas = split_multihost_query();
1828         hfta_list.insert(hfta_list.end(), hftas.begin(), hftas.end());
1829         */
1830
1831         num_nodes = query_plan.size();
1832         // also split multi-way merges into two-way merges
1833         for (i = 0; i < num_nodes; ++i) {
1834                 qp_node* node = query_plan[i];
1835                 if (node && node->node_type() == "mrg_qpn") {
1836                         vector<mrg_qpn *> split_merge = ((mrg_qpn *)node)->split_sources();
1837
1838                         query_plan.insert(query_plan.end(), split_merge.begin(), split_merge.end());
1839                         // delete query_plan[sorted_nodes[i]];
1840                         query_plan[i] = NULL;
1841                 }
1842         }
1843
1844         generate_linkage();
1845 }
1846
1847
1848
1849 table_def *stream_query::get_output_tabledef(){
1850         return( query_plan[qhead]->get_fields() );
1851 }
1852
1853
1854
1855 //////////////////////////////////////////////////////////
1856 ////            De-siloing.  TO BE REMOVED
1857
1858 void stream_query::desilo_lftas(map<string, int> &lfta_names,vector<string> &silo_names,table_list *Schema){
1859         int i,t,s;
1860
1861         int suffix_len = silo_names.back().size();
1862
1863         for(i=0;i<qtail.size();++i){
1864                 vector<tablevar_t *> itbls = query_plan[qtail[i]]->get_input_tbls();
1865                 for(t=0;t<itbls.size();++t){
1866                         string itbl_name = itbls[t]->get_schema_name();
1867                         if(lfta_names.count(itbl_name)>0){
1868 //printf("Query %s input %d references lfta input %s\n",query_plan[qtail[i]]->get_node_name().c_str(),t,itbl_name.c_str());
1869                                 vector<string> src_names;
1870                                 string lfta_base = itbl_name.substr(0,itbl_name.size()-suffix_len);
1871                                 for(s=0;s<silo_names.size();++s){
1872                                         string lfta_subsilo = lfta_base + silo_names[s];
1873 //printf("\t%s\n",lfta_subsilo.c_str());
1874                                         src_names.push_back(lfta_subsilo);
1875                                 }
1876                                 string merge_node_name = "desilo_"+query_plan[qtail[i]]->get_node_name()+
1877                                         "_input_"+int_to_string(t);
1878                                 mrg_qpn *merge_node = new mrg_qpn(merge_node_name,src_names,Schema);
1879                                 int m_op_pos = Schema->add_table(merge_node->table_layout);
1880                                 itbls[t]->set_schema(merge_node_name);
1881                                 itbls[t]->set_schema_ref(m_op_pos);
1882                                 query_plan.push_back(merge_node);
1883                         }
1884                 }
1885         }
1886
1887         generate_linkage();
1888 }
1889 ////////////////////////////////////////
1890 ///             End de-siloing
1891
1892
1893
1894 //                      Given a collection of LFTA stream queries,
1895 //                      extract their WHERE predicates
1896 //                      and pass them to an analysis routine which will extract
1897 //                      common elements
1898 //
1899 void get_common_lfta_filter(std::vector<stream_query *> lfta_list,table_list *Schema,ext_fcn_list *Ext_fcns, vector<cnf_set *> &prefilter_preds, set<unsigned int> &pred_ids){
1900         int s;
1901         std::vector< std::vector<cnf_elem *> > where_list;
1902
1903 //              still safe to assume that LFTA queries have a single
1904 //              query node, which is at position 0.
1905         for(s=0;s<lfta_list.size();++s){
1906                 vector<cnf_elem *> cnf_list = lfta_list[s]->query_plan[0]->get_filter_clause();
1907                 if(lfta_list[s]->query_plan[0]->node_type() == "sgah_qpn"){
1908                         gb_table *gtbl = ((sgah_qpn *)(lfta_list[s]->query_plan[0]))->get_gb_tbl();
1909                         int c;
1910                         for(c=0;c<cnf_list.size();++c){
1911                                 insert_gb_def_pr(cnf_list[c]->pr,gtbl);
1912                         }
1913                 }
1914                 where_list.push_back(lfta_list[s]->query_plan[0]->get_filter_clause());
1915         }
1916
1917         find_common_filter(where_list,Schema,Ext_fcns,prefilter_preds, pred_ids);
1918 }
1919
1920
1921
1922
1923
1924 //                      Given a collection of LFTA stream queries,
1925 //                      extract the union of all temporal attributes referenced in select clauses
1926 //                      those attributes will need to be unpacked in prefilter
1927 //
1928 void get_prefilter_temporal_cids(std::vector<stream_query *> lfta_list, col_id_set &temp_cids){
1929         int s, sl;
1930         vector<scalarexp_t *> sl_list;
1931         gb_table *gb_tbl = NULL;
1932
1933
1934 //              still safe to assume that LFTA queries have a single
1935 //              query node, which is at position 0.
1936         for(s=0;s<lfta_list.size();++s){
1937
1938                 if(lfta_list[s]->query_plan[0]->node_type() == "spx_qpn"){
1939                         spx_qpn *spx_node = (spx_qpn *)lfta_list[s]->query_plan[0];
1940                         sl_list = spx_node->get_select_se_list();
1941                 }
1942                 if(lfta_list[s]->query_plan[0]->node_type() == "sgah_qpn"){
1943                         sgah_qpn *sgah_node = (sgah_qpn *)lfta_list[s]->query_plan[0];
1944                         sl_list = sgah_node->get_select_se_list();
1945                         gb_tbl = sgah_node->get_gb_tbl();
1946                 }
1947
1948                 for(sl=0;sl<sl_list.size();sl++){
1949                         data_type *sdt = sl_list[sl]->get_data_type();
1950                         if (sdt->is_temporal()) {
1951                                 gather_se_col_ids(sl_list[sl],temp_cids, gb_tbl);
1952                         }
1953                 }
1954         }
1955 }
1956
1957