57847953621f8944628d9994d974f7672641585d
[com/gs-lite.git] / src / ftacmp / translate_fta.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #include<unistd.h>              // for gethostname
17
18 #include <string>
19 #include "parse_fta.h"
20 #include "parse_schema.h"
21 #include "parse_ext_fcns.h"
22 #include"analyze_fta.h"
23 #include"query_plan.h"
24 #include"generate_lfta_code.h"
25 #include"stream_query.h"
26 #include"generate_utils.h"
27 #include"nic_def.h"
28 #include"generate_nic_code.h"
29
30 #include <stdlib.h>
31 #include <stdio.h>
32 #include<ctype.h>
33 #include<glob.h>
34 #include<string.h>
35
36 #include<list>
37
38 //              for the scandir
39      #include <sys/types.h>
40      #include <dirent.h>
41
42
43 #include<errno.h>
44
45 //              to verify that some files exist.
46      #include <sys/types.h>
47      #include <sys/stat.h>
48
49 #include "parse_partn.h"
50
51 #include "print_plan.h"
52
53 //              Interface to the xml parser
54
55 #include"xml_t.h"
56 #include"field_list.h"
57
58 #include "gsconfig.h"
59
60 extern int xmlParserparse(void);
61 extern FILE *xmlParserin;
62 extern int xmlParserdebug;
63
64 std::vector<std::string> xml_attr_vec;
65 std::vector<std::string> xml_val_vec;
66 std::string xml_a, xml_v;
67 xml_t *xml_leaves = NULL;
68
69 //      Interface to the field list verifier
70 field_list *field_verifier = NULL;
71
72 #define TMPSTRLEN 1000
73
74 #ifndef PATH_DELIM
75   #define PATH_DELIM '/'
76 #endif
77
78 char tmp_schema_str[10000];
79
80 // maximum delay between two hearbeats produced
81 // by UDOP. Used when its not explicity
82 // provided in udop definition
83 #define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
84
85 //              Default lfta hash table size, must be power of 2.
86 int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
87
88 //              Interface to FTA definition lexer and parser ...
89
90 extern int FtaParserparse(void);
91 extern FILE *FtaParserin;
92 extern int FtaParserdebug;
93
94 fta_parse_t *fta_parse_result;
95 var_defs_t *fta_parse_defines;
96
97
98
99 //              Interface to external function lexer and parser ...
100
101 extern int Ext_fcnsParserparse(void);
102 extern FILE *Ext_fcnsParserin;
103 extern int Ext_fcnsParserdebug;
104
105 ext_fcn_list *Ext_fcns;
106
107
108 //              Interface to partition definition parser
109 extern int PartnParserparse();
110 partn_def_list_t *partn_parse_result = NULL;
111
112
113
114 using namespace std;
115 //extern int errno;
116
117
118 //              forward delcaration of local utility function
119 void generate_makefile(vector<string> &input_file_names, int nfiles,
120                                            vector<string> &hfta_names, opview_set &opviews,
121                                                 vector<string> &machine_names,
122                                                 string schema_file_name,
123                                                 vector<string> &interface_names,
124                                                 ifq_t *ifdb, string &config_dir_path,
125                                                 bool use_pads,
126                                                 string extra_libs,
127                                                 map<string, vector<int> > &rts_hload
128                                         );
129
130 //static int split_string(char *instr,char sep, char **words,int max_words);
131 #define MAXFLDS 100
132
133   FILE *schema_summary_output = NULL;           // query names
134
135 //                      Dump schema summary
136 void dump_summary(stream_query *str){
137         for(int q=0;q<str->query_plan.size();++q){
138                 qp_node *qp = str->query_plan[q];
139                 if(qp==NULL)
140                         continue;       // there can be blanks
141
142                 fprintf(schema_summary_output,"-----\n");
143                 fprintf(schema_summary_output,"%s\n",qp->node_name.c_str());
144
145                 table_def *sch = qp->get_fields();
146
147                 vector<field_entry *> flds = sch->get_fields();
148                 int f;
149                 for(f=0;f<flds.size();++f){
150                         if(f>0) fprintf(schema_summary_output,"|");
151                         fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
152                 }
153                 fprintf(schema_summary_output,"\n");
154                 for(f=0;f<flds.size();++f){
155                         if(f>0) fprintf(schema_summary_output,"|");
156                         fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
157                 }
158                 fprintf(schema_summary_output,"\n");
159
160                 map<std::string, std::string> defines = qp->get_definitions();
161                 string comment = "";
162                 if(defines.count("comment")>0){
163                         comment = defines["comment"];
164                 }
165                 fprintf(schema_summary_output,"%s\n",comment.c_str());
166
167                 vector<tablevar_t *> input_tables = qp->get_input_tbls();
168                 for(int t=0; t<input_tables.size(); ++t){
169                         if(t>0) fprintf(schema_summary_output,"|");
170                         string machine = input_tables[t]->get_machine(); 
171                         string iface = input_tables[t]->get_interface(); 
172                         string schema = input_tables[t]->get_schema_name(); 
173                         if(machine!=""){
174                                 fprintf(schema_summary_output,"%s.",machine.c_str());
175                         }
176                         if(iface!=""){
177                                 fprintf(schema_summary_output,"%s.",iface.c_str());
178                         }else{
179                                 if(machine!="") fprintf(schema_summary_output,".");
180                         }
181                         fprintf(schema_summary_output,"%s",schema.c_str());
182                 }
183
184                 fprintf(schema_summary_output,"\n");
185         }
186
187
188
189 /*
190         fprintf(schema_summary_output,"-----\n");
191         fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
192
193         table_def *sch = str->get_output_tabledef();
194
195         vector<field_entry *> flds = sch->get_fields();
196         int f;
197         for(f=0;f<flds.size();++f){
198                 if(f>0) fprintf(schema_summary_output,"|");
199                 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
200         }
201         fprintf(schema_summary_output,"\n");
202         for(f=0;f<flds.size();++f){
203                 if(f>0) fprintf(schema_summary_output,"|");
204                 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
205         }
206         fprintf(schema_summary_output,"\n");
207
208         string comment = "";
209         if(str->defines.count("comment")>0){
210                 comment = str->defines["comment"];
211         }
212         fprintf(schema_summary_output,"%s\n",comment.c_str());
213         
214         vector<tablevar_t *> input_tables = str->get_input_tables();
215         for(int t=0; t<input_tables.size(); ++t){
216                 if(t>0) fprintf(schema_summary_output,"|");
217                 string machine = input_tables[t]->get_machine(); 
218                 string iface = input_tables[t]->get_interface(); 
219                 string schema = input_tables[t]->get_schema_name(); 
220                 if(machine!=""){
221                         fprintf(schema_summary_output,"%s.",machine.c_str());
222                 }
223                 if(iface!=""){
224                         fprintf(schema_summary_output,"%s.",iface.c_str());
225                 }else{
226                         if(machine!="") fprintf(schema_summary_output,".");
227                 }
228                 fprintf(schema_summary_output,"%s",schema.c_str());
229         }
230         fprintf(schema_summary_output,"\n");
231 */
232
233 }
234
235 //              Globals
236 string hostname;                // name of current host.
237 int hostname_len;
238 bool generate_stats = false;
239 string root_path = "../..";
240
241
242 int main(int argc, char **argv){
243   char tmpstr[TMPSTRLEN];
244   string err_str;
245   int q,s,h,f;
246
247   set<int>::iterator si;
248
249   vector<string> registration_query_names;                      // for lfta.c registration
250   map<string, vector<int> > mach_query_names;   // list queries of machine
251   vector<int> snap_lengths;                             // for lfta.c registration
252   vector<int> snap_position;                            // for lfta.c registration
253   vector<string> interface_names;                       // for lfta.c registration
254   vector<string> machine_names;                 // machine of interface
255   vector<bool> lfta_reuse_options;                      // for lfta.c registration
256   vector<int> lfta_liveness_timeouts;           // fot qtree.xml generation
257   vector<string> hfta_names;                    // hfta cource code names, for
258                                                                                 // creating make file.
259   vector<string> qnames;                                // ensure unique names
260   map<string, int> lfta_names;                  // keep track of unique lftas.
261
262
263 //                              set these to 1 to debug the parser
264   FtaParserdebug = 0;
265   Ext_fcnsParserdebug = 0;
266
267   FILE *lfta_out;                               // lfta.c output.
268   FILE *fta_in;                                 // input file
269   FILE *table_schemas_in;               // source tables definition file
270   FILE *query_name_output;              // query names
271   FILE *qtree_output;                   // interconnections of query nodes
272
273   // -------------------------------
274   // Handling of Input Arguments
275   // -------------------------------
276     char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
277         const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
278                 "\t[-B] : debug only (don't create output files)\n"
279                 "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
280                 "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
281                 "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
282                 "\t[-C] : use <config directory> for definition files\n"
283                 "\t[-l] : use <library directory> for library queries\n"
284                 "\t[-N] : output query names in query_names.txt\n"
285                 "\t[-H] : create HFTA only (no schema_file)\n"
286                 "\t[-Q] : use query name for hfta suffix\n"
287                 "\t[-M] : generate make file and runit, stopit scripts\n"
288                 "\t[-S] : enable LFTA statistics (alters Makefile).\n"
289                 "\t[-f] : Output schema summary to schema_summary.txt\n"
290                 "\t[-P] : link with PADS\n"
291                 "\t[-h] : override host name.\n"
292                 "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
293                 "\t[-R] : path to root of GS-lite\n"
294 ;
295
296 //              parameters gathered from command line processing
297         string external_fcns_path;
298 //      string internal_fcn_path;
299         string config_dir_path;
300         string library_path = "./";
301         vector<string> input_file_names;
302         string schema_file_name;
303         bool debug_only = false;
304         bool hfta_only = false;
305         bool output_query_names = false;
306         bool output_schema_summary=false;
307         bool numeric_hfta_flname = true;
308         bool create_makefile = false;
309         bool distributed_mode = false;
310         bool partitioned_mode = false;
311         bool use_live_hosts_file = false;
312         bool use_pads = false;
313         bool clean_make = false;
314         int n_virtual_interfaces = 1;
315
316    char chopt;
317    while((chopt = getopt(argc,argv,optstr)) != -1){
318                 switch(chopt){
319                 case 'B':
320                         debug_only = true;
321                         break;
322                 case 'D':
323                         distributed_mode = true;
324                         break;
325                 case 'p':
326                         partitioned_mode = true;
327                         break;
328                 case 'L':
329                         use_live_hosts_file = true;
330                         break;
331                 case 'C':
332                                 if(optarg != NULL)
333                                  config_dir_path = string(optarg) + string("/");
334                         break;
335                 case 'l':
336                                 if(optarg != NULL)
337                                  library_path = string(optarg) + string("/");
338                         break;
339                 case 'N':
340                         output_query_names = true;
341                         break;
342                 case 'Q':
343                         numeric_hfta_flname = false;
344                         break;
345                 case 'H':
346                         if(schema_file_name == ""){
347                                 hfta_only = true;
348                         }
349                         break;
350                 case 'f':
351                         output_schema_summary=true;
352                         break;
353                 case 'M':
354                         create_makefile=true;
355                         break;
356                 case 'S':
357                         generate_stats=true;
358                         break;
359                 case 'P':
360                         use_pads = true;
361                         break;
362                 case 'c':
363                         clean_make = true;
364                         break;
365                 case 'h':
366                         if(optarg != NULL)
367                                 hostname = optarg;
368                         break;
369                 case 'R':
370                         if(optarg != NULL)
371                                 root_path = optarg;
372                         break;
373                 case 'n':
374                         if(optarg != NULL){
375                                 n_virtual_interfaces = atoi(optarg);
376                                 if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
377                                         fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
378                                         n_virtual_interfaces = 1;
379                                 }
380                         }
381                         break;
382                 case '?':
383                         fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
384                         fprintf(stderr,"%s\n", usage_str);
385                         exit(1);
386                 default:
387                         fprintf(stderr, "Argument was %c\n", optopt);
388                         fprintf(stderr,"Invalid arguments\n");
389                         fprintf(stderr,"%s\n", usage_str);
390                         exit(1);
391                 }
392         }
393         argc -= optind;
394         argv += optind;
395         for (int i = 0; i < argc; ++i) {
396                 if((schema_file_name == "") && !hfta_only){
397                         schema_file_name = argv[i];
398                 }else{
399                         input_file_names.push_back(argv[i]);
400                 }
401         }
402
403         if(input_file_names.size() == 0){
404                 fprintf(stderr,"%s\n", usage_str);
405                 exit(1);
406         }
407
408         if(clean_make){
409                 string clean_cmd = "rm Makefile hfta_*.cc";
410                 int clean_ret = system(clean_cmd.c_str());
411                 if(clean_ret){
412                         fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
413                 }
414         }
415
416
417         nic_prop_db *npdb = new nic_prop_db(config_dir_path);
418
419 //                      Open globally used file names.
420
421         // prepend config directory to schema file
422         schema_file_name = config_dir_path + schema_file_name;
423         external_fcns_path = config_dir_path + string("external_fcns.def");
424     string ifx_fname = config_dir_path + string("ifres.xml");
425
426 //              Find interface query file(s).
427         if(hostname == ""){
428                 gethostname(tmpstr,TMPSTRLEN);
429                 hostname = tmpstr;
430         }
431         hostname_len = strlen(tmpstr);
432     string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
433         vector<string> ifq_fls;
434
435                 ifq_fls.push_back(ifq_fname);
436
437
438 //                      Get the field list, if it exists
439         string flist_fl = config_dir_path + "field_list.xml";
440         FILE *flf_in = NULL;
441         if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
442                 fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
443                 xml_leaves = new xml_t();
444                 xmlParser_setfileinput(flf_in);
445                 if(xmlParserparse()){
446                         fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
447                 }else{
448                         field_verifier = new field_list(xml_leaves);
449                 }
450         }
451
452         if(!hfta_only){
453           if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
454                 fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
455                 exit(1);
456           }
457         }
458
459 /*
460         if(!(debug_only || hfta_only)){
461           if((lfta_out = fopen("lfta.c","w")) == NULL){
462                 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
463                 exit(1);
464           }
465         }
466 */
467
468 //              Get the output specification file.
469 //              format is
470 //                      query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
471         string ospec_fl = "output_spec.cfg";
472         FILE *osp_in = NULL;
473         vector<ospec_str *> output_specs;
474         multimap<string, int> qname_to_ospec;
475         if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
476                 char *flds[MAXFLDS];
477                 int o_lineno = 0;
478                 while(fgets(tmpstr,TMPSTRLEN,osp_in)){
479                         o_lineno++;
480                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
481                         if(tmpstr[0]!='\n' && tmpstr[0]!='\r' && tmpstr[0]!='\0' && tmpstr[0]!='#'){
482                                 if(nflds == 7){
483 //              make operator type lowercase
484                                         char *tmpc;
485                                         for(tmpc=flds[1];*tmpc!='\0';++tmpc)
486                                                 *tmpc = tolower(*tmpc);
487         
488                                         ospec_str *tmp_ospec = new ospec_str();
489                                         tmp_ospec->query = flds[0];
490                                         tmp_ospec->operator_type = flds[1];
491                                         tmp_ospec->operator_param = flds[2];
492                                         tmp_ospec->output_directory = flds[3];
493                                         tmp_ospec->bucketwidth = atoi(flds[4]);
494                                         tmp_ospec->partitioning_flds = flds[5];
495                                         tmp_ospec->n_partitions = atoi(flds[6]);
496                                         qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
497                                         output_specs.push_back(tmp_ospec);
498                                 }else{
499                                         fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
500                                 }
501                         }
502                 }
503                 fclose(osp_in);
504         }else{
505                 fprintf(stderr,"output_spec.cfg not found.  The query set has no output.  exiting.\n");
506                 exit(1);
507         }
508
509 //              hfta parallelism
510         string pspec_fl = "hfta_parallelism.cfg";
511         FILE *psp_in = NULL;
512         map<string, int> hfta_parallelism;
513         if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
514                 char *flds[MAXFLDS];
515                 int o_lineno = 0;
516                 while(fgets(tmpstr,TMPSTRLEN,psp_in)){
517                         bool good_entry = true;
518                         o_lineno++;
519                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
520                         if(nflds == 2){
521                                 string hname = flds[0];
522                                 int par = atoi(flds[1]);
523                                 if(par <= 0 || par > n_virtual_interfaces){
524                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
525                                         good_entry = false;
526                                 }
527                                 if(good_entry && n_virtual_interfaces % par != 0){
528                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
529                                         good_entry = false;
530                                 }
531                                 if(good_entry)
532                                         hfta_parallelism[hname] = par;
533                         }
534                 }
535         }else{
536                 fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
537         }
538
539
540 //              LFTA hash table sizes
541         string htspec_fl = "lfta_htsize.cfg";
542         FILE *htsp_in = NULL;
543         map<string, int> lfta_htsize;
544         if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
545                 char *flds[MAXFLDS];
546                 int o_lineno = 0;
547                 while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
548                         bool good_entry = true;
549                         o_lineno++;
550                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
551                         if(nflds == 2){
552                                 string lfta_name = flds[0];
553                                 int htsz = atoi(flds[1]);
554                                 if(htsz>0){
555                                         lfta_htsize[lfta_name] = htsz;
556                                 }else{
557                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
558                                 }
559                         }
560                 }
561         }else{
562                 fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
563         }
564
565 //              LFTA vitual interface hash split
566         string rtlspec_fl = "rts_load.cfg";
567         FILE *rtl_in = NULL;
568         map<string, vector<int> > rts_hload;
569         if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
570                 char *flds[MAXFLDS];
571                 int r_lineno = 0;
572                 string iface_name;
573                 vector<int> hload;
574                 while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
575                         bool good_entry = true;
576                         r_lineno++;
577                         iface_name = "";
578                         hload.clear();
579                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
580                         if(nflds >1){
581                                 iface_name = flds[0];
582                                 int cumm_h = 0;
583                                 int j;
584                                 for(j=1;j<nflds;++j){
585                                         int h = atoi(flds[j]);
586                                         if(h<=0)
587                                                 good_entry = false;
588                                         cumm_h += h;
589                                         hload.push_back(cumm_h);
590                                 }
591                         }else{
592                                 good_entry = false;
593                         }
594                         if(good_entry){
595                                 rts_hload[iface_name] = hload;
596                         }else{
597                                 fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
598                         }
599                 }
600         }
601
602
603
604         if(output_query_names){
605           if((query_name_output = fopen("query_names.txt","w")) == NULL){
606                 fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
607                 exit(1);
608           }
609         }
610
611         if(output_schema_summary){
612           if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
613                 fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
614                 exit(1);
615           }
616         }
617
618         if((qtree_output = fopen("qtree.xml","w")) == NULL){
619                 fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
620                 exit(1);
621         }
622         fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
623         fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
624         fprintf(qtree_output,"<QueryNodes>\n");
625
626
627 //                      Get an initial Schema
628         table_list *Schema;
629         if(!hfta_only){
630 //                      Parse the table schema definitions.
631           fta_parse_result = new fta_parse_t();
632           FtaParser_setfileinput(table_schemas_in);
633           if(FtaParserparse()){
634                 fprintf(stderr,"Table schema parse failed.\n");
635                 exit(1);
636           }
637           if(fta_parse_result->parse_type != TABLE_PARSE){
638                 fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
639                 exit(1);
640           }
641           Schema = fta_parse_result->tables;
642
643 //                      Ensure that all schema_ids, if set, are distinct.
644 //  Obsolete? There is code elsewhere to ensure that schema IDs are
645 //  distinct on a per-interface basis.
646 /*
647           set<int> found_ids;
648           set<int> dup_ids;
649           for(int t=0;t<Schema->size();++t){
650                 int sch_id = Schema->get_table(t)->get_schema_id();
651                 if(sch_id> 0){
652                         if(found_ids.find(sch_id) != found_ids.end()){
653                                 dup_ids.insert(sch_id);
654                         }else{
655                                 found_ids.insert(sch_id);
656                         }
657                 }
658                 if(dup_ids.size()>0){
659                         fprintf(stderr, "Error, the schema has duplicate schema_ids:");
660                         for(auto dit=dup_ids.begin();dit!=dup_ids.end(); ++dit)
661                                 fprintf(stderr," %d",(*dit));
662                         fprintf(stderr,"\n");
663                         exit(1);
664                 }
665           }
666 */
667
668
669 //                      Process schema field inheritance
670           int retval;
671           retval = Schema->unroll_tables(err_str);
672           if(retval){
673                 fprintf(stderr,"Error processing schema field inheritance:\n %s\n", err_str.c_str() );
674                 exit(1);
675           }
676         }else{
677 //                      hfta only => we will try to fetch schemas from the registry.
678 //                      therefore, start off with an empty schema.
679           Schema = new table_list();
680         }
681
682
683 //                      Open and parse the external functions file.
684         Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
685         if(Ext_fcnsParserin == NULL){
686                 fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
687                 Ext_fcns = new ext_fcn_list();
688         }else{
689                 if(Ext_fcnsParserparse()){
690                         fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
691                         Ext_fcns = new ext_fcn_list();
692                 }
693         }
694         if(Ext_fcns->validate_fcns(err_str)){
695                 fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
696                 exit(1);
697         }
698
699 //              Open and parse the interface resources file.
700 //      ifq_t *ifaces_db = new ifq_t();
701 //   string ierr;
702 //      if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
703 //              fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
704 //                              ifx_fname.c_str(), ierr.c_str());
705 //              exit(1);
706 //      }
707 //      if(ifaces_db->load_ifqs(ifq_fname, ierr)){
708 //              fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
709 //                              ifq_fname.c_str(), ierr.c_str());
710 //              exit(1);
711 //      }
712
713
714 //                      The LFTA code string.
715 //                      Put the standard preamble here.
716 //                      NOTE: the hash macros, fcns should go into the run time
717   map<string, string> lfta_val;
718   map<string, string> lfta_prefilter_val;
719
720   string lfta_header =
721 "#include <limits.h>\n"
722 "#include \"rts.h\"\n"
723 "#include \"fta.h\"\n"
724 "#include \"lapp.h\"\n"
725 "#include \"rts_udaf.h\"\n"
726 "#include<stdio.h>\n"
727 "#include <float.h>\n"
728 "#include \"rdtsc.h\"\n"
729 "#include \"watchlist.h\"\n\n"
730
731 ;
732 // Get any locally defined parsing headers
733     glob_t glob_result;
734     memset(&glob_result, 0, sizeof(glob_result));
735
736     // do the glob operation TODO should be from GSROOT
737     int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
738         if(return_value == 0){
739                 lfta_header += "\n";
740         for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
741                         char *flds[1000];
742                         int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
743                         lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n";
744         }
745                 lfta_header += "\n";
746         }else{
747                 fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
748         }
749
750 /*
751 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
752 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
753 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
754 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
755 */
756
757         lfta_header += 
758 "\n"
759 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
760 "\n"
761 "#define SLOT_FILLED 0x04\n"
762 "#define SLOT_GEN_BITS 0x03\n"
763 "#define SLOT_HASH_BITS 0xfffffff8\n"
764 "#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
765 "#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
766 "#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
767 "\n\n"
768
769 "#define lfta_BOOL_to_hash(x) (x)\n"
770 "#define lfta_USHORT_to_hash(x) (x)\n"
771 "#define lfta_UINT_to_hash(x) (x)\n"
772 "#define lfta_IP_to_hash(x) (x)\n"
773 "#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
774 "#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
775 "#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
776 "#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
777 "#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
778 "static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
779 "       gs_uint32_t i,ret=0,tmp_sum = 0;\n"
780 "       for(i=0;i<x.length;++i){\n"
781 "               tmp_sum |= (x.data[i]) << (8*(i%4));\n"
782 "               if((i%4) == 3){\n"
783 "                       ret ^= tmp_sum;\n"
784 "                       tmp_sum = 0;\n"
785 "               }\n"
786 "       }\n"
787 "       if((i%4)!=0) ret ^=tmp_sum;\n"
788 "       return(ret);\n"
789 "}\n\n\n";
790
791
792
793 //////////////////////////////////////////////////////////////////
794 /////                   Get all of the query parse trees
795
796
797   int i,p;
798   int hfta_count = 0;           // for numeric suffixes to hfta .cc files
799
800 //---------------------------
801 //              Global info needed for post processing.
802
803 //                      Set of operator views ref'd in the query set.
804         opview_set opviews;
805 //                      lftas on a per-machine basis.
806         map<string, vector<stream_query *> > lfta_mach_lists;
807         int nfiles = input_file_names.size();
808         vector<stream_query *> hfta_list;               // list of hftas.
809         map<string, stream_query *> sq_map;             // map from query name to stream query.
810
811
812 //////////////////////////////////////////
813
814 //              Open and parse the interface resources file.
815         ifq_t *ifaces_db = new ifq_t();
816     string ierr;
817         if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
818                 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
819                                 ifx_fname.c_str(), ierr.c_str());
820                 exit(1);
821         }
822         if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
823                 fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
824                                 ifq_fls[0].c_str(), ierr.c_str());
825                 exit(1);
826         }
827
828   map<string, string> qname_to_flname;  // for detecting duplicate query names
829
830
831
832 //                      Parse the files to create a vector of parse trees.
833 //                      Load qnodes with information to perform a topo sort
834 //                      based on query dependencies.
835   vector<query_node *> qnodes;                          // for topo sort.
836   map<string,int> name_node_map;                        // map query name to qnodes entry
837   for(i=0;i<input_file_names.size();i++){
838
839           if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
840                   fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
841                   continue;
842           }
843 fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
844
845 //                      Parse the FTA query
846           fta_parse_result = new fta_parse_t();
847           FtaParser_setfileinput(fta_in);
848           if(FtaParserparse()){
849                 fprintf(stderr,"FTA parse failed.\n");
850                 exit(1);
851           }
852           if(fta_parse_result->parse_type != QUERY_PARSE){
853                 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
854                 exit(1);
855           }
856
857 //                      returns a list of parse trees
858           vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
859           for(p=0;p<qlist.size();++p){
860             table_exp_t *fta_parse_tree = qlist[p];
861 //              query_parse_trees.push_back(fta_parse_tree);
862
863 //                      compute the default name -- extract from query name
864                 strcpy(tmpstr,input_file_names[i].c_str());
865                 char *qname = strrchr(tmpstr,PATH_DELIM);
866                 if(qname == NULL)
867                         qname = tmpstr;
868                 else
869                         qname++;
870                 char *qname_end = strchr(qname,'.');
871                 if(qname_end != NULL) *qname_end = '\0';
872                 string qname_str = qname;
873                 string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
874
875 //                      Deternmine visibility.  Should I be attaching all of the output methods?
876                 if(qname_to_ospec.count(imputed_qname)>0)
877                         fta_parse_tree->set_visible(true);
878                 else
879                         fta_parse_tree->set_visible(false);
880
881
882 //                              Create a manipulable repesentation of the parse tree.
883 //                              the qnode inherits the visibility assigned to the parse tree.
884             int pos = qnodes.size();
885                 qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
886                 name_node_map[ qnodes[pos]->name ] = pos;
887 //printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
888 //              qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
889 //              qfiles.push_back(i);
890
891 //                      Check for duplicate query names
892 //                                      NOTE : in hfta-only generation, I should
893 //                                      also check with the names of the registered queries.
894                 if(qname_to_flname.count(qnodes[pos]->name) > 0){
895                         fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
896                                 qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
897                         exit(1);
898                 }
899                 if(Schema->find_tbl(qnodes[pos]->name) >= 0){
900                         fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
901                                 qnodes[pos]->name.c_str(), input_file_names[i].c_str());
902                         exit(1);
903                 }
904                 qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
905
906
907         }
908   }
909
910 //              Add the library queries
911
912   int pos;
913   for(pos=0;pos<qnodes.size();++pos){
914         int fi;
915         for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
916                 string src_tbl = qnodes[pos]->refd_tbls[fi];
917                 if(qname_to_flname.count(src_tbl) == 0){
918                         int last_sep = src_tbl.find_last_of('/');
919                         if(last_sep != string::npos){
920 fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
921                                 string target_qname = src_tbl.substr(last_sep+1);
922                                 string qpathname = library_path + src_tbl + ".gsql";
923                                 if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
924                                         fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
925                                         exit(1);
926                                         fprintf(stderr,"After exit\n");
927                                 }
928 fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
929 //                      Parse the FTA query
930                                 fta_parse_result = new fta_parse_t();
931                                 FtaParser_setfileinput(fta_in);
932                                 if(FtaParserparse()){
933                                         fprintf(stderr,"FTA parse failed.\n");
934                                         exit(1);
935                                 }
936                                 if(fta_parse_result->parse_type != QUERY_PARSE){
937                                         fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
938                                         exit(1);
939                                 }
940
941                                 map<string, int> local_query_map;
942                                 vector<string> local_query_names;
943                                 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
944                                 for(p=0;p<qlist.size();++p){
945                                 table_exp_t *fta_parse_tree = qlist[p];
946                                         fta_parse_tree->set_visible(false);             // assumed to not produce output
947                                         string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
948                                         if(imputed_qname == target_qname)
949                                                 imputed_qname = src_tbl;
950                                         if(local_query_map.count(imputed_qname)>0){
951                                                 fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
952                                                 exit(1);
953                                         }
954                                         local_query_map[ imputed_qname ] = p;
955                                         local_query_names.push_back(imputed_qname);
956                                 }
957
958                                 if(local_query_map.count(src_tbl)==0){
959                                         fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
960                                         exit(1);
961                                 }
962
963                                 vector<int> worklist;
964                                 set<int> added_queries;
965                                 vector<query_node *> new_qnodes;
966                                 worklist.push_back(local_query_map[target_qname]);
967                                 added_queries.insert(local_query_map[target_qname]);
968                                 int qq;
969                                 int qpos = qnodes.size();
970                                 for(qq=0;qq<worklist.size();++qq){
971                                         int q_id = worklist[qq];
972                                         query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
973                                         new_qnodes.push_back( new_qnode);
974                                         vector<string> refd_tbls =  new_qnode->refd_tbls;
975                                         int ff;
976                                         for(ff = 0;ff<refd_tbls.size();++ff){
977                                                 if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
978
979                                                         if(name_node_map.count(refd_tbls[ff])>0){
980                                                                 fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s,  and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
981                                                                 exit(1);
982                                                         }else{
983                                                                 worklist.push_back(local_query_map[refd_tbls[ff]]);
984                                                         }
985                                                 }
986                                         }
987                                 }
988
989                                 for(qq=0;qq<new_qnodes.size();++qq){
990                                         int qpos = qnodes.size();
991                                         qnodes.push_back(new_qnodes[qq]);
992                                         name_node_map[qnodes[qpos]->name ] = qpos;
993                                         qname_to_flname[qnodes[qpos]->name ] = qpathname;
994                                 }
995                         }
996                 }
997         }
998   }
999
1000
1001
1002
1003
1004
1005
1006
1007 //---------------------------------------
1008
1009
1010 //              Add the UDOPS.
1011
1012   string udop_missing_sources;
1013   for(i=0;i<qnodes.size();++i){
1014         int fi;
1015         for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
1016                 int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
1017                 if(sid >= 0){
1018                         if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
1019                                 if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
1020                                 int pos = qnodes.size();
1021                                         qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
1022                                         name_node_map[ qnodes[pos]->name ] = pos;
1023                                         qnodes[pos]->is_externally_visible = false;   // its visible
1024         //                                      Need to mark the source queries as visible.
1025                                         int si;
1026                                         string missing_sources = "";
1027                                         for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
1028                                                 string src_tbl = qnodes[pos]->refd_tbls[si];
1029                                                 if(name_node_map.count(src_tbl)==0){
1030                                                         missing_sources += src_tbl + " ";
1031                                                 }
1032                                         }
1033                                         if(missing_sources != ""){
1034                                                 udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
1035                                         }
1036                                 }
1037                         }
1038                 }
1039         }
1040   }
1041   if(udop_missing_sources != ""){
1042         fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
1043         exit(1);
1044   }
1045
1046
1047
1048 ////////////////////////////////////////////////////////////////////
1049 ///                             Check parse trees to verify that some
1050 ///                             global properties are met :
1051 ///                             if q1 reads from q2, then
1052 ///                               q2 is processed before q1
1053 ///                               q1 can supply q2's parameters
1054 ///                             Verify there is no cycle in the reads-from graph.
1055
1056 //                      Compute an order in which to process the
1057 //                      queries.
1058
1059 //                      Start by building the reads-from lists.
1060 //
1061
1062   for(i=0;i<qnodes.size();++i){
1063         int qi, fi;
1064         vector<string> refd_tbls =  qnodes[i]->refd_tbls;
1065         for(fi = 0;fi<refd_tbls.size();++fi){
1066                 if(name_node_map.count(refd_tbls[fi])>0){
1067 //printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
1068                         (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
1069                 }
1070         }
1071   }
1072
1073
1074 //              If one query reads the result of another,
1075 //              check for parameter compatibility.  Currently it must
1076 //              be an exact match.  I will move to requiring
1077 //              containment after re-ordering, but will require
1078 //              some analysis for code generation which is not
1079 //              yet in place.
1080 //printf("There are %d query nodes.\n",qnodes.size());
1081
1082
1083   for(i=0;i<qnodes.size();++i){
1084         vector<var_pair_t *> target_params  = qnodes[i]->params;
1085         for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1086                 vector<var_pair_t *> source_params  = qnodes[(*si)]->params;
1087                 if(target_params.size() != source_params.size()){
1088                         fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1089                         exit(1);
1090                 }
1091                 int p;
1092                 for(p=0;p<target_params.size();++p){
1093                         if(! (target_params[p]->name == source_params[p]->name &&
1094                               target_params[p]->val == source_params[p]->val ) ){
1095                         fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1096                                 exit(1);
1097                         }
1098                 }
1099         }
1100   }
1101
1102
1103 //              Find the roots.
1104 //              Start by counting inedges.
1105   for(i=0;i<qnodes.size();++i){
1106         for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1107                 qnodes[(*si)]->n_consumers++;
1108         }
1109   }
1110
1111 //              The roots are the nodes with indegree zero.
1112   set<int> roots;
1113   for(i=0;i<qnodes.size();++i){
1114         if(qnodes[i]->n_consumers == 0){
1115                 if(qnodes[i]->is_externally_visible == false){
1116                         fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible.  Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
1117                 }
1118                 roots.insert(i);
1119         }
1120   }
1121
1122 //              Remove the parts of the subtree that produce no output.
1123   set<int> valid_roots;
1124   set<int> discarded_nodes;
1125   set<int> candidates;
1126   while(roots.size() >0){
1127         for(si=roots.begin();si!=roots.end();++si){
1128                 if(qnodes[(*si)]->is_externally_visible){
1129                         valid_roots.insert((*si));
1130                 }else{
1131                         discarded_nodes.insert((*si));
1132                         set<int>::iterator sir;
1133                         for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1134                                 qnodes[(*sir)]->n_consumers--;
1135                                 if(qnodes[(*sir)]->n_consumers == 0)
1136                                         candidates.insert( (*sir));
1137                         }
1138                 }
1139         }
1140         roots = candidates;
1141         candidates.clear();
1142   }
1143   roots = valid_roots;
1144   if(discarded_nodes.size()>0){
1145         fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
1146         int di = 0;
1147         for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
1148                 if(di>0 && (di%8)==0) fprintf(stderr,"\n");
1149                 di++;
1150                 fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
1151         }
1152         fprintf(stderr,"\n");
1153   }
1154
1155 //              Compute the sources_to set, ignoring discarded nodes.
1156   for(i=0;i<qnodes.size();++i){
1157         if(discarded_nodes.count(i)==0)
1158                 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1159                         qnodes[(*si)]->sources_to.insert(i);
1160         }
1161   }
1162
1163
1164 //              Find the nodes that are shared by multiple visible subtrees.
1165 //              THe roots become inferred visible nodes.
1166
1167 //              Find the visible nodes.
1168         vector<int> visible_nodes;
1169         for(i=0;i<qnodes.size();i++){
1170                 if(qnodes[i]->is_externally_visible){
1171                         visible_nodes.push_back(i);
1172                 }
1173         }
1174
1175 //              Find UDOPs referenced by visible nodes.
1176   list<int> workq;
1177   for(i=0;i<visible_nodes.size();++i){
1178         workq.push_back(visible_nodes[i]);
1179   }
1180   while(!workq.empty()){
1181         int node = workq.front();
1182         workq.pop_front();
1183         set<int>::iterator children;
1184         if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
1185                 qnodes[node]->is_externally_visible = true;
1186                 visible_nodes.push_back(node);
1187                 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1188                         if(qnodes[(*children)]->is_externally_visible == false){
1189                                 qnodes[(*children)]->is_externally_visible = true;
1190                                 visible_nodes.push_back((*children));
1191                         }
1192                 }
1193         }
1194         for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1195                 workq.push_back((*children));
1196         }
1197   }
1198
1199         bool done = false;
1200         while(!done){
1201 //      reset the nodes
1202                 for(i=0;i<qnodes.size();i++){
1203                         qnodes[i]->subtree_roots.clear();
1204                 }
1205
1206 //              Walk the tree defined by a visible node, not descending into
1207 //              subtrees rooted by a visible node.  Mark the node visited with
1208 //              the visible node ID.
1209                 for(i=0;i<visible_nodes.size();++i){
1210                         set<int> vroots;
1211                         vroots.insert(visible_nodes[i]);
1212                         while(vroots.size()>0){
1213                                 for(si=vroots.begin();si!=vroots.end();++si){
1214                                         qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
1215
1216                                         set<int>::iterator sir;
1217                                         for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1218                                                 if(! qnodes[(*sir)]->is_externally_visible){
1219                                                         candidates.insert( (*sir));
1220                                                 }
1221                                         }
1222                                 }
1223                                 vroots = candidates;
1224                                 candidates.clear();
1225                         }
1226                 }
1227 //              Find the nodes in multiple visible node subtrees, but with no parent
1228 //              that has is in multile visible node subtrees.  Mark these as inferred visible nodes.
1229                 done = true;    // until proven otherwise
1230                 for(i=0;i<qnodes.size();i++){
1231                         if(qnodes[i]->subtree_roots.size()>1){
1232                                 bool is_new_root = true;
1233                                 set<int>::iterator sir;
1234                                 for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
1235                                         if(qnodes[(*sir)]->subtree_roots.size()>1)
1236                                                 is_new_root = false;
1237                                 }
1238                                 if(is_new_root){
1239                                         qnodes[i]->is_externally_visible = true;
1240                                         qnodes[i]->inferred_visible_node = true;
1241                                         visible_nodes.push_back(i);
1242                                         done = false;
1243                                 }
1244                         }
1245                 }
1246         }
1247
1248
1249
1250
1251
1252 //              get visible nodes in topo ordering.
1253 //  for(i=0;i<qnodes.size();i++){
1254 //              qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
1255 //  }
1256   vector<int> process_order;
1257   while(roots.size() >0){
1258         for(si=roots.begin();si!=roots.end();++si){
1259                 if(discarded_nodes.count((*si))==0){
1260                         process_order.push_back( (*si) );
1261                 }
1262                 set<int>::iterator sir;
1263                 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1264                         qnodes[(*sir)]->n_consumers--;
1265                         if(qnodes[(*sir)]->n_consumers == 0)
1266                                 candidates.insert( (*sir));
1267                 }
1268         }
1269         roots = candidates;
1270         candidates.clear();
1271   }
1272
1273
1274 //printf("process_order.size() =%d\n",process_order.size());
1275
1276 //              Search for cyclic dependencies
1277   string found_dep;
1278   for(i=0;i<qnodes.size();++i){
1279         if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
1280                 if(found_dep.size() != 0) found_dep += ", ";
1281                 found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
1282         }
1283   }
1284   if(found_dep.size()>0){
1285         fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
1286         exit(1);
1287   }
1288
1289 //              Get a list of query sets, in the order to be processed.
1290 //              Start at visible root and do bfs.
1291 //              The query set includes queries referenced indirectly,
1292 //              as sources for user-defined operators.  These are needed
1293 //              to ensure that they are added to the schema, but are not part
1294 //              of the query tree.
1295
1296 //              stream_node_sets contains queries reachable only through the
1297 //              FROM clause, so I can tell which queries to add to the stream
1298 //              query. (DISABLED, UDOPS are integrated, does this cause problems?)
1299
1300 //                      NOTE: this code works because in order for data to be
1301 //                      read by multiple hftas, the node must be externally visible.
1302 //                      But visible nodes define roots of process sets.
1303 //                      internally visible nodes can feed data only
1304 //                      to other nodes in the same query file.
1305 //                      Therefore, any access can be restricted to a file,
1306 //                      hfta output sharing is done only on roots
1307 //                      never on interior nodes.
1308
1309
1310
1311
1312 //              Conpute the base collection of hftas.
1313   vector<hfta_node *> hfta_sets;
1314   map<string, int> hfta_name_map;
1315 //  vector< vector<int> > process_sets;
1316 //  vector< set<int> > stream_node_sets;
1317   reverse(process_order.begin(), process_order.end());  // get listing in reverse .
1318                                                                                                                 // order: process leaves 1st.
1319   for(i=0;i<process_order.size();++i){
1320         if(qnodes[process_order[i]]->is_externally_visible == true){
1321 //printf("Visible.\n");
1322                 int root = process_order[i];
1323                 hfta_node *hnode = new hfta_node();
1324                 hnode->name = qnodes[root]-> name;
1325                 hnode->source_name = qnodes[root]-> name;
1326                 hnode->is_udop = qnodes[root]->is_udop;
1327                 hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
1328
1329                 vector<int> proc_list;  proc_list.push_back(root);
1330 //                      Ensure that nodes are added only once.
1331                 set<int> proc_set;      proc_set.insert(root);
1332                 roots.clear();                  roots.insert(root);
1333                 candidates.clear();
1334                 while(roots.size()>0){
1335                         for(si=roots.begin();si!=roots.end();++si){
1336 //printf("Processing root %d\n",(*si));
1337                                 set<int>::iterator sir;
1338                                 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1339 //printf("reads fom %d\n",(*sir));
1340                                         if(qnodes[(*sir)]->is_externally_visible==false){
1341                                                 candidates.insert( (*sir) );
1342                                                 if(proc_set.count( (*sir) )==0){
1343                                                         proc_set.insert( (*sir) );
1344                                                         proc_list.push_back( (*sir) );
1345                                                 }
1346                                         }
1347                                 }
1348                         }
1349                         roots = candidates;
1350                         candidates.clear();
1351                 }
1352
1353                 reverse(proc_list.begin(), proc_list.end());
1354                 hnode->query_node_indices = proc_list;
1355                 hfta_name_map[hnode->name] = hfta_sets.size();
1356                 hfta_sets.push_back(hnode);
1357         }
1358   }
1359
1360 //              Compute the reads_from / sources_to graphs for the hftas.
1361
1362   for(i=0;i<hfta_sets.size();++i){
1363         hfta_node *hnode = hfta_sets[i];
1364         for(q=0;q<hnode->query_node_indices.size();q++){
1365                 query_node *qnode = qnodes[hnode->query_node_indices[q]];
1366                 for(s=0;s<qnode->refd_tbls.size();++s){
1367                         if(hfta_name_map.count(qnode->refd_tbls[s])){
1368                                 int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
1369                                 hnode->reads_from.insert(other_hfta);
1370                                 hfta_sets[other_hfta]->sources_to.insert(i);
1371                         }
1372                 }
1373         }
1374   }
1375
1376 //              Compute a topological sort of the hfta_sets.
1377
1378   vector<int> hfta_topsort;
1379   workq.clear();
1380   int hnode_srcs[hfta_sets.size()];
1381   for(i=0;i<hfta_sets.size();++i){
1382         hnode_srcs[i] = 0;
1383         if(hfta_sets[i]->sources_to.size() == 0)
1384                 workq.push_back(i);
1385   }
1386
1387   while(! workq.empty()){
1388         int     node = workq.front();
1389         workq.pop_front();
1390         hfta_topsort.push_back(node);
1391         set<int>::iterator stsi;
1392         for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
1393                 int parent = (*stsi);
1394                 hnode_srcs[parent]++;
1395                 if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
1396                         workq.push_back(parent);
1397                 }
1398         }
1399   }
1400
1401 //              Decorate hfta nodes with the level of parallelism given as input.
1402
1403   map<string, int>::iterator msii;
1404   for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1405         string hfta_name = (*msii).first;
1406         int par = (*msii).second;
1407         if(hfta_name_map.count(hfta_name) > 0){
1408                 hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
1409         }else{
1410                 fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
1411         }
1412   }
1413
1414 //              Propagate levels of parallelism: children should have a level of parallelism
1415 //              as large as any of its parents.  Adjust children upwards to compensate.
1416 //              Start at parents and adjust children, auto-propagation will occur.
1417
1418   for(i=hfta_sets.size()-1;i>=0;i--){
1419         set<int>::iterator stsi;
1420         for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
1421                 if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
1422                         hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
1423                 }
1424         }
1425   }
1426
1427 //              Before all the name mangling, check if therey are any output_spec.cfg
1428 //              or hfta_parallelism.cfg entries that do not have a matching query.
1429
1430         string dangling_ospecs = "";
1431         for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
1432                 string oq = (*msii).first;
1433                 if(hfta_name_map.count(oq) == 0){
1434                         dangling_ospecs += " "+(*msii).first;
1435                 }
1436         }
1437         if(dangling_ospecs!=""){
1438                 fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
1439                 exit(1);
1440         }
1441
1442         string dangling_par = "";
1443         for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1444                 string oq = (*msii).first;
1445                 if(hfta_name_map.count(oq) == 0){
1446                         dangling_par += " "+(*msii).first;
1447                 }
1448         }
1449         if(dangling_par!=""){
1450                 fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
1451         }
1452
1453
1454
1455 //              Replicate parallelized hftas.  Do __copyX name mangling.  Adjust
1456 //              FROM clauses: retarget any name which is an internal node, and
1457 //              any which is in hfta_sets (and which is parallelized). Add Merge nodes
1458 //              when the source hfta has more parallelism than the target node.
1459 //              Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
1460
1461
1462   int n_original_hfta_sets = hfta_sets.size();
1463   for(i=0;i<n_original_hfta_sets;++i){
1464         if(hfta_sets[i]->n_parallel > 1){
1465                 hfta_sets[i]->do_generation =false;     // set the deletion flag for this entry.
1466                 set<string> local_nodes;                // names of query nodes in the hfta.
1467                 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1468                         local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
1469                 }
1470
1471                 for(p=0;p<hfta_sets[i]->n_parallel;++p){
1472                         string mangler = "__copy"+int_to_string(p);
1473                         hfta_node *par_hfta  = new hfta_node();
1474                         par_hfta->name = hfta_sets[i]->name + mangler;
1475                         par_hfta->source_name = hfta_sets[i]->name;
1476                         par_hfta->is_udop = hfta_sets[i]->is_udop;
1477                         par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
1478                         par_hfta->n_parallel = hfta_sets[i]->n_parallel;
1479                         par_hfta->parallel_idx = p;
1480
1481                         map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
1482
1483 //      Is it a UDOP?
1484                         if(hfta_sets[i]->is_udop){
1485                                 int root = hfta_sets[i]->query_node_indices[0];
1486
1487                                 string unequal_par_sources;
1488                                 set<int>::iterator rfsii;
1489                                 for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
1490                                         if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
1491                                                 unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
1492                                         }
1493                                 }
1494                                 if(unequal_par_sources != ""){
1495                                         fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
1496                                         exit(1);
1497                                 }
1498
1499                                 int rti;
1500                                 vector<string> new_sources;
1501                                 for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
1502                                         new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
1503                                 }
1504
1505                                 query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
1506                                 new_qn->name += mangler;
1507                                 new_qn->mangler = mangler;
1508                                 new_qn->refd_tbls = new_sources;
1509                                 par_hfta->query_node_indices.push_back(qnodes.size());
1510                                 par_qnode_map[new_qn->name] = qnodes.size();
1511                                 name_node_map[ new_qn->name ] = qnodes.size();
1512                                 qnodes.push_back(new_qn);
1513                         }else{
1514 //              regular query node
1515                           for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1516                                 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1517                                 table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
1518 //                                      rehome the from clause on mangled names.
1519 //                                      create merge nodes as needed for external sources.
1520                                 for(f=0;f<dup_pt->fm->tlist.size();++f){
1521                                         if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
1522                                                 dup_pt->fm->tlist[f]->schema_name += mangler;
1523                                         }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
1524 //                      Ref's an external HFTA.  No parallelism => leave as is.  Else, if level of parallelism of the two hftas is the same, rename by the mangler.  Else, there mnust be more sources, so create a merge node.
1525                                                 int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
1526                                                 if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
1527                                                         dup_pt->fm->tlist[f]->schema_name += mangler;
1528                                                 }else{
1529                                                         vector<string> src_tbls;
1530                                                         int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
1531                                                         if(stride == 0){
1532                                                                 fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
1533                                                                 exit(1);
1534                                                         }
1535                                                         for(s=0;s<stride;++s){
1536                                                                 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
1537                                                                 src_tbls.push_back(ext_src_name);
1538                                                         }
1539                                                         table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1540                                                         string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
1541                                                         dup_pt->fm->tlist[f]->schema_name = merge_node_name;
1542 //                                      Make a qnode to represent the new merge node
1543                                                         query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1544                                                         qn_pt->refd_tbls = src_tbls;
1545                                                         qn_pt->is_udop  = false;
1546                                                         qn_pt->is_externally_visible = false;
1547                                                         qn_pt->inferred_visible_node  = false;
1548                                                         par_hfta->query_node_indices.push_back(qnodes.size());
1549                                                         par_qnode_map[merge_node_name] = qnodes.size();
1550                                                         name_node_map[ merge_node_name ] = qnodes.size();
1551                                                         qnodes.push_back(qn_pt);
1552                                                 }
1553                                         }
1554                                 }
1555                                 query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
1556                                 for(f=0;f<dup_pt->fm->tlist.size();++f){
1557                                         new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
1558                                 }
1559                                 new_qn->params = qnodes[hqn_idx]->params;
1560                                 new_qn->is_udop = false;
1561                                 new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
1562                                 new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
1563                                 par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
1564                                 par_qnode_map[new_qn->name] = qnodes.size();
1565                                 name_node_map[ new_qn->name ] = qnodes.size();
1566                                 qnodes.push_back(new_qn);
1567                           }
1568                         }
1569                         hfta_name_map[par_hfta->name] = hfta_sets.size();
1570                         hfta_sets.push_back(par_hfta);
1571                 }
1572         }else{
1573 //              This hfta isn't being parallelized, but add merge nodes for any parallelized
1574 //              hfta sources.
1575                 if(!hfta_sets[i]->is_udop){
1576                   for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1577                         int hqn_idx = hfta_sets[i]->query_node_indices[h];
1578                         for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
1579                                 if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
1580 //                      Ref's an external HFTA.  No parallelism => leave as is.  Else, if level of parallelism of the two hftas is the same, rename by the mangler.  Else, there mnust be more sources, so create a merge node.
1581                                         int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
1582                                         if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
1583                                                 vector<string> src_tbls;
1584                                                 for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
1585                                                         string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
1586                                                         src_tbls.push_back(ext_src_name);
1587                                                 }
1588                                                 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1589                                                 string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
1590                                                 qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
1591 //                                      Make a qnode to represent the new merge node
1592                                                 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1593                                                 qn_pt->refd_tbls = src_tbls;
1594                                                 qn_pt->is_udop  = false;
1595                                                 qn_pt->is_externally_visible = false;
1596                                                 qn_pt->inferred_visible_node  = false;
1597                                                 hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
1598                                                 name_node_map[ merge_node_name ] = qnodes.size();
1599                                                 qnodes.push_back(qn_pt);
1600                                         }
1601                                 }
1602                         }
1603                 }
1604           }
1605         }
1606   }
1607
1608 //                      Rebuild the reads_from / sources_to lists in the qnodes
1609   for(q=0;q<qnodes.size();++q){
1610         qnodes[q]->reads_from.clear();
1611         qnodes[q]->sources_to.clear();
1612   }
1613   for(q=0;q<qnodes.size();++q){
1614         for(s=0;s<qnodes[q]->refd_tbls.size();++s){
1615                 if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
1616                         int rf = name_node_map[qnodes[q]->refd_tbls[s]];
1617                         qnodes[q]->reads_from.insert(rf);
1618                         qnodes[rf]->sources_to.insert(q);
1619                 }
1620         }
1621   }
1622
1623 //                      Rebuild the reads_from / sources_to lists in hfta_sets
1624   for(q=0;q<hfta_sets.size();++q){
1625         hfta_sets[q]->reads_from.clear();
1626         hfta_sets[q]->sources_to.clear();
1627   }
1628   for(q=0;q<hfta_sets.size();++q){
1629         for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
1630                 int node = hfta_sets[q]->query_node_indices[s];
1631                 set<int>::iterator rfsii;
1632                 for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
1633                         if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
1634                                 hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
1635                                 hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
1636                         }
1637                 }
1638         }
1639   }
1640
1641 /*
1642 for(q=0;q<qnodes.size();++q){
1643  printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
1644  set<int>::iterator rsii;
1645  for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
1646   printf(" %d",(*rsii));
1647   printf(", and sources-to %d:",qnodes[q]->sources_to.size());
1648  for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
1649   printf(" %d",(*rsii));
1650  printf("\n");
1651 }
1652
1653 for(q=0;q<hfta_sets.size();++q){
1654  if(hfta_sets[q]->do_generation==false)
1655         continue;
1656  printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
1657  set<int>::iterator rsii;
1658  for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
1659   printf(" %d",(*rsii));
1660   printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
1661  for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
1662   printf(" %d",(*rsii));
1663  printf("\n");
1664 }
1665 */
1666
1667
1668
1669 //              Re-topo sort the hftas
1670   hfta_topsort.clear();
1671   workq.clear();
1672   int hnode_srcs_2[hfta_sets.size()];
1673   for(i=0;i<hfta_sets.size();++i){
1674         hnode_srcs_2[i] = 0;
1675         if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
1676                 workq.push_back(i);
1677         }
1678   }
1679
1680   while(workq.empty() == false){
1681         int     node = workq.front();
1682         workq.pop_front();
1683         hfta_topsort.push_back(node);
1684         set<int>::iterator stsii;
1685         for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
1686                 int child = (*stsii);
1687                 hnode_srcs_2[child]++;
1688                 if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
1689                         workq.push_back(child);
1690                 }
1691         }
1692   }
1693
1694 //              Ensure that all of the query_node_indices in hfta_sets are topologically
1695 //              sorted, don't rely on assumptions that all transforms maintain some kind of order.
1696   for(i=0;i<hfta_sets.size();++i){
1697         if(hfta_sets[i]->do_generation){
1698                 map<int,int> n_accounted;
1699                 vector<int> new_order;
1700                 workq.clear();
1701                 vector<int>::iterator vii;
1702                 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1703                         n_accounted[(*vii)]= 0;
1704                 }
1705                 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1706                         set<int>::iterator rfsii;
1707                         for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
1708                                 if(n_accounted.count((*rfsii)) == 0){
1709                                         n_accounted[(*vii)]++;
1710                                 }
1711                         }
1712                         if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
1713                                 workq.push_back((*vii));
1714                         }
1715                 }
1716
1717                 while(workq.empty() == false){
1718                         int node = workq.front();
1719                         workq.pop_front();
1720                         new_order.push_back(node);
1721                         set<int>::iterator stsii;
1722                         for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
1723                                 if(n_accounted.count((*stsii))){
1724                                         n_accounted[(*stsii)]++;
1725                                         if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
1726                                                 workq.push_back((*stsii));
1727                                         }
1728                                 }
1729                         }
1730                 }
1731                 hfta_sets[i]->query_node_indices = new_order;
1732         }
1733   }
1734
1735
1736
1737
1738
1739 ///                     Global checkng is done, start the analysis and translation
1740 ///                     of the query parse tree in the order specified by process_order
1741
1742
1743 //                      Get a list of the LFTAs for global lfta optimization
1744 //                              TODO: separate building operators from spliting lftas,
1745 //                                      that will make optimizations such as predicate pushing easier.
1746         vector<stream_query *> lfta_list;
1747         stream_query *rootq;
1748     int qi,qj;
1749
1750         map<string, map<int, string> > schema_of_schemaid; // ensure that schema IDs are unique to an <interface, schema>
1751
1752         for(qi=hfta_topsort.size()-1;qi>=0;--qi){
1753
1754         int hfta_id = hfta_topsort[qi];
1755     vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
1756
1757
1758
1759 //              Two possibilities, either its a UDOP, or its a collection of queries.
1760 //      if(qnodes[curr_list.back()]->is_udop)
1761         if(hfta_sets[hfta_id]->is_udop){
1762                 int node_id = curr_list.back();
1763                 int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
1764                 opview_entry *opv = new opview_entry();
1765
1766 //                      Many of the UDOP properties aren't currently used.
1767                 opv->parent_qname = "no_parent";
1768                 opv->root_name = qnodes[node_id]->name;
1769                 opv->view_name = qnodes[node_id]->file;
1770                 opv->pos = qi;
1771                 sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
1772                 opv->udop_alias = tmpstr;
1773                 opv->mangler = qnodes[node_id]->mangler;
1774
1775                 if(opv->mangler != ""){
1776                         int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
1777                         Schema->mangle_subq_names(new_udop_schref,opv->mangler);
1778                 }
1779
1780 //                      This piece of code makes each hfta which referes to the same udop
1781 //                      reference a distinct running udop.  Do this at query optimization time?
1782 //              fmtbl->set_udop_alias(opv->udop_alias);
1783
1784                 opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
1785                 opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
1786
1787                 vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
1788                 int s,f,q;
1789                 for(s=0;s<subq.size();++s){
1790 //                              Validate that the fields match.
1791                         subquery_spec *sqs = subq[s];
1792                         string subq_name = sqs->name + opv->mangler;
1793                         vector<field_entry *> flds = Schema->get_fields(subq_name);
1794                         if(flds.size() == 0){
1795                                 fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
1796                                 return(1);
1797                         }
1798                         if(flds.size() < sqs->types.size()){
1799                                 fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
1800                                 return(1);
1801                         }
1802                         bool failed = false;
1803                         for(f=0;f<sqs->types.size();++f){
1804                                 data_type dte(sqs->types[f],sqs->modifiers[f]);
1805                                 data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
1806                                 if(! dte.subsumes_type(&dtf) ){
1807                                         fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
1808                                         failed = true;
1809                                 }
1810 /*
1811                                 if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
1812                                         string pstr = dte.get_temporal_string();
1813                                         fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
1814                                         failed = true;
1815                                 }
1816 */
1817                         }
1818                         if(failed)
1819                                 return(1);
1820 ///                             Validation done, find the subquery, make a copy of the
1821 ///                             parse tree, and add it to the return list.
1822                         for(q=0;q<qnodes.size();++q)
1823                                 if(qnodes[q]->name == subq_name)
1824                                         break;
1825                         if(q==qnodes.size()){
1826                                 fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
1827                                 return(1);
1828                         }
1829
1830                 }
1831
1832 //                      Cross-link to from entry(s) in all sourced-to tables.
1833                 set<int>::iterator sii;
1834                 for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
1835 //printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
1836                         vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
1837                         int ii;
1838                         for(ii=0;ii<tblvars.size();++ii){
1839                                 if(tblvars[ii]->schema_name == opv->root_name){
1840                                         tblvars[ii]->set_opview_idx(opviews.size());
1841                                 }
1842
1843                         }
1844                 }
1845
1846                 opviews.append(opv);
1847         }else{
1848
1849 //                      Analyze the parse trees in this query,
1850 //                      put them in rootq
1851 //      vector<int> curr_list = process_sets[qi];
1852
1853
1854 ////////////////////////////////////////
1855
1856           rootq = NULL;
1857 //printf("Process set %d, has %d queries\n",qi,curr_list.size());
1858           for(qj=0;qj<curr_list.size();++qj){
1859                 i = curr_list[qj];
1860         fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
1861
1862 //                      Select the current query parse tree
1863                 table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
1864
1865 //                      if hfta only, try to fetch any missing schemas
1866 //                      from the registry (using the print_schema program).
1867 //                      Here I use a hack to avoid analyzing the query -- all referenced
1868 //                      tables must be in the from clause
1869 //                      If there is a problem loading any table, just issue a warning,
1870 //
1871                 tablevar_list_t *fm = fta_parse_tree->get_from();
1872                 vector<string> refd_tbls =  fm->get_src_tbls(Schema);
1873 //                      iterate over all referenced tables
1874                 int t;
1875                 for(t=0;t<refd_tbls.size();++t){
1876                   int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
1877
1878                   if(tbl_ref < 0){      // if this table is not in the Schema
1879
1880                         if(hfta_only){
1881                                 string cmd="print_schema "+refd_tbls[t];
1882                                 FILE *schema_in = popen(cmd.c_str(), "r");
1883                                 if(schema_in == NULL){
1884                                   fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
1885                                 }else{
1886                                   string schema_instr;
1887                                   while(fgets(tmpstr,TMPSTRLEN,schema_in)){
1888                                         schema_instr += tmpstr;
1889                                   }
1890                           fta_parse_result = new fta_parse_t();
1891                                   strcpy(tmp_schema_str,schema_instr.c_str());
1892                                   FtaParser_setstringinput(tmp_schema_str);
1893                           if(FtaParserparse()){
1894                                 fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
1895                           }else{
1896                                         if( fta_parse_result->tables != NULL){
1897                                                 int tl;
1898                                                 for(tl=0;tl<fta_parse_result->tables->size();++tl){
1899                                                         Schema->add_table(fta_parse_result->tables->get_table(tl));
1900                                                 }
1901                                         }else{
1902                                                 fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
1903                                         }
1904                                 }
1905                         }
1906                   }else{
1907                                 fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
1908                                 exit(1);
1909                   }
1910
1911                 }
1912           }
1913
1914
1915 //                              Analyze the query.
1916           query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
1917           if(qs == NULL){
1918                 fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
1919                 exit(1);
1920           }
1921
1922           stream_query new_sq(qs, Schema);
1923           if(new_sq.error_code){
1924                         fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
1925                         exit(1);
1926           }
1927
1928 //                      Add it to the Schema
1929           table_def *output_td = new_sq.get_output_tabledef();
1930           Schema->add_table(output_td);
1931
1932 //                      Create a query plan from the analyzed parse tree.
1933 //                      If its a query referneced via FROM, add it to the stream query.
1934           if(rootq){
1935                 rootq->add_query(new_sq);
1936           }else{
1937                 rootq = new stream_query(new_sq);
1938 //                      have the stream query object inherit properties form the analyzed
1939 //                      hfta_node object.
1940                 rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
1941                 rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
1942           }
1943
1944
1945     }
1946
1947 //              This stream query has all its parts
1948 //              Build and optimize it.
1949 //printf("translate_fta: generating plan.\n");
1950         if(rootq->generate_plan(Schema)){
1951                 fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
1952                 continue;
1953         }
1954
1955 //      If we've found the query plan head, so now add the output operators
1956         if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
1957                 pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
1958                 multimap<string, int>::iterator mmsi;
1959                 oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
1960                 for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
1961                         rootq->add_output_operator(output_specs[(*mmsi).second]);
1962                 }
1963         }
1964
1965
1966
1967 //                              Perform query splitting if necessary.
1968         bool hfta_returned;
1969     vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
1970
1971         int l;
1972 //for(l=0;l<split_queries.size();++l){
1973 //printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
1974 //}
1975
1976
1977
1978
1979     if(split_queries.size() > 0){       // should be at least one component.
1980
1981 //                              Compute the number of LFTAs.
1982           int n_lfta = split_queries.size();
1983           if(hfta_returned) n_lfta--;
1984 //                              Check if a schemaId constraint needs to be inserted.
1985
1986 //                              Process the LFTA components.
1987           for(l=0;l<n_lfta;++l){
1988            if(lfta_names.count(split_queries[l]->query_name) == 0){
1989 //                              Grab the lfta for global optimization.
1990                 vector<tablevar_t *> tvec =  split_queries[l]->query_plan[0]->get_input_tbls();
1991                 string liface = "_local_";
1992 //              string lmach = "";
1993                 string lmach = hostname;
1994                 if(tvec.size()>0){
1995                         liface = tvec[0]->get_interface();      // iface queries have been resolved
1996                         if(tvec[0]->get_machine() != ""){
1997                                 lmach = tvec[0]->get_machine();
1998                         }else{
1999                                 fprintf(stderr,"WARNING, lfta %s has empty machine name, using %s\n",  split_queries[l]->query_plan[0]->node_name.c_str(), hostname.c_str());
2000                         }
2001                 } // else{
2002                         interface_names.push_back(liface);
2003                         machine_names.push_back(lmach);
2004 //              }
2005
2006                 vector<predicate_t *> schemaid_preds;
2007                 for(int irv=0;irv<tvec.size();++irv){
2008
2009                         string schema_name = tvec[irv]->get_schema_name();
2010                         string rvar_name = tvec[irv]->get_var_name();
2011                         int schema_ref = tvec[irv]->get_schema_ref();
2012                         if (lmach == "")
2013                                 lmach = hostname;
2014 //                      interface_names.push_back(liface);
2015 //                      machine_names.push_back(lmach);
2016
2017 //printf("Machine is %s\n",lmach.c_str());
2018
2019 //                              Check if a schemaId constraint needs to be inserted.
2020                         if(schema_ref<0){ // can result from some kinds of splits
2021                                 schema_ref = Schema->get_table_ref(schema_name);
2022                         }
2023                         int schema_id = Schema->get_schema_id(schema_ref);  // id associated with PROTOCOL
2024                         int errnum = 0;
2025                         string if_error;
2026                         iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
2027                         if(iface==NULL){
2028                                 fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
2029                                 exit(1);
2030                         }       
2031
2032
2033                         if(tvec[irv]->get_interface() != "_local_"){
2034                          if(iface->has_multiple_schemas()){
2035                                 if(schema_id<0){        // invalid schema_id
2036                                         fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
2037                                         exit(1);
2038                                 }
2039                                 vector<string> iface_schemas = iface->get_property("Schemas");
2040                                 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
2041                                         fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
2042                                         exit(1);
2043                                 }
2044 // Ensure that in liface, schema_id is used for only one schema
2045                                 if(schema_of_schemaid.count(liface)==0){
2046                                         map<int, string> empty_map;
2047                                         schema_of_schemaid[liface] = empty_map;
2048                                 }
2049                                 if(schema_of_schemaid[liface].count(schema_id)==0){
2050                                         schema_of_schemaid[liface][schema_id] = schema_name;
2051                                 }else{
2052                                         if(schema_of_schemaid[liface][schema_id] != schema_name){
2053                                                 fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
2054                                                 exit(1);
2055                                         }
2056                                 }
2057                          }else{ // single-schema interface
2058                                 schema_id = -1; // don't generate schema_id predicate
2059                                 vector<string> iface_schemas = iface->get_property("Schemas");
2060                                 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
2061                                         fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
2062                                         exit(1);
2063                                 }
2064                                 if(iface_schemas.size()>1){
2065                                         fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
2066                                         exit(1);
2067                                 }
2068                          }                      
2069                         }else{
2070                                 schema_id = -1;
2071                         }
2072
2073 // If we need to check the schema_id, insert a predicate into the lfta.
2074 //       TODO not just schema_id, the full all_schema_ids set.
2075                         if(schema_id>=0){
2076                                 colref_t *schid_cr = new colref_t("schemaId");
2077                                 schid_cr->schema_ref = schema_ref;
2078                                 schid_cr->table_name = rvar_name;
2079                                 schid_cr->tablevar_ref = 0;
2080                                 schid_cr->default_table = false;
2081                                 scalarexp_t *schid_se = new scalarexp_t(schid_cr);
2082                                 data_type *schid_dt = new data_type("uint");
2083                                 schid_se->dt = schid_dt;
2084
2085                                 string schid_str = int_to_string(schema_id);
2086                                 literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
2087                                 scalarexp_t *lit_se = new scalarexp_t(schid_lit);
2088                                 lit_se->dt = schid_dt;
2089
2090                                 predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
2091                                 vector<cnf_elem *> clist;
2092                                 make_cnf_from_pr(schid_pr, clist);
2093                                 analyze_cnf(clist[0]);
2094                                 clist[0]->cost = 1;     // cheap one comparison
2095 // cnf built, now insert it.
2096                                 split_queries[l]->query_plan[0]->append_to_where(clist[0]);
2097
2098 // Specialized processing 
2099 //              filter join, get two schemaid preds
2100                                 string node_type = split_queries[l]->query_plan[0]->node_type();
2101                                 if(node_type == "filter_join"){
2102                                         filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2103                                         if(irv==0){
2104                                                 fj->pred_t0.push_back(clist[0]);
2105                                         }else{
2106                                                 fj->pred_t1.push_back(clist[0]);
2107                                         }
2108                                         schemaid_preds.push_back(schid_pr);
2109                                 }
2110 //              watchlist join, get the first schemaid pred
2111                                 if(node_type == "watch_join"){
2112                                         watch_join_qpn *fj = (watch_join_qpn *)split_queries[l]->query_plan[0];
2113                                         if(irv==0){
2114                                                 fj->pred_t0.push_back(clist[0]);
2115                                                 schemaid_preds.push_back(schid_pr);
2116                                         }
2117                                 }
2118                         }
2119                 }
2120 // Specialized processing, currently filter join.
2121                 if(schemaid_preds.size()>1){
2122                         string node_type = split_queries[l]->query_plan[0]->node_type();
2123                         if(node_type == "filter_join"){
2124                                 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2125                                 predicate_t *filter_pr = new predicate_t("OR", schemaid_preds[0], schemaid_preds[1]);
2126                                 vector<cnf_elem *> clist;
2127                                 make_cnf_from_pr(filter_pr, clist);
2128                                 analyze_cnf(clist[0]);
2129                                 clist[0]->cost = 1;     // cheap one comparison
2130                                 fj->shared_pred.push_back(clist[0]);
2131                         }
2132                 }
2133                         
2134
2135
2136
2137
2138
2139
2140 //                      Set the ht size from the recommendation, if there is one in the rec file
2141                 if(lfta_htsize.count(split_queries[l]->query_name)>0){
2142                         split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
2143                 }
2144
2145
2146                 lfta_names[split_queries[l]->query_name] = lfta_list.size();
2147                 split_queries[l]->set_gid(lfta_list.size());  // set lfta global id
2148                 lfta_list.push_back(split_queries[l]);
2149                 lfta_mach_lists[lmach].push_back(split_queries[l]);
2150
2151 //                      THe following is a hack,
2152 //                      as I should be generating LFTA code through
2153 //                      the stream_query object.
2154
2155                 split_queries[l]->query_plan[0]->bind_to_schema(Schema);
2156
2157 //              split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
2158
2159 /*
2160 //                              Create query description to embed in lfta.c
2161                 string lfta_schema_str = split_queries[l]->make_schema();
2162                 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2163
2164 //                              get NIC capabilities.
2165                 int erri;
2166                 nic_property *nicprop = NULL;
2167                 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2168                 if(iface_codegen_type.size()){
2169                         nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2170                         if(!nicprop){
2171                                 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2172                                         exit(1);
2173                         }
2174                 }
2175
2176                 lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
2177 */
2178
2179                 snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "snap"));
2180                 snap_position.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "index"));
2181
2182 // STOPPED HERE need to figure out how to generate the code that Vlad needs
2183 //              from snap_postion
2184
2185 // TODO NOTE : I'd like it to be the case that registration_query_names
2186 //      are the queries to be registered for subsciption.
2187 //      but there is some complex bookkeeping here.
2188                 registration_query_names.push_back(split_queries[l]->query_name);
2189                 mach_query_names[lmach].push_back(registration_query_names.size()-1);
2190 //                      NOTE: I will assume a 1-1 correspondance between
2191 //                      mach_query_names[lmach] and lfta_mach_lists[lmach]
2192 //                      where mach_query_names[lmach][i] contains the index into
2193 //                      query_names, which names the lfta, and
2194 //                      mach_query_names[lmach][i] is the stream_query * of the
2195 //                      corresponding lfta.
2196 //                      Later, lfta_iface_qnames are the query names matching lfta_iface_lists
2197
2198
2199
2200                 // check if lfta is reusable
2201                 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
2202
2203                 bool lfta_reusable = false;
2204                 if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
2205                         split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
2206                         lfta_reusable = true;
2207                 }
2208                 lfta_reuse_options.push_back(lfta_reusable);
2209
2210                 // LFTA will inherit the liveness timeout specification from the containing query
2211                 // it is too conservative as lfta are expected to spend less time per tuple
2212                 // then full query
2213
2214                 // extract liveness timeout from query definition
2215                 int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
2216                 if (!liveness_timeout) {
2217 //                  fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
2218 //                    split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2219                     liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2220                 }
2221                 lfta_liveness_timeouts.push_back(liveness_timeout);
2222
2223 //                      Add it to the schema
2224                 table_def *td = split_queries[l]->get_output_tabledef();
2225                 Schema->append_table(td);
2226 //printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
2227
2228           }
2229          }
2230
2231 //                              If the output is lfta-only, dump out the query name.
2232       if(split_queries.size() == 1 && !hfta_returned){
2233         if(output_query_names ){
2234            fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
2235                 }
2236 /*
2237 else{
2238            fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
2239                 }
2240 */
2241
2242       }
2243 //                              output schema summary
2244           if(output_schema_summary){
2245                 for(int o=0;o<split_queries.size(); ++o){
2246                         dump_summary(split_queries[o]);
2247                 }
2248           }
2249
2250
2251           if(hfta_returned){            // query also has an HFTA component
2252                 int hfta_nbr = split_queries.size()-1;
2253
2254                         hfta_list.push_back(split_queries[hfta_nbr]);
2255
2256 //                                      report on generated query names
2257         if(output_query_names){
2258                         string hfta_name =split_queries[hfta_nbr]->query_name;
2259                 fprintf(query_name_output,"%s H\n",hfta_name.c_str());
2260                         for(l=0;l<hfta_nbr;++l){
2261                                 string lfta_name =split_queries[l]->query_name;
2262                         fprintf(query_name_output,"%s L\n",lfta_name.c_str());
2263                         }
2264                 }
2265 //              else{
2266 //              fprintf(stderr,"query names are ");
2267 //                      for(l=0;l<hfta_nbr;++l){
2268 //                              if(l>0) fprintf(stderr,",");
2269 //                              string fta_name =split_queries[l]->query_name;
2270 //                      fprintf(stderr," %s",fta_name.c_str());
2271 //                      }
2272 //                      fprintf(stderr,"\n");
2273 //              }
2274           }
2275
2276   }else{
2277           fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
2278           fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
2279           exit(1);
2280   }
2281  }
2282 }
2283
2284
2285 //-----------------------------------------------------------------
2286 //              Compute and propagate the SE in PROTOCOL fields compute a field.
2287 //-----------------------------------------------------------------
2288
2289 for(i=0;i<lfta_list.size();i++){
2290         lfta_list[i]->generate_protocol_se(sq_map, Schema);
2291         sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
2292 }
2293 for(i=0;i<hfta_list.size();i++){
2294         hfta_list[i]->generate_protocol_se(sq_map, Schema);
2295         sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
2296 }
2297
2298
2299
2300 //------------------------------------------------------------------------
2301 //              Perform  individual FTA optimizations
2302 //-----------------------------------------------------------------------
2303
2304 if (partitioned_mode) {
2305
2306         // open partition definition file
2307         string part_fname = config_dir_path + "partition.txt";
2308
2309         FILE* partfd = fopen(part_fname.c_str(), "r");
2310         if (!partfd) {
2311                 fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
2312                 exit(1);
2313         }
2314         PartnParser_setfileinput(partfd);
2315         if (PartnParserparse()) {
2316                 fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
2317                 exit(1);
2318         }
2319         fclose(partfd);
2320 }
2321
2322
2323 print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2324
2325 int num_hfta = hfta_list.size();
2326 for(i=0; i < hfta_list.size(); ++i){
2327         hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
2328 }
2329
2330 //                      Add all new hftas to schema
2331 for(i=num_hfta; i < hfta_list.size(); ++i){
2332                 table_def *td = hfta_list[i]->get_output_tabledef();
2333                 Schema->append_table(td);
2334 }
2335
2336 print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2337
2338
2339
2340 //------------------------------------------------------------------------
2341 //              Do global (cross-fta) optimization
2342 //-----------------------------------------------------------------------
2343
2344
2345
2346
2347
2348
2349 set<string> extra_external_libs;
2350
2351 for(i=0;i<hfta_list.size();++i){                // query also has an HFTA component
2352
2353                 if(! debug_only){
2354 //                                      build hfta file name, create output
2355            if(numeric_hfta_flname){
2356             sprintf(tmpstr,"hfta_%d",hfta_count);
2357                         hfta_names.push_back(tmpstr);
2358             sprintf(tmpstr,"hfta_%d.cc",hfta_count);
2359          }else{
2360             sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
2361                         hfta_names.push_back(tmpstr);
2362             sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
2363           }
2364                   FILE *hfta_fl = fopen(tmpstr,"w");
2365                   if(hfta_fl == NULL){
2366                         fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
2367                         exit(1);
2368                   }
2369                   fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
2370
2371 //                      If there is a field verifier, warn about
2372 //                      lack of compatability
2373 //                              NOTE : this code assumes that visible non-lfta queries
2374 //                              are those at the root of a stream query.
2375                   string hfta_comment;
2376                   string hfta_title;
2377                   string hfta_namespace;
2378                   if(hfta_list[i]->defines.count("comment")>0)
2379                         hfta_comment = hfta_list[i]->defines["comment"];
2380                   if(hfta_list[i]->defines.count("Comment")>0)
2381                         hfta_comment = hfta_list[i]->defines["Comment"];
2382                   if(hfta_list[i]->defines.count("COMMENT")>0)
2383                         hfta_comment = hfta_list[i]->defines["COMMENT"];
2384                   if(hfta_list[i]->defines.count("title")>0)
2385                         hfta_title = hfta_list[i]->defines["title"];
2386                   if(hfta_list[i]->defines.count("Title")>0)
2387                         hfta_title = hfta_list[i]->defines["Title"];
2388                   if(hfta_list[i]->defines.count("TITLE")>0)
2389                         hfta_title = hfta_list[i]->defines["TITLE"];
2390                   if(hfta_list[i]->defines.count("namespace")>0)
2391                         hfta_namespace = hfta_list[i]->defines["namespace"];
2392                   if(hfta_list[i]->defines.count("Namespace")>0)
2393                         hfta_namespace = hfta_list[i]->defines["Namespace"];
2394                   if(hfta_list[i]->defines.count("NAMESPACE")>0)
2395                         hfta_namespace = hfta_list[i]->defines["NAMESPACE"];
2396
2397                   if(field_verifier != NULL){
2398                         string warning_str;
2399                         if(hfta_comment == "")
2400                                 warning_str += "\tcomment not found.\n";
2401
2402 // Obsolete stuff that Carsten wanted
2403 //                      if(hfta_title == "")
2404 //                              warning_str += "\ttitle not found.\n";
2405 //                      if(hfta_namespace == "")
2406 //                              warning_str += "\tnamespace not found.\n";
2407
2408 //      There is a get_tbl_keys method implemented for qp_nodes,
2409 //      integrate it into steam_query, then call it to find keys,
2410 //      and annotate feidls with their key-ness.
2411 //      If there is a "keys" proprty in the defines block, override anything returned
2412 //      from the automated analysis
2413
2414                         vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2415                         int fi;
2416                         for(fi=0;fi<flds.size();fi++){
2417                                 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2418                         }
2419                         if(warning_str != "")
2420                                 fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
2421                                         hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
2422                   }
2423
2424 // Get the fields in this query
2425                   vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2426
2427 // do key processing
2428                   string hfta_keys_s = "";
2429                   if(hfta_list[i]->defines.count("keys")>0)
2430                         hfta_keys_s = hfta_list[i]->defines["keys"];
2431                   if(hfta_list[i]->defines.count("Keys")>0)
2432                         hfta_keys_s = hfta_list[i]->defines["Keys"];
2433                   if(hfta_list[i]->defines.count("KEYS")>0)
2434                         hfta_keys_s = hfta_list[i]->defines["KEYS"];
2435                   string xtra_keys_s = "";
2436                   if(hfta_list[i]->defines.count("extra_keys")>0)
2437                         xtra_keys_s = hfta_list[i]->defines["extra_keys"];
2438                   if(hfta_list[i]->defines.count("Extra_Keys")>0)
2439                         xtra_keys_s = hfta_list[i]->defines["Extra_Keys"];
2440                   if(hfta_list[i]->defines.count("EXTRA_KEYS")>0)
2441                         xtra_keys_s = hfta_list[i]->defines["EXTRA_KEYS"];
2442 // get the keys
2443                   vector<string> hfta_keys;
2444                   vector<string> partial_keys;
2445                   vector<string> xtra_keys;
2446                   if(hfta_keys_s==""){
2447                                 hfta_keys = hfta_list[i]->get_tbl_keys(partial_keys);
2448                                 if(xtra_keys_s.size()>0){
2449                                         xtra_keys = split_string(xtra_keys_s, ',');
2450                                 }
2451                                 for(int xi=0;xi<xtra_keys.size();++xi){
2452                                         if(find(hfta_keys.begin(), hfta_keys.end(), xtra_keys[xi]) == hfta_keys.end()){
2453                                                 hfta_keys.push_back(xtra_keys[xi]);
2454                                         }
2455                                 }
2456                   }else{
2457                                 hfta_keys = split_string(hfta_keys_s, ',');
2458                   }
2459 // validate that all of the keys exist in the output.
2460 //      (exit on error, as its a bad specificiation)
2461                   vector<string> missing_keys;
2462                   for(int ki=0;ki<hfta_keys.size(); ++ki){
2463                         int fi;
2464                         for(fi=0;fi<flds.size();++fi){
2465                                 if(hfta_keys[ki] == flds[fi]->get_name())
2466                                         break;
2467                         }
2468                         if(fi==flds.size())
2469                                 missing_keys.push_back(hfta_keys[ki]);
2470                   }
2471                   if(missing_keys.size()>0){
2472                         fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the output:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2473                         for(int hi=0; hi<missing_keys.size(); ++hi){
2474                                 fprintf(stderr," %s", missing_keys[hi].c_str());
2475                         }
2476                         fprintf(stderr,"\n");
2477                         exit(1);
2478                   }
2479
2480                   fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2481                   if(hfta_comment != "")
2482                         fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
2483                   if(hfta_title != "")
2484                         fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
2485                   if(hfta_namespace != "")
2486                         fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
2487                   fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
2488                   fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2489
2490 //                              write info about fields to qtree.xml
2491                   int fi;
2492                   for(fi=0;fi<flds.size();fi++){
2493                         fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2494                         if(flds[fi]->get_modifier_list()->size()){
2495                                 fprintf(qtree_output,"mods='%s'", flds[fi]->get_modifier_list()->to_string().c_str());
2496                         }
2497                         fprintf(qtree_output," />\n");
2498                   }
2499 // info about keys
2500                   for(int hi=0;hi<hfta_keys.size();++hi){
2501                         fprintf(qtree_output, "\t\t<Key value='%s'/>\n",hfta_keys[hi].c_str());
2502                   }
2503                   for(int hi=0;hi<partial_keys.size();++hi){
2504                         fprintf(qtree_output, "\t\t<PartialKey value='%s'/>\n",partial_keys[hi].c_str());
2505                   }
2506                   for(int hi=0;hi<xtra_keys.size();++hi){
2507                         fprintf(qtree_output, "\t\t<ExtraKey value='%s'/>\n",xtra_keys[hi].c_str());
2508                   }
2509
2510
2511                   // extract liveness timeout from query definition
2512                   int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
2513                   if (!liveness_timeout) {
2514 //                  fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
2515 //                    hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2516                     liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2517                   }
2518                   fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
2519
2520                   vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
2521                   int itv;
2522                   for(itv=0;itv<tmp_tv.size();++itv){
2523                         fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
2524                   }
2525                   string ifrs = hfta_list[i]->collect_refd_ifaces();
2526                   if(ifrs != ""){
2527                         fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
2528                   }
2529                   fprintf(qtree_output,"\t</HFTA>\n");
2530
2531                   fclose(hfta_fl);
2532                 }else{
2533 //                                      debug only -- do code generation to catch generation-time errors.
2534                   hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
2535                 }
2536
2537                 hfta_count++;   // for hfta file names with numeric suffixes
2538
2539                 hfta_list[i]->get_external_libs(extra_external_libs);
2540
2541           }
2542
2543 string ext_lib_string;
2544 set<string>::iterator ssi_el;
2545 for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
2546         ext_lib_string += (*ssi_el)+" ";
2547
2548
2549
2550 //                      Report on the set of operator views
2551   for(i=0;i<opviews.size();++i){
2552         opview_entry *opve = opviews.get_entry(i);
2553         fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
2554         fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
2555         fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
2556         fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
2557         fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2558
2559         if (!opve->liveness_timeout) {
2560 //              fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
2561 //                      opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2562                 opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2563         }
2564         fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
2565     int j;
2566         for(j=0;j<opve->subq_names.size();j++)
2567                 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
2568         fprintf(qtree_output,"\t</UDOP>\n");
2569   }
2570
2571
2572 //-----------------------------------------------------------------
2573
2574 //                      Create interface-specific meta code files.
2575 //                              first, open and parse the interface resources file.
2576         ifaces_db = new ifq_t();
2577     ierr = "";
2578         if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
2579                 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
2580                                 ifx_fname.c_str(), ierr.c_str());
2581                 exit(1);
2582         }
2583
2584         map<string, vector<stream_query *> >::iterator svsi;
2585         for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
2586                 string lmach = (*svsi).first;
2587
2588         //              For this machine, create a set of lftas per interface.
2589                 vector<stream_query *> mach_lftas = (*svsi).second;
2590                 map<string, vector<stream_query *> > lfta_iface_lists;
2591                 int li;
2592                 for(li=0;li<mach_lftas.size();++li){
2593                         vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2594                         string lfta_iface = "_local_";
2595                         if(tvec.size()>0){
2596                                 string lfta_iface = tvec[0]->get_interface();
2597                         }
2598                         lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2599                 }
2600
2601                 map<string, vector<stream_query *> >::iterator lsvsi;
2602                 for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
2603                         int erri;
2604                         string liface = (*lsvsi).first;
2605                         vector<stream_query *> iface_lftas = (*lsvsi).second;
2606                         vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2607                         if(iface_codegen_type.size()){
2608                                 nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2609                                 if(!nicprop){
2610                                         fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2611                                         exit(1);
2612                                 }
2613                                 string mcs = generate_nic_code(iface_lftas, nicprop);
2614                                 string mcf_flnm;
2615                                 if(lmach != "")
2616                                   mcf_flnm = lmach + "_"+liface+".mcf";
2617                                 else
2618                                   mcf_flnm = hostname + "_"+liface+".mcf";
2619                                 FILE *mcf_fl ;
2620                                 if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
2621                                         fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
2622                                         exit(1);
2623                                 }
2624                                 fprintf(mcf_fl,"%s",mcs.c_str());
2625                                 fclose(mcf_fl);
2626 //printf("mcs of machine %s, iface %s of type %s is \n%s\n",
2627 //lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
2628                         }
2629                 }
2630
2631
2632         }
2633
2634
2635
2636 //-----------------------------------------------------------------
2637
2638
2639 //                      Find common filter predicates in the LFTAs.
2640 //                      in addition generate structs to store the
2641 //                      temporal attributes unpacked by prefilter
2642 //                      compute & provide interface for per-interface
2643 //                      record extraction properties
2644         
2645         map<string, vector<stream_query *> >::iterator ssqi;
2646         for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
2647
2648                 string lmach = (*ssqi).first;
2649                 bool packed_return = false;
2650                 int li, erri;
2651
2652
2653 //      The LFTAs of this machine.
2654                 vector<stream_query *> mach_lftas = (*ssqi).second;
2655 //      break up on a per-interface basis.
2656                 map<string, vector<stream_query *> > lfta_iface_lists;
2657                 map<string, vector<int> > lfta_iface_qname_ix; // need the query name
2658                                                         // for fta_init
2659                 for(li=0;li<mach_lftas.size();++li){
2660                         vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2661                         string lfta_iface = "_local_";
2662                         if(tvec.size()>0){
2663                                 lfta_iface = tvec[0]->get_interface();
2664                         }
2665                         lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2666                         lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
2667                 }
2668
2669
2670 //      Are the return values "packed"?
2671 //      This should be done on a per-interface basis.
2672 //      But this is defunct code for gs-lite
2673                 for(li=0;li<mach_lftas.size();++li){
2674                   vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2675                   string liface = "_local_";
2676                   if(tvec.size()>0){
2677                          liface = tvec[0]->get_interface();
2678                   }
2679                   vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2680                   if(iface_codegen_type.size()){
2681                         if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
2682                         packed_return = true;
2683                         }
2684                   }
2685                 }
2686
2687
2688 // Separate lftas by interface, collect results on a per-interface basis.
2689
2690                 vector<cnf_set *> no_preds;     // fallback if there is no prefilter
2691                 map<string, vector<cnf_set *> > prefilter_preds;
2692                 set<unsigned int> pred_ids;     // this can be global for all interfaces
2693                 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2694                         string liface = (*mvsi).first;
2695                         vector<cnf_set *> empty_list;
2696                         prefilter_preds[liface] = empty_list;
2697                         if(! packed_return){
2698                                 get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
2699                         }
2700
2701 //                              get NIC capabilities.  (Is this needed?)
2702                         nic_property *nicprop = NULL;
2703                         vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2704                         if(iface_codegen_type.size()){
2705                                 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2706                                 if(!nicprop){
2707                                         fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2708                                         exit(1);
2709                                 }
2710                         }
2711                 }
2712
2713
2714 //              Now that we know the prefilter preds, generate the lfta code.
2715 //      Do this for all lftas in this machine.
2716                 for(li=0;li<mach_lftas.size();++li){
2717                         set<unsigned int> subsumed_preds;
2718                         set<unsigned int>::iterator sii;
2719 #ifdef PREFILTER_OK
2720                         for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
2721                                 int pid = (*sii);
2722                                 if((pid>>16) == li){
2723                                         subsumed_preds.insert(pid & 0xffff);
2724                                 }
2725                         }
2726 #endif
2727                         string lfta_schema_str = mach_lftas[li]->make_schema();
2728                         string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2729                         nic_property *nicprop = NULL; // no NIC properties?
2730                         lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
2731                 }
2732
2733
2734 //                      generate structs to store the temporal attributes
2735 //                      unpacked by prefilter
2736                 col_id_set temp_cids;
2737                 get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
2738                 lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
2739
2740 //                      Compute the lfta bit signatures and the lfta colrefs
2741 //      do this on a per-interface basis
2742 #ifdef PREFILTER_OK
2743                         lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2744 #endif
2745                 map<string, vector<long long int> > lfta_sigs; // used again later
2746                 map<string, int> lfta_snap_pos; // optimize csv parsing
2747                                                                         // compute now, use in get_iface_properties
2748                 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2749                         string liface = (*mvsi).first;
2750                         vector<long long int> empty_list;
2751                         lfta_sigs[liface] = empty_list;
2752                         lfta_snap_pos[liface] = -1;
2753
2754                         vector<col_id_set> lfta_cols;
2755                         vector<int> lfta_snap_length;
2756                         for(li=0;li<lfta_iface_lists[liface].size();++li){
2757                                 unsigned long long int mask=0, bpos=1;
2758                                 int f_pos;
2759                                 for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
2760                                         if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
2761                                                 mask |= bpos;
2762                                         bpos = bpos << 1;
2763                                 }
2764                                 lfta_sigs[liface].push_back(mask);
2765                                 lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
2766                                 lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "snap"));
2767                                 int this_snap_pos = compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "index");
2768                                 if(this_snap_pos > lfta_snap_pos[liface])
2769                                         lfta_snap_pos[liface]  = this_snap_pos;
2770                         }
2771
2772 //for(li=0;li<mach_lftas.size();++li){
2773 //printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
2774 //col_id_set::iterator tcisi;
2775 //for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
2776 //printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
2777 //}
2778 //}
2779
2780
2781 //                      generate the prefilter
2782 //      Do this on a per-interface basis, except for the #define
2783 #ifdef PREFILTER_OK
2784 //                      lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2785                         lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2786 #else
2787                         lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns,  lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2788
2789 #endif
2790                 }
2791
2792 //                      Generate interface parameter lookup function
2793           lfta_val[lmach] += "// lookup interface properties by name\n";
2794           lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
2795           lfta_val[lmach] += "// returns NULL if given property does not exist\n";
2796           lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
2797
2798 //        collect a lit of interface names used by queries running on this host
2799           set<std::string> iface_names;
2800           for(i=0;i<mach_query_names[lmach].size();i++){
2801                 int mi = mach_query_names[lmach][i];
2802                 stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2803
2804                 if(interface_names[mi]=="")
2805                         iface_names.insert("DEFAULTDEV");
2806                 else
2807                         iface_names.insert(interface_names[mi]);
2808           }
2809
2810 //        generate interface property lookup code for every interface
2811           set<std::string>::iterator sir;
2812           for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
2813                 if (sir == iface_names.begin())
2814                         lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2815                 else
2816                         lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2817
2818                 // iterate through interface properties
2819                 vector<string> iface_properties;
2820                 if(*sir!="_local_"){    // dummy watchlist interface, don't process.
2821                         iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
2822                 }
2823                 if (erri) {
2824                         fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
2825                         exit(1);
2826                 }
2827                 if (iface_properties.empty())
2828                         lfta_val[lmach] += "\t\treturn NULL;\n";
2829                 else {
2830                         for (int i = 0; i < iface_properties.size(); ++i) {
2831                                 if (i == 0)
2832                                         lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2833                                 else
2834                                         lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2835
2836                                 // combine all values for the interface property using comma separator
2837                                 vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
2838                                 lfta_val[lmach] += "\t\t\treturn \"";
2839                                 for (int j = 0; j < vals.size(); ++j) {
2840                                         lfta_val[lmach] +=  vals[j];
2841                                         if (j != vals.size()-1)
2842                                                 lfta_val[lmach] += ",";
2843                                 }
2844                                 lfta_val[lmach] += "\";\n";
2845                         }
2846                         lfta_val[lmach] += "\t\t}else if(!strcmp(property_name, \"_max_csv_pos\")){\n";
2847                         lfta_val[lmach] += "\t\t\treturn \""+int_to_string(lfta_snap_pos[(*sir)])+"\";\n";
2848                         lfta_val[lmach] += "\t\t} else\n";
2849                         lfta_val[lmach] += "\t\t\treturn NULL;\n";
2850                 }
2851           }
2852           lfta_val[lmach] += "\t} else\n";
2853           lfta_val[lmach] += "\t\treturn NULL;\n";
2854           lfta_val[lmach] += "}\n\n";
2855
2856
2857 //                      Generate a full list of FTAs for clearinghouse reference
2858           lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
2859           lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
2860
2861           bool first = true;
2862           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2863                 string liface = (*mvsi).first;
2864                 if(liface != "_local_"){        // these don't register themselves
2865                         vector<stream_query *> lfta_list = (*mvsi).second;
2866                         for(i=0;i<lfta_list.size();i++){
2867                                 int mi = lfta_iface_qname_ix[liface][i];
2868                                 if(first) first = false;
2869                                 else      lfta_val[lmach] += ", ";
2870                                 lfta_val[lmach] += "\"" + registration_query_names[mi] + "\"";
2871                         }
2872                 }
2873           }
2874 //        for (i = 0; i < registration_query_names.size(); ++i) {
2875 //                 if (i)
2876 //                        lfta_val[lmach] += ", ";
2877 //                 lfta_val[lmach] += "\"" + registration_query_names[i] + "\"";
2878 //        }
2879
2880           for (i = 0; i < hfta_list.size(); ++i) {
2881                    lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
2882           }
2883           lfta_val[lmach] += ", NULL};\n\n";
2884
2885
2886 //                      Add the initialization function to lfta.c
2887 //      Change to accept the interface name, and 
2888 //      set the prefilter function accordingly.
2889 //      see the example in demo/err2
2890           lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
2891           lfta_val[lmach] += "//        note: the last parameter in fta_register is the prefilter signature\n";
2892
2893 //        for(i=0;i<mach_query_names[lmach].size();i++)
2894 //              int mi = mach_query_names[lmach][i];
2895 //              stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2896
2897           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2898                 string liface = (*mvsi).first;
2899                 vector<stream_query *> lfta_list = (*mvsi).second;
2900                 for(i=0;i<lfta_list.size();i++){
2901                         stream_query *lfta_sq = lfta_list[i];
2902                         int mi = lfta_iface_qname_ix[liface][i];
2903
2904                         if(liface == "_local_"){
2905 //  Don't register an init function, do the init code inline
2906                                 lfta_val[lmach] += "// initialization code for "+registration_query_names[mi]+";\n";
2907                                 lfta_val[lmach] += "\t"+generate_watchlist_name(registration_query_names[mi])+".filename = "+generate_watchlist_name(registration_query_names[mi])+"__fstr;\n";
2908                                 continue;
2909                         }
2910                 
2911                         fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
2912
2913                         string this_iface = "DEFAULTDEV";
2914                         if(interface_names[mi]!="")
2915                                 this_iface = '"'+interface_names[mi]+'"';
2916                         lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
2917                 lfta_val[lmach] += "\t\tfta_register(\""+registration_query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
2918 //              if(interface_names[mi]=="")
2919 //                              lfta_val[lmach]+="DEFAULTDEV";
2920 //              else
2921 //                              lfta_val[lmach]+='"'+interface_names[mi]+'"';
2922                         lfta_val[lmach] += this_iface;
2923
2924
2925                 lfta_val[lmach] += ", "+generate_alloc_name(registration_query_names[mi])
2926                         +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(registration_query_names[mi])
2927                         +"\n#endif\n";
2928                                 sprintf(tmpstr,",%d",snap_lengths[mi]);
2929                         lfta_val[lmach] += tmpstr;
2930
2931 //                      unsigned long long int mask=0, bpos=1;
2932 //                      int f_pos;
2933 //                      for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
2934 //                              if(prefilter_preds[f_pos]->lfta_id.count(i))
2935 //                                      mask |= bpos;
2936 //                              bpos = bpos << 1;
2937 //                      }
2938
2939 #ifdef PREFILTER_OK
2940 //                      sprintf(tmpstr,",%lluull",mask);
2941                         sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
2942                         lfta_val[lmach]+=tmpstr;
2943 #else
2944                         lfta_val[lmach] += ",0ull";
2945 #endif
2946
2947                         lfta_val[lmach] += ");\n";
2948
2949
2950
2951 //    End of lfta prefilter stuff
2952 // --------------------------------------------------
2953
2954 //                      If there is a field verifier, warn about
2955 //                      lack of compatability
2956                   string lfta_comment;
2957                   string lfta_title;
2958                   string lfta_namespace;
2959                   map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
2960                   if(ldefs.count("comment")>0)
2961                         lfta_comment = lfta_sq->defines["comment"];
2962                   if(ldefs.count("Comment")>0)
2963                         lfta_comment = lfta_sq->defines["Comment"];
2964                   if(ldefs.count("COMMENT")>0)
2965                         lfta_comment = lfta_sq->defines["COMMENT"];
2966                   if(ldefs.count("title")>0)
2967                         lfta_title = lfta_sq->defines["title"];
2968                   if(ldefs.count("Title")>0)
2969                         lfta_title = lfta_sq->defines["Title"];
2970                   if(ldefs.count("TITLE")>0)
2971                         lfta_title = lfta_sq->defines["TITLE"];
2972                   if(ldefs.count("NAMESPACE")>0)
2973                         lfta_namespace = lfta_sq->defines["NAMESPACE"];
2974                   if(ldefs.count("Namespace")>0)
2975                         lfta_namespace = lfta_sq->defines["Namespace"];
2976                   if(ldefs.count("namespace")>0)
2977                         lfta_namespace = lfta_sq->defines["namespace"];
2978
2979                   string lfta_ht_size;
2980                   if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
2981                         lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
2982                   if(ldefs.count("aggregate_slots")>0){
2983                         lfta_ht_size = ldefs["aggregate_slots"];
2984                   }
2985
2986 //                      NOTE : I'm assuming that visible lftas do not start with _fta.
2987 //                              -- will fail for non-visible simple selection queries.
2988                 if(field_verifier != NULL && registration_query_names[mi].substr(0,1) != "_"){
2989                         string warning_str;
2990                         if(lfta_comment == "")
2991                                 warning_str += "\tcomment not found.\n";
2992 // Obsolete stuff that carsten wanted
2993 //                      if(lfta_title == "")
2994 //                              warning_str += "\ttitle not found.\n";
2995 //                      if(lfta_namespace == "")
2996 //                              warning_str += "\tnamespace not found.\n";
2997
2998                         vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2999                         int fi;
3000                         for(fi=0;fi<flds.size();fi++){
3001                                 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
3002                         }
3003                         if(warning_str != "")
3004                                 fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
3005                                         registration_query_names[mi].c_str(),warning_str.c_str());
3006                 }
3007
3008
3009 //                      Create qtree output
3010                 fprintf(qtree_output,"\t<LFTA name='%s' >\n",registration_query_names[mi].c_str());
3011         if(lfta_comment != "")
3012               fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
3013         if(lfta_title != "")
3014               fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
3015         if(lfta_namespace != "")
3016               fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
3017         if(lfta_ht_size != "")
3018               fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
3019                 if(lmach != "")
3020                   fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
3021                 else
3022                   fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
3023                 fprintf(qtree_output,"\t\t<Interface  value='%s' />\n",interface_names[mi].c_str());
3024                 std::vector<tablevar_t *> itbls = lfta_sq->get_input_tables();
3025                 for(int t=0;t<itbls.size();++t){
3026                         fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",itbls[t]->get_schema_name().c_str());
3027                 }
3028 //              fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
3029                 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
3030                 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
3031 //                              write info about fields to qtree.xml
3032                   vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
3033                   int fi;
3034                   for(fi=0;fi<flds.size();fi++){
3035                         fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
3036                         if(flds[fi]->get_modifier_list()->size()){
3037                                 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
3038                         }
3039                         fprintf(qtree_output," />\n");
3040                   }
3041                 fprintf(qtree_output,"\t</LFTA>\n");
3042
3043
3044             }
3045           }
3046
3047           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
3048                         string liface = (*mvsi).first;
3049                         lfta_val[lmach] += 
3050 "       if (!strcmp(device, \""+liface+"\")) \n"
3051 "               lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
3052 ;
3053                 }
3054                 lfta_val[lmach] += 
3055 "       if(lfta_prefilter == NULL){\n"
3056 "               fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
3057 "               exit(1);\n"
3058 "       }\n"
3059 ;
3060
3061
3062
3063           lfta_val[lmach] += "}\n\n";
3064
3065       if(!(debug_only || hfta_only) ){
3066                 string lfta_flnm;
3067                 if(lmach != "")
3068                   lfta_flnm = lmach + "_lfta.c";
3069                 else
3070                   lfta_flnm = hostname + "_lfta.c";
3071                 if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
3072                         fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
3073                         exit(1);
3074                 }
3075               fprintf(lfta_out,"%s",lfta_header.c_str());
3076               fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
3077               fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
3078                 fclose(lfta_out);
3079           }
3080         }
3081
3082 //              Say what are the operators which must execute
3083         if(opviews.size()>0)
3084                 fprintf(stderr,"The queries use the following external operators:\n");
3085         for(i=0;i<opviews.size();++i){
3086                 opview_entry *opv = opviews.get_entry(i);
3087                 fprintf(stderr,"\t%s\n",opv->view_name.c_str());
3088         }
3089
3090         if(create_makefile)
3091                 generate_makefile(input_file_names, nfiles, hfta_names, opviews,
3092                 machine_names, schema_file_name,
3093                 interface_names,
3094                 ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
3095
3096
3097         fprintf(qtree_output,"</QueryNodes>\n");
3098
3099         return(0);
3100 }
3101
3102 ////////////////////////////////////////////////////////////
3103
3104 void generate_makefile(vector<string> &input_file_names, int nfiles,
3105                                            vector<string> &hfta_names, opview_set &opviews,
3106                                                 vector<string> &machine_names,
3107                                                 string schema_file_name,
3108                                                 vector<string> &interface_names,
3109                                                 ifq_t *ifdb, string &config_dir_path,
3110                                                 bool use_pads,
3111                                                 string extra_libs,
3112                                                 map<string, vector<int> > &rts_hload
3113                                          ){
3114         int i,j;
3115
3116         if(config_dir_path != ""){
3117                 config_dir_path = "-C "+config_dir_path;
3118         }
3119
3120         struct stat sb;
3121         bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
3122         bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
3123
3124 //      if(libz_exists && !libast_exists){
3125 //              fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
3126 //              exit(1);
3127 //      }
3128
3129 //                      Get set of operator executable files to run
3130         set<string> op_fls;
3131         set<string>::iterator ssi;
3132         for(i=0;i<opviews.size();++i){
3133                 opview_entry *opv = opviews.get_entry(i);
3134                 if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
3135         }
3136
3137         FILE *outfl = fopen("Makefile", "w");
3138         if(outfl==NULL){
3139                 fprintf(stderr,"Can't open Makefile for write, exiting.\n");
3140                 exit(0);
3141         }
3142
3143         fputs(
3144 ("CPP= g++ -O3 -g -I "+root_path+"/include  -I "+root_path+"/include/hfta\n"
3145 "CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
3146 ).c_str(), outfl
3147 );
3148         if(generate_stats)
3149                 fprintf(outfl,"  -DLFTA_STATS");
3150
3151 //              Gather the set of interfaces
3152 //              Also, gather "base interface names" for use in computing
3153 //              the hash splitting to virtual interfaces.
3154 //              TODO : must update to hanndle machines
3155         set<string> ifaces;
3156         set<string> base_vifaces;       // base interfaces of virtual interfaces
3157         map<string, string> ifmachines;
3158         map<string, string> ifattrs;
3159         for(i=0;i<interface_names.size();++i){
3160                 ifaces.insert(interface_names[i]);
3161                 ifmachines[interface_names[i]] = machine_names[i];
3162
3163                 size_t Xpos = interface_names[i].find_last_of("X");
3164                 if(Xpos!=string::npos){
3165                         string iface = interface_names[i].substr(0,Xpos);
3166                         base_vifaces.insert(iface);
3167                 }
3168                 // get interface attributes and add them to the list
3169         }
3170
3171 //              Do we need to include protobuf libraries?
3172 //              TODO Move to the interface library: get the libraries to include 
3173 //              for an interface type
3174
3175         bool use_proto = false;
3176         bool use_bsa = false;
3177         bool use_kafka = false;
3178         bool use_ssl = false;   
3179         int erri;
3180         string err_str;
3181         for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3182                 string ifnm = (*ssi);
3183                 vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
3184                 for(int ift_i=0;ift_i<ift.size();ift_i++){
3185                         if(ift[ift_i]=="PROTO"){
3186 #ifdef PROTO_ENABLED                            
3187                                 use_proto = true;
3188 #else
3189                                 fprintf(stderr,"Runtime libraries built without PROTO support. Rebuild with PROTO_ENABLED defined in gsoptions.h\n");
3190                                 exit(0);
3191 #endif
3192                         }
3193                 }
3194                 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "BSA", erri, err_str);
3195                 for(int ift_i=0;ift_i<ift.size();ift_i++){
3196                         if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){       
3197 #ifdef BSA_ENABLED                              
3198                                 use_bsa = true;
3199 #else
3200                                 fprintf(stderr,"Runtime libraries built without BSA support. Rebuild with BSA_ENABLED defined in gsoptions.h\n");
3201                                 exit(0);
3202 #endif                                  
3203                         }
3204                 }
3205                 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "KAFKA", erri, err_str);
3206                 for(int ift_i=0;ift_i<ift.size();ift_i++){
3207                         if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){       
3208 #ifdef KAFKA_ENABLED                            
3209                                 use_kafka = true;
3210 #else
3211                                 fprintf(stderr,"Runtime libraries built without KAFKA support. Rebuild with KAFKA_ENABLED defined in gsoptions.h\n");
3212                                 exit(0);
3213 #endif  
3214                         }
3215                 }
3216                 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "ENCRYPTED", erri, err_str);
3217                 for(int ift_i=0;ift_i<ift.size();ift_i++){
3218                         if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){       
3219 #ifdef SSL_ENABLED                              
3220                                 use_ssl = true;
3221 #else
3222                                 fprintf(stderr,"Runtime libraries built without SSL support. Rebuild with SSL_ENABLED defined in gsoptions.h\n");
3223                                 exit(0);
3224 #endif  
3225                         }
3226                 }               
3227         }
3228
3229         fprintf(outfl,
3230 "\n"
3231 "\n"
3232 "all: rts");
3233         for(i=0;i<hfta_names.size();++i)
3234                 fprintf(outfl," %s",hfta_names[i].c_str());
3235         fputs(
3236 ("\n"
3237 "\n"
3238 "rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a  "+root_path+"/lib/libclearinghouse.a\n"
3239 "\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
3240         if(use_pads)
3241                 fprintf(outfl,"-L. ");
3242         fputs(
3243 ("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
3244         if(use_pads)
3245                 fprintf(outfl,"-lgscppads -lpads ");
3246         fprintf(outfl,
3247 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux  -lclearinghouse -lresolv -lpthread -lgscpinterface -lz -lrt");
3248         if(use_pads)
3249                 fprintf(outfl, " -lpz -lz -lbz ");
3250         if(libz_exists && libast_exists)
3251                 fprintf(outfl," -last ");
3252         if(use_pads)
3253                 fprintf(outfl, " -ldll -ldl ");
3254
3255 #ifdef PROTO_ENABLED    
3256         fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
3257 #endif
3258 #ifdef BSA_ENABLED      
3259         fprintf(outfl, " -lbsa_stream ");
3260 #endif
3261 #ifdef KAFKA_ENABLED    
3262         fprintf(outfl, " -lrdkafka ");
3263 #endif
3264 #ifdef SSL_ENABLED      
3265         fprintf(outfl, " -lssl -lcrypto ");
3266 #endif
3267         fprintf(outfl," -lgscpaux");
3268 #ifdef GCOV
3269         fprintf(outfl," -fprofile-arcs");
3270 #endif
3271         fprintf(outfl,
3272 "\n"
3273 "\n"
3274 "lfta.o: %s_lfta.c\n"
3275 "\t$(CC) -o lfta.o -c %s_lfta.c\n"
3276 "\n"
3277 "%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
3278         for(i=0;i<nfiles;++i)
3279                 fprintf(outfl," %s",input_file_names[i].c_str());
3280         if(hostname == ""){
3281                 fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3282         }else{
3283                 fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3284         }
3285         for(i=0;i<nfiles;++i)
3286                 fprintf(outfl," %s",input_file_names[i].c_str());
3287         fprintf(outfl,"\n");
3288
3289         for(i=0;i<hfta_names.size();++i)
3290                 fprintf(outfl,
3291 ("%s: %s.o\n"
3292 "\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
3293 "\n"
3294 "%s.o: %s.cc\n"
3295 "\t$(CPP) -o %s.o -c %s.cc\n"
3296 "\n"
3297 "\n").c_str(),
3298     hfta_names[i].c_str(), hfta_names[i].c_str(),
3299         hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
3300         hfta_names[i].c_str(), hfta_names[i].c_str(),
3301         hfta_names[i].c_str(), hfta_names[i].c_str()
3302                 );
3303
3304         fprintf(outfl,
3305 ("\n"
3306 "packet_schema.txt:\n"
3307 "\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
3308 "\n"
3309 "external_fcns.def:\n"
3310 "\tln -s "+root_path+"/cfg/external_fcns.def .\n"
3311 "\n"
3312 "clean:\n"
3313 "\trm -rf core rts *.o  %s_lfta.c  external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
3314         for(i=0;i<hfta_names.size();++i)
3315                 fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
3316         fprintf(outfl,"\n");
3317
3318         fclose(outfl);
3319
3320
3321
3322 //              Gather the set of interfaces
3323 //              TODO : must update to hanndle machines
3324 //              TODO : lookup interface attributes and add them as a parameter to rts process
3325         outfl = fopen("runit", "w");
3326         if(outfl==NULL){
3327                 fprintf(stderr,"Can't open runit for write, exiting.\n");
3328                 exit(0);
3329         }
3330
3331
3332         fputs(
3333 ("#!/bin/sh\n"
3334 "./stopit\n"
3335 +root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
3336 "sleep 5\n"
3337 "if [ ! -f gshub.log ]\n"
3338 "then\n"
3339 "\techo \"Failed to start bin/gshub.py\"\n"
3340 "\texit -1\n"
3341 "fi\n"
3342 "ADDR=`cat gshub.log`\n"
3343 "ps opgid= $! >> gs.pids\n"
3344 "./rts $ADDR default ").c_str(), outfl);
3345 //      int erri;
3346 //      string err_str;
3347         for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3348                 string ifnm = (*ssi);
3349                 // suppress internal _local_ interface
3350                 if (ifnm == "_local_")
3351                         continue;
3352                 fprintf(outfl, "%s ",ifnm.c_str());
3353                 vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
3354                 for(j=0;j<ifv.size();++j)
3355                         fprintf(outfl, "%s ",ifv[j].c_str());
3356         }
3357         fprintf(outfl, " &\n");
3358         fprintf(outfl, "echo $! >> gs.pids\n");
3359         for(i=0;i<hfta_names.size();++i)
3360                 fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
3361
3362         for(j=0;j<opviews.opview_list.size();++j){
3363                 fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
3364         }
3365
3366         fclose(outfl);
3367         system("chmod +x runit");
3368
3369         outfl = fopen("stopit", "w");
3370         if(outfl==NULL){
3371                 fprintf(stderr,"Can't open stopit for write, exiting.\n");
3372                 exit(0);
3373         }
3374
3375         fprintf(outfl,"#!/bin/sh\n"
3376 "rm -f gshub.log\n"
3377 "if [ ! -f gs.pids ]\n"
3378 "then\n"
3379 "exit\n"
3380 "fi\n"
3381 "for pgid in `cat gs.pids`\n"
3382 "do\n"
3383 "kill -TERM -$pgid\n"
3384 "done\n"
3385 "sleep 1\n"
3386 "for pgid in `cat gs.pids`\n"
3387 "do\n"
3388 "kill -9 -$pgid\n"
3389 "done\n"
3390 "rm gs.pids\n");
3391
3392         fclose(outfl);
3393         system("chmod +x stopit");
3394
3395 //-----------------------------------------------
3396
3397 /* For now disable support for virtual interfaces
3398         outfl = fopen("set_vinterface_hash.bat", "w");
3399         if(outfl==NULL){
3400                 fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
3401                 exit(0);
3402         }
3403
3404 //              The format should be determined by an entry in the ifres.xml file,
3405 //              but for now hardcode the only example I have.
3406         for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
3407                 if(rts_hload.count((*ssi))){
3408                         string iface_name = (*ssi);
3409                         string iface_number = "";
3410                         for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
3411                                 if(isdigit(iface_name[j])){
3412                                         iface_number = iface_name[j];
3413                                         if(j>0 && isdigit(iface_name[j-1]))
3414                                                 iface_number = iface_name[j-1] + iface_number;
3415                                 }
3416                         }
3417
3418                         fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
3419                         vector<int> halloc = rts_hload[iface_name];
3420                         int prev_limit = 0;
3421                         for(j=0;j<halloc.size();++j){
3422                                 if(j>0)
3423                                         fprintf(outfl,":");
3424                                 fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
3425                                 prev_limit = halloc[j];
3426                         }
3427                         fprintf(outfl,"\n");
3428                 }
3429         }
3430         fclose(outfl);
3431         system("chmod +x set_vinterface_hash.bat");
3432 */
3433 }
3434
3435 //              Code for implementing a local schema
3436 /*
3437                 table_list qpSchema;
3438
3439 //                              Load the schemas of any LFTAs.
3440                 int l;
3441                 for(l=0;l<hfta_nbr;++l){
3442                         stream_query *sq0 = split_queries[l];
3443                         table_def *td = sq0->get_output_tabledef();
3444                         qpSchema.append_table(td);
3445                 }
3446 //                              load the schemas of any other ref'd tables.
3447 //                              (e.g., hftas)
3448                 vector<tablevar_t *>  input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
3449                 int ti;
3450                 for(ti=0;ti<input_tbl_names.size();++ti){
3451                         int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
3452                         if(tbl_ref < 0){
3453                                 tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
3454                                 if(tbl_ref < 0){
3455                                         fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
3456                                         exit(1);
3457                                 }
3458                                 qpSchema.append_table(Schema->get_table(tbl_ref));
3459                         }
3460                 }
3461 */
3462
3463 //              Functions related to parsing.
3464
3465 /*
3466 static int split_string(char *instr,char sep, char **words,int max_words){
3467    char *loc;
3468    char *str;
3469    int nwords = 0;
3470
3471    str = instr;
3472    words[nwords++] = str;
3473    while( (loc = strchr(str,sep)) != NULL){
3474         *loc = '\0';
3475         str = loc+1;
3476         if(nwords >= max_words){
3477                 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
3478                 nwords = max_words-1;
3479         }
3480         words[nwords++] = str;
3481    }
3482
3483    return(nwords);
3484 }
3485
3486 */
3487