Improvements to aggregation code and fucntion library
[com/gs-lite.git] / src / ftacmp / translate_fta.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #include<unistd.h>              // for gethostname
17
18 #include <string>
19 #include "parse_fta.h"
20 #include "parse_schema.h"
21 #include "parse_ext_fcns.h"
22 #include"analyze_fta.h"
23 #include"query_plan.h"
24 #include"generate_lfta_code.h"
25 #include"stream_query.h"
26 #include"generate_utils.h"
27 #include"nic_def.h"
28 #include"generate_nic_code.h"
29
30 #include <stdlib.h>
31 #include <stdio.h>
32 #include<ctype.h>
33 #include<glob.h>
34 #include<string.h>
35
36 #include<list>
37
38 //              for the scandir
39      #include <sys/types.h>
40      #include <dirent.h>
41
42
43 #include<errno.h>
44
45 //              to verify that some files exist.
46      #include <sys/types.h>
47      #include <sys/stat.h>
48
49 #include "parse_partn.h"
50
51 #include "print_plan.h"
52
53 //              Interface to the xml parser
54
55 #include"xml_t.h"
56 #include"field_list.h"
57
58 #include "gsconfig.h"
59
60 extern int xmlParserparse(void);
61 extern FILE *xmlParserin;
62 extern int xmlParserdebug;
63
64 std::vector<std::string> xml_attr_vec;
65 std::vector<std::string> xml_val_vec;
66 std::string xml_a, xml_v;
67 xml_t *xml_leaves = NULL;
68
69 //      Interface to the field list verifier
70 field_list *field_verifier = NULL;
71
72 #define TMPSTRLEN 1000
73
74 #ifndef PATH_DELIM
75   #define PATH_DELIM '/'
76 #endif
77
78 char tmp_schema_str[10000];
79
80 // maximum delay between two hearbeats produced
81 // by UDOP. Used when its not explicity
82 // provided in udop definition
83 #define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
84
85 //              Default lfta hash table size, must be power of 2.
86 int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
87
88 //              Interface to FTA definition lexer and parser ...
89
90 extern int FtaParserparse(void);
91 extern FILE *FtaParserin;
92 extern int FtaParserdebug;
93
94 fta_parse_t *fta_parse_result;
95 var_defs_t *fta_parse_defines;
96
97
98
99 //              Interface to external function lexer and parser ...
100
101 extern int Ext_fcnsParserparse(void);
102 extern FILE *Ext_fcnsParserin;
103 extern int Ext_fcnsParserdebug;
104
105 ext_fcn_list *Ext_fcns;
106
107
108 //              Interface to partition definition parser
109 extern int PartnParserparse();
110 partn_def_list_t *partn_parse_result = NULL;
111
112
113
114 using namespace std;
115 //extern int errno;
116
117
118 //              forward delcaration of local utility function
119 void generate_makefile(vector<string> &input_file_names, int nfiles,
120                                            vector<string> &hfta_names, opview_set &opviews,
121                                                 vector<string> &machine_names,
122                                                 string schema_file_name,
123                                                 vector<string> &interface_names,
124                                                 ifq_t *ifdb, string &config_dir_path,
125                                                 bool use_pads,
126                                                 string extra_libs,
127                                                 map<string, vector<int> > &rts_hload
128                                         );
129
130 //static int split_string(char *instr,char sep, char **words,int max_words);
131 #define MAXFLDS 100
132
133   FILE *schema_summary_output = NULL;           // query names
134
135 //                      Dump schema summary
136 void dump_summary(stream_query *str){
137         fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
138
139         table_def *sch = str->get_output_tabledef();
140
141         vector<field_entry *> flds = sch->get_fields();
142         int f;
143         for(f=0;f<flds.size();++f){
144                 if(f>0) fprintf(schema_summary_output,"|");
145                 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
146         }
147         fprintf(schema_summary_output,"\n");
148         for(f=0;f<flds.size();++f){
149                 if(f>0) fprintf(schema_summary_output,"|");
150                 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
151         }
152         fprintf(schema_summary_output,"\n");
153 }
154
155 //              Globals
156 string hostname;                // name of current host.
157 int hostname_len;
158 bool generate_stats = false;
159 string root_path = "../..";
160
161
162 int main(int argc, char **argv){
163   char tmpstr[TMPSTRLEN];
164   string err_str;
165   int q,s,h,f;
166
167   set<int>::iterator si;
168
169   vector<string> registration_query_names;                      // for lfta.c registration
170   map<string, vector<int> > mach_query_names;   // list queries of machine
171   vector<int> snap_lengths;                             // for lfta.c registration
172   vector<int> snap_position;                            // for lfta.c registration
173   vector<string> interface_names;                       // for lfta.c registration
174   vector<string> machine_names;                 // machine of interface
175   vector<bool> lfta_reuse_options;                      // for lfta.c registration
176   vector<int> lfta_liveness_timeouts;           // fot qtree.xml generation
177   vector<string> hfta_names;                    // hfta cource code names, for
178                                                                                 // creating make file.
179   vector<string> qnames;                                // ensure unique names
180   map<string, int> lfta_names;                  // keep track of unique lftas.
181
182
183 //                              set these to 1 to debug the parser
184   FtaParserdebug = 0;
185   Ext_fcnsParserdebug = 0;
186
187   FILE *lfta_out;                               // lfta.c output.
188   FILE *fta_in;                                 // input file
189   FILE *table_schemas_in;               // source tables definition file
190   FILE *query_name_output;              // query names
191   FILE *qtree_output;                   // interconnections of query nodes
192
193   // -------------------------------
194   // Handling of Input Arguments
195   // -------------------------------
196     char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
197         const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
198                 "\t[-B] : debug only (don't create output files)\n"
199                 "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
200                 "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
201                 "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
202                 "\t[-C] : use <config directory> for definition files\n"
203                 "\t[-l] : use <library directory> for library queries\n"
204                 "\t[-N] : output query names in query_names.txt\n"
205                 "\t[-H] : create HFTA only (no schema_file)\n"
206                 "\t[-Q] : use query name for hfta suffix\n"
207                 "\t[-M] : generate make file and runit, stopit scripts\n"
208                 "\t[-S] : enable LFTA statistics (alters Makefile).\n"
209                 "\t[-f] : Output schema summary to schema_summary.txt\n"
210                 "\t[-P] : link with PADS\n"
211                 "\t[-h] : override host name.\n"
212                 "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
213                 "\t[-R] : path to root of GS-lite\n"
214 ;
215
216 //              parameters gathered from command line processing
217         string external_fcns_path;
218 //      string internal_fcn_path;
219         string config_dir_path;
220         string library_path = "./";
221         vector<string> input_file_names;
222         string schema_file_name;
223         bool debug_only = false;
224         bool hfta_only = false;
225         bool output_query_names = false;
226         bool output_schema_summary=false;
227         bool numeric_hfta_flname = true;
228         bool create_makefile = false;
229         bool distributed_mode = false;
230         bool partitioned_mode = false;
231         bool use_live_hosts_file = false;
232         bool use_pads = false;
233         bool clean_make = false;
234         int n_virtual_interfaces = 1;
235
236    char chopt;
237    while((chopt = getopt(argc,argv,optstr)) != -1){
238                 switch(chopt){
239                 case 'B':
240                         debug_only = true;
241                         break;
242                 case 'D':
243                         distributed_mode = true;
244                         break;
245                 case 'p':
246                         partitioned_mode = true;
247                         break;
248                 case 'L':
249                         use_live_hosts_file = true;
250                         break;
251                 case 'C':
252                                 if(optarg != NULL)
253                                  config_dir_path = string(optarg) + string("/");
254                         break;
255                 case 'l':
256                                 if(optarg != NULL)
257                                  library_path = string(optarg) + string("/");
258                         break;
259                 case 'N':
260                         output_query_names = true;
261                         break;
262                 case 'Q':
263                         numeric_hfta_flname = false;
264                         break;
265                 case 'H':
266                         if(schema_file_name == ""){
267                                 hfta_only = true;
268                         }
269                         break;
270                 case 'f':
271                         output_schema_summary=true;
272                         break;
273                 case 'M':
274                         create_makefile=true;
275                         break;
276                 case 'S':
277                         generate_stats=true;
278                         break;
279                 case 'P':
280                         use_pads = true;
281                         break;
282                 case 'c':
283                         clean_make = true;
284                         break;
285                 case 'h':
286                         if(optarg != NULL)
287                                 hostname = optarg;
288                         break;
289                 case 'R':
290                         if(optarg != NULL)
291                                 root_path = optarg;
292                         break;
293                 case 'n':
294                         if(optarg != NULL){
295                                 n_virtual_interfaces = atoi(optarg);
296                                 if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
297                                         fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
298                                         n_virtual_interfaces = 1;
299                                 }
300                         }
301                         break;
302                 case '?':
303                         fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
304                         fprintf(stderr,"%s\n", usage_str);
305                         exit(1);
306                 default:
307                         fprintf(stderr, "Argument was %c\n", optopt);
308                         fprintf(stderr,"Invalid arguments\n");
309                         fprintf(stderr,"%s\n", usage_str);
310                         exit(1);
311                 }
312         }
313         argc -= optind;
314         argv += optind;
315         for (int i = 0; i < argc; ++i) {
316                 if((schema_file_name == "") && !hfta_only){
317                         schema_file_name = argv[i];
318                 }else{
319                         input_file_names.push_back(argv[i]);
320                 }
321         }
322
323         if(input_file_names.size() == 0){
324                 fprintf(stderr,"%s\n", usage_str);
325                 exit(1);
326         }
327
328         if(clean_make){
329                 string clean_cmd = "rm Makefile hfta_*.cc";
330                 int clean_ret = system(clean_cmd.c_str());
331                 if(clean_ret){
332                         fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
333                 }
334         }
335
336
337         nic_prop_db *npdb = new nic_prop_db(config_dir_path);
338
339 //                      Open globally used file names.
340
341         // prepend config directory to schema file
342         schema_file_name = config_dir_path + schema_file_name;
343         external_fcns_path = config_dir_path + string("external_fcns.def");
344     string ifx_fname = config_dir_path + string("ifres.xml");
345
346 //              Find interface query file(s).
347         if(hostname == ""){
348                 gethostname(tmpstr,TMPSTRLEN);
349                 hostname = tmpstr;
350         }
351         hostname_len = strlen(tmpstr);
352     string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
353         vector<string> ifq_fls;
354
355                 ifq_fls.push_back(ifq_fname);
356
357
358 //                      Get the field list, if it exists
359         string flist_fl = config_dir_path + "field_list.xml";
360         FILE *flf_in = NULL;
361         if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
362                 fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
363                 xml_leaves = new xml_t();
364                 xmlParser_setfileinput(flf_in);
365                 if(xmlParserparse()){
366                         fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
367                 }else{
368                         field_verifier = new field_list(xml_leaves);
369                 }
370         }
371
372         if(!hfta_only){
373           if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
374                 fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
375                 exit(1);
376           }
377         }
378
379 /*
380         if(!(debug_only || hfta_only)){
381           if((lfta_out = fopen("lfta.c","w")) == NULL){
382                 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
383                 exit(1);
384           }
385         }
386 */
387
388 //              Get the output specification file.
389 //              format is
390 //                      query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
391         string ospec_fl = "output_spec.cfg";
392         FILE *osp_in = NULL;
393         vector<ospec_str *> output_specs;
394         multimap<string, int> qname_to_ospec;
395         if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
396                 char *flds[MAXFLDS];
397                 int o_lineno = 0;
398                 while(fgets(tmpstr,TMPSTRLEN,osp_in)){
399                         o_lineno++;
400                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
401                         if(tmpstr[0]!='\n' && tmpstr[0]!='\r' && tmpstr[0]!='\0' && tmpstr[0]!='#'){
402                                 if(nflds == 7){
403 //              make operator type lowercase
404                                         char *tmpc;
405                                         for(tmpc=flds[1];*tmpc!='\0';++tmpc)
406                                                 *tmpc = tolower(*tmpc);
407         
408                                         ospec_str *tmp_ospec = new ospec_str();
409                                         tmp_ospec->query = flds[0];
410                                         tmp_ospec->operator_type = flds[1];
411                                         tmp_ospec->operator_param = flds[2];
412                                         tmp_ospec->output_directory = flds[3];
413                                         tmp_ospec->bucketwidth = atoi(flds[4]);
414                                         tmp_ospec->partitioning_flds = flds[5];
415                                         tmp_ospec->n_partitions = atoi(flds[6]);
416                                         qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
417                                         output_specs.push_back(tmp_ospec);
418                                 }else{
419                                         fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
420                                 }
421                         }
422                 }
423                 fclose(osp_in);
424         }else{
425                 fprintf(stderr,"output_spec.cfg not found.  The query set has no output.  exiting.\n");
426                 exit(1);
427         }
428
429 //              hfta parallelism
430         string pspec_fl = "hfta_parallelism.cfg";
431         FILE *psp_in = NULL;
432         map<string, int> hfta_parallelism;
433         if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
434                 char *flds[MAXFLDS];
435                 int o_lineno = 0;
436                 while(fgets(tmpstr,TMPSTRLEN,psp_in)){
437                         bool good_entry = true;
438                         o_lineno++;
439                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
440                         if(nflds == 2){
441                                 string hname = flds[0];
442                                 int par = atoi(flds[1]);
443                                 if(par <= 0 || par > n_virtual_interfaces){
444                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
445                                         good_entry = false;
446                                 }
447                                 if(good_entry && n_virtual_interfaces % par != 0){
448                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
449                                         good_entry = false;
450                                 }
451                                 if(good_entry)
452                                         hfta_parallelism[hname] = par;
453                         }
454                 }
455         }else{
456                 fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
457         }
458
459
460 //              LFTA hash table sizes
461         string htspec_fl = "lfta_htsize.cfg";
462         FILE *htsp_in = NULL;
463         map<string, int> lfta_htsize;
464         if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
465                 char *flds[MAXFLDS];
466                 int o_lineno = 0;
467                 while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
468                         bool good_entry = true;
469                         o_lineno++;
470                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
471                         if(nflds == 2){
472                                 string lfta_name = flds[0];
473                                 int htsz = atoi(flds[1]);
474                                 if(htsz>0){
475                                         lfta_htsize[lfta_name] = htsz;
476                                 }else{
477                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
478                                 }
479                         }
480                 }
481         }else{
482                 fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
483         }
484
485 //              LFTA vitual interface hash split
486         string rtlspec_fl = "rts_load.cfg";
487         FILE *rtl_in = NULL;
488         map<string, vector<int> > rts_hload;
489         if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
490                 char *flds[MAXFLDS];
491                 int r_lineno = 0;
492                 string iface_name;
493                 vector<int> hload;
494                 while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
495                         bool good_entry = true;
496                         r_lineno++;
497                         iface_name = "";
498                         hload.clear();
499                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
500                         if(nflds >1){
501                                 iface_name = flds[0];
502                                 int cumm_h = 0;
503                                 int j;
504                                 for(j=1;j<nflds;++j){
505                                         int h = atoi(flds[j]);
506                                         if(h<=0)
507                                                 good_entry = false;
508                                         cumm_h += h;
509                                         hload.push_back(cumm_h);
510                                 }
511                         }else{
512                                 good_entry = false;
513                         }
514                         if(good_entry){
515                                 rts_hload[iface_name] = hload;
516                         }else{
517                                 fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
518                         }
519                 }
520         }
521
522
523
524         if(output_query_names){
525           if((query_name_output = fopen("query_names.txt","w")) == NULL){
526                 fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
527                 exit(1);
528           }
529         }
530
531         if(output_schema_summary){
532           if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
533                 fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
534                 exit(1);
535           }
536         }
537
538         if((qtree_output = fopen("qtree.xml","w")) == NULL){
539                 fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
540                 exit(1);
541         }
542         fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
543         fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
544         fprintf(qtree_output,"<QueryNodes>\n");
545
546
547 //                      Get an initial Schema
548         table_list *Schema;
549         if(!hfta_only){
550 //                      Parse the table schema definitions.
551           fta_parse_result = new fta_parse_t();
552           FtaParser_setfileinput(table_schemas_in);
553           if(FtaParserparse()){
554                 fprintf(stderr,"Table schema parse failed.\n");
555                 exit(1);
556           }
557           if(fta_parse_result->parse_type != TABLE_PARSE){
558                 fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
559                 exit(1);
560           }
561           Schema = fta_parse_result->tables;
562
563 //                      Ensure that all schema_ids, if set, are distinct.
564 //  Obsolete? There is code elsewhere to ensure that schema IDs are
565 //  distinct on a per-interface basis.
566 /*
567           set<int> found_ids;
568           set<int> dup_ids;
569           for(int t=0;t<Schema->size();++t){
570                 int sch_id = Schema->get_table(t)->get_schema_id();
571                 if(sch_id> 0){
572                         if(found_ids.find(sch_id) != found_ids.end()){
573                                 dup_ids.insert(sch_id);
574                         }else{
575                                 found_ids.insert(sch_id);
576                         }
577                 }
578                 if(dup_ids.size()>0){
579                         fprintf(stderr, "Error, the schema has duplicate schema_ids:");
580                         for(auto dit=dup_ids.begin();dit!=dup_ids.end(); ++dit)
581                                 fprintf(stderr," %d",(*dit));
582                         fprintf(stderr,"\n");
583                         exit(1);
584                 }
585           }
586 */
587
588
589 //                      Process schema field inheritance
590           int retval;
591           retval = Schema->unroll_tables(err_str);
592           if(retval){
593                 fprintf(stderr,"Error processing schema field inheritance:\n %s\n", err_str.c_str() );
594                 exit(1);
595           }
596         }else{
597 //                      hfta only => we will try to fetch schemas from the registry.
598 //                      therefore, start off with an empty schema.
599           Schema = new table_list();
600         }
601
602
603 //                      Open and parse the external functions file.
604         Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
605         if(Ext_fcnsParserin == NULL){
606                 fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
607                 Ext_fcns = new ext_fcn_list();
608         }else{
609                 if(Ext_fcnsParserparse()){
610                         fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
611                         Ext_fcns = new ext_fcn_list();
612                 }
613         }
614         if(Ext_fcns->validate_fcns(err_str)){
615                 fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
616                 exit(1);
617         }
618
619 //              Open and parse the interface resources file.
620 //      ifq_t *ifaces_db = new ifq_t();
621 //   string ierr;
622 //      if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
623 //              fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
624 //                              ifx_fname.c_str(), ierr.c_str());
625 //              exit(1);
626 //      }
627 //      if(ifaces_db->load_ifqs(ifq_fname, ierr)){
628 //              fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
629 //                              ifq_fname.c_str(), ierr.c_str());
630 //              exit(1);
631 //      }
632
633
634 //                      The LFTA code string.
635 //                      Put the standard preamble here.
636 //                      NOTE: the hash macros, fcns should go into the run time
637   map<string, string> lfta_val;
638   map<string, string> lfta_prefilter_val;
639
640   string lfta_header =
641 "#include <limits.h>\n"
642 "#include \"rts.h\"\n"
643 "#include \"fta.h\"\n"
644 "#include \"lapp.h\"\n"
645 "#include \"rts_udaf.h\"\n"
646 "#include<stdio.h>\n"
647 "#include <float.h>\n"
648 "#include \"rdtsc.h\"\n"
649 "#include \"watchlist.h\"\n\n"
650
651 ;
652 // Get any locally defined parsing headers
653     glob_t glob_result;
654     memset(&glob_result, 0, sizeof(glob_result));
655
656     // do the glob operation TODO should be from GSROOT
657     int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
658         if(return_value == 0){
659                 lfta_header += "\n";
660         for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
661                         char *flds[1000];
662                         int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
663                         lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n";
664         }
665                 lfta_header += "\n";
666         }else{
667                 fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
668         }
669
670 /*
671 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
672 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
673 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
674 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
675 */
676
677         lfta_header += 
678 "\n"
679 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
680 "\n"
681 "#define SLOT_FILLED 0x04\n"
682 "#define SLOT_GEN_BITS 0x03\n"
683 "#define SLOT_HASH_BITS 0xfffffff8\n"
684 "#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
685 "#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
686 "#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
687 "\n\n"
688
689 "#define lfta_BOOL_to_hash(x) (x)\n"
690 "#define lfta_USHORT_to_hash(x) (x)\n"
691 "#define lfta_UINT_to_hash(x) (x)\n"
692 "#define lfta_IP_to_hash(x) (x)\n"
693 "#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
694 "#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
695 "#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
696 "#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
697 "#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
698 "static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
699 "       gs_uint32_t i,ret=0,tmp_sum = 0;\n"
700 "       for(i=0;i<x.length;++i){\n"
701 "               tmp_sum |= (x.data[i]) << (8*(i%4));\n"
702 "               if((i%4) == 3){\n"
703 "                       ret ^= tmp_sum;\n"
704 "                       tmp_sum = 0;\n"
705 "               }\n"
706 "       }\n"
707 "       if((i%4)!=0) ret ^=tmp_sum;\n"
708 "       return(ret);\n"
709 "}\n\n\n";
710
711
712
713 //////////////////////////////////////////////////////////////////
714 /////                   Get all of the query parse trees
715
716
717   int i,p;
718   int hfta_count = 0;           // for numeric suffixes to hfta .cc files
719
720 //---------------------------
721 //              Global info needed for post processing.
722
723 //                      Set of operator views ref'd in the query set.
724         opview_set opviews;
725 //                      lftas on a per-machine basis.
726         map<string, vector<stream_query *> > lfta_mach_lists;
727         int nfiles = input_file_names.size();
728         vector<stream_query *> hfta_list;               // list of hftas.
729         map<string, stream_query *> sq_map;             // map from query name to stream query.
730
731
732 //////////////////////////////////////////
733
734 //              Open and parse the interface resources file.
735         ifq_t *ifaces_db = new ifq_t();
736     string ierr;
737         if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
738                 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
739                                 ifx_fname.c_str(), ierr.c_str());
740                 exit(1);
741         }
742         if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
743                 fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
744                                 ifq_fls[0].c_str(), ierr.c_str());
745                 exit(1);
746         }
747
748   map<string, string> qname_to_flname;  // for detecting duplicate query names
749
750
751
752 //                      Parse the files to create a vector of parse trees.
753 //                      Load qnodes with information to perform a topo sort
754 //                      based on query dependencies.
755   vector<query_node *> qnodes;                          // for topo sort.
756   map<string,int> name_node_map;                        // map query name to qnodes entry
757   for(i=0;i<input_file_names.size();i++){
758
759           if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
760                   fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
761                   continue;
762           }
763 fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
764
765 //                      Parse the FTA query
766           fta_parse_result = new fta_parse_t();
767           FtaParser_setfileinput(fta_in);
768           if(FtaParserparse()){
769                 fprintf(stderr,"FTA parse failed.\n");
770                 exit(1);
771           }
772           if(fta_parse_result->parse_type != QUERY_PARSE){
773                 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
774                 exit(1);
775           }
776
777 //                      returns a list of parse trees
778           vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
779           for(p=0;p<qlist.size();++p){
780             table_exp_t *fta_parse_tree = qlist[p];
781 //              query_parse_trees.push_back(fta_parse_tree);
782
783 //                      compute the default name -- extract from query name
784                 strcpy(tmpstr,input_file_names[i].c_str());
785                 char *qname = strrchr(tmpstr,PATH_DELIM);
786                 if(qname == NULL)
787                         qname = tmpstr;
788                 else
789                         qname++;
790                 char *qname_end = strchr(qname,'.');
791                 if(qname_end != NULL) *qname_end = '\0';
792                 string qname_str = qname;
793                 string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
794
795 //                      Deternmine visibility.  Should I be attaching all of the output methods?
796                 if(qname_to_ospec.count(imputed_qname)>0)
797                         fta_parse_tree->set_visible(true);
798                 else
799                         fta_parse_tree->set_visible(false);
800
801
802 //                              Create a manipulable repesentation of the parse tree.
803 //                              the qnode inherits the visibility assigned to the parse tree.
804             int pos = qnodes.size();
805                 qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
806                 name_node_map[ qnodes[pos]->name ] = pos;
807 //printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
808 //              qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
809 //              qfiles.push_back(i);
810
811 //                      Check for duplicate query names
812 //                                      NOTE : in hfta-only generation, I should
813 //                                      also check with the names of the registered queries.
814                 if(qname_to_flname.count(qnodes[pos]->name) > 0){
815                         fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
816                                 qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
817                         exit(1);
818                 }
819                 if(Schema->find_tbl(qnodes[pos]->name) >= 0){
820                         fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
821                                 qnodes[pos]->name.c_str(), input_file_names[i].c_str());
822                         exit(1);
823                 }
824                 qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
825
826
827         }
828   }
829
830 //              Add the library queries
831
832   int pos;
833   for(pos=0;pos<qnodes.size();++pos){
834         int fi;
835         for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
836                 string src_tbl = qnodes[pos]->refd_tbls[fi];
837                 if(qname_to_flname.count(src_tbl) == 0){
838                         int last_sep = src_tbl.find_last_of('/');
839                         if(last_sep != string::npos){
840 fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
841                                 string target_qname = src_tbl.substr(last_sep+1);
842                                 string qpathname = library_path + src_tbl + ".gsql";
843                                 if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
844                                         fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
845                                         exit(1);
846                                         fprintf(stderr,"After exit\n");
847                                 }
848 fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
849 //                      Parse the FTA query
850                                 fta_parse_result = new fta_parse_t();
851                                 FtaParser_setfileinput(fta_in);
852                                 if(FtaParserparse()){
853                                         fprintf(stderr,"FTA parse failed.\n");
854                                         exit(1);
855                                 }
856                                 if(fta_parse_result->parse_type != QUERY_PARSE){
857                                         fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
858                                         exit(1);
859                                 }
860
861                                 map<string, int> local_query_map;
862                                 vector<string> local_query_names;
863                                 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
864                                 for(p=0;p<qlist.size();++p){
865                                 table_exp_t *fta_parse_tree = qlist[p];
866                                         fta_parse_tree->set_visible(false);             // assumed to not produce output
867                                         string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
868                                         if(imputed_qname == target_qname)
869                                                 imputed_qname = src_tbl;
870                                         if(local_query_map.count(imputed_qname)>0){
871                                                 fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
872                                                 exit(1);
873                                         }
874                                         local_query_map[ imputed_qname ] = p;
875                                         local_query_names.push_back(imputed_qname);
876                                 }
877
878                                 if(local_query_map.count(src_tbl)==0){
879                                         fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
880                                         exit(1);
881                                 }
882
883                                 vector<int> worklist;
884                                 set<int> added_queries;
885                                 vector<query_node *> new_qnodes;
886                                 worklist.push_back(local_query_map[target_qname]);
887                                 added_queries.insert(local_query_map[target_qname]);
888                                 int qq;
889                                 int qpos = qnodes.size();
890                                 for(qq=0;qq<worklist.size();++qq){
891                                         int q_id = worklist[qq];
892                                         query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
893                                         new_qnodes.push_back( new_qnode);
894                                         vector<string> refd_tbls =  new_qnode->refd_tbls;
895                                         int ff;
896                                         for(ff = 0;ff<refd_tbls.size();++ff){
897                                                 if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
898
899                                                         if(name_node_map.count(refd_tbls[ff])>0){
900                                                                 fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s,  and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
901                                                                 exit(1);
902                                                         }else{
903                                                                 worklist.push_back(local_query_map[refd_tbls[ff]]);
904                                                         }
905                                                 }
906                                         }
907                                 }
908
909                                 for(qq=0;qq<new_qnodes.size();++qq){
910                                         int qpos = qnodes.size();
911                                         qnodes.push_back(new_qnodes[qq]);
912                                         name_node_map[qnodes[qpos]->name ] = qpos;
913                                         qname_to_flname[qnodes[qpos]->name ] = qpathname;
914                                 }
915                         }
916                 }
917         }
918   }
919
920
921
922
923
924
925
926
927 //---------------------------------------
928
929
930 //              Add the UDOPS.
931
932   string udop_missing_sources;
933   for(i=0;i<qnodes.size();++i){
934         int fi;
935         for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
936                 int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
937                 if(sid >= 0){
938                         if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
939                                 if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
940                                 int pos = qnodes.size();
941                                         qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
942                                         name_node_map[ qnodes[pos]->name ] = pos;
943                                         qnodes[pos]->is_externally_visible = false;   // its visible
944         //                                      Need to mark the source queries as visible.
945                                         int si;
946                                         string missing_sources = "";
947                                         for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
948                                                 string src_tbl = qnodes[pos]->refd_tbls[si];
949                                                 if(name_node_map.count(src_tbl)==0){
950                                                         missing_sources += src_tbl + " ";
951                                                 }
952                                         }
953                                         if(missing_sources != ""){
954                                                 udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
955                                         }
956                                 }
957                         }
958                 }
959         }
960   }
961   if(udop_missing_sources != ""){
962         fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
963         exit(1);
964   }
965
966
967
968 ////////////////////////////////////////////////////////////////////
969 ///                             Check parse trees to verify that some
970 ///                             global properties are met :
971 ///                             if q1 reads from q2, then
972 ///                               q2 is processed before q1
973 ///                               q1 can supply q2's parameters
974 ///                             Verify there is no cycle in the reads-from graph.
975
976 //                      Compute an order in which to process the
977 //                      queries.
978
979 //                      Start by building the reads-from lists.
980 //
981
982   for(i=0;i<qnodes.size();++i){
983         int qi, fi;
984         vector<string> refd_tbls =  qnodes[i]->refd_tbls;
985         for(fi = 0;fi<refd_tbls.size();++fi){
986                 if(name_node_map.count(refd_tbls[fi])>0){
987 //printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
988                         (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
989                 }
990         }
991   }
992
993
994 //              If one query reads the result of another,
995 //              check for parameter compatibility.  Currently it must
996 //              be an exact match.  I will move to requiring
997 //              containment after re-ordering, but will require
998 //              some analysis for code generation which is not
999 //              yet in place.
1000 //printf("There are %d query nodes.\n",qnodes.size());
1001
1002
1003   for(i=0;i<qnodes.size();++i){
1004         vector<var_pair_t *> target_params  = qnodes[i]->params;
1005         for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1006                 vector<var_pair_t *> source_params  = qnodes[(*si)]->params;
1007                 if(target_params.size() != source_params.size()){
1008                         fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1009                         exit(1);
1010                 }
1011                 int p;
1012                 for(p=0;p<target_params.size();++p){
1013                         if(! (target_params[p]->name == source_params[p]->name &&
1014                               target_params[p]->val == source_params[p]->val ) ){
1015                         fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1016                                 exit(1);
1017                         }
1018                 }
1019         }
1020   }
1021
1022
1023 //              Find the roots.
1024 //              Start by counting inedges.
1025   for(i=0;i<qnodes.size();++i){
1026         for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1027                 qnodes[(*si)]->n_consumers++;
1028         }
1029   }
1030
1031 //              The roots are the nodes with indegree zero.
1032   set<int> roots;
1033   for(i=0;i<qnodes.size();++i){
1034         if(qnodes[i]->n_consumers == 0){
1035                 if(qnodes[i]->is_externally_visible == false){
1036                         fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible.  Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
1037                 }
1038                 roots.insert(i);
1039         }
1040   }
1041
1042 //              Remove the parts of the subtree that produce no output.
1043   set<int> valid_roots;
1044   set<int> discarded_nodes;
1045   set<int> candidates;
1046   while(roots.size() >0){
1047         for(si=roots.begin();si!=roots.end();++si){
1048                 if(qnodes[(*si)]->is_externally_visible){
1049                         valid_roots.insert((*si));
1050                 }else{
1051                         discarded_nodes.insert((*si));
1052                         set<int>::iterator sir;
1053                         for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1054                                 qnodes[(*sir)]->n_consumers--;
1055                                 if(qnodes[(*sir)]->n_consumers == 0)
1056                                         candidates.insert( (*sir));
1057                         }
1058                 }
1059         }
1060         roots = candidates;
1061         candidates.clear();
1062   }
1063   roots = valid_roots;
1064   if(discarded_nodes.size()>0){
1065         fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
1066         int di = 0;
1067         for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
1068                 if(di>0 && (di%8)==0) fprintf(stderr,"\n");
1069                 di++;
1070                 fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
1071         }
1072         fprintf(stderr,"\n");
1073   }
1074
1075 //              Compute the sources_to set, ignoring discarded nodes.
1076   for(i=0;i<qnodes.size();++i){
1077         if(discarded_nodes.count(i)==0)
1078                 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1079                         qnodes[(*si)]->sources_to.insert(i);
1080         }
1081   }
1082
1083
1084 //              Find the nodes that are shared by multiple visible subtrees.
1085 //              THe roots become inferred visible nodes.
1086
1087 //              Find the visible nodes.
1088         vector<int> visible_nodes;
1089         for(i=0;i<qnodes.size();i++){
1090                 if(qnodes[i]->is_externally_visible){
1091                         visible_nodes.push_back(i);
1092                 }
1093         }
1094
1095 //              Find UDOPs referenced by visible nodes.
1096   list<int> workq;
1097   for(i=0;i<visible_nodes.size();++i){
1098         workq.push_back(visible_nodes[i]);
1099   }
1100   while(!workq.empty()){
1101         int node = workq.front();
1102         workq.pop_front();
1103         set<int>::iterator children;
1104         if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
1105                 qnodes[node]->is_externally_visible = true;
1106                 visible_nodes.push_back(node);
1107                 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1108                         if(qnodes[(*children)]->is_externally_visible == false){
1109                                 qnodes[(*children)]->is_externally_visible = true;
1110                                 visible_nodes.push_back((*children));
1111                         }
1112                 }
1113         }
1114         for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1115                 workq.push_back((*children));
1116         }
1117   }
1118
1119         bool done = false;
1120         while(!done){
1121 //      reset the nodes
1122                 for(i=0;i<qnodes.size();i++){
1123                         qnodes[i]->subtree_roots.clear();
1124                 }
1125
1126 //              Walk the tree defined by a visible node, not descending into
1127 //              subtrees rooted by a visible node.  Mark the node visited with
1128 //              the visible node ID.
1129                 for(i=0;i<visible_nodes.size();++i){
1130                         set<int> vroots;
1131                         vroots.insert(visible_nodes[i]);
1132                         while(vroots.size()>0){
1133                                 for(si=vroots.begin();si!=vroots.end();++si){
1134                                         qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
1135
1136                                         set<int>::iterator sir;
1137                                         for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1138                                                 if(! qnodes[(*sir)]->is_externally_visible){
1139                                                         candidates.insert( (*sir));
1140                                                 }
1141                                         }
1142                                 }
1143                                 vroots = candidates;
1144                                 candidates.clear();
1145                         }
1146                 }
1147 //              Find the nodes in multiple visible node subtrees, but with no parent
1148 //              that has is in multile visible node subtrees.  Mark these as inferred visible nodes.
1149                 done = true;    // until proven otherwise
1150                 for(i=0;i<qnodes.size();i++){
1151                         if(qnodes[i]->subtree_roots.size()>1){
1152                                 bool is_new_root = true;
1153                                 set<int>::iterator sir;
1154                                 for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
1155                                         if(qnodes[(*sir)]->subtree_roots.size()>1)
1156                                                 is_new_root = false;
1157                                 }
1158                                 if(is_new_root){
1159                                         qnodes[i]->is_externally_visible = true;
1160                                         qnodes[i]->inferred_visible_node = true;
1161                                         visible_nodes.push_back(i);
1162                                         done = false;
1163                                 }
1164                         }
1165                 }
1166         }
1167
1168
1169
1170
1171
1172 //              get visible nodes in topo ordering.
1173 //  for(i=0;i<qnodes.size();i++){
1174 //              qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
1175 //  }
1176   vector<int> process_order;
1177   while(roots.size() >0){
1178         for(si=roots.begin();si!=roots.end();++si){
1179                 if(discarded_nodes.count((*si))==0){
1180                         process_order.push_back( (*si) );
1181                 }
1182                 set<int>::iterator sir;
1183                 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1184                         qnodes[(*sir)]->n_consumers--;
1185                         if(qnodes[(*sir)]->n_consumers == 0)
1186                                 candidates.insert( (*sir));
1187                 }
1188         }
1189         roots = candidates;
1190         candidates.clear();
1191   }
1192
1193
1194 //printf("process_order.size() =%d\n",process_order.size());
1195
1196 //              Search for cyclic dependencies
1197   string found_dep;
1198   for(i=0;i<qnodes.size();++i){
1199         if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
1200                 if(found_dep.size() != 0) found_dep += ", ";
1201                 found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
1202         }
1203   }
1204   if(found_dep.size()>0){
1205         fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
1206         exit(1);
1207   }
1208
1209 //              Get a list of query sets, in the order to be processed.
1210 //              Start at visible root and do bfs.
1211 //              The query set includes queries referenced indirectly,
1212 //              as sources for user-defined operators.  These are needed
1213 //              to ensure that they are added to the schema, but are not part
1214 //              of the query tree.
1215
1216 //              stream_node_sets contains queries reachable only through the
1217 //              FROM clause, so I can tell which queries to add to the stream
1218 //              query. (DISABLED, UDOPS are integrated, does this cause problems?)
1219
1220 //                      NOTE: this code works because in order for data to be
1221 //                      read by multiple hftas, the node must be externally visible.
1222 //                      But visible nodes define roots of process sets.
1223 //                      internally visible nodes can feed data only
1224 //                      to other nodes in the same query file.
1225 //                      Therefore, any access can be restricted to a file,
1226 //                      hfta output sharing is done only on roots
1227 //                      never on interior nodes.
1228
1229
1230
1231
1232 //              Conpute the base collection of hftas.
1233   vector<hfta_node *> hfta_sets;
1234   map<string, int> hfta_name_map;
1235 //  vector< vector<int> > process_sets;
1236 //  vector< set<int> > stream_node_sets;
1237   reverse(process_order.begin(), process_order.end());  // get listing in reverse order.
1238                                                                                                                 // i.e. process leaves 1st.
1239   for(i=0;i<process_order.size();++i){
1240         if(qnodes[process_order[i]]->is_externally_visible == true){
1241 //printf("Visible.\n");
1242                 int root = process_order[i];
1243                 hfta_node *hnode = new hfta_node();
1244                 hnode->name = qnodes[root]-> name;
1245                 hnode->source_name = qnodes[root]-> name;
1246                 hnode->is_udop = qnodes[root]->is_udop;
1247                 hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
1248
1249                 vector<int> proc_list;  proc_list.push_back(root);
1250 //                      Ensure that nodes are added only once.
1251                 set<int> proc_set;      proc_set.insert(root);
1252                 roots.clear();                  roots.insert(root);
1253                 candidates.clear();
1254                 while(roots.size()>0){
1255                         for(si=roots.begin();si!=roots.end();++si){
1256 //printf("Processing root %d\n",(*si));
1257                                 set<int>::iterator sir;
1258                                 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1259 //printf("reads fom %d\n",(*sir));
1260                                         if(qnodes[(*sir)]->is_externally_visible==false){
1261                                                 candidates.insert( (*sir) );
1262                                                 if(proc_set.count( (*sir) )==0){
1263                                                         proc_set.insert( (*sir) );
1264                                                         proc_list.push_back( (*sir) );
1265                                                 }
1266                                         }
1267                                 }
1268                         }
1269                         roots = candidates;
1270                         candidates.clear();
1271                 }
1272
1273                 reverse(proc_list.begin(), proc_list.end());
1274                 hnode->query_node_indices = proc_list;
1275                 hfta_name_map[hnode->name] = hfta_sets.size();
1276                 hfta_sets.push_back(hnode);
1277         }
1278   }
1279
1280 //              Compute the reads_from / sources_to graphs for the hftas.
1281
1282   for(i=0;i<hfta_sets.size();++i){
1283         hfta_node *hnode = hfta_sets[i];
1284         for(q=0;q<hnode->query_node_indices.size();q++){
1285                 query_node *qnode = qnodes[hnode->query_node_indices[q]];
1286                 for(s=0;s<qnode->refd_tbls.size();++s){
1287                         if(hfta_name_map.count(qnode->refd_tbls[s])){
1288                                 int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
1289                                 hnode->reads_from.insert(other_hfta);
1290                                 hfta_sets[other_hfta]->sources_to.insert(i);
1291                         }
1292                 }
1293         }
1294   }
1295
1296 //              Compute a topological sort of the hfta_sets.
1297
1298   vector<int> hfta_topsort;
1299   workq.clear();
1300   int hnode_srcs[hfta_sets.size()];
1301   for(i=0;i<hfta_sets.size();++i){
1302         hnode_srcs[i] = 0;
1303         if(hfta_sets[i]->sources_to.size() == 0)
1304                 workq.push_back(i);
1305   }
1306
1307   while(! workq.empty()){
1308         int     node = workq.front();
1309         workq.pop_front();
1310         hfta_topsort.push_back(node);
1311         set<int>::iterator stsi;
1312         for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
1313                 int parent = (*stsi);
1314                 hnode_srcs[parent]++;
1315                 if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
1316                         workq.push_back(parent);
1317                 }
1318         }
1319   }
1320
1321 //              Decorate hfta nodes with the level of parallelism given as input.
1322
1323   map<string, int>::iterator msii;
1324   for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1325         string hfta_name = (*msii).first;
1326         int par = (*msii).second;
1327         if(hfta_name_map.count(hfta_name) > 0){
1328                 hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
1329         }else{
1330                 fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
1331         }
1332   }
1333
1334 //              Propagate levels of parallelism: children should have a level of parallelism
1335 //              as large as any of its parents.  Adjust children upwards to compensate.
1336 //              Start at parents and adjust children, auto-propagation will occur.
1337
1338   for(i=hfta_sets.size()-1;i>=0;i--){
1339         set<int>::iterator stsi;
1340         for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
1341                 if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
1342                         hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
1343                 }
1344         }
1345   }
1346
1347 //              Before all the name mangling, check if therey are any output_spec.cfg
1348 //              or hfta_parallelism.cfg entries that do not have a matching query.
1349
1350         string dangling_ospecs = "";
1351         for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
1352                 string oq = (*msii).first;
1353                 if(hfta_name_map.count(oq) == 0){
1354                         dangling_ospecs += " "+(*msii).first;
1355                 }
1356         }
1357         if(dangling_ospecs!=""){
1358                 fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
1359                 exit(1);
1360         }
1361
1362         string dangling_par = "";
1363         for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1364                 string oq = (*msii).first;
1365                 if(hfta_name_map.count(oq) == 0){
1366                         dangling_par += " "+(*msii).first;
1367                 }
1368         }
1369         if(dangling_par!=""){
1370                 fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
1371         }
1372
1373
1374
1375 //              Replicate parallelized hftas.  Do __copyX name mangling.  Adjust
1376 //              FROM clauses: retarget any name which is an internal node, and
1377 //              any which is in hfta_sets (and which is parallelized). Add Merge nodes
1378 //              when the source hfta has more parallelism than the target node.
1379 //              Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
1380
1381
1382   int n_original_hfta_sets = hfta_sets.size();
1383   for(i=0;i<n_original_hfta_sets;++i){
1384         if(hfta_sets[i]->n_parallel > 1){
1385                 hfta_sets[i]->do_generation =false;     // set the deletion flag for this entry.
1386                 set<string> local_nodes;                // names of query nodes in the hfta.
1387                 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1388                         local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
1389                 }
1390
1391                 for(p=0;p<hfta_sets[i]->n_parallel;++p){
1392                         string mangler = "__copy"+int_to_string(p);
1393                         hfta_node *par_hfta  = new hfta_node();
1394                         par_hfta->name = hfta_sets[i]->name + mangler;
1395                         par_hfta->source_name = hfta_sets[i]->name;
1396                         par_hfta->is_udop = hfta_sets[i]->is_udop;
1397                         par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
1398                         par_hfta->n_parallel = hfta_sets[i]->n_parallel;
1399                         par_hfta->parallel_idx = p;
1400
1401                         map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
1402
1403 //      Is it a UDOP?
1404                         if(hfta_sets[i]->is_udop){
1405                                 int root = hfta_sets[i]->query_node_indices[0];
1406
1407                                 string unequal_par_sources;
1408                                 set<int>::iterator rfsii;
1409                                 for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
1410                                         if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
1411                                                 unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
1412                                         }
1413                                 }
1414                                 if(unequal_par_sources != ""){
1415                                         fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
1416                                         exit(1);
1417                                 }
1418
1419                                 int rti;
1420                                 vector<string> new_sources;
1421                                 for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
1422                                         new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
1423                                 }
1424
1425                                 query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
1426                                 new_qn->name += mangler;
1427                                 new_qn->mangler = mangler;
1428                                 new_qn->refd_tbls = new_sources;
1429                                 par_hfta->query_node_indices.push_back(qnodes.size());
1430                                 par_qnode_map[new_qn->name] = qnodes.size();
1431                                 name_node_map[ new_qn->name ] = qnodes.size();
1432                                 qnodes.push_back(new_qn);
1433                         }else{
1434 //              regular query node
1435                           for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1436                                 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1437                                 table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
1438 //                                      rehome the from clause on mangled names.
1439 //                                      create merge nodes as needed for external sources.
1440                                 for(f=0;f<dup_pt->fm->tlist.size();++f){
1441                                         if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
1442                                                 dup_pt->fm->tlist[f]->schema_name += mangler;
1443                                         }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
1444 //                      Ref's an external HFTA.  No parallelism => leave as is.  Else, if level of parallelism of the two hftas is the same, rename by the mangler.  Else, there mnust be more sources, so create a merge node.
1445                                                 int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
1446                                                 if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
1447                                                         dup_pt->fm->tlist[f]->schema_name += mangler;
1448                                                 }else{
1449                                                         vector<string> src_tbls;
1450                                                         int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
1451                                                         if(stride == 0){
1452                                                                 fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
1453                                                                 exit(1);
1454                                                         }
1455                                                         for(s=0;s<stride;++s){
1456                                                                 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
1457                                                                 src_tbls.push_back(ext_src_name);
1458                                                         }
1459                                                         table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1460                                                         string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
1461                                                         dup_pt->fm->tlist[f]->schema_name = merge_node_name;
1462 //                                      Make a qnode to represent the new merge node
1463                                                         query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1464                                                         qn_pt->refd_tbls = src_tbls;
1465                                                         qn_pt->is_udop  = false;
1466                                                         qn_pt->is_externally_visible = false;
1467                                                         qn_pt->inferred_visible_node  = false;
1468                                                         par_hfta->query_node_indices.push_back(qnodes.size());
1469                                                         par_qnode_map[merge_node_name] = qnodes.size();
1470                                                         name_node_map[ merge_node_name ] = qnodes.size();
1471                                                         qnodes.push_back(qn_pt);
1472                                                 }
1473                                         }
1474                                 }
1475                                 query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
1476                                 for(f=0;f<dup_pt->fm->tlist.size();++f){
1477                                         new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
1478                                 }
1479                                 new_qn->params = qnodes[hqn_idx]->params;
1480                                 new_qn->is_udop = false;
1481                                 new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
1482                                 new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
1483                                 par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
1484                                 par_qnode_map[new_qn->name] = qnodes.size();
1485                                 name_node_map[ new_qn->name ] = qnodes.size();
1486                                 qnodes.push_back(new_qn);
1487                           }
1488                         }
1489                         hfta_name_map[par_hfta->name] = hfta_sets.size();
1490                         hfta_sets.push_back(par_hfta);
1491                 }
1492         }else{
1493 //              This hfta isn't being parallelized, but add merge nodes for any parallelized
1494 //              hfta sources.
1495                 if(!hfta_sets[i]->is_udop){
1496                   for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1497                         int hqn_idx = hfta_sets[i]->query_node_indices[h];
1498                         for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
1499                                 if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
1500 //                      Ref's an external HFTA.  No parallelism => leave as is.  Else, if level of parallelism of the two hftas is the same, rename by the mangler.  Else, there mnust be more sources, so create a merge node.
1501                                         int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
1502                                         if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
1503                                                 vector<string> src_tbls;
1504                                                 for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
1505                                                         string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
1506                                                         src_tbls.push_back(ext_src_name);
1507                                                 }
1508                                                 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1509                                                 string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
1510                                                 qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
1511 //                                      Make a qnode to represent the new merge node
1512                                                 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1513                                                 qn_pt->refd_tbls = src_tbls;
1514                                                 qn_pt->is_udop  = false;
1515                                                 qn_pt->is_externally_visible = false;
1516                                                 qn_pt->inferred_visible_node  = false;
1517                                                 hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
1518                                                 name_node_map[ merge_node_name ] = qnodes.size();
1519                                                 qnodes.push_back(qn_pt);
1520                                         }
1521                                 }
1522                         }
1523                 }
1524           }
1525         }
1526   }
1527
1528 //                      Rebuild the reads_from / sources_to lists in the qnodes
1529   for(q=0;q<qnodes.size();++q){
1530         qnodes[q]->reads_from.clear();
1531         qnodes[q]->sources_to.clear();
1532   }
1533   for(q=0;q<qnodes.size();++q){
1534         for(s=0;s<qnodes[q]->refd_tbls.size();++s){
1535                 if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
1536                         int rf = name_node_map[qnodes[q]->refd_tbls[s]];
1537                         qnodes[q]->reads_from.insert(rf);
1538                         qnodes[rf]->sources_to.insert(q);
1539                 }
1540         }
1541   }
1542
1543 //                      Rebuild the reads_from / sources_to lists in hfta_sets
1544   for(q=0;q<hfta_sets.size();++q){
1545         hfta_sets[q]->reads_from.clear();
1546         hfta_sets[q]->sources_to.clear();
1547   }
1548   for(q=0;q<hfta_sets.size();++q){
1549         for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
1550                 int node = hfta_sets[q]->query_node_indices[s];
1551                 set<int>::iterator rfsii;
1552                 for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
1553                         if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
1554                                 hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
1555                                 hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
1556                         }
1557                 }
1558         }
1559   }
1560
1561 /*
1562 for(q=0;q<qnodes.size();++q){
1563  printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
1564  set<int>::iterator rsii;
1565  for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
1566   printf(" %d",(*rsii));
1567   printf(", and sources-to %d:",qnodes[q]->sources_to.size());
1568  for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
1569   printf(" %d",(*rsii));
1570  printf("\n");
1571 }
1572
1573 for(q=0;q<hfta_sets.size();++q){
1574  if(hfta_sets[q]->do_generation==false)
1575         continue;
1576  printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
1577  set<int>::iterator rsii;
1578  for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
1579   printf(" %d",(*rsii));
1580   printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
1581  for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
1582   printf(" %d",(*rsii));
1583  printf("\n");
1584 }
1585 */
1586
1587
1588
1589 //              Re-topo sort the hftas
1590   hfta_topsort.clear();
1591   workq.clear();
1592   int hnode_srcs_2[hfta_sets.size()];
1593   for(i=0;i<hfta_sets.size();++i){
1594         hnode_srcs_2[i] = 0;
1595         if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
1596                 workq.push_back(i);
1597         }
1598   }
1599
1600   while(workq.empty() == false){
1601         int     node = workq.front();
1602         workq.pop_front();
1603         hfta_topsort.push_back(node);
1604         set<int>::iterator stsii;
1605         for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
1606                 int child = (*stsii);
1607                 hnode_srcs_2[child]++;
1608                 if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
1609                         workq.push_back(child);
1610                 }
1611         }
1612   }
1613
1614 //              Ensure that all of the query_node_indices in hfta_sets are topologically
1615 //              sorted, don't rely on assumptions that all transforms maintain some kind of order.
1616   for(i=0;i<hfta_sets.size();++i){
1617         if(hfta_sets[i]->do_generation){
1618                 map<int,int> n_accounted;
1619                 vector<int> new_order;
1620                 workq.clear();
1621                 vector<int>::iterator vii;
1622                 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1623                         n_accounted[(*vii)]= 0;
1624                 }
1625                 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1626                         set<int>::iterator rfsii;
1627                         for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
1628                                 if(n_accounted.count((*rfsii)) == 0){
1629                                         n_accounted[(*vii)]++;
1630                                 }
1631                         }
1632                         if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
1633                                 workq.push_back((*vii));
1634                         }
1635                 }
1636
1637                 while(workq.empty() == false){
1638                         int node = workq.front();
1639                         workq.pop_front();
1640                         new_order.push_back(node);
1641                         set<int>::iterator stsii;
1642                         for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
1643                                 if(n_accounted.count((*stsii))){
1644                                         n_accounted[(*stsii)]++;
1645                                         if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
1646                                                 workq.push_back((*stsii));
1647                                         }
1648                                 }
1649                         }
1650                 }
1651                 hfta_sets[i]->query_node_indices = new_order;
1652         }
1653   }
1654
1655
1656
1657
1658
1659 ///                     Global checkng is done, start the analysis and translation
1660 ///                     of the query parse tree in the order specified by process_order
1661
1662
1663 //                      Get a list of the LFTAs for global lfta optimization
1664 //                              TODO: separate building operators from spliting lftas,
1665 //                                      that will make optimizations such as predicate pushing easier.
1666         vector<stream_query *> lfta_list;
1667         stream_query *rootq;
1668     int qi,qj;
1669
1670         map<string, map<int, string> > schema_of_schemaid; // ensure that schema IDs are unique to an <interface, schema>
1671
1672         for(qi=hfta_topsort.size()-1;qi>=0;--qi){
1673
1674         int hfta_id = hfta_topsort[qi];
1675     vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
1676
1677
1678
1679 //              Two possibilities, either its a UDOP, or its a collection of queries.
1680 //      if(qnodes[curr_list.back()]->is_udop)
1681         if(hfta_sets[hfta_id]->is_udop){
1682                 int node_id = curr_list.back();
1683                 int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
1684                 opview_entry *opv = new opview_entry();
1685
1686 //                      Many of the UDOP properties aren't currently used.
1687                 opv->parent_qname = "no_parent";
1688                 opv->root_name = qnodes[node_id]->name;
1689                 opv->view_name = qnodes[node_id]->file;
1690                 opv->pos = qi;
1691                 sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
1692                 opv->udop_alias = tmpstr;
1693                 opv->mangler = qnodes[node_id]->mangler;
1694
1695                 if(opv->mangler != ""){
1696                         int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
1697                         Schema->mangle_subq_names(new_udop_schref,opv->mangler);
1698                 }
1699
1700 //                      This piece of code makes each hfta which referes to the same udop
1701 //                      reference a distinct running udop.  Do this at query optimization time?
1702 //              fmtbl->set_udop_alias(opv->udop_alias);
1703
1704                 opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
1705                 opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
1706
1707                 vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
1708                 int s,f,q;
1709                 for(s=0;s<subq.size();++s){
1710 //                              Validate that the fields match.
1711                         subquery_spec *sqs = subq[s];
1712                         string subq_name = sqs->name + opv->mangler;
1713                         vector<field_entry *> flds = Schema->get_fields(subq_name);
1714                         if(flds.size() == 0){
1715                                 fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
1716                                 return(1);
1717                         }
1718                         if(flds.size() < sqs->types.size()){
1719                                 fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
1720                                 return(1);
1721                         }
1722                         bool failed = false;
1723                         for(f=0;f<sqs->types.size();++f){
1724                                 data_type dte(sqs->types[f],sqs->modifiers[f]);
1725                                 data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
1726                                 if(! dte.subsumes_type(&dtf) ){
1727                                         fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
1728                                         failed = true;
1729                                 }
1730 /*
1731                                 if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
1732                                         string pstr = dte.get_temporal_string();
1733                                         fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
1734                                         failed = true;
1735                                 }
1736 */
1737                         }
1738                         if(failed)
1739                                 return(1);
1740 ///                             Validation done, find the subquery, make a copy of the
1741 ///                             parse tree, and add it to the return list.
1742                         for(q=0;q<qnodes.size();++q)
1743                                 if(qnodes[q]->name == subq_name)
1744                                         break;
1745                         if(q==qnodes.size()){
1746                                 fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
1747                                 return(1);
1748                         }
1749
1750                 }
1751
1752 //                      Cross-link to from entry(s) in all sourced-to tables.
1753                 set<int>::iterator sii;
1754                 for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
1755 //printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
1756                         vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
1757                         int ii;
1758                         for(ii=0;ii<tblvars.size();++ii){
1759                                 if(tblvars[ii]->schema_name == opv->root_name){
1760                                         tblvars[ii]->set_opview_idx(opviews.size());
1761                                 }
1762
1763                         }
1764                 }
1765
1766                 opviews.append(opv);
1767         }else{
1768
1769 //                      Analyze the parse trees in this query,
1770 //                      put them in rootq
1771 //      vector<int> curr_list = process_sets[qi];
1772
1773
1774 ////////////////////////////////////////
1775
1776           rootq = NULL;
1777 //printf("Process set %d, has %d queries\n",qi,curr_list.size());
1778           for(qj=0;qj<curr_list.size();++qj){
1779                 i = curr_list[qj];
1780         fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
1781
1782 //                      Select the current query parse tree
1783                 table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
1784
1785 //                      if hfta only, try to fetch any missing schemas
1786 //                      from the registry (using the print_schema program).
1787 //                      Here I use a hack to avoid analyzing the query -- all referenced
1788 //                      tables must be in the from clause
1789 //                      If there is a problem loading any table, just issue a warning,
1790 //
1791                 tablevar_list_t *fm = fta_parse_tree->get_from();
1792                 vector<string> refd_tbls =  fm->get_src_tbls(Schema);
1793 //                      iterate over all referenced tables
1794                 int t;
1795                 for(t=0;t<refd_tbls.size();++t){
1796                   int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
1797
1798                   if(tbl_ref < 0){      // if this table is not in the Schema
1799
1800                         if(hfta_only){
1801                                 string cmd="print_schema "+refd_tbls[t];
1802                                 FILE *schema_in = popen(cmd.c_str(), "r");
1803                                 if(schema_in == NULL){
1804                                   fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
1805                                 }else{
1806                                   string schema_instr;
1807                                   while(fgets(tmpstr,TMPSTRLEN,schema_in)){
1808                                         schema_instr += tmpstr;
1809                                   }
1810                           fta_parse_result = new fta_parse_t();
1811                                   strcpy(tmp_schema_str,schema_instr.c_str());
1812                                   FtaParser_setstringinput(tmp_schema_str);
1813                           if(FtaParserparse()){
1814                                 fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
1815                           }else{
1816                                         if( fta_parse_result->tables != NULL){
1817                                                 int tl;
1818                                                 for(tl=0;tl<fta_parse_result->tables->size();++tl){
1819                                                         Schema->add_table(fta_parse_result->tables->get_table(tl));
1820                                                 }
1821                                         }else{
1822                                                 fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
1823                                         }
1824                                 }
1825                         }
1826                   }else{
1827                                 fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
1828                                 exit(1);
1829                   }
1830
1831                 }
1832           }
1833
1834
1835 //                              Analyze the query.
1836           query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
1837           if(qs == NULL){
1838                 fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
1839                 exit(1);
1840           }
1841
1842           stream_query new_sq(qs, Schema);
1843           if(new_sq.error_code){
1844                         fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
1845                         exit(1);
1846           }
1847
1848 //                      Add it to the Schema
1849           table_def *output_td = new_sq.get_output_tabledef();
1850           Schema->add_table(output_td);
1851
1852 //                      Create a query plan from the analyzed parse tree.
1853 //                      If its a query referneced via FROM, add it to the stream query.
1854           if(rootq){
1855                 rootq->add_query(new_sq);
1856           }else{
1857                 rootq = new stream_query(new_sq);
1858 //                      have the stream query object inherit properties form the analyzed
1859 //                      hfta_node object.
1860                 rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
1861                 rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
1862           }
1863
1864
1865     }
1866
1867 //              This stream query has all its parts
1868 //              Build and optimize it.
1869 //printf("translate_fta: generating plan.\n");
1870         if(rootq->generate_plan(Schema)){
1871                 fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
1872                 continue;
1873         }
1874
1875 //      If we've found the query plan head, so now add the output operators
1876         if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
1877                 pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
1878                 multimap<string, int>::iterator mmsi;
1879                 oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
1880                 for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
1881                         rootq->add_output_operator(output_specs[(*mmsi).second]);
1882                 }
1883         }
1884
1885
1886
1887 //                              Perform query splitting if necessary.
1888         bool hfta_returned;
1889     vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
1890
1891         int l;
1892 //for(l=0;l<split_queries.size();++l){
1893 //printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
1894 //}
1895
1896
1897
1898
1899     if(split_queries.size() > 0){       // should be at least one component.
1900
1901 //                              Compute the number of LFTAs.
1902           int n_lfta = split_queries.size();
1903           if(hfta_returned) n_lfta--;
1904 //                              Check if a schemaId constraint needs to be inserted.
1905
1906 //                              Process the LFTA components.
1907           for(l=0;l<n_lfta;++l){
1908            if(lfta_names.count(split_queries[l]->query_name) == 0){
1909 //                              Grab the lfta for global optimization.
1910                 vector<tablevar_t *> tvec =  split_queries[l]->query_plan[0]->get_input_tbls();
1911                 string liface = "_local_";
1912 //              string lmach = "";
1913                 string lmach = hostname;
1914                 if(tvec.size()>0){
1915                         liface = tvec[0]->get_interface();      // iface queries have been resolved
1916                         if(tvec[0]->get_machine() != ""){
1917                                 lmach = tvec[0]->get_machine();
1918                         }else{
1919                                 fprintf(stderr,"WARNING, lfta %s has empty machine name, using %s\n",  split_queries[l]->query_plan[0]->node_name.c_str(), hostname.c_str());
1920                         }
1921                 } // else{
1922                         interface_names.push_back(liface);
1923                         machine_names.push_back(lmach);
1924 //              }
1925
1926                 vector<predicate_t *> schemaid_preds;
1927                 for(int irv=0;irv<tvec.size();++irv){
1928
1929                         string schema_name = tvec[irv]->get_schema_name();
1930                         string rvar_name = tvec[irv]->get_var_name();
1931                         int schema_ref = tvec[irv]->get_schema_ref();
1932                         if (lmach == "")
1933                                 lmach = hostname;
1934 //                      interface_names.push_back(liface);
1935 //                      machine_names.push_back(lmach);
1936
1937 //printf("Machine is %s\n",lmach.c_str());
1938
1939 //                              Check if a schemaId constraint needs to be inserted.
1940                         if(schema_ref<0){ // can result from some kinds of splits
1941                                 schema_ref = Schema->get_table_ref(schema_name);
1942                         }
1943                         int schema_id = Schema->get_schema_id(schema_ref);  // id associated with PROTOCOL
1944                         int errnum = 0;
1945                         string if_error;
1946                         iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
1947                         if(iface==NULL){
1948                                 fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
1949                                 exit(1);
1950                         }       
1951
1952
1953                         if(tvec[irv]->get_interface() != "_local_"){
1954                          if(iface->has_multiple_schemas()){
1955                                 if(schema_id<0){        // invalid schema_id
1956                                         fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
1957                                         exit(1);
1958                                 }
1959                                 vector<string> iface_schemas = iface->get_property("Schemas");
1960                                 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1961                                         fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1962                                         exit(1);
1963                                 }
1964 // Ensure that in liface, schema_id is used for only one schema
1965                                 if(schema_of_schemaid.count(liface)==0){
1966                                         map<int, string> empty_map;
1967                                         schema_of_schemaid[liface] = empty_map;
1968                                 }
1969                                 if(schema_of_schemaid[liface].count(schema_id)==0){
1970                                         schema_of_schemaid[liface][schema_id] = schema_name;
1971                                 }else{
1972                                         if(schema_of_schemaid[liface][schema_id] != schema_name){
1973                                                 fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
1974                                                 exit(1);
1975                                         }
1976                                 }
1977                          }else{ // single-schema interface
1978                                 schema_id = -1; // don't generate schema_id predicate
1979                                 vector<string> iface_schemas = iface->get_property("Schemas");
1980                                 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1981                                         fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1982                                         exit(1);
1983                                 }
1984                                 if(iface_schemas.size()>1){
1985                                         fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
1986                                         exit(1);
1987                                 }
1988                          }                      
1989                         }else{
1990                                 schema_id = -1;
1991                         }
1992
1993 // If we need to check the schema_id, insert a predicate into the lfta.
1994 //       TODO not just schema_id, the full all_schema_ids set.
1995                         if(schema_id>=0){
1996                                 colref_t *schid_cr = new colref_t("schemaId");
1997                                 schid_cr->schema_ref = schema_ref;
1998                                 schid_cr->table_name = rvar_name;
1999                                 schid_cr->tablevar_ref = 0;
2000                                 schid_cr->default_table = false;
2001                                 scalarexp_t *schid_se = new scalarexp_t(schid_cr);
2002                                 data_type *schid_dt = new data_type("uint");
2003                                 schid_se->dt = schid_dt;
2004
2005                                 string schid_str = int_to_string(schema_id);
2006                                 literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
2007                                 scalarexp_t *lit_se = new scalarexp_t(schid_lit);
2008                                 lit_se->dt = schid_dt;
2009
2010                                 predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
2011                                 vector<cnf_elem *> clist;
2012                                 make_cnf_from_pr(schid_pr, clist);
2013                                 analyze_cnf(clist[0]);
2014                                 clist[0]->cost = 1;     // cheap one comparison
2015 // cnf built, now insert it.
2016                                 split_queries[l]->query_plan[0]->append_to_where(clist[0]);
2017
2018 // Specialized processing 
2019 //              filter join, get two schemaid preds
2020                                 string node_type = split_queries[l]->query_plan[0]->node_type();
2021                                 if(node_type == "filter_join"){
2022                                         filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2023                                         if(irv==0){
2024                                                 fj->pred_t0.push_back(clist[0]);
2025                                         }else{
2026                                                 fj->pred_t1.push_back(clist[0]);
2027                                         }
2028                                         schemaid_preds.push_back(schid_pr);
2029                                 }
2030 //              watchlist join, get the first schemaid pred
2031                                 if(node_type == "watch_join"){
2032                                         watch_join_qpn *fj = (watch_join_qpn *)split_queries[l]->query_plan[0];
2033                                         if(irv==0){
2034                                                 fj->pred_t0.push_back(clist[0]);
2035                                                 schemaid_preds.push_back(schid_pr);
2036                                         }
2037                                 }
2038                         }
2039                 }
2040 // Specialized processing, currently filter join.
2041                 if(schemaid_preds.size()>1){
2042                         string node_type = split_queries[l]->query_plan[0]->node_type();
2043                         if(node_type == "filter_join"){
2044                                 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2045                                 predicate_t *filter_pr = new predicate_t("OR", schemaid_preds[0], schemaid_preds[1]);
2046                                 vector<cnf_elem *> clist;
2047                                 make_cnf_from_pr(filter_pr, clist);
2048                                 analyze_cnf(clist[0]);
2049                                 clist[0]->cost = 1;     // cheap one comparison
2050                                 fj->shared_pred.push_back(clist[0]);
2051                         }
2052                 }
2053                         
2054
2055
2056
2057
2058
2059
2060 //                      Set the ht size from the recommendation, if there is one in the rec file
2061                 if(lfta_htsize.count(split_queries[l]->query_name)>0){
2062                         split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
2063                 }
2064
2065
2066                 lfta_names[split_queries[l]->query_name] = lfta_list.size();
2067                 split_queries[l]->set_gid(lfta_list.size());  // set lfta global id
2068                 lfta_list.push_back(split_queries[l]);
2069                 lfta_mach_lists[lmach].push_back(split_queries[l]);
2070
2071 //                      THe following is a hack,
2072 //                      as I should be generating LFTA code through
2073 //                      the stream_query object.
2074
2075                 split_queries[l]->query_plan[0]->bind_to_schema(Schema);
2076
2077 //              split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
2078
2079 /*
2080 //                              Create query description to embed in lfta.c
2081                 string lfta_schema_str = split_queries[l]->make_schema();
2082                 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2083
2084 //                              get NIC capabilities.
2085                 int erri;
2086                 nic_property *nicprop = NULL;
2087                 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2088                 if(iface_codegen_type.size()){
2089                         nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2090                         if(!nicprop){
2091                                 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2092                                         exit(1);
2093                         }
2094                 }
2095
2096                 lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
2097 */
2098
2099                 snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "snap"));
2100                 snap_position.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "index"));
2101
2102 // STOPPED HERE need to figure out how to generate the code that Vlad needs
2103 //              from snap_postion
2104
2105 // TODO NOTE : I'd like it to be the case that registration_query_names
2106 //      are the queries to be registered for subsciption.
2107 //      but there is some complex bookkeeping here.
2108                 registration_query_names.push_back(split_queries[l]->query_name);
2109                 mach_query_names[lmach].push_back(registration_query_names.size()-1);
2110 //                      NOTE: I will assume a 1-1 correspondance between
2111 //                      mach_query_names[lmach] and lfta_mach_lists[lmach]
2112 //                      where mach_query_names[lmach][i] contains the index into
2113 //                      query_names, which names the lfta, and
2114 //                      mach_query_names[lmach][i] is the stream_query * of the
2115 //                      corresponding lfta.
2116 //                      Later, lfta_iface_qnames are the query names matching lfta_iface_lists
2117
2118
2119
2120                 // check if lfta is reusable
2121                 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
2122
2123                 bool lfta_reusable = false;
2124                 if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
2125                         split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
2126                         lfta_reusable = true;
2127                 }
2128                 lfta_reuse_options.push_back(lfta_reusable);
2129
2130                 // LFTA will inherit the liveness timeout specification from the containing query
2131                 // it is too conservative as lfta are expected to spend less time per tuple
2132                 // then full query
2133
2134                 // extract liveness timeout from query definition
2135                 int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
2136                 if (!liveness_timeout) {
2137 //                  fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
2138 //                    split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2139                     liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2140                 }
2141                 lfta_liveness_timeouts.push_back(liveness_timeout);
2142
2143 //                      Add it to the schema
2144                 table_def *td = split_queries[l]->get_output_tabledef();
2145                 Schema->append_table(td);
2146 //printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
2147
2148           }
2149          }
2150
2151 //                              If the output is lfta-only, dump out the query name.
2152       if(split_queries.size() == 1 && !hfta_returned){
2153         if(output_query_names ){
2154            fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
2155                 }
2156 /*
2157 else{
2158            fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
2159                 }
2160 */
2161
2162 /*
2163 //                              output schema summary
2164                 if(output_schema_summary){
2165                         dump_summary(split_queries[0]);
2166                 }
2167 */
2168       }
2169
2170
2171           if(hfta_returned){            // query also has an HFTA component
2172                 int hfta_nbr = split_queries.size()-1;
2173
2174                         hfta_list.push_back(split_queries[hfta_nbr]);
2175
2176 //                                      report on generated query names
2177         if(output_query_names){
2178                         string hfta_name =split_queries[hfta_nbr]->query_name;
2179                 fprintf(query_name_output,"%s H\n",hfta_name.c_str());
2180                         for(l=0;l<hfta_nbr;++l){
2181                                 string lfta_name =split_queries[l]->query_name;
2182                         fprintf(query_name_output,"%s L\n",lfta_name.c_str());
2183                         }
2184                 }
2185 //              else{
2186 //              fprintf(stderr,"query names are ");
2187 //                      for(l=0;l<hfta_nbr;++l){
2188 //                              if(l>0) fprintf(stderr,",");
2189 //                              string fta_name =split_queries[l]->query_name;
2190 //                      fprintf(stderr," %s",fta_name.c_str());
2191 //                      }
2192 //                      fprintf(stderr,"\n");
2193 //              }
2194           }
2195
2196   }else{
2197           fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
2198           fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
2199           exit(1);
2200   }
2201  }
2202 }
2203
2204
2205 //-----------------------------------------------------------------
2206 //              Compute and propagate the SE in PROTOCOL fields compute a field.
2207 //-----------------------------------------------------------------
2208
2209 for(i=0;i<lfta_list.size();i++){
2210         lfta_list[i]->generate_protocol_se(sq_map, Schema);
2211         sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
2212 }
2213 for(i=0;i<hfta_list.size();i++){
2214         hfta_list[i]->generate_protocol_se(sq_map, Schema);
2215         sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
2216 }
2217
2218
2219
2220 //------------------------------------------------------------------------
2221 //              Perform  individual FTA optimizations
2222 //-----------------------------------------------------------------------
2223
2224 if (partitioned_mode) {
2225
2226         // open partition definition file
2227         string part_fname = config_dir_path + "partition.txt";
2228
2229         FILE* partfd = fopen(part_fname.c_str(), "r");
2230         if (!partfd) {
2231                 fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
2232                 exit(1);
2233         }
2234         PartnParser_setfileinput(partfd);
2235         if (PartnParserparse()) {
2236                 fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
2237                 exit(1);
2238         }
2239         fclose(partfd);
2240 }
2241
2242
2243 print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2244
2245 int num_hfta = hfta_list.size();
2246 for(i=0; i < hfta_list.size(); ++i){
2247         hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
2248 }
2249
2250 //                      Add all new hftas to schema
2251 for(i=num_hfta; i < hfta_list.size(); ++i){
2252                 table_def *td = hfta_list[i]->get_output_tabledef();
2253                 Schema->append_table(td);
2254 }
2255
2256 print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2257
2258
2259
2260 //------------------------------------------------------------------------
2261 //              Do global (cross-fta) optimization
2262 //-----------------------------------------------------------------------
2263
2264
2265
2266
2267
2268
2269 set<string> extra_external_libs;
2270
2271 for(i=0;i<hfta_list.size();++i){                // query also has an HFTA component
2272
2273                 if(! debug_only){
2274 //                                      build hfta file name, create output
2275            if(numeric_hfta_flname){
2276             sprintf(tmpstr,"hfta_%d",hfta_count);
2277                         hfta_names.push_back(tmpstr);
2278             sprintf(tmpstr,"hfta_%d.cc",hfta_count);
2279          }else{
2280             sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
2281                         hfta_names.push_back(tmpstr);
2282             sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
2283           }
2284                   FILE *hfta_fl = fopen(tmpstr,"w");
2285                   if(hfta_fl == NULL){
2286                         fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
2287                         exit(1);
2288                   }
2289                   fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
2290
2291 //                      If there is a field verifier, warn about
2292 //                      lack of compatability
2293 //                              NOTE : this code assumes that visible non-lfta queries
2294 //                              are those at the root of a stream query.
2295                   string hfta_comment;
2296                   string hfta_title;
2297                   string hfta_namespace;
2298                   if(hfta_list[i]->defines.count("comment")>0)
2299                         hfta_comment = hfta_list[i]->defines["comment"];
2300                   if(hfta_list[i]->defines.count("Comment")>0)
2301                         hfta_comment = hfta_list[i]->defines["Comment"];
2302                   if(hfta_list[i]->defines.count("COMMENT")>0)
2303                         hfta_comment = hfta_list[i]->defines["COMMENT"];
2304                   if(hfta_list[i]->defines.count("title")>0)
2305                         hfta_title = hfta_list[i]->defines["title"];
2306                   if(hfta_list[i]->defines.count("Title")>0)
2307                         hfta_title = hfta_list[i]->defines["Title"];
2308                   if(hfta_list[i]->defines.count("TITLE")>0)
2309                         hfta_title = hfta_list[i]->defines["TITLE"];
2310                   if(hfta_list[i]->defines.count("namespace")>0)
2311                         hfta_namespace = hfta_list[i]->defines["namespace"];
2312                   if(hfta_list[i]->defines.count("Namespace")>0)
2313                         hfta_namespace = hfta_list[i]->defines["Namespace"];
2314                   if(hfta_list[i]->defines.count("NAMESPACE")>0)
2315                         hfta_namespace = hfta_list[i]->defines["NAMESPACE"];
2316
2317                   if(field_verifier != NULL){
2318                         string warning_str;
2319                         if(hfta_comment == "")
2320                                 warning_str += "\tcomment not found.\n";
2321
2322 // Obsolete stuff that Carsten wanted
2323 //                      if(hfta_title == "")
2324 //                              warning_str += "\ttitle not found.\n";
2325 //                      if(hfta_namespace == "")
2326 //                              warning_str += "\tnamespace not found.\n";
2327
2328 //      There is a get_tbl_keys method implemented for qp_nodes,
2329 //      integrate it into steam_query, then call it to find keys,
2330 //      and annotate feidls with their key-ness.
2331 //      If there is a "keys" proprty in the defines block, override anything returned
2332 //      from the automated analysis
2333
2334                         vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2335                         int fi;
2336                         for(fi=0;fi<flds.size();fi++){
2337                                 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2338                         }
2339                         if(warning_str != "")
2340                                 fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
2341                                         hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
2342                   }
2343
2344 // Get the fields in this query
2345                   vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2346
2347 // do key processing
2348                   string hfta_keys_s = "";
2349                   if(hfta_list[i]->defines.count("keys")>0)
2350                         hfta_keys_s = hfta_list[i]->defines["keys"];
2351                   if(hfta_list[i]->defines.count("Keys")>0)
2352                         hfta_keys_s = hfta_list[i]->defines["Keys"];
2353                   if(hfta_list[i]->defines.count("KEYS")>0)
2354                         hfta_keys_s = hfta_list[i]->defines["KEYS"];
2355                   string xtra_keys_s = "";
2356                   if(hfta_list[i]->defines.count("extra_keys")>0)
2357                         xtra_keys_s = hfta_list[i]->defines["extra_keys"];
2358                   if(hfta_list[i]->defines.count("Extra_Keys")>0)
2359                         xtra_keys_s = hfta_list[i]->defines["Extra_Keys"];
2360                   if(hfta_list[i]->defines.count("EXTRA_KEYS")>0)
2361                         xtra_keys_s = hfta_list[i]->defines["EXTRA_KEYS"];
2362 // get the keys
2363                   vector<string> hfta_keys;
2364                   vector<string> partial_keys;
2365                   vector<string> xtra_keys;
2366                   if(hfta_keys_s==""){
2367                                 hfta_keys = hfta_list[i]->get_tbl_keys(partial_keys);
2368                                 if(xtra_keys_s.size()>0){
2369                                         xtra_keys = split_string(xtra_keys_s, ',');
2370                                 }
2371                                 for(int xi=0;xi<xtra_keys.size();++xi){
2372                                         if(find(hfta_keys.begin(), hfta_keys.end(), xtra_keys[xi]) == hfta_keys.end()){
2373                                                 hfta_keys.push_back(xtra_keys[xi]);
2374                                         }
2375                                 }
2376                   }else{
2377                                 hfta_keys = split_string(hfta_keys_s, ',');
2378                   }
2379 // validate that all of the keys exist in the output.
2380 //      (exit on error, as its a bad specificiation)
2381                   vector<string> missing_keys;
2382                   for(int ki=0;ki<hfta_keys.size(); ++ki){
2383                         int fi;
2384                         for(fi=0;fi<flds.size();++fi){
2385                                 if(hfta_keys[ki] == flds[fi]->get_name())
2386                                         break;
2387                         }
2388                         if(fi==flds.size())
2389                                 missing_keys.push_back(hfta_keys[ki]);
2390                   }
2391                   if(missing_keys.size()>0){
2392                         fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the output:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2393                         for(int hi=0; hi<missing_keys.size(); ++hi){
2394                                 fprintf(stderr," %s", missing_keys[hi].c_str());
2395                         }
2396                         fprintf(stderr,"\n");
2397                         exit(1);
2398                   }
2399
2400                   fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2401                   if(hfta_comment != "")
2402                         fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
2403                   if(hfta_title != "")
2404                         fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
2405                   if(hfta_namespace != "")
2406                         fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
2407                   fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
2408                   fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2409
2410 //                              write info about fields to qtree.xml
2411                   int fi;
2412                   for(fi=0;fi<flds.size();fi++){
2413                         fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2414                         if(flds[fi]->get_modifier_list()->size()){
2415                                 fprintf(qtree_output,"mods='%s'", flds[fi]->get_modifier_list()->to_string().c_str());
2416                         }
2417                         fprintf(qtree_output," />\n");
2418                   }
2419 // info about keys
2420                   for(int hi=0;hi<hfta_keys.size();++hi){
2421                         fprintf(qtree_output, "\t\t<Key value='%s'/>\n",hfta_keys[hi].c_str());
2422                   }
2423                   for(int hi=0;hi<partial_keys.size();++hi){
2424                         fprintf(qtree_output, "\t\t<PartialKey value='%s'/>\n",partial_keys[hi].c_str());
2425                   }
2426                   for(int hi=0;hi<xtra_keys.size();++hi){
2427                         fprintf(qtree_output, "\t\t<ExtraKey value='%s'/>\n",xtra_keys[hi].c_str());
2428                   }
2429
2430
2431                   // extract liveness timeout from query definition
2432                   int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
2433                   if (!liveness_timeout) {
2434 //                  fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
2435 //                    hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2436                     liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2437                   }
2438                   fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
2439
2440                   vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
2441                   int itv;
2442                   for(itv=0;itv<tmp_tv.size();++itv){
2443                         fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
2444                   }
2445                   string ifrs = hfta_list[i]->collect_refd_ifaces();
2446                   if(ifrs != ""){
2447                         fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
2448                   }
2449                   fprintf(qtree_output,"\t</HFTA>\n");
2450
2451                   fclose(hfta_fl);
2452                 }else{
2453 //                                      debug only -- do code generation to catch generation-time errors.
2454                   hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
2455                 }
2456
2457                 hfta_count++;   // for hfta file names with numeric suffixes
2458
2459                 hfta_list[i]->get_external_libs(extra_external_libs);
2460
2461           }
2462
2463 string ext_lib_string;
2464 set<string>::iterator ssi_el;
2465 for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
2466         ext_lib_string += (*ssi_el)+" ";
2467
2468
2469
2470 //                      Report on the set of operator views
2471   for(i=0;i<opviews.size();++i){
2472         opview_entry *opve = opviews.get_entry(i);
2473         fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
2474         fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
2475         fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
2476         fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
2477         fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2478
2479         if (!opve->liveness_timeout) {
2480 //              fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
2481 //                      opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2482                 opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2483         }
2484         fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
2485     int j;
2486         for(j=0;j<opve->subq_names.size();j++)
2487                 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
2488         fprintf(qtree_output,"\t</UDOP>\n");
2489   }
2490
2491
2492 //-----------------------------------------------------------------
2493
2494 //                      Create interface-specific meta code files.
2495 //                              first, open and parse the interface resources file.
2496         ifaces_db = new ifq_t();
2497     ierr = "";
2498         if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
2499                 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
2500                                 ifx_fname.c_str(), ierr.c_str());
2501                 exit(1);
2502         }
2503
2504         map<string, vector<stream_query *> >::iterator svsi;
2505         for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
2506                 string lmach = (*svsi).first;
2507
2508         //              For this machine, create a set of lftas per interface.
2509                 vector<stream_query *> mach_lftas = (*svsi).second;
2510                 map<string, vector<stream_query *> > lfta_iface_lists;
2511                 int li;
2512                 for(li=0;li<mach_lftas.size();++li){
2513                         vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2514                         string lfta_iface = "_local_";
2515                         if(tvec.size()>0){
2516                                 string lfta_iface = tvec[0]->get_interface();
2517                         }
2518                         lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2519                 }
2520
2521                 map<string, vector<stream_query *> >::iterator lsvsi;
2522                 for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
2523                         int erri;
2524                         string liface = (*lsvsi).first;
2525                         vector<stream_query *> iface_lftas = (*lsvsi).second;
2526                         vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2527                         if(iface_codegen_type.size()){
2528                                 nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2529                                 if(!nicprop){
2530                                         fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2531                                         exit(1);
2532                                 }
2533                                 string mcs = generate_nic_code(iface_lftas, nicprop);
2534                                 string mcf_flnm;
2535                                 if(lmach != "")
2536                                   mcf_flnm = lmach + "_"+liface+".mcf";
2537                                 else
2538                                   mcf_flnm = hostname + "_"+liface+".mcf";
2539                                 FILE *mcf_fl ;
2540                                 if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
2541                                         fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
2542                                         exit(1);
2543                                 }
2544                                 fprintf(mcf_fl,"%s",mcs.c_str());
2545                                 fclose(mcf_fl);
2546 //printf("mcs of machine %s, iface %s of type %s is \n%s\n",
2547 //lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
2548                         }
2549                 }
2550
2551
2552         }
2553
2554
2555
2556 //-----------------------------------------------------------------
2557
2558
2559 //                      Find common filter predicates in the LFTAs.
2560 //                      in addition generate structs to store the
2561 //                      temporal attributes unpacked by prefilter
2562 //                      compute & provide interface for per-interface
2563 //                      record extraction properties
2564         
2565         map<string, vector<stream_query *> >::iterator ssqi;
2566         for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
2567
2568                 string lmach = (*ssqi).first;
2569                 bool packed_return = false;
2570                 int li, erri;
2571
2572
2573 //      The LFTAs of this machine.
2574                 vector<stream_query *> mach_lftas = (*ssqi).second;
2575 //      break up on a per-interface basis.
2576                 map<string, vector<stream_query *> > lfta_iface_lists;
2577                 map<string, vector<int> > lfta_iface_qname_ix; // need the query name
2578                                                         // for fta_init
2579                 for(li=0;li<mach_lftas.size();++li){
2580                         vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2581                         string lfta_iface = "_local_";
2582                         if(tvec.size()>0){
2583                                 lfta_iface = tvec[0]->get_interface();
2584                         }
2585                         lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2586                         lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
2587                 }
2588
2589
2590 //      Are the return values "packed"?
2591 //      This should be done on a per-interface basis.
2592 //      But this is defunct code for gs-lite
2593                 for(li=0;li<mach_lftas.size();++li){
2594                   vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2595                   string liface = "_local_";
2596                   if(tvec.size()>0){
2597                          liface = tvec[0]->get_interface();
2598                   }
2599                   vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2600                   if(iface_codegen_type.size()){
2601                         if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
2602                         packed_return = true;
2603                         }
2604                   }
2605                 }
2606
2607
2608 // Separate lftas by interface, collect results on a per-interface basis.
2609
2610                 vector<cnf_set *> no_preds;     // fallback if there is no prefilter
2611                 map<string, vector<cnf_set *> > prefilter_preds;
2612                 set<unsigned int> pred_ids;     // this can be global for all interfaces
2613                 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2614                         string liface = (*mvsi).first;
2615                         vector<cnf_set *> empty_list;
2616                         prefilter_preds[liface] = empty_list;
2617                         if(! packed_return){
2618                                 get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
2619                         }
2620
2621 //                              get NIC capabilities.  (Is this needed?)
2622                         nic_property *nicprop = NULL;
2623                         vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2624                         if(iface_codegen_type.size()){
2625                                 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2626                                 if(!nicprop){
2627                                         fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2628                                         exit(1);
2629                                 }
2630                         }
2631                 }
2632
2633
2634 //              Now that we know the prefilter preds, generate the lfta code.
2635 //      Do this for all lftas in this machine.
2636                 for(li=0;li<mach_lftas.size();++li){
2637                         set<unsigned int> subsumed_preds;
2638                         set<unsigned int>::iterator sii;
2639 #ifdef PREFILTER_OK
2640                         for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
2641                                 int pid = (*sii);
2642                                 if((pid>>16) == li){
2643                                         subsumed_preds.insert(pid & 0xffff);
2644                                 }
2645                         }
2646 #endif
2647                         string lfta_schema_str = mach_lftas[li]->make_schema();
2648                         string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2649                         nic_property *nicprop = NULL; // no NIC properties?
2650                         lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
2651                 }
2652
2653
2654 //                      generate structs to store the temporal attributes
2655 //                      unpacked by prefilter
2656                 col_id_set temp_cids;
2657                 get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
2658                 lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
2659
2660 //                      Compute the lfta bit signatures and the lfta colrefs
2661 //      do this on a per-interface basis
2662 #ifdef PREFILTER_OK
2663                         lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2664 #endif
2665                 map<string, vector<long long int> > lfta_sigs; // used again later
2666                 map<string, int> lfta_snap_pos; // optimize csv parsing
2667                                                                         // compute now, use in get_iface_properties
2668                 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2669                         string liface = (*mvsi).first;
2670                         vector<long long int> empty_list;
2671                         lfta_sigs[liface] = empty_list;
2672                         lfta_snap_pos[liface] = -1;
2673
2674                         vector<col_id_set> lfta_cols;
2675                         vector<int> lfta_snap_length;
2676                         for(li=0;li<lfta_iface_lists[liface].size();++li){
2677                                 unsigned long long int mask=0, bpos=1;
2678                                 int f_pos;
2679                                 for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
2680                                         if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
2681                                                 mask |= bpos;
2682                                         bpos = bpos << 1;
2683                                 }
2684                                 lfta_sigs[liface].push_back(mask);
2685                                 lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
2686                                 lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "snap"));
2687                                 int this_snap_pos = compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "index");
2688                                 if(this_snap_pos > lfta_snap_pos[liface])
2689                                         lfta_snap_pos[liface]  = this_snap_pos;
2690                         }
2691
2692 //for(li=0;li<mach_lftas.size();++li){
2693 //printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
2694 //col_id_set::iterator tcisi;
2695 //for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
2696 //printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
2697 //}
2698 //}
2699
2700
2701 //                      generate the prefilter
2702 //      Do this on a per-interface basis, except for the #define
2703 #ifdef PREFILTER_OK
2704 //                      lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2705                         lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2706 #else
2707                         lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns,  lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2708
2709 #endif
2710                 }
2711
2712 //                      Generate interface parameter lookup function
2713           lfta_val[lmach] += "// lookup interface properties by name\n";
2714           lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
2715           lfta_val[lmach] += "// returns NULL if given property does not exist\n";
2716           lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
2717
2718 //        collect a lit of interface names used by queries running on this host
2719           set<std::string> iface_names;
2720           for(i=0;i<mach_query_names[lmach].size();i++){
2721                 int mi = mach_query_names[lmach][i];
2722                 stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2723
2724                 if(interface_names[mi]=="")
2725                         iface_names.insert("DEFAULTDEV");
2726                 else
2727                         iface_names.insert(interface_names[mi]);
2728           }
2729
2730 //        generate interface property lookup code for every interface
2731           set<std::string>::iterator sir;
2732           for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
2733                 if (sir == iface_names.begin())
2734                         lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2735                 else
2736                         lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2737
2738                 // iterate through interface properties
2739                 vector<string> iface_properties;
2740                 if(*sir!="_local_"){    // dummy watchlist interface, don't process.
2741                         iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
2742                 }
2743                 if (erri) {
2744                         fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
2745                         exit(1);
2746                 }
2747                 if (iface_properties.empty())
2748                         lfta_val[lmach] += "\t\treturn NULL;\n";
2749                 else {
2750                         for (int i = 0; i < iface_properties.size(); ++i) {
2751                                 if (i == 0)
2752                                         lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2753                                 else
2754                                         lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2755
2756                                 // combine all values for the interface property using comma separator
2757                                 vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
2758                                 lfta_val[lmach] += "\t\t\treturn \"";
2759                                 for (int j = 0; j < vals.size(); ++j) {
2760                                         lfta_val[lmach] +=  vals[j];
2761                                         if (j != vals.size()-1)
2762                                                 lfta_val[lmach] += ",";
2763                                 }
2764                                 lfta_val[lmach] += "\";\n";
2765                         }
2766                         lfta_val[lmach] += "\t\t}else if(!strcmp(property_name, \"_max_csv_pos\")){\n";
2767                         lfta_val[lmach] += "\t\t\treturn \""+int_to_string(lfta_snap_pos[(*sir)])+"\";\n";
2768                         lfta_val[lmach] += "\t\t} else\n";
2769                         lfta_val[lmach] += "\t\t\treturn NULL;\n";
2770                 }
2771           }
2772           lfta_val[lmach] += "\t} else\n";
2773           lfta_val[lmach] += "\t\treturn NULL;\n";
2774           lfta_val[lmach] += "}\n\n";
2775
2776
2777 //                      Generate a full list of FTAs for clearinghouse reference
2778           lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
2779           lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
2780
2781           bool first = true;
2782           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2783                 string liface = (*mvsi).first;
2784                 if(liface != "_local_"){        // these don't register themselves
2785                         vector<stream_query *> lfta_list = (*mvsi).second;
2786                         for(i=0;i<lfta_list.size();i++){
2787                                 int mi = lfta_iface_qname_ix[liface][i];
2788                                 if(first) first = false;
2789                                 else      lfta_val[lmach] += ", ";
2790                                 lfta_val[lmach] += "\"" + registration_query_names[mi] + "\"";
2791                         }
2792                 }
2793           }
2794 //        for (i = 0; i < registration_query_names.size(); ++i) {
2795 //                 if (i)
2796 //                        lfta_val[lmach] += ", ";
2797 //                 lfta_val[lmach] += "\"" + registration_query_names[i] + "\"";
2798 //        }
2799
2800           for (i = 0; i < hfta_list.size(); ++i) {
2801                    lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
2802           }
2803           lfta_val[lmach] += ", NULL};\n\n";
2804
2805
2806 //                      Add the initialization function to lfta.c
2807 //      Change to accept the interface name, and 
2808 //      set the prefilter function accordingly.
2809 //      see the example in demo/err2
2810           lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
2811           lfta_val[lmach] += "//        note: the last parameter in fta_register is the prefilter signature\n";
2812
2813 //        for(i=0;i<mach_query_names[lmach].size();i++)
2814 //              int mi = mach_query_names[lmach][i];
2815 //              stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2816
2817           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2818                 string liface = (*mvsi).first;
2819                 vector<stream_query *> lfta_list = (*mvsi).second;
2820                 for(i=0;i<lfta_list.size();i++){
2821                         stream_query *lfta_sq = lfta_list[i];
2822                         int mi = lfta_iface_qname_ix[liface][i];
2823
2824                         if(liface == "_local_"){
2825 //  Don't register an init function, do the init code inline
2826                                 lfta_val[lmach] += "// initialization code for "+registration_query_names[mi]+";\n";
2827                                 lfta_val[lmach] += "\t"+generate_watchlist_name(registration_query_names[mi])+".filename = "+generate_watchlist_name(registration_query_names[mi])+"__fstr;\n";
2828                                 continue;
2829                         }
2830                 
2831                         fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
2832
2833                         string this_iface = "DEFAULTDEV";
2834                         if(interface_names[mi]!="")
2835                                 this_iface = '"'+interface_names[mi]+'"';
2836                         lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
2837                 lfta_val[lmach] += "\t\tfta_register(\""+registration_query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
2838 //              if(interface_names[mi]=="")
2839 //                              lfta_val[lmach]+="DEFAULTDEV";
2840 //              else
2841 //                              lfta_val[lmach]+='"'+interface_names[mi]+'"';
2842                         lfta_val[lmach] += this_iface;
2843
2844
2845                 lfta_val[lmach] += ", "+generate_alloc_name(registration_query_names[mi])
2846                         +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(registration_query_names[mi])
2847                         +"\n#endif\n";
2848                                 sprintf(tmpstr,",%d",snap_lengths[mi]);
2849                         lfta_val[lmach] += tmpstr;
2850
2851 //                      unsigned long long int mask=0, bpos=1;
2852 //                      int f_pos;
2853 //                      for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
2854 //                              if(prefilter_preds[f_pos]->lfta_id.count(i))
2855 //                                      mask |= bpos;
2856 //                              bpos = bpos << 1;
2857 //                      }
2858
2859 #ifdef PREFILTER_OK
2860 //                      sprintf(tmpstr,",%lluull",mask);
2861                         sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
2862                         lfta_val[lmach]+=tmpstr;
2863 #else
2864                         lfta_val[lmach] += ",0ull";
2865 #endif
2866
2867                         lfta_val[lmach] += ");\n";
2868
2869
2870
2871 //    End of lfta prefilter stuff
2872 // --------------------------------------------------
2873
2874 //                      If there is a field verifier, warn about
2875 //                      lack of compatability
2876                   string lfta_comment;
2877                   string lfta_title;
2878                   string lfta_namespace;
2879                   map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
2880                   if(ldefs.count("comment")>0)
2881                         lfta_comment = lfta_sq->defines["comment"];
2882                   if(ldefs.count("Comment")>0)
2883                         lfta_comment = lfta_sq->defines["Comment"];
2884                   if(ldefs.count("COMMENT")>0)
2885                         lfta_comment = lfta_sq->defines["COMMENT"];
2886                   if(ldefs.count("title")>0)
2887                         lfta_title = lfta_sq->defines["title"];
2888                   if(ldefs.count("Title")>0)
2889                         lfta_title = lfta_sq->defines["Title"];
2890                   if(ldefs.count("TITLE")>0)
2891                         lfta_title = lfta_sq->defines["TITLE"];
2892                   if(ldefs.count("NAMESPACE")>0)
2893                         lfta_namespace = lfta_sq->defines["NAMESPACE"];
2894                   if(ldefs.count("Namespace")>0)
2895                         lfta_namespace = lfta_sq->defines["Namespace"];
2896                   if(ldefs.count("namespace")>0)
2897                         lfta_namespace = lfta_sq->defines["namespace"];
2898
2899                   string lfta_ht_size;
2900                   if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
2901                         lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
2902                   if(ldefs.count("aggregate_slots")>0){
2903                         lfta_ht_size = ldefs["aggregate_slots"];
2904                   }
2905
2906 //                      NOTE : I'm assuming that visible lftas do not start with _fta.
2907 //                              -- will fail for non-visible simple selection queries.
2908                 if(field_verifier != NULL && registration_query_names[mi].substr(0,1) != "_"){
2909                         string warning_str;
2910                         if(lfta_comment == "")
2911                                 warning_str += "\tcomment not found.\n";
2912 // Obsolete stuff that carsten wanted
2913 //                      if(lfta_title == "")
2914 //                              warning_str += "\ttitle not found.\n";
2915 //                      if(lfta_namespace == "")
2916 //                              warning_str += "\tnamespace not found.\n";
2917
2918                         vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2919                         int fi;
2920                         for(fi=0;fi<flds.size();fi++){
2921                                 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2922                         }
2923                         if(warning_str != "")
2924                                 fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
2925                                         registration_query_names[mi].c_str(),warning_str.c_str());
2926                 }
2927
2928
2929 //                      Create qtree output
2930                 fprintf(qtree_output,"\t<LFTA name='%s' >\n",registration_query_names[mi].c_str());
2931         if(lfta_comment != "")
2932               fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
2933         if(lfta_title != "")
2934               fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
2935         if(lfta_namespace != "")
2936               fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
2937         if(lfta_ht_size != "")
2938               fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
2939                 if(lmach != "")
2940                   fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
2941                 else
2942                   fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
2943                 fprintf(qtree_output,"\t\t<Interface  value='%s' />\n",interface_names[mi].c_str());
2944                 std::vector<tablevar_t *> itbls = lfta_sq->get_input_tables();
2945                 for(int t=0;t<itbls.size();++t){
2946                         fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",itbls[t]->get_schema_name().c_str());
2947                 }
2948 //              fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
2949                 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2950                 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
2951 //                              write info about fields to qtree.xml
2952                   vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2953                   int fi;
2954                   for(fi=0;fi<flds.size();fi++){
2955                         fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2956                         if(flds[fi]->get_modifier_list()->size()){
2957                                 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2958                         }
2959                         fprintf(qtree_output," />\n");
2960                   }
2961                 fprintf(qtree_output,"\t</LFTA>\n");
2962
2963
2964             }
2965           }
2966
2967           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2968                         string liface = (*mvsi).first;
2969                         lfta_val[lmach] += 
2970 "       if (!strcmp(device, \""+liface+"\")) \n"
2971 "               lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
2972 ;
2973                 }
2974                 lfta_val[lmach] += 
2975 "       if(lfta_prefilter == NULL){\n"
2976 "               fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
2977 "               exit(1);\n"
2978 "       }\n"
2979 ;
2980
2981
2982
2983           lfta_val[lmach] += "}\n\n";
2984
2985       if(!(debug_only || hfta_only) ){
2986                 string lfta_flnm;
2987                 if(lmach != "")
2988                   lfta_flnm = lmach + "_lfta.c";
2989                 else
2990                   lfta_flnm = hostname + "_lfta.c";
2991                 if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
2992                         fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
2993                         exit(1);
2994                 }
2995               fprintf(lfta_out,"%s",lfta_header.c_str());
2996               fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
2997               fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
2998                 fclose(lfta_out);
2999           }
3000         }
3001
3002 //              Say what are the operators which must execute
3003         if(opviews.size()>0)
3004                 fprintf(stderr,"The queries use the following external operators:\n");
3005         for(i=0;i<opviews.size();++i){
3006                 opview_entry *opv = opviews.get_entry(i);
3007                 fprintf(stderr,"\t%s\n",opv->view_name.c_str());
3008         }
3009
3010         if(create_makefile)
3011                 generate_makefile(input_file_names, nfiles, hfta_names, opviews,
3012                 machine_names, schema_file_name,
3013                 interface_names,
3014                 ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
3015
3016
3017         fprintf(qtree_output,"</QueryNodes>\n");
3018
3019         return(0);
3020 }
3021
3022 ////////////////////////////////////////////////////////////
3023
3024 void generate_makefile(vector<string> &input_file_names, int nfiles,
3025                                            vector<string> &hfta_names, opview_set &opviews,
3026                                                 vector<string> &machine_names,
3027                                                 string schema_file_name,
3028                                                 vector<string> &interface_names,
3029                                                 ifq_t *ifdb, string &config_dir_path,
3030                                                 bool use_pads,
3031                                                 string extra_libs,
3032                                                 map<string, vector<int> > &rts_hload
3033                                          ){
3034         int i,j;
3035
3036         if(config_dir_path != ""){
3037                 config_dir_path = "-C "+config_dir_path;
3038         }
3039
3040         struct stat sb;
3041         bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
3042         bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
3043
3044 //      if(libz_exists && !libast_exists){
3045 //              fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
3046 //              exit(1);
3047 //      }
3048
3049 //                      Get set of operator executable files to run
3050         set<string> op_fls;
3051         set<string>::iterator ssi;
3052         for(i=0;i<opviews.size();++i){
3053                 opview_entry *opv = opviews.get_entry(i);
3054                 if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
3055         }
3056
3057         FILE *outfl = fopen("Makefile", "w");
3058         if(outfl==NULL){
3059                 fprintf(stderr,"Can't open Makefile for write, exiting.\n");
3060                 exit(0);
3061         }
3062
3063         fputs(
3064 ("CPP= g++ -O3 -g -I "+root_path+"/include  -I "+root_path+"/include/hfta\n"
3065 "CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
3066 ).c_str(), outfl
3067 );
3068         if(generate_stats)
3069                 fprintf(outfl,"  -DLFTA_STATS");
3070
3071 //              Gather the set of interfaces
3072 //              Also, gather "base interface names" for use in computing
3073 //              the hash splitting to virtual interfaces.
3074 //              TODO : must update to hanndle machines
3075         set<string> ifaces;
3076         set<string> base_vifaces;       // base interfaces of virtual interfaces
3077         map<string, string> ifmachines;
3078         map<string, string> ifattrs;
3079         for(i=0;i<interface_names.size();++i){
3080                 ifaces.insert(interface_names[i]);
3081                 ifmachines[interface_names[i]] = machine_names[i];
3082
3083                 size_t Xpos = interface_names[i].find_last_of("X");
3084                 if(Xpos!=string::npos){
3085                         string iface = interface_names[i].substr(0,Xpos);
3086                         base_vifaces.insert(iface);
3087                 }
3088                 // get interface attributes and add them to the list
3089         }
3090
3091 //              Do we need to include protobuf libraries?
3092 //              TODO Move to the interface library: get the libraries to include 
3093 //              for an interface type
3094
3095         bool use_proto = false;
3096         bool use_bsa = false;
3097         bool use_kafka = false;
3098         bool use_ssl = false;   
3099         int erri;
3100         string err_str;
3101         for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3102                 string ifnm = (*ssi);
3103                 vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
3104                 for(int ift_i=0;ift_i<ift.size();ift_i++){
3105                         if(ift[ift_i]=="PROTO"){
3106 #ifdef PROTO_ENABLED                            
3107                                 use_proto = true;
3108 #else
3109                                 fprintf(stderr,"Runtime libraries built without PROTO support. Rebuild with PROTO_ENABLED defined in gsoptions.h\n");
3110                                 exit(0);
3111 #endif
3112                         }
3113                 }
3114                 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "BSA", erri, err_str);
3115                 for(int ift_i=0;ift_i<ift.size();ift_i++){
3116                         if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){       
3117 #ifdef BSA_ENABLED                              
3118                                 use_bsa = true;
3119 #else
3120                                 fprintf(stderr,"Runtime libraries built without BSA support. Rebuild with BSA_ENABLED defined in gsoptions.h\n");
3121                                 exit(0);
3122 #endif                                  
3123                         }
3124                 }
3125                 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "KAFKA", erri, err_str);
3126                 for(int ift_i=0;ift_i<ift.size();ift_i++){
3127                         if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){       
3128 #ifdef KAFKA_ENABLED                            
3129                                 use_kafka = true;
3130 #else
3131                                 fprintf(stderr,"Runtime libraries built without KAFKA support. Rebuild with KAFKA_ENABLED defined in gsoptions.h\n");
3132                                 exit(0);
3133 #endif  
3134                         }
3135                 }
3136                 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "ENCRYPTED", erri, err_str);
3137                 for(int ift_i=0;ift_i<ift.size();ift_i++){
3138                         if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){       
3139 #ifdef SSL_ENABLED                              
3140                                 use_ssl = true;
3141 #else
3142                                 fprintf(stderr,"Runtime libraries built without SSL support. Rebuild with SSL_ENABLED defined in gsoptions.h\n");
3143                                 exit(0);
3144 #endif  
3145                         }
3146                 }               
3147         }
3148
3149         fprintf(outfl,
3150 "\n"
3151 "\n"
3152 "all: rts");
3153         for(i=0;i<hfta_names.size();++i)
3154                 fprintf(outfl," %s",hfta_names[i].c_str());
3155         fputs(
3156 ("\n"
3157 "\n"
3158 "rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a  "+root_path+"/lib/libclearinghouse.a\n"
3159 "\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
3160         if(use_pads)
3161                 fprintf(outfl,"-L. ");
3162         fputs(
3163 ("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
3164         if(use_pads)
3165                 fprintf(outfl,"-lgscppads -lpads ");
3166         fprintf(outfl,
3167 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux  -lclearinghouse -lresolv -lpthread -lgscpinterface -lz -lrt");
3168         if(use_pads)
3169                 fprintf(outfl, " -lpz -lz -lbz ");
3170         if(libz_exists && libast_exists)
3171                 fprintf(outfl," -last ");
3172         if(use_pads)
3173                 fprintf(outfl, " -ldll -ldl ");
3174
3175 #ifdef PROTO_ENABLED    
3176         fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
3177 #endif
3178 #ifdef BSA_ENABLED      
3179         fprintf(outfl, " -lbsa_stream ");
3180 #endif
3181 #ifdef KAFKA_ENABLED    
3182         fprintf(outfl, " -lrdkafka ");
3183 #endif
3184 #ifdef SSL_ENABLED      
3185         fprintf(outfl, " -lssl -lcrypto ");
3186 #endif
3187         fprintf(outfl," -lgscpaux");
3188 #ifdef GCOV
3189         fprintf(outfl," -fprofile-arcs");
3190 #endif
3191         fprintf(outfl,
3192 "\n"
3193 "\n"
3194 "lfta.o: %s_lfta.c\n"
3195 "\t$(CC) -o lfta.o -c %s_lfta.c\n"
3196 "\n"
3197 "%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
3198         for(i=0;i<nfiles;++i)
3199                 fprintf(outfl," %s",input_file_names[i].c_str());
3200         if(hostname == ""){
3201                 fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3202         }else{
3203                 fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3204         }
3205         for(i=0;i<nfiles;++i)
3206                 fprintf(outfl," %s",input_file_names[i].c_str());
3207         fprintf(outfl,"\n");
3208
3209         for(i=0;i<hfta_names.size();++i)
3210                 fprintf(outfl,
3211 ("%s: %s.o\n"
3212 "\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
3213 "\n"
3214 "%s.o: %s.cc\n"
3215 "\t$(CPP) -o %s.o -c %s.cc\n"
3216 "\n"
3217 "\n").c_str(),
3218     hfta_names[i].c_str(), hfta_names[i].c_str(),
3219         hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
3220         hfta_names[i].c_str(), hfta_names[i].c_str(),
3221         hfta_names[i].c_str(), hfta_names[i].c_str()
3222                 );
3223
3224         fprintf(outfl,
3225 ("\n"
3226 "packet_schema.txt:\n"
3227 "\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
3228 "\n"
3229 "external_fcns.def:\n"
3230 "\tln -s "+root_path+"/cfg/external_fcns.def .\n"
3231 "\n"
3232 "clean:\n"
3233 "\trm -rf core rts *.o  %s_lfta.c  external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
3234         for(i=0;i<hfta_names.size();++i)
3235                 fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
3236         fprintf(outfl,"\n");
3237
3238         fclose(outfl);
3239
3240
3241
3242 //              Gather the set of interfaces
3243 //              TODO : must update to hanndle machines
3244 //              TODO : lookup interface attributes and add them as a parameter to rts process
3245         outfl = fopen("runit", "w");
3246         if(outfl==NULL){
3247                 fprintf(stderr,"Can't open runit for write, exiting.\n");
3248                 exit(0);
3249         }
3250
3251
3252         fputs(
3253 ("#!/bin/sh\n"
3254 "./stopit\n"
3255 +root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
3256 "sleep 5\n"
3257 "if [ ! -f gshub.log ]\n"
3258 "then\n"
3259 "\techo \"Failed to start bin/gshub.py\"\n"
3260 "\texit -1\n"
3261 "fi\n"
3262 "ADDR=`cat gshub.log`\n"
3263 "ps opgid= $! >> gs.pids\n"
3264 "./rts $ADDR default ").c_str(), outfl);
3265 //      int erri;
3266 //      string err_str;
3267         for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3268                 string ifnm = (*ssi);
3269                 // suppress internal _local_ interface
3270                 if (ifnm == "_local_")
3271                         continue;
3272                 fprintf(outfl, "%s ",ifnm.c_str());
3273                 vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
3274                 for(j=0;j<ifv.size();++j)
3275                         fprintf(outfl, "%s ",ifv[j].c_str());
3276         }
3277         fprintf(outfl, " &\n");
3278         fprintf(outfl, "echo $! >> gs.pids\n");
3279         for(i=0;i<hfta_names.size();++i)
3280                 fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
3281
3282         for(j=0;j<opviews.opview_list.size();++j){
3283                 fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
3284         }
3285
3286         fclose(outfl);
3287         system("chmod +x runit");
3288
3289         outfl = fopen("stopit", "w");
3290         if(outfl==NULL){
3291                 fprintf(stderr,"Can't open stopit for write, exiting.\n");
3292                 exit(0);
3293         }
3294
3295         fprintf(outfl,"#!/bin/sh\n"
3296 "rm -f gshub.log\n"
3297 "if [ ! -f gs.pids ]\n"
3298 "then\n"
3299 "exit\n"
3300 "fi\n"
3301 "for pgid in `cat gs.pids`\n"
3302 "do\n"
3303 "kill -TERM -$pgid\n"
3304 "done\n"
3305 "sleep 1\n"
3306 "for pgid in `cat gs.pids`\n"
3307 "do\n"
3308 "kill -9 -$pgid\n"
3309 "done\n"
3310 "rm gs.pids\n");
3311
3312         fclose(outfl);
3313         system("chmod +x stopit");
3314
3315 //-----------------------------------------------
3316
3317 /* For now disable support for virtual interfaces
3318         outfl = fopen("set_vinterface_hash.bat", "w");
3319         if(outfl==NULL){
3320                 fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
3321                 exit(0);
3322         }
3323
3324 //              The format should be determined by an entry in the ifres.xml file,
3325 //              but for now hardcode the only example I have.
3326         for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
3327                 if(rts_hload.count((*ssi))){
3328                         string iface_name = (*ssi);
3329                         string iface_number = "";
3330                         for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
3331                                 if(isdigit(iface_name[j])){
3332                                         iface_number = iface_name[j];
3333                                         if(j>0 && isdigit(iface_name[j-1]))
3334                                                 iface_number = iface_name[j-1] + iface_number;
3335                                 }
3336                         }
3337
3338                         fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
3339                         vector<int> halloc = rts_hload[iface_name];
3340                         int prev_limit = 0;
3341                         for(j=0;j<halloc.size();++j){
3342                                 if(j>0)
3343                                         fprintf(outfl,":");
3344                                 fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
3345                                 prev_limit = halloc[j];
3346                         }
3347                         fprintf(outfl,"\n");
3348                 }
3349         }
3350         fclose(outfl);
3351         system("chmod +x set_vinterface_hash.bat");
3352 */
3353 }
3354
3355 //              Code for implementing a local schema
3356 /*
3357                 table_list qpSchema;
3358
3359 //                              Load the schemas of any LFTAs.
3360                 int l;
3361                 for(l=0;l<hfta_nbr;++l){
3362                         stream_query *sq0 = split_queries[l];
3363                         table_def *td = sq0->get_output_tabledef();
3364                         qpSchema.append_table(td);
3365                 }
3366 //                              load the schemas of any other ref'd tables.
3367 //                              (e.g., hftas)
3368                 vector<tablevar_t *>  input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
3369                 int ti;
3370                 for(ti=0;ti<input_tbl_names.size();++ti){
3371                         int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
3372                         if(tbl_ref < 0){
3373                                 tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
3374                                 if(tbl_ref < 0){
3375                                         fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
3376                                         exit(1);
3377                                 }
3378                                 qpSchema.append_table(Schema->get_table(tbl_ref));
3379                         }
3380                 }
3381 */
3382
3383 //              Functions related to parsing.
3384
3385 /*
3386 static int split_string(char *instr,char sep, char **words,int max_words){
3387    char *loc;
3388    char *str;
3389    int nwords = 0;
3390
3391    str = instr;
3392    words[nwords++] = str;
3393    while( (loc = strchr(str,sep)) != NULL){
3394         *loc = '\0';
3395         str = loc+1;
3396         if(nwords >= max_words){
3397                 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
3398                 nwords = max_words-1;
3399         }
3400         words[nwords++] = str;
3401    }
3402
3403    return(nwords);
3404 }
3405
3406 */
3407