fix int_to_string_map bug; allow blank lines and comments in output_spec.cfg
[com/gs-lite.git] / src / ftacmp / translate_fta.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #include<unistd.h>              // for gethostname
17
18 #include <string>
19 #include "parse_fta.h"
20 #include "parse_schema.h"
21 #include "parse_ext_fcns.h"
22 #include"analyze_fta.h"
23 #include"query_plan.h"
24 #include"generate_lfta_code.h"
25 #include"stream_query.h"
26 #include"generate_utils.h"
27 #include"nic_def.h"
28 #include"generate_nic_code.h"
29
30 #include <stdlib.h>
31 #include <stdio.h>
32 #include<ctype.h>
33 #include<glob.h>
34 #include<string.h>
35
36 #include<list>
37
38 //              for the scandir
39      #include <sys/types.h>
40      #include <dirent.h>
41
42
43 #include<errno.h>
44
45 //              to verify that some files exist.
46      #include <sys/types.h>
47      #include <sys/stat.h>
48
49 #include "parse_partn.h"
50
51 #include "print_plan.h"
52
53 //              Interface to the xml parser
54
55 #include"xml_t.h"
56 #include"field_list.h"
57
58 #include "gsconfig.h"
59
60 extern int xmlParserparse(void);
61 extern FILE *xmlParserin;
62 extern int xmlParserdebug;
63
64 std::vector<std::string> xml_attr_vec;
65 std::vector<std::string> xml_val_vec;
66 std::string xml_a, xml_v;
67 xml_t *xml_leaves = NULL;
68
69 //      Interface to the field list verifier
70 field_list *field_verifier = NULL;
71
72 #define TMPSTRLEN 1000
73
74 #ifndef PATH_DELIM
75   #define PATH_DELIM '/'
76 #endif
77
78 char tmp_schema_str[10000];
79
80 // maximum delay between two hearbeats produced
81 // by UDOP. Used when its not explicity
82 // provided in udop definition
83 #define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
84
85 //              Default lfta hash table size, must be power of 2.
86 int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
87
88 //              Interface to FTA definition lexer and parser ...
89
90 extern int FtaParserparse(void);
91 extern FILE *FtaParserin;
92 extern int FtaParserdebug;
93
94 fta_parse_t *fta_parse_result;
95 var_defs_t *fta_parse_defines;
96
97
98
99 //              Interface to external function lexer and parser ...
100
101 extern int Ext_fcnsParserparse(void);
102 extern FILE *Ext_fcnsParserin;
103 extern int Ext_fcnsParserdebug;
104
105 ext_fcn_list *Ext_fcns;
106
107
108 //              Interface to partition definition parser
109 extern int PartnParserparse();
110 partn_def_list_t *partn_parse_result = NULL;
111
112
113
114 using namespace std;
115 //extern int errno;
116
117
118 //              forward delcaration of local utility function
119 void generate_makefile(vector<string> &input_file_names, int nfiles,
120                                            vector<string> &hfta_names, opview_set &opviews,
121                                                 vector<string> &machine_names,
122                                                 string schema_file_name,
123                                                 vector<string> &interface_names,
124                                                 ifq_t *ifdb, string &config_dir_path,
125                                                 bool use_pads,
126                                                 string extra_libs,
127                                                 map<string, vector<int> > &rts_hload
128                                         );
129
130 //static int split_string(char *instr,char sep, char **words,int max_words);
131 #define MAXFLDS 100
132
133   FILE *schema_summary_output = NULL;           // query names
134
135 //                      Dump schema summary
136 void dump_summary(stream_query *str){
137         fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
138
139         table_def *sch = str->get_output_tabledef();
140
141         vector<field_entry *> flds = sch->get_fields();
142         int f;
143         for(f=0;f<flds.size();++f){
144                 if(f>0) fprintf(schema_summary_output,"|");
145                 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
146         }
147         fprintf(schema_summary_output,"\n");
148         for(f=0;f<flds.size();++f){
149                 if(f>0) fprintf(schema_summary_output,"|");
150                 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
151         }
152         fprintf(schema_summary_output,"\n");
153 }
154
155 //              Globals
156 string hostname;                // name of current host.
157 int hostname_len;
158 bool generate_stats = false;
159 string root_path = "../..";
160
161
162 int main(int argc, char **argv){
163   char tmpstr[TMPSTRLEN];
164   string err_str;
165   int q,s,h,f;
166
167   set<int>::iterator si;
168
169   vector<string> registration_query_names;                      // for lfta.c registration
170   map<string, vector<int> > mach_query_names;   // list queries of machine
171   vector<int> snap_lengths;                             // for lfta.c registration
172   vector<string> interface_names;                       // for lfta.c registration
173   vector<string> machine_names;                 // machine of interface
174   vector<bool> lfta_reuse_options;                      // for lfta.c registration
175   vector<int> lfta_liveness_timeouts;           // fot qtree.xml generation
176   vector<string> hfta_names;                    // hfta cource code names, for
177                                                                                 // creating make file.
178   vector<string> qnames;                                // ensure unique names
179   map<string, int> lfta_names;                  // keep track of unique lftas.
180
181
182 //                              set these to 1 to debug the parser
183   FtaParserdebug = 0;
184   Ext_fcnsParserdebug = 0;
185
186   FILE *lfta_out;                               // lfta.c output.
187   FILE *fta_in;                                 // input file
188   FILE *table_schemas_in;               // source tables definition file
189   FILE *query_name_output;              // query names
190   FILE *qtree_output;                   // interconnections of query nodes
191
192   // -------------------------------
193   // Handling of Input Arguments
194   // -------------------------------
195     char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
196         const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
197                 "\t[-B] : debug only (don't create output files)\n"
198                 "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
199                 "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
200                 "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
201                 "\t[-C] : use <config directory> for definition files\n"
202                 "\t[-l] : use <library directory> for library queries\n"
203                 "\t[-N] : output query names in query_names.txt\n"
204                 "\t[-H] : create HFTA only (no schema_file)\n"
205                 "\t[-Q] : use query name for hfta suffix\n"
206                 "\t[-M] : generate make file and runit, stopit scripts\n"
207                 "\t[-S] : enable LFTA statistics (alters Makefile).\n"
208                 "\t[-f] : Output schema summary to schema_summary.txt\n"
209                 "\t[-P] : link with PADS\n"
210                 "\t[-h] : override host name.\n"
211                 "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
212                 "\t[-R] : path to root of GS-lite\n"
213 ;
214
215 //              parameters gathered from command line processing
216         string external_fcns_path;
217 //      string internal_fcn_path;
218         string config_dir_path;
219         string library_path = "./";
220         vector<string> input_file_names;
221         string schema_file_name;
222         bool debug_only = false;
223         bool hfta_only = false;
224         bool output_query_names = false;
225         bool output_schema_summary=false;
226         bool numeric_hfta_flname = true;
227         bool create_makefile = false;
228         bool distributed_mode = false;
229         bool partitioned_mode = false;
230         bool use_live_hosts_file = false;
231         bool use_pads = false;
232         bool clean_make = false;
233         int n_virtual_interfaces = 1;
234
235    char chopt;
236    while((chopt = getopt(argc,argv,optstr)) != -1){
237                 switch(chopt){
238                 case 'B':
239                         debug_only = true;
240                         break;
241                 case 'D':
242                         distributed_mode = true;
243                         break;
244                 case 'p':
245                         partitioned_mode = true;
246                         break;
247                 case 'L':
248                         use_live_hosts_file = true;
249                         break;
250                 case 'C':
251                                 if(optarg != NULL)
252                                  config_dir_path = string(optarg) + string("/");
253                         break;
254                 case 'l':
255                                 if(optarg != NULL)
256                                  library_path = string(optarg) + string("/");
257                         break;
258                 case 'N':
259                         output_query_names = true;
260                         break;
261                 case 'Q':
262                         numeric_hfta_flname = false;
263                         break;
264                 case 'H':
265                         if(schema_file_name == ""){
266                                 hfta_only = true;
267                         }
268                         break;
269                 case 'f':
270                         output_schema_summary=true;
271                         break;
272                 case 'M':
273                         create_makefile=true;
274                         break;
275                 case 'S':
276                         generate_stats=true;
277                         break;
278                 case 'P':
279                         use_pads = true;
280                         break;
281                 case 'c':
282                         clean_make = true;
283                         break;
284                 case 'h':
285                         if(optarg != NULL)
286                                 hostname = optarg;
287                         break;
288                 case 'R':
289                         if(optarg != NULL)
290                                 root_path = optarg;
291                         break;
292                 case 'n':
293                         if(optarg != NULL){
294                                 n_virtual_interfaces = atoi(optarg);
295                                 if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
296                                         fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
297                                         n_virtual_interfaces = 1;
298                                 }
299                         }
300                         break;
301                 case '?':
302                         fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
303                         fprintf(stderr,"%s\n", usage_str);
304                         exit(1);
305                 default:
306                         fprintf(stderr, "Argument was %c\n", optopt);
307                         fprintf(stderr,"Invalid arguments\n");
308                         fprintf(stderr,"%s\n", usage_str);
309                         exit(1);
310                 }
311         }
312         argc -= optind;
313         argv += optind;
314         for (int i = 0; i < argc; ++i) {
315                 if((schema_file_name == "") && !hfta_only){
316                         schema_file_name = argv[i];
317                 }else{
318                         input_file_names.push_back(argv[i]);
319                 }
320         }
321
322         if(input_file_names.size() == 0){
323                 fprintf(stderr,"%s\n", usage_str);
324                 exit(1);
325         }
326
327         if(clean_make){
328                 string clean_cmd = "rm Makefile hfta_*.cc";
329                 int clean_ret = system(clean_cmd.c_str());
330                 if(clean_ret){
331                         fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
332                 }
333         }
334
335
336         nic_prop_db *npdb = new nic_prop_db(config_dir_path);
337
338 //                      Open globally used file names.
339
340         // prepend config directory to schema file
341         schema_file_name = config_dir_path + schema_file_name;
342         external_fcns_path = config_dir_path + string("external_fcns.def");
343     string ifx_fname = config_dir_path + string("ifres.xml");
344
345 //              Find interface query file(s).
346         if(hostname == ""){
347                 gethostname(tmpstr,TMPSTRLEN);
348                 hostname = tmpstr;
349         }
350         hostname_len = strlen(tmpstr);
351     string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
352         vector<string> ifq_fls;
353
354                 ifq_fls.push_back(ifq_fname);
355
356
357 //                      Get the field list, if it exists
358         string flist_fl = config_dir_path + "field_list.xml";
359         FILE *flf_in = NULL;
360         if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
361                 fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
362                 xml_leaves = new xml_t();
363                 xmlParser_setfileinput(flf_in);
364                 if(xmlParserparse()){
365                         fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
366                 }else{
367                         field_verifier = new field_list(xml_leaves);
368                 }
369         }
370
371         if(!hfta_only){
372           if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
373                 fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
374                 exit(1);
375           }
376         }
377
378 /*
379         if(!(debug_only || hfta_only)){
380           if((lfta_out = fopen("lfta.c","w")) == NULL){
381                 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
382                 exit(1);
383           }
384         }
385 */
386
387 //              Get the output specification file.
388 //              format is
389 //                      query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
390         string ospec_fl = "output_spec.cfg";
391         FILE *osp_in = NULL;
392         vector<ospec_str *> output_specs;
393         multimap<string, int> qname_to_ospec;
394         if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
395                 char *flds[MAXFLDS];
396                 int o_lineno = 0;
397                 while(fgets(tmpstr,TMPSTRLEN,osp_in)){
398                         o_lineno++;
399                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
400                         if(tmpstr[0]!='\n' && tmpstr[0]!='\r' && tmpstr[0]!='\0' && tmpstr[0]!='#'){
401                                 if(nflds == 7){
402 //              make operator type lowercase
403                                         char *tmpc;
404                                         for(tmpc=flds[1];*tmpc!='\0';++tmpc)
405                                                 *tmpc = tolower(*tmpc);
406         
407                                         ospec_str *tmp_ospec = new ospec_str();
408                                         tmp_ospec->query = flds[0];
409                                         tmp_ospec->operator_type = flds[1];
410                                         tmp_ospec->operator_param = flds[2];
411                                         tmp_ospec->output_directory = flds[3];
412                                         tmp_ospec->bucketwidth = atoi(flds[4]);
413                                         tmp_ospec->partitioning_flds = flds[5];
414                                         tmp_ospec->n_partitions = atoi(flds[6]);
415                                         qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
416                                         output_specs.push_back(tmp_ospec);
417                                 }else{
418                                         fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
419                                 }
420                         }
421                 }
422                 fclose(osp_in);
423         }else{
424                 fprintf(stderr,"output_spec.cfg not found.  The query set has no output.  exiting.\n");
425                 exit(1);
426         }
427
428 //              hfta parallelism
429         string pspec_fl = "hfta_parallelism.cfg";
430         FILE *psp_in = NULL;
431         map<string, int> hfta_parallelism;
432         if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
433                 char *flds[MAXFLDS];
434                 int o_lineno = 0;
435                 while(fgets(tmpstr,TMPSTRLEN,psp_in)){
436                         bool good_entry = true;
437                         o_lineno++;
438                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
439                         if(nflds == 2){
440                                 string hname = flds[0];
441                                 int par = atoi(flds[1]);
442                                 if(par <= 0 || par > n_virtual_interfaces){
443                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
444                                         good_entry = false;
445                                 }
446                                 if(good_entry && n_virtual_interfaces % par != 0){
447                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
448                                         good_entry = false;
449                                 }
450                                 if(good_entry)
451                                         hfta_parallelism[hname] = par;
452                         }
453                 }
454         }else{
455                 fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
456         }
457
458
459 //              LFTA hash table sizes
460         string htspec_fl = "lfta_htsize.cfg";
461         FILE *htsp_in = NULL;
462         map<string, int> lfta_htsize;
463         if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
464                 char *flds[MAXFLDS];
465                 int o_lineno = 0;
466                 while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
467                         bool good_entry = true;
468                         o_lineno++;
469                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
470                         if(nflds == 2){
471                                 string lfta_name = flds[0];
472                                 int htsz = atoi(flds[1]);
473                                 if(htsz>0){
474                                         lfta_htsize[lfta_name] = htsz;
475                                 }else{
476                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
477                                 }
478                         }
479                 }
480         }else{
481                 fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
482         }
483
484 //              LFTA vitual interface hash split
485         string rtlspec_fl = "rts_load.cfg";
486         FILE *rtl_in = NULL;
487         map<string, vector<int> > rts_hload;
488         if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
489                 char *flds[MAXFLDS];
490                 int r_lineno = 0;
491                 string iface_name;
492                 vector<int> hload;
493                 while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
494                         bool good_entry = true;
495                         r_lineno++;
496                         iface_name = "";
497                         hload.clear();
498                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
499                         if(nflds >1){
500                                 iface_name = flds[0];
501                                 int cumm_h = 0;
502                                 int j;
503                                 for(j=1;j<nflds;++j){
504                                         int h = atoi(flds[j]);
505                                         if(h<=0)
506                                                 good_entry = false;
507                                         cumm_h += h;
508                                         hload.push_back(cumm_h);
509                                 }
510                         }else{
511                                 good_entry = false;
512                         }
513                         if(good_entry){
514                                 rts_hload[iface_name] = hload;
515                         }else{
516                                 fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
517                         }
518                 }
519         }
520
521
522
523         if(output_query_names){
524           if((query_name_output = fopen("query_names.txt","w")) == NULL){
525                 fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
526                 exit(1);
527           }
528         }
529
530         if(output_schema_summary){
531           if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
532                 fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
533                 exit(1);
534           }
535         }
536
537         if((qtree_output = fopen("qtree.xml","w")) == NULL){
538                 fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
539                 exit(1);
540         }
541         fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
542         fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
543         fprintf(qtree_output,"<QueryNodes>\n");
544
545
546 //                      Get an initial Schema
547         table_list *Schema;
548         if(!hfta_only){
549 //                      Parse the table schema definitions.
550           fta_parse_result = new fta_parse_t();
551           FtaParser_setfileinput(table_schemas_in);
552           if(FtaParserparse()){
553                 fprintf(stderr,"Table schema parse failed.\n");
554                 exit(1);
555           }
556           if(fta_parse_result->parse_type != TABLE_PARSE){
557                 fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
558                 exit(1);
559           }
560           Schema = fta_parse_result->tables;
561
562 //                      Ensure that all schema_ids, if set, are distinct.
563 //  Obsolete? There is code elsewhere to ensure that schema IDs are
564 //  distinct on a per-interface basis.
565 /*
566           set<int> found_ids;
567           set<int> dup_ids;
568           for(int t=0;t<Schema->size();++t){
569                 int sch_id = Schema->get_table(t)->get_schema_id();
570                 if(sch_id> 0){
571                         if(found_ids.find(sch_id) != found_ids.end()){
572                                 dup_ids.insert(sch_id);
573                         }else{
574                                 found_ids.insert(sch_id);
575                         }
576                 }
577                 if(dup_ids.size()>0){
578                         fprintf(stderr, "Error, the schema has duplicate schema_ids:");
579                         for(auto dit=dup_ids.begin();dit!=dup_ids.end(); ++dit)
580                                 fprintf(stderr," %d",(*dit));
581                         fprintf(stderr,"\n");
582                         exit(1);
583                 }
584           }
585 */
586
587
588 //                      Process schema field inheritance
589           int retval;
590           retval = Schema->unroll_tables(err_str);
591           if(retval){
592                 fprintf(stderr,"Error processing schema field inheritance:\n %s\n", err_str.c_str() );
593                 exit(1);
594           }
595         }else{
596 //                      hfta only => we will try to fetch schemas from the registry.
597 //                      therefore, start off with an empty schema.
598           Schema = new table_list();
599         }
600
601
602 //                      Open and parse the external functions file.
603         Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
604         if(Ext_fcnsParserin == NULL){
605                 fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
606                 Ext_fcns = new ext_fcn_list();
607         }else{
608                 if(Ext_fcnsParserparse()){
609                         fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
610                         Ext_fcns = new ext_fcn_list();
611                 }
612         }
613         if(Ext_fcns->validate_fcns(err_str)){
614                 fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
615                 exit(1);
616         }
617
618 //              Open and parse the interface resources file.
619 //      ifq_t *ifaces_db = new ifq_t();
620 //   string ierr;
621 //      if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
622 //              fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
623 //                              ifx_fname.c_str(), ierr.c_str());
624 //              exit(1);
625 //      }
626 //      if(ifaces_db->load_ifqs(ifq_fname, ierr)){
627 //              fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
628 //                              ifq_fname.c_str(), ierr.c_str());
629 //              exit(1);
630 //      }
631
632
633 //                      The LFTA code string.
634 //                      Put the standard preamble here.
635 //                      NOTE: the hash macros, fcns should go into the run time
636   map<string, string> lfta_val;
637   map<string, string> lfta_prefilter_val;
638
639   string lfta_header =
640 "#include <limits.h>\n"
641 "#include \"rts.h\"\n"
642 "#include \"fta.h\"\n"
643 "#include \"lapp.h\"\n"
644 "#include \"rts_udaf.h\"\n"
645 "#include<stdio.h>\n"
646 "#include <float.h>\n"
647 "#include \"rdtsc.h\"\n"
648 "#include \"watchlist.h\"\n\n"
649
650 ;
651 // Get any locally defined parsing headers
652     glob_t glob_result;
653     memset(&glob_result, 0, sizeof(glob_result));
654
655     // do the glob operation TODO should be from GSROOT
656     int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
657         if(return_value == 0){
658                 lfta_header += "\n";
659         for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
660                         char *flds[1000];
661                         int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
662                         lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n";
663         }
664                 lfta_header += "\n";
665         }else{
666                 fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
667         }
668
669 /*
670 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
671 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
672 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
673 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
674 */
675
676         lfta_header += 
677 "\n"
678 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
679 "\n"
680 "#define SLOT_FILLED 0x04\n"
681 "#define SLOT_GEN_BITS 0x03\n"
682 "#define SLOT_HASH_BITS 0xfffffff8\n"
683 "#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
684 "#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
685 "#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
686 "\n\n"
687
688 "#define lfta_BOOL_to_hash(x) (x)\n"
689 "#define lfta_USHORT_to_hash(x) (x)\n"
690 "#define lfta_UINT_to_hash(x) (x)\n"
691 "#define lfta_IP_to_hash(x) (x)\n"
692 "#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
693 "#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
694 "#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
695 "#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
696 "#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
697 "static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
698 "       gs_uint32_t i,ret=0,tmp_sum = 0;\n"
699 "       for(i=0;i<x.length;++i){\n"
700 "               tmp_sum |= (x.data[i]) << (8*(i%4));\n"
701 "               if((i%4) == 3){\n"
702 "                       ret ^= tmp_sum;\n"
703 "                       tmp_sum = 0;\n"
704 "               }\n"
705 "       }\n"
706 "       if((i%4)!=0) ret ^=tmp_sum;\n"
707 "       return(ret);\n"
708 "}\n\n\n";
709
710
711
712 //////////////////////////////////////////////////////////////////
713 /////                   Get all of the query parse trees
714
715
716   int i,p;
717   int hfta_count = 0;           // for numeric suffixes to hfta .cc files
718
719 //---------------------------
720 //              Global info needed for post processing.
721
722 //                      Set of operator views ref'd in the query set.
723         opview_set opviews;
724 //                      lftas on a per-machine basis.
725         map<string, vector<stream_query *> > lfta_mach_lists;
726         int nfiles = input_file_names.size();
727         vector<stream_query *> hfta_list;               // list of hftas.
728         map<string, stream_query *> sq_map;             // map from query name to stream query.
729
730
731 //////////////////////////////////////////
732
733 //              Open and parse the interface resources file.
734         ifq_t *ifaces_db = new ifq_t();
735     string ierr;
736         if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
737                 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
738                                 ifx_fname.c_str(), ierr.c_str());
739                 exit(1);
740         }
741         if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
742                 fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
743                                 ifq_fls[0].c_str(), ierr.c_str());
744                 exit(1);
745         }
746
747   map<string, string> qname_to_flname;  // for detecting duplicate query names
748
749
750
751 //                      Parse the files to create a vector of parse trees.
752 //                      Load qnodes with information to perform a topo sort
753 //                      based on query dependencies.
754   vector<query_node *> qnodes;                          // for topo sort.
755   map<string,int> name_node_map;                        // map query name to qnodes entry
756   for(i=0;i<input_file_names.size();i++){
757
758           if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
759                   fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
760                   continue;
761           }
762 fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
763
764 //                      Parse the FTA query
765           fta_parse_result = new fta_parse_t();
766           FtaParser_setfileinput(fta_in);
767           if(FtaParserparse()){
768                 fprintf(stderr,"FTA parse failed.\n");
769                 exit(1);
770           }
771           if(fta_parse_result->parse_type != QUERY_PARSE){
772                 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
773                 exit(1);
774           }
775
776 //                      returns a list of parse trees
777           vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
778           for(p=0;p<qlist.size();++p){
779             table_exp_t *fta_parse_tree = qlist[p];
780 //              query_parse_trees.push_back(fta_parse_tree);
781
782 //                      compute the default name -- extract from query name
783                 strcpy(tmpstr,input_file_names[i].c_str());
784                 char *qname = strrchr(tmpstr,PATH_DELIM);
785                 if(qname == NULL)
786                         qname = tmpstr;
787                 else
788                         qname++;
789                 char *qname_end = strchr(qname,'.');
790                 if(qname_end != NULL) *qname_end = '\0';
791                 string qname_str = qname;
792                 string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
793
794 //                      Deternmine visibility.  Should I be attaching all of the output methods?
795                 if(qname_to_ospec.count(imputed_qname)>0)
796                         fta_parse_tree->set_visible(true);
797                 else
798                         fta_parse_tree->set_visible(false);
799
800
801 //                              Create a manipulable repesentation of the parse tree.
802 //                              the qnode inherits the visibility assigned to the parse tree.
803             int pos = qnodes.size();
804                 qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
805                 name_node_map[ qnodes[pos]->name ] = pos;
806 //printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
807 //              qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
808 //              qfiles.push_back(i);
809
810 //                      Check for duplicate query names
811 //                                      NOTE : in hfta-only generation, I should
812 //                                      also check with the names of the registered queries.
813                 if(qname_to_flname.count(qnodes[pos]->name) > 0){
814                         fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
815                                 qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
816                         exit(1);
817                 }
818                 if(Schema->find_tbl(qnodes[pos]->name) >= 0){
819                         fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
820                                 qnodes[pos]->name.c_str(), input_file_names[i].c_str());
821                         exit(1);
822                 }
823                 qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
824
825
826         }
827   }
828
829 //              Add the library queries
830
831   int pos;
832   for(pos=0;pos<qnodes.size();++pos){
833         int fi;
834         for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
835                 string src_tbl = qnodes[pos]->refd_tbls[fi];
836                 if(qname_to_flname.count(src_tbl) == 0){
837                         int last_sep = src_tbl.find_last_of('/');
838                         if(last_sep != string::npos){
839 fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
840                                 string target_qname = src_tbl.substr(last_sep+1);
841                                 string qpathname = library_path + src_tbl + ".gsql";
842                                 if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
843                                         fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
844                                         exit(1);
845                                         fprintf(stderr,"After exit\n");
846                                 }
847 fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
848 //                      Parse the FTA query
849                                 fta_parse_result = new fta_parse_t();
850                                 FtaParser_setfileinput(fta_in);
851                                 if(FtaParserparse()){
852                                         fprintf(stderr,"FTA parse failed.\n");
853                                         exit(1);
854                                 }
855                                 if(fta_parse_result->parse_type != QUERY_PARSE){
856                                         fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
857                                         exit(1);
858                                 }
859
860                                 map<string, int> local_query_map;
861                                 vector<string> local_query_names;
862                                 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
863                                 for(p=0;p<qlist.size();++p){
864                                 table_exp_t *fta_parse_tree = qlist[p];
865                                         fta_parse_tree->set_visible(false);             // assumed to not produce output
866                                         string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
867                                         if(imputed_qname == target_qname)
868                                                 imputed_qname = src_tbl;
869                                         if(local_query_map.count(imputed_qname)>0){
870                                                 fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
871                                                 exit(1);
872                                         }
873                                         local_query_map[ imputed_qname ] = p;
874                                         local_query_names.push_back(imputed_qname);
875                                 }
876
877                                 if(local_query_map.count(src_tbl)==0){
878                                         fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
879                                         exit(1);
880                                 }
881
882                                 vector<int> worklist;
883                                 set<int> added_queries;
884                                 vector<query_node *> new_qnodes;
885                                 worklist.push_back(local_query_map[target_qname]);
886                                 added_queries.insert(local_query_map[target_qname]);
887                                 int qq;
888                                 int qpos = qnodes.size();
889                                 for(qq=0;qq<worklist.size();++qq){
890                                         int q_id = worklist[qq];
891                                         query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
892                                         new_qnodes.push_back( new_qnode);
893                                         vector<string> refd_tbls =  new_qnode->refd_tbls;
894                                         int ff;
895                                         for(ff = 0;ff<refd_tbls.size();++ff){
896                                                 if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
897
898                                                         if(name_node_map.count(refd_tbls[ff])>0){
899                                                                 fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s,  and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
900                                                                 exit(1);
901                                                         }else{
902                                                                 worklist.push_back(local_query_map[refd_tbls[ff]]);
903                                                         }
904                                                 }
905                                         }
906                                 }
907
908                                 for(qq=0;qq<new_qnodes.size();++qq){
909                                         int qpos = qnodes.size();
910                                         qnodes.push_back(new_qnodes[qq]);
911                                         name_node_map[qnodes[qpos]->name ] = qpos;
912                                         qname_to_flname[qnodes[qpos]->name ] = qpathname;
913                                 }
914                         }
915                 }
916         }
917   }
918
919
920
921
922
923
924
925
926 //---------------------------------------
927
928
929 //              Add the UDOPS.
930
931   string udop_missing_sources;
932   for(i=0;i<qnodes.size();++i){
933         int fi;
934         for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
935                 int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
936                 if(sid >= 0){
937                         if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
938                                 if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
939                                 int pos = qnodes.size();
940                                         qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
941                                         name_node_map[ qnodes[pos]->name ] = pos;
942                                         qnodes[pos]->is_externally_visible = false;   // its visible
943         //                                      Need to mark the source queries as visible.
944                                         int si;
945                                         string missing_sources = "";
946                                         for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
947                                                 string src_tbl = qnodes[pos]->refd_tbls[si];
948                                                 if(name_node_map.count(src_tbl)==0){
949                                                         missing_sources += src_tbl + " ";
950                                                 }
951                                         }
952                                         if(missing_sources != ""){
953                                                 udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
954                                         }
955                                 }
956                         }
957                 }
958         }
959   }
960   if(udop_missing_sources != ""){
961         fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
962         exit(1);
963   }
964
965
966
967 ////////////////////////////////////////////////////////////////////
968 ///                             Check parse trees to verify that some
969 ///                             global properties are met :
970 ///                             if q1 reads from q2, then
971 ///                               q2 is processed before q1
972 ///                               q1 can supply q2's parameters
973 ///                             Verify there is no cycle in the reads-from graph.
974
975 //                      Compute an order in which to process the
976 //                      queries.
977
978 //                      Start by building the reads-from lists.
979 //
980
981   for(i=0;i<qnodes.size();++i){
982         int qi, fi;
983         vector<string> refd_tbls =  qnodes[i]->refd_tbls;
984         for(fi = 0;fi<refd_tbls.size();++fi){
985                 if(name_node_map.count(refd_tbls[fi])>0){
986 //printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
987                         (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
988                 }
989         }
990   }
991
992
993 //              If one query reads the result of another,
994 //              check for parameter compatibility.  Currently it must
995 //              be an exact match.  I will move to requiring
996 //              containment after re-ordering, but will require
997 //              some analysis for code generation which is not
998 //              yet in place.
999 //printf("There are %d query nodes.\n",qnodes.size());
1000
1001
1002   for(i=0;i<qnodes.size();++i){
1003         vector<var_pair_t *> target_params  = qnodes[i]->params;
1004         for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1005                 vector<var_pair_t *> source_params  = qnodes[(*si)]->params;
1006                 if(target_params.size() != source_params.size()){
1007                         fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1008                         exit(1);
1009                 }
1010                 int p;
1011                 for(p=0;p<target_params.size();++p){
1012                         if(! (target_params[p]->name == source_params[p]->name &&
1013                               target_params[p]->val == source_params[p]->val ) ){
1014                         fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1015                                 exit(1);
1016                         }
1017                 }
1018         }
1019   }
1020
1021
1022 //              Find the roots.
1023 //              Start by counting inedges.
1024   for(i=0;i<qnodes.size();++i){
1025         for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1026                 qnodes[(*si)]->n_consumers++;
1027         }
1028   }
1029
1030 //              The roots are the nodes with indegree zero.
1031   set<int> roots;
1032   for(i=0;i<qnodes.size();++i){
1033         if(qnodes[i]->n_consumers == 0){
1034                 if(qnodes[i]->is_externally_visible == false){
1035                         fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible.  Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
1036                 }
1037                 roots.insert(i);
1038         }
1039   }
1040
1041 //              Remove the parts of the subtree that produce no output.
1042   set<int> valid_roots;
1043   set<int> discarded_nodes;
1044   set<int> candidates;
1045   while(roots.size() >0){
1046         for(si=roots.begin();si!=roots.end();++si){
1047                 if(qnodes[(*si)]->is_externally_visible){
1048                         valid_roots.insert((*si));
1049                 }else{
1050                         discarded_nodes.insert((*si));
1051                         set<int>::iterator sir;
1052                         for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1053                                 qnodes[(*sir)]->n_consumers--;
1054                                 if(qnodes[(*sir)]->n_consumers == 0)
1055                                         candidates.insert( (*sir));
1056                         }
1057                 }
1058         }
1059         roots = candidates;
1060         candidates.clear();
1061   }
1062   roots = valid_roots;
1063   if(discarded_nodes.size()>0){
1064         fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
1065         int di = 0;
1066         for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
1067                 if(di>0 && (di%8)==0) fprintf(stderr,"\n");
1068                 di++;
1069                 fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
1070         }
1071         fprintf(stderr,"\n");
1072   }
1073
1074 //              Compute the sources_to set, ignoring discarded nodes.
1075   for(i=0;i<qnodes.size();++i){
1076         if(discarded_nodes.count(i)==0)
1077                 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1078                         qnodes[(*si)]->sources_to.insert(i);
1079         }
1080   }
1081
1082
1083 //              Find the nodes that are shared by multiple visible subtrees.
1084 //              THe roots become inferred visible nodes.
1085
1086 //              Find the visible nodes.
1087         vector<int> visible_nodes;
1088         for(i=0;i<qnodes.size();i++){
1089                 if(qnodes[i]->is_externally_visible){
1090                         visible_nodes.push_back(i);
1091                 }
1092         }
1093
1094 //              Find UDOPs referenced by visible nodes.
1095   list<int> workq;
1096   for(i=0;i<visible_nodes.size();++i){
1097         workq.push_back(visible_nodes[i]);
1098   }
1099   while(!workq.empty()){
1100         int node = workq.front();
1101         workq.pop_front();
1102         set<int>::iterator children;
1103         if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
1104                 qnodes[node]->is_externally_visible = true;
1105                 visible_nodes.push_back(node);
1106                 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1107                         if(qnodes[(*children)]->is_externally_visible == false){
1108                                 qnodes[(*children)]->is_externally_visible = true;
1109                                 visible_nodes.push_back((*children));
1110                         }
1111                 }
1112         }
1113         for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1114                 workq.push_back((*children));
1115         }
1116   }
1117
1118         bool done = false;
1119         while(!done){
1120 //      reset the nodes
1121                 for(i=0;i<qnodes.size();i++){
1122                         qnodes[i]->subtree_roots.clear();
1123                 }
1124
1125 //              Walk the tree defined by a visible node, not descending into
1126 //              subtrees rooted by a visible node.  Mark the node visited with
1127 //              the visible node ID.
1128                 for(i=0;i<visible_nodes.size();++i){
1129                         set<int> vroots;
1130                         vroots.insert(visible_nodes[i]);
1131                         while(vroots.size()>0){
1132                                 for(si=vroots.begin();si!=vroots.end();++si){
1133                                         qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
1134
1135                                         set<int>::iterator sir;
1136                                         for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1137                                                 if(! qnodes[(*sir)]->is_externally_visible){
1138                                                         candidates.insert( (*sir));
1139                                                 }
1140                                         }
1141                                 }
1142                                 vroots = candidates;
1143                                 candidates.clear();
1144                         }
1145                 }
1146 //              Find the nodes in multiple visible node subtrees, but with no parent
1147 //              that has is in multile visible node subtrees.  Mark these as inferred visible nodes.
1148                 done = true;    // until proven otherwise
1149                 for(i=0;i<qnodes.size();i++){
1150                         if(qnodes[i]->subtree_roots.size()>1){
1151                                 bool is_new_root = true;
1152                                 set<int>::iterator sir;
1153                                 for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
1154                                         if(qnodes[(*sir)]->subtree_roots.size()>1)
1155                                                 is_new_root = false;
1156                                 }
1157                                 if(is_new_root){
1158                                         qnodes[i]->is_externally_visible = true;
1159                                         qnodes[i]->inferred_visible_node = true;
1160                                         visible_nodes.push_back(i);
1161                                         done = false;
1162                                 }
1163                         }
1164                 }
1165         }
1166
1167
1168
1169
1170
1171 //              get visible nodes in topo ordering.
1172 //  for(i=0;i<qnodes.size();i++){
1173 //              qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
1174 //  }
1175   vector<int> process_order;
1176   while(roots.size() >0){
1177         for(si=roots.begin();si!=roots.end();++si){
1178                 if(discarded_nodes.count((*si))==0){
1179                         process_order.push_back( (*si) );
1180                 }
1181                 set<int>::iterator sir;
1182                 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1183                         qnodes[(*sir)]->n_consumers--;
1184                         if(qnodes[(*sir)]->n_consumers == 0)
1185                                 candidates.insert( (*sir));
1186                 }
1187         }
1188         roots = candidates;
1189         candidates.clear();
1190   }
1191
1192
1193 //printf("process_order.size() =%d\n",process_order.size());
1194
1195 //              Search for cyclic dependencies
1196   string found_dep;
1197   for(i=0;i<qnodes.size();++i){
1198         if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
1199                 if(found_dep.size() != 0) found_dep += ", ";
1200                 found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
1201         }
1202   }
1203   if(found_dep.size()>0){
1204         fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
1205         exit(1);
1206   }
1207
1208 //              Get a list of query sets, in the order to be processed.
1209 //              Start at visible root and do bfs.
1210 //              The query set includes queries referenced indirectly,
1211 //              as sources for user-defined operators.  These are needed
1212 //              to ensure that they are added to the schema, but are not part
1213 //              of the query tree.
1214
1215 //              stream_node_sets contains queries reachable only through the
1216 //              FROM clause, so I can tell which queries to add to the stream
1217 //              query. (DISABLED, UDOPS are integrated, does this cause problems?)
1218
1219 //                      NOTE: this code works because in order for data to be
1220 //                      read by multiple hftas, the node must be externally visible.
1221 //                      But visible nodes define roots of process sets.
1222 //                      internally visible nodes can feed data only
1223 //                      to other nodes in the same query file.
1224 //                      Therefore, any access can be restricted to a file,
1225 //                      hfta output sharing is done only on roots
1226 //                      never on interior nodes.
1227
1228
1229
1230
1231 //              Conpute the base collection of hftas.
1232   vector<hfta_node *> hfta_sets;
1233   map<string, int> hfta_name_map;
1234 //  vector< vector<int> > process_sets;
1235 //  vector< set<int> > stream_node_sets;
1236   reverse(process_order.begin(), process_order.end());  // get listing in reverse order.
1237                                                                                                                 // i.e. process leaves 1st.
1238   for(i=0;i<process_order.size();++i){
1239         if(qnodes[process_order[i]]->is_externally_visible == true){
1240 //printf("Visible.\n");
1241                 int root = process_order[i];
1242                 hfta_node *hnode = new hfta_node();
1243                 hnode->name = qnodes[root]-> name;
1244                 hnode->source_name = qnodes[root]-> name;
1245                 hnode->is_udop = qnodes[root]->is_udop;
1246                 hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
1247
1248                 vector<int> proc_list;  proc_list.push_back(root);
1249 //                      Ensure that nodes are added only once.
1250                 set<int> proc_set;      proc_set.insert(root);
1251                 roots.clear();                  roots.insert(root);
1252                 candidates.clear();
1253                 while(roots.size()>0){
1254                         for(si=roots.begin();si!=roots.end();++si){
1255 //printf("Processing root %d\n",(*si));
1256                                 set<int>::iterator sir;
1257                                 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1258 //printf("reads fom %d\n",(*sir));
1259                                         if(qnodes[(*sir)]->is_externally_visible==false){
1260                                                 candidates.insert( (*sir) );
1261                                                 if(proc_set.count( (*sir) )==0){
1262                                                         proc_set.insert( (*sir) );
1263                                                         proc_list.push_back( (*sir) );
1264                                                 }
1265                                         }
1266                                 }
1267                         }
1268                         roots = candidates;
1269                         candidates.clear();
1270                 }
1271
1272                 reverse(proc_list.begin(), proc_list.end());
1273                 hnode->query_node_indices = proc_list;
1274                 hfta_name_map[hnode->name] = hfta_sets.size();
1275                 hfta_sets.push_back(hnode);
1276         }
1277   }
1278
1279 //              Compute the reads_from / sources_to graphs for the hftas.
1280
1281   for(i=0;i<hfta_sets.size();++i){
1282         hfta_node *hnode = hfta_sets[i];
1283         for(q=0;q<hnode->query_node_indices.size();q++){
1284                 query_node *qnode = qnodes[hnode->query_node_indices[q]];
1285                 for(s=0;s<qnode->refd_tbls.size();++s){
1286                         if(hfta_name_map.count(qnode->refd_tbls[s])){
1287                                 int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
1288                                 hnode->reads_from.insert(other_hfta);
1289                                 hfta_sets[other_hfta]->sources_to.insert(i);
1290                         }
1291                 }
1292         }
1293   }
1294
1295 //              Compute a topological sort of the hfta_sets.
1296
1297   vector<int> hfta_topsort;
1298   workq.clear();
1299   int hnode_srcs[hfta_sets.size()];
1300   for(i=0;i<hfta_sets.size();++i){
1301         hnode_srcs[i] = 0;
1302         if(hfta_sets[i]->sources_to.size() == 0)
1303                 workq.push_back(i);
1304   }
1305
1306   while(! workq.empty()){
1307         int     node = workq.front();
1308         workq.pop_front();
1309         hfta_topsort.push_back(node);
1310         set<int>::iterator stsi;
1311         for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
1312                 int parent = (*stsi);
1313                 hnode_srcs[parent]++;
1314                 if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
1315                         workq.push_back(parent);
1316                 }
1317         }
1318   }
1319
1320 //              Decorate hfta nodes with the level of parallelism given as input.
1321
1322   map<string, int>::iterator msii;
1323   for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1324         string hfta_name = (*msii).first;
1325         int par = (*msii).second;
1326         if(hfta_name_map.count(hfta_name) > 0){
1327                 hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
1328         }else{
1329                 fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
1330         }
1331   }
1332
1333 //              Propagate levels of parallelism: children should have a level of parallelism
1334 //              as large as any of its parents.  Adjust children upwards to compensate.
1335 //              Start at parents and adjust children, auto-propagation will occur.
1336
1337   for(i=hfta_sets.size()-1;i>=0;i--){
1338         set<int>::iterator stsi;
1339         for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
1340                 if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
1341                         hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
1342                 }
1343         }
1344   }
1345
1346 //              Before all the name mangling, check if therey are any output_spec.cfg
1347 //              or hfta_parallelism.cfg entries that do not have a matching query.
1348
1349         string dangling_ospecs = "";
1350         for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
1351                 string oq = (*msii).first;
1352                 if(hfta_name_map.count(oq) == 0){
1353                         dangling_ospecs += " "+(*msii).first;
1354                 }
1355         }
1356         if(dangling_ospecs!=""){
1357                 fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
1358                 exit(1);
1359         }
1360
1361         string dangling_par = "";
1362         for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1363                 string oq = (*msii).first;
1364                 if(hfta_name_map.count(oq) == 0){
1365                         dangling_par += " "+(*msii).first;
1366                 }
1367         }
1368         if(dangling_par!=""){
1369                 fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
1370         }
1371
1372
1373
1374 //              Replicate parallelized hftas.  Do __copyX name mangling.  Adjust
1375 //              FROM clauses: retarget any name which is an internal node, and
1376 //              any which is in hfta_sets (and which is parallelized). Add Merge nodes
1377 //              when the source hfta has more parallelism than the target node.
1378 //              Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
1379
1380
1381   int n_original_hfta_sets = hfta_sets.size();
1382   for(i=0;i<n_original_hfta_sets;++i){
1383         if(hfta_sets[i]->n_parallel > 1){
1384                 hfta_sets[i]->do_generation =false;     // set the deletion flag for this entry.
1385                 set<string> local_nodes;                // names of query nodes in the hfta.
1386                 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1387                         local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
1388                 }
1389
1390                 for(p=0;p<hfta_sets[i]->n_parallel;++p){
1391                         string mangler = "__copy"+int_to_string(p);
1392                         hfta_node *par_hfta  = new hfta_node();
1393                         par_hfta->name = hfta_sets[i]->name + mangler;
1394                         par_hfta->source_name = hfta_sets[i]->name;
1395                         par_hfta->is_udop = hfta_sets[i]->is_udop;
1396                         par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
1397                         par_hfta->n_parallel = hfta_sets[i]->n_parallel;
1398                         par_hfta->parallel_idx = p;
1399
1400                         map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
1401
1402 //      Is it a UDOP?
1403                         if(hfta_sets[i]->is_udop){
1404                                 int root = hfta_sets[i]->query_node_indices[0];
1405
1406                                 string unequal_par_sources;
1407                                 set<int>::iterator rfsii;
1408                                 for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
1409                                         if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
1410                                                 unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
1411                                         }
1412                                 }
1413                                 if(unequal_par_sources != ""){
1414                                         fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
1415                                         exit(1);
1416                                 }
1417
1418                                 int rti;
1419                                 vector<string> new_sources;
1420                                 for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
1421                                         new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
1422                                 }
1423
1424                                 query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
1425                                 new_qn->name += mangler;
1426                                 new_qn->mangler = mangler;
1427                                 new_qn->refd_tbls = new_sources;
1428                                 par_hfta->query_node_indices.push_back(qnodes.size());
1429                                 par_qnode_map[new_qn->name] = qnodes.size();
1430                                 name_node_map[ new_qn->name ] = qnodes.size();
1431                                 qnodes.push_back(new_qn);
1432                         }else{
1433 //              regular query node
1434                           for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1435                                 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1436                                 table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
1437 //                                      rehome the from clause on mangled names.
1438 //                                      create merge nodes as needed for external sources.
1439                                 for(f=0;f<dup_pt->fm->tlist.size();++f){
1440                                         if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
1441                                                 dup_pt->fm->tlist[f]->schema_name += mangler;
1442                                         }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
1443 //                      Ref's an external HFTA.  No parallelism => leave as is.  Else, if level of parallelism of the two hftas is the same, rename by the mangler.  Else, there mnust be more sources, so create a merge node.
1444                                                 int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
1445                                                 if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
1446                                                         dup_pt->fm->tlist[f]->schema_name += mangler;
1447                                                 }else{
1448                                                         vector<string> src_tbls;
1449                                                         int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
1450                                                         if(stride == 0){
1451                                                                 fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
1452                                                                 exit(1);
1453                                                         }
1454                                                         for(s=0;s<stride;++s){
1455                                                                 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
1456                                                                 src_tbls.push_back(ext_src_name);
1457                                                         }
1458                                                         table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1459                                                         string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
1460                                                         dup_pt->fm->tlist[f]->schema_name = merge_node_name;
1461 //                                      Make a qnode to represent the new merge node
1462                                                         query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1463                                                         qn_pt->refd_tbls = src_tbls;
1464                                                         qn_pt->is_udop  = false;
1465                                                         qn_pt->is_externally_visible = false;
1466                                                         qn_pt->inferred_visible_node  = false;
1467                                                         par_hfta->query_node_indices.push_back(qnodes.size());
1468                                                         par_qnode_map[merge_node_name] = qnodes.size();
1469                                                         name_node_map[ merge_node_name ] = qnodes.size();
1470                                                         qnodes.push_back(qn_pt);
1471                                                 }
1472                                         }
1473                                 }
1474                                 query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
1475                                 for(f=0;f<dup_pt->fm->tlist.size();++f){
1476                                         new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
1477                                 }
1478                                 new_qn->params = qnodes[hqn_idx]->params;
1479                                 new_qn->is_udop = false;
1480                                 new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
1481                                 new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
1482                                 par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
1483                                 par_qnode_map[new_qn->name] = qnodes.size();
1484                                 name_node_map[ new_qn->name ] = qnodes.size();
1485                                 qnodes.push_back(new_qn);
1486                           }
1487                         }
1488                         hfta_name_map[par_hfta->name] = hfta_sets.size();
1489                         hfta_sets.push_back(par_hfta);
1490                 }
1491         }else{
1492 //              This hfta isn't being parallelized, but add merge nodes for any parallelized
1493 //              hfta sources.
1494                 if(!hfta_sets[i]->is_udop){
1495                   for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1496                         int hqn_idx = hfta_sets[i]->query_node_indices[h];
1497                         for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
1498                                 if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
1499 //                      Ref's an external HFTA.  No parallelism => leave as is.  Else, if level of parallelism of the two hftas is the same, rename by the mangler.  Else, there mnust be more sources, so create a merge node.
1500                                         int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
1501                                         if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
1502                                                 vector<string> src_tbls;
1503                                                 for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
1504                                                         string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
1505                                                         src_tbls.push_back(ext_src_name);
1506                                                 }
1507                                                 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1508                                                 string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
1509                                                 qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
1510 //                                      Make a qnode to represent the new merge node
1511                                                 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1512                                                 qn_pt->refd_tbls = src_tbls;
1513                                                 qn_pt->is_udop  = false;
1514                                                 qn_pt->is_externally_visible = false;
1515                                                 qn_pt->inferred_visible_node  = false;
1516                                                 hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
1517                                                 name_node_map[ merge_node_name ] = qnodes.size();
1518                                                 qnodes.push_back(qn_pt);
1519                                         }
1520                                 }
1521                         }
1522                 }
1523           }
1524         }
1525   }
1526
1527 //                      Rebuild the reads_from / sources_to lists in the qnodes
1528   for(q=0;q<qnodes.size();++q){
1529         qnodes[q]->reads_from.clear();
1530         qnodes[q]->sources_to.clear();
1531   }
1532   for(q=0;q<qnodes.size();++q){
1533         for(s=0;s<qnodes[q]->refd_tbls.size();++s){
1534                 if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
1535                         int rf = name_node_map[qnodes[q]->refd_tbls[s]];
1536                         qnodes[q]->reads_from.insert(rf);
1537                         qnodes[rf]->sources_to.insert(q);
1538                 }
1539         }
1540   }
1541
1542 //                      Rebuild the reads_from / sources_to lists in hfta_sets
1543   for(q=0;q<hfta_sets.size();++q){
1544         hfta_sets[q]->reads_from.clear();
1545         hfta_sets[q]->sources_to.clear();
1546   }
1547   for(q=0;q<hfta_sets.size();++q){
1548         for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
1549                 int node = hfta_sets[q]->query_node_indices[s];
1550                 set<int>::iterator rfsii;
1551                 for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
1552                         if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
1553                                 hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
1554                                 hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
1555                         }
1556                 }
1557         }
1558   }
1559
1560 /*
1561 for(q=0;q<qnodes.size();++q){
1562  printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
1563  set<int>::iterator rsii;
1564  for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
1565   printf(" %d",(*rsii));
1566   printf(", and sources-to %d:",qnodes[q]->sources_to.size());
1567  for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
1568   printf(" %d",(*rsii));
1569  printf("\n");
1570 }
1571
1572 for(q=0;q<hfta_sets.size();++q){
1573  if(hfta_sets[q]->do_generation==false)
1574         continue;
1575  printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
1576  set<int>::iterator rsii;
1577  for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
1578   printf(" %d",(*rsii));
1579   printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
1580  for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
1581   printf(" %d",(*rsii));
1582  printf("\n");
1583 }
1584 */
1585
1586
1587
1588 //              Re-topo sort the hftas
1589   hfta_topsort.clear();
1590   workq.clear();
1591   int hnode_srcs_2[hfta_sets.size()];
1592   for(i=0;i<hfta_sets.size();++i){
1593         hnode_srcs_2[i] = 0;
1594         if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
1595                 workq.push_back(i);
1596         }
1597   }
1598
1599   while(workq.empty() == false){
1600         int     node = workq.front();
1601         workq.pop_front();
1602         hfta_topsort.push_back(node);
1603         set<int>::iterator stsii;
1604         for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
1605                 int child = (*stsii);
1606                 hnode_srcs_2[child]++;
1607                 if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
1608                         workq.push_back(child);
1609                 }
1610         }
1611   }
1612
1613 //              Ensure that all of the query_node_indices in hfta_sets are topologically
1614 //              sorted, don't rely on assumptions that all transforms maintain some kind of order.
1615   for(i=0;i<hfta_sets.size();++i){
1616         if(hfta_sets[i]->do_generation){
1617                 map<int,int> n_accounted;
1618                 vector<int> new_order;
1619                 workq.clear();
1620                 vector<int>::iterator vii;
1621                 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1622                         n_accounted[(*vii)]= 0;
1623                 }
1624                 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1625                         set<int>::iterator rfsii;
1626                         for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
1627                                 if(n_accounted.count((*rfsii)) == 0){
1628                                         n_accounted[(*vii)]++;
1629                                 }
1630                         }
1631                         if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
1632                                 workq.push_back((*vii));
1633                         }
1634                 }
1635
1636                 while(workq.empty() == false){
1637                         int node = workq.front();
1638                         workq.pop_front();
1639                         new_order.push_back(node);
1640                         set<int>::iterator stsii;
1641                         for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
1642                                 if(n_accounted.count((*stsii))){
1643                                         n_accounted[(*stsii)]++;
1644                                         if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
1645                                                 workq.push_back((*stsii));
1646                                         }
1647                                 }
1648                         }
1649                 }
1650                 hfta_sets[i]->query_node_indices = new_order;
1651         }
1652   }
1653
1654
1655
1656
1657
1658 ///                     Global checkng is done, start the analysis and translation
1659 ///                     of the query parse tree in the order specified by process_order
1660
1661
1662 //                      Get a list of the LFTAs for global lfta optimization
1663 //                              TODO: separate building operators from spliting lftas,
1664 //                                      that will make optimizations such as predicate pushing easier.
1665         vector<stream_query *> lfta_list;
1666         stream_query *rootq;
1667     int qi,qj;
1668
1669         map<string, map<int, string> > schema_of_schemaid; // ensure that schema IDs are unique to an <interface, schema>
1670
1671         for(qi=hfta_topsort.size()-1;qi>=0;--qi){
1672
1673         int hfta_id = hfta_topsort[qi];
1674     vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
1675
1676
1677
1678 //              Two possibilities, either its a UDOP, or its a collection of queries.
1679 //      if(qnodes[curr_list.back()]->is_udop)
1680         if(hfta_sets[hfta_id]->is_udop){
1681                 int node_id = curr_list.back();
1682                 int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
1683                 opview_entry *opv = new opview_entry();
1684
1685 //                      Many of the UDOP properties aren't currently used.
1686                 opv->parent_qname = "no_parent";
1687                 opv->root_name = qnodes[node_id]->name;
1688                 opv->view_name = qnodes[node_id]->file;
1689                 opv->pos = qi;
1690                 sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
1691                 opv->udop_alias = tmpstr;
1692                 opv->mangler = qnodes[node_id]->mangler;
1693
1694                 if(opv->mangler != ""){
1695                         int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
1696                         Schema->mangle_subq_names(new_udop_schref,opv->mangler);
1697                 }
1698
1699 //                      This piece of code makes each hfta which referes to the same udop
1700 //                      reference a distinct running udop.  Do this at query optimization time?
1701 //              fmtbl->set_udop_alias(opv->udop_alias);
1702
1703                 opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
1704                 opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
1705
1706                 vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
1707                 int s,f,q;
1708                 for(s=0;s<subq.size();++s){
1709 //                              Validate that the fields match.
1710                         subquery_spec *sqs = subq[s];
1711                         string subq_name = sqs->name + opv->mangler;
1712                         vector<field_entry *> flds = Schema->get_fields(subq_name);
1713                         if(flds.size() == 0){
1714                                 fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
1715                                 return(1);
1716                         }
1717                         if(flds.size() < sqs->types.size()){
1718                                 fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
1719                                 return(1);
1720                         }
1721                         bool failed = false;
1722                         for(f=0;f<sqs->types.size();++f){
1723                                 data_type dte(sqs->types[f],sqs->modifiers[f]);
1724                                 data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
1725                                 if(! dte.subsumes_type(&dtf) ){
1726                                         fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
1727                                         failed = true;
1728                                 }
1729 /*
1730                                 if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
1731                                         string pstr = dte.get_temporal_string();
1732                                         fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
1733                                         failed = true;
1734                                 }
1735 */
1736                         }
1737                         if(failed)
1738                                 return(1);
1739 ///                             Validation done, find the subquery, make a copy of the
1740 ///                             parse tree, and add it to the return list.
1741                         for(q=0;q<qnodes.size();++q)
1742                                 if(qnodes[q]->name == subq_name)
1743                                         break;
1744                         if(q==qnodes.size()){
1745                                 fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
1746                                 return(1);
1747                         }
1748
1749                 }
1750
1751 //                      Cross-link to from entry(s) in all sourced-to tables.
1752                 set<int>::iterator sii;
1753                 for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
1754 //printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
1755                         vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
1756                         int ii;
1757                         for(ii=0;ii<tblvars.size();++ii){
1758                                 if(tblvars[ii]->schema_name == opv->root_name){
1759                                         tblvars[ii]->set_opview_idx(opviews.size());
1760                                 }
1761
1762                         }
1763                 }
1764
1765                 opviews.append(opv);
1766         }else{
1767
1768 //                      Analyze the parse trees in this query,
1769 //                      put them in rootq
1770 //      vector<int> curr_list = process_sets[qi];
1771
1772
1773 ////////////////////////////////////////
1774
1775           rootq = NULL;
1776 //printf("Process set %d, has %d queries\n",qi,curr_list.size());
1777           for(qj=0;qj<curr_list.size();++qj){
1778                 i = curr_list[qj];
1779         fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
1780
1781 //                      Select the current query parse tree
1782                 table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
1783
1784 //                      if hfta only, try to fetch any missing schemas
1785 //                      from the registry (using the print_schema program).
1786 //                      Here I use a hack to avoid analyzing the query -- all referenced
1787 //                      tables must be in the from clause
1788 //                      If there is a problem loading any table, just issue a warning,
1789 //
1790                 tablevar_list_t *fm = fta_parse_tree->get_from();
1791                 vector<string> refd_tbls =  fm->get_src_tbls(Schema);
1792 //                      iterate over all referenced tables
1793                 int t;
1794                 for(t=0;t<refd_tbls.size();++t){
1795                   int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
1796
1797                   if(tbl_ref < 0){      // if this table is not in the Schema
1798
1799                         if(hfta_only){
1800                                 string cmd="print_schema "+refd_tbls[t];
1801                                 FILE *schema_in = popen(cmd.c_str(), "r");
1802                                 if(schema_in == NULL){
1803                                   fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
1804                                 }else{
1805                                   string schema_instr;
1806                                   while(fgets(tmpstr,TMPSTRLEN,schema_in)){
1807                                         schema_instr += tmpstr;
1808                                   }
1809                           fta_parse_result = new fta_parse_t();
1810                                   strcpy(tmp_schema_str,schema_instr.c_str());
1811                                   FtaParser_setstringinput(tmp_schema_str);
1812                           if(FtaParserparse()){
1813                                 fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
1814                           }else{
1815                                         if( fta_parse_result->tables != NULL){
1816                                                 int tl;
1817                                                 for(tl=0;tl<fta_parse_result->tables->size();++tl){
1818                                                         Schema->add_table(fta_parse_result->tables->get_table(tl));
1819                                                 }
1820                                         }else{
1821                                                 fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
1822                                         }
1823                                 }
1824                         }
1825                   }else{
1826                                 fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
1827                                 exit(1);
1828                   }
1829
1830                 }
1831           }
1832
1833
1834 //                              Analyze the query.
1835           query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
1836           if(qs == NULL){
1837                 fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
1838                 exit(1);
1839           }
1840
1841           stream_query new_sq(qs, Schema);
1842           if(new_sq.error_code){
1843                         fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
1844                         exit(1);
1845           }
1846
1847 //                      Add it to the Schema
1848           table_def *output_td = new_sq.get_output_tabledef();
1849           Schema->add_table(output_td);
1850
1851 //                      Create a query plan from the analyzed parse tree.
1852 //                      If its a query referneced via FROM, add it to the stream query.
1853           if(rootq){
1854                 rootq->add_query(new_sq);
1855           }else{
1856                 rootq = new stream_query(new_sq);
1857 //                      have the stream query object inherit properties form the analyzed
1858 //                      hfta_node object.
1859                 rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
1860                 rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
1861           }
1862
1863
1864     }
1865
1866 //              This stream query has all its parts
1867 //              Build and optimize it.
1868 //printf("translate_fta: generating plan.\n");
1869         if(rootq->generate_plan(Schema)){
1870                 fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
1871                 continue;
1872         }
1873
1874 //      If we've found the query plan head, so now add the output operators
1875         if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
1876                 pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
1877                 multimap<string, int>::iterator mmsi;
1878                 oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
1879                 for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
1880                         rootq->add_output_operator(output_specs[(*mmsi).second]);
1881                 }
1882         }
1883
1884
1885
1886 //                              Perform query splitting if necessary.
1887         bool hfta_returned;
1888     vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
1889
1890         int l;
1891 //for(l=0;l<split_queries.size();++l){
1892 //printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
1893 //}
1894
1895
1896
1897
1898     if(split_queries.size() > 0){       // should be at least one component.
1899
1900 //                              Compute the number of LFTAs.
1901           int n_lfta = split_queries.size();
1902           if(hfta_returned) n_lfta--;
1903 //                              Check if a schemaId constraint needs to be inserted.
1904
1905 //                              Process the LFTA components.
1906           for(l=0;l<n_lfta;++l){
1907            if(lfta_names.count(split_queries[l]->query_name) == 0){
1908 //                              Grab the lfta for global optimization.
1909                 vector<tablevar_t *> tvec =  split_queries[l]->query_plan[0]->get_input_tbls();
1910                 string liface = "_local_";
1911 //              string lmach = "";
1912                 string lmach = hostname;
1913                 if(tvec.size()>0){
1914                         liface = tvec[0]->get_interface();      // iface queries have been resolved
1915                         if(tvec[0]->get_machine() != ""){
1916                                 lmach = tvec[0]->get_machine();
1917                         }else{
1918                                 fprintf(stderr,"WARNING, lfta %s has empty machine name, using %s\n",  split_queries[l]->query_plan[0]->node_name.c_str(), hostname.c_str());
1919                         }
1920                 } // else{
1921                         interface_names.push_back(liface);
1922                         machine_names.push_back(lmach);
1923 //              }
1924
1925                 vector<predicate_t *> schemaid_preds;
1926                 for(int irv=0;irv<tvec.size();++irv){
1927
1928                         string schema_name = tvec[irv]->get_schema_name();
1929                         string rvar_name = tvec[irv]->get_var_name();
1930                         int schema_ref = tvec[irv]->get_schema_ref();
1931                         if (lmach == "")
1932                                 lmach = hostname;
1933 //                      interface_names.push_back(liface);
1934 //                      machine_names.push_back(lmach);
1935
1936 //printf("Machine is %s\n",lmach.c_str());
1937
1938 //                              Check if a schemaId constraint needs to be inserted.
1939                         if(schema_ref<0){ // can result from some kinds of splits
1940                                 schema_ref = Schema->get_table_ref(schema_name);
1941                         }
1942                         int schema_id = Schema->get_schema_id(schema_ref);  // id associated with PROTOCOL
1943                         int errnum = 0;
1944                         string if_error;
1945                         iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
1946                         if(iface==NULL){
1947                                 fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
1948                                 exit(1);
1949                         }       
1950
1951
1952                         if(tvec[irv]->get_interface() != "_local_"){
1953                          if(iface->has_multiple_schemas()){
1954                                 if(schema_id<0){        // invalid schema_id
1955                                         fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
1956                                         exit(1);
1957                                 }
1958                                 vector<string> iface_schemas = iface->get_property("Schemas");
1959                                 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1960                                         fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1961                                         exit(1);
1962                                 }
1963 // Ensure that in liface, schema_id is used for only one schema
1964                                 if(schema_of_schemaid.count(liface)==0){
1965                                         map<int, string> empty_map;
1966                                         schema_of_schemaid[liface] = empty_map;
1967                                 }
1968                                 if(schema_of_schemaid[liface].count(schema_id)==0){
1969                                         schema_of_schemaid[liface][schema_id] = schema_name;
1970                                 }else{
1971                                         if(schema_of_schemaid[liface][schema_id] != schema_name){
1972                                                 fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
1973                                                 exit(1);
1974                                         }
1975                                 }
1976                          }else{ // single-schema interface
1977                                 schema_id = -1; // don't generate schema_id predicate
1978                                 vector<string> iface_schemas = iface->get_property("Schemas");
1979                                 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1980                                         fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1981                                         exit(1);
1982                                 }
1983                                 if(iface_schemas.size()>1){
1984                                         fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
1985                                         exit(1);
1986                                 }
1987                          }                      
1988                         }else{
1989                                 schema_id = -1;
1990                         }
1991
1992 // If we need to check the schema_id, insert a predicate into the lfta.
1993 //       TODO not just schema_id, the full all_schema_ids set.
1994                         if(schema_id>=0){
1995                                 colref_t *schid_cr = new colref_t("schemaId");
1996                                 schid_cr->schema_ref = schema_ref;
1997                                 schid_cr->table_name = rvar_name;
1998                                 schid_cr->tablevar_ref = 0;
1999                                 schid_cr->default_table = false;
2000                                 scalarexp_t *schid_se = new scalarexp_t(schid_cr);
2001                                 data_type *schid_dt = new data_type("uint");
2002                                 schid_se->dt = schid_dt;
2003
2004                                 string schid_str = int_to_string(schema_id);
2005                                 literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
2006                                 scalarexp_t *lit_se = new scalarexp_t(schid_lit);
2007                                 lit_se->dt = schid_dt;
2008
2009                                 predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
2010                                 vector<cnf_elem *> clist;
2011                                 make_cnf_from_pr(schid_pr, clist);
2012                                 analyze_cnf(clist[0]);
2013                                 clist[0]->cost = 1;     // cheap one comparison
2014 // cnf built, now insert it.
2015                                 split_queries[l]->query_plan[0]->append_to_where(clist[0]);
2016
2017 // Specialized processing 
2018 //              filter join, get two schemaid preds
2019                                 string node_type = split_queries[l]->query_plan[0]->node_type();
2020                                 if(node_type == "filter_join"){
2021                                         filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2022                                         if(irv==0){
2023                                                 fj->pred_t0.push_back(clist[0]);
2024                                         }else{
2025                                                 fj->pred_t1.push_back(clist[0]);
2026                                         }
2027                                         schemaid_preds.push_back(schid_pr);
2028                                 }
2029 //              watchlist join, get the first schemaid pred
2030                                 if(node_type == "watch_join"){
2031                                         watch_join_qpn *fj = (watch_join_qpn *)split_queries[l]->query_plan[0];
2032                                         if(irv==0){
2033                                                 fj->pred_t0.push_back(clist[0]);
2034                                                 schemaid_preds.push_back(schid_pr);
2035                                         }
2036                                 }
2037                         }
2038                 }
2039 // Specialized processing, currently filter join.
2040                 if(schemaid_preds.size()>1){
2041                         string node_type = split_queries[l]->query_plan[0]->node_type();
2042                         if(node_type == "filter_join"){
2043                                 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2044                                 predicate_t *filter_pr = new predicate_t("OR", schemaid_preds[0], schemaid_preds[1]);
2045                                 vector<cnf_elem *> clist;
2046                                 make_cnf_from_pr(filter_pr, clist);
2047                                 analyze_cnf(clist[0]);
2048                                 clist[0]->cost = 1;     // cheap one comparison
2049                                 fj->shared_pred.push_back(clist[0]);
2050                         }
2051                 }
2052                         
2053
2054
2055
2056
2057
2058
2059 //                      Set the ht size from the recommendation, if there is one in the rec file
2060                 if(lfta_htsize.count(split_queries[l]->query_name)>0){
2061                         split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
2062                 }
2063
2064
2065                 lfta_names[split_queries[l]->query_name] = lfta_list.size();
2066                 split_queries[l]->set_gid(lfta_list.size());  // set lfta global id
2067                 lfta_list.push_back(split_queries[l]);
2068                 lfta_mach_lists[lmach].push_back(split_queries[l]);
2069
2070 //                      THe following is a hack,
2071 //                      as I should be generating LFTA code through
2072 //                      the stream_query object.
2073
2074                 split_queries[l]->query_plan[0]->bind_to_schema(Schema);
2075
2076 //              split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
2077
2078 /*
2079 //                              Create query description to embed in lfta.c
2080                 string lfta_schema_str = split_queries[l]->make_schema();
2081                 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2082
2083 //                              get NIC capabilities.
2084                 int erri;
2085                 nic_property *nicprop = NULL;
2086                 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2087                 if(iface_codegen_type.size()){
2088                         nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2089                         if(!nicprop){
2090                                 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2091                                         exit(1);
2092                         }
2093                 }
2094
2095                 lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
2096 */
2097
2098                 snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
2099
2100 // TODO NOTE : I'd like it to be the case that registration_query_names
2101 //      are the queries to be registered for subsciption.
2102 //      but there is some complex bookkeeping here.
2103                 registration_query_names.push_back(split_queries[l]->query_name);
2104                 mach_query_names[lmach].push_back(registration_query_names.size()-1);
2105 //                      NOTE: I will assume a 1-1 correspondance between
2106 //                      mach_query_names[lmach] and lfta_mach_lists[lmach]
2107 //                      where mach_query_names[lmach][i] contains the index into
2108 //                      query_names, which names the lfta, and
2109 //                      mach_query_names[lmach][i] is the stream_query * of the
2110 //                      corresponding lfta.
2111 //                      Later, lfta_iface_qnames are the query names matching lfta_iface_lists
2112
2113
2114
2115                 // check if lfta is reusable
2116                 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
2117
2118                 bool lfta_reusable = false;
2119                 if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
2120                         split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
2121                         lfta_reusable = true;
2122                 }
2123                 lfta_reuse_options.push_back(lfta_reusable);
2124
2125                 // LFTA will inherit the liveness timeout specification from the containing query
2126                 // it is too conservative as lfta are expected to spend less time per tuple
2127                 // then full query
2128
2129                 // extract liveness timeout from query definition
2130                 int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
2131                 if (!liveness_timeout) {
2132 //                  fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
2133 //                    split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2134                     liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2135                 }
2136                 lfta_liveness_timeouts.push_back(liveness_timeout);
2137
2138 //                      Add it to the schema
2139                 table_def *td = split_queries[l]->get_output_tabledef();
2140                 Schema->append_table(td);
2141 //printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
2142
2143           }
2144          }
2145
2146 //                              If the output is lfta-only, dump out the query name.
2147       if(split_queries.size() == 1 && !hfta_returned){
2148         if(output_query_names ){
2149            fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
2150                 }
2151 /*
2152 else{
2153            fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
2154                 }
2155 */
2156
2157 /*
2158 //                              output schema summary
2159                 if(output_schema_summary){
2160                         dump_summary(split_queries[0]);
2161                 }
2162 */
2163       }
2164
2165
2166           if(hfta_returned){            // query also has an HFTA component
2167                 int hfta_nbr = split_queries.size()-1;
2168
2169                         hfta_list.push_back(split_queries[hfta_nbr]);
2170
2171 //                                      report on generated query names
2172         if(output_query_names){
2173                         string hfta_name =split_queries[hfta_nbr]->query_name;
2174                 fprintf(query_name_output,"%s H\n",hfta_name.c_str());
2175                         for(l=0;l<hfta_nbr;++l){
2176                                 string lfta_name =split_queries[l]->query_name;
2177                         fprintf(query_name_output,"%s L\n",lfta_name.c_str());
2178                         }
2179                 }
2180 //              else{
2181 //              fprintf(stderr,"query names are ");
2182 //                      for(l=0;l<hfta_nbr;++l){
2183 //                              if(l>0) fprintf(stderr,",");
2184 //                              string fta_name =split_queries[l]->query_name;
2185 //                      fprintf(stderr," %s",fta_name.c_str());
2186 //                      }
2187 //                      fprintf(stderr,"\n");
2188 //              }
2189           }
2190
2191   }else{
2192           fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
2193           fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
2194           exit(1);
2195   }
2196  }
2197 }
2198
2199
2200 //-----------------------------------------------------------------
2201 //              Compute and propagate the SE in PROTOCOL fields compute a field.
2202 //-----------------------------------------------------------------
2203
2204 for(i=0;i<lfta_list.size();i++){
2205         lfta_list[i]->generate_protocol_se(sq_map, Schema);
2206         sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
2207 }
2208 for(i=0;i<hfta_list.size();i++){
2209         hfta_list[i]->generate_protocol_se(sq_map, Schema);
2210         sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
2211 }
2212
2213
2214
2215 //------------------------------------------------------------------------
2216 //              Perform  individual FTA optimizations
2217 //-----------------------------------------------------------------------
2218
2219 if (partitioned_mode) {
2220
2221         // open partition definition file
2222         string part_fname = config_dir_path + "partition.txt";
2223
2224         FILE* partfd = fopen(part_fname.c_str(), "r");
2225         if (!partfd) {
2226                 fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
2227                 exit(1);
2228         }
2229         PartnParser_setfileinput(partfd);
2230         if (PartnParserparse()) {
2231                 fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
2232                 exit(1);
2233         }
2234         fclose(partfd);
2235 }
2236
2237
2238 print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2239
2240 int num_hfta = hfta_list.size();
2241 for(i=0; i < hfta_list.size(); ++i){
2242         hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
2243 }
2244
2245 //                      Add all new hftas to schema
2246 for(i=num_hfta; i < hfta_list.size(); ++i){
2247                 table_def *td = hfta_list[i]->get_output_tabledef();
2248                 Schema->append_table(td);
2249 }
2250
2251 print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2252
2253
2254
2255 //------------------------------------------------------------------------
2256 //              Do global (cross-fta) optimization
2257 //-----------------------------------------------------------------------
2258
2259
2260
2261
2262
2263
2264 set<string> extra_external_libs;
2265
2266 for(i=0;i<hfta_list.size();++i){                // query also has an HFTA component
2267
2268                 if(! debug_only){
2269 //                                      build hfta file name, create output
2270            if(numeric_hfta_flname){
2271             sprintf(tmpstr,"hfta_%d",hfta_count);
2272                         hfta_names.push_back(tmpstr);
2273             sprintf(tmpstr,"hfta_%d.cc",hfta_count);
2274          }else{
2275             sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
2276                         hfta_names.push_back(tmpstr);
2277             sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
2278           }
2279                   FILE *hfta_fl = fopen(tmpstr,"w");
2280                   if(hfta_fl == NULL){
2281                         fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
2282                         exit(1);
2283                   }
2284                   fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
2285
2286 //                      If there is a field verifier, warn about
2287 //                      lack of compatability
2288 //                              NOTE : this code assumes that visible non-lfta queries
2289 //                              are those at the root of a stream query.
2290                   string hfta_comment;
2291                   string hfta_title;
2292                   string hfta_namespace;
2293                   if(hfta_list[i]->defines.count("comment")>0)
2294                         hfta_comment = hfta_list[i]->defines["comment"];
2295                   if(hfta_list[i]->defines.count("Comment")>0)
2296                         hfta_comment = hfta_list[i]->defines["Comment"];
2297                   if(hfta_list[i]->defines.count("COMMENT")>0)
2298                         hfta_comment = hfta_list[i]->defines["COMMENT"];
2299                   if(hfta_list[i]->defines.count("title")>0)
2300                         hfta_title = hfta_list[i]->defines["title"];
2301                   if(hfta_list[i]->defines.count("Title")>0)
2302                         hfta_title = hfta_list[i]->defines["Title"];
2303                   if(hfta_list[i]->defines.count("TITLE")>0)
2304                         hfta_title = hfta_list[i]->defines["TITLE"];
2305                   if(hfta_list[i]->defines.count("namespace")>0)
2306                         hfta_namespace = hfta_list[i]->defines["namespace"];
2307                   if(hfta_list[i]->defines.count("Namespace")>0)
2308                         hfta_namespace = hfta_list[i]->defines["Namespace"];
2309                   if(hfta_list[i]->defines.count("NAMESPACE")>0)
2310                         hfta_namespace = hfta_list[i]->defines["NAMESPACE"];
2311
2312                   if(field_verifier != NULL){
2313                         string warning_str;
2314                         if(hfta_comment == "")
2315                                 warning_str += "\tcomment not found.\n";
2316
2317 // Obsolete stuff that Carsten wanted
2318 //                      if(hfta_title == "")
2319 //                              warning_str += "\ttitle not found.\n";
2320 //                      if(hfta_namespace == "")
2321 //                              warning_str += "\tnamespace not found.\n";
2322
2323 //      There is a get_tbl_keys method implemented for qp_nodes,
2324 //      integrate it into steam_query, then call it to find keys,
2325 //      and annotate feidls with their key-ness.
2326 //      If there is a "keys" proprty in the defines block, override anything returned
2327 //      from the automated analysis
2328
2329                         vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2330                         int fi;
2331                         for(fi=0;fi<flds.size();fi++){
2332                                 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2333                         }
2334                         if(warning_str != "")
2335                                 fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
2336                                         hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
2337                   }
2338
2339 // Get the fields in this query
2340                   vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2341
2342 // do key processing
2343                   string hfta_keys_s = "";
2344                   if(hfta_list[i]->defines.count("keys")>0)
2345                         hfta_keys_s = hfta_list[i]->defines["keys"];
2346                   if(hfta_list[i]->defines.count("Keys")>0)
2347                         hfta_keys_s = hfta_list[i]->defines["Keys"];
2348                   if(hfta_list[i]->defines.count("KEYS")>0)
2349                         hfta_keys_s = hfta_list[i]->defines["KEYS"];
2350                   string xtra_keys_s = "";
2351                   if(hfta_list[i]->defines.count("extra_keys")>0)
2352                         xtra_keys_s = hfta_list[i]->defines["extra_keys"];
2353                   if(hfta_list[i]->defines.count("Extra_Keys")>0)
2354                         xtra_keys_s = hfta_list[i]->defines["Extra_Keys"];
2355                   if(hfta_list[i]->defines.count("EXTRA_KEYS")>0)
2356                         xtra_keys_s = hfta_list[i]->defines["EXTRA_KEYS"];
2357 // get the keys
2358                   vector<string> hfta_keys;
2359                   vector<string> partial_keys;
2360                   vector<string> xtra_keys;
2361                   if(hfta_keys_s==""){
2362                                 hfta_keys = hfta_list[i]->get_tbl_keys(partial_keys);
2363                                 if(xtra_keys_s.size()>0){
2364                                         xtra_keys = split_string(xtra_keys_s, ',');
2365                                 }
2366                                 for(int xi=0;xi<xtra_keys.size();++xi){
2367                                         if(find(hfta_keys.begin(), hfta_keys.end(), xtra_keys[xi]) == hfta_keys.end()){
2368                                                 hfta_keys.push_back(xtra_keys[xi]);
2369                                         }
2370                                 }
2371                   }else{
2372                                 hfta_keys = split_string(hfta_keys_s, ',');
2373                   }
2374 // validate that all of the keys exist in the output.
2375 //      (exit on error, as its a bad specificiation)
2376                   vector<string> missing_keys;
2377                   for(int ki=0;ki<hfta_keys.size(); ++ki){
2378                         int fi;
2379                         for(fi=0;fi<flds.size();++fi){
2380                                 if(hfta_keys[ki] == flds[fi]->get_name())
2381                                         break;
2382                         }
2383                         if(fi==flds.size())
2384                                 missing_keys.push_back(hfta_keys[ki]);
2385                   }
2386                   if(missing_keys.size()>0){
2387                         fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the output:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2388                         for(int hi=0; hi<missing_keys.size(); ++hi){
2389                                 fprintf(stderr," %s", missing_keys[hi].c_str());
2390                         }
2391                         fprintf(stderr,"\n");
2392                         exit(1);
2393                   }
2394
2395                   fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2396                   if(hfta_comment != "")
2397                         fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
2398                   if(hfta_title != "")
2399                         fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
2400                   if(hfta_namespace != "")
2401                         fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
2402                   fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
2403                   fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2404
2405 //                              write info about fields to qtree.xml
2406                   int fi;
2407                   for(fi=0;fi<flds.size();fi++){
2408                         fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2409                         if(flds[fi]->get_modifier_list()->size()){
2410                                 fprintf(qtree_output,"mods='%s'", flds[fi]->get_modifier_list()->to_string().c_str());
2411                         }
2412                         fprintf(qtree_output," />\n");
2413                   }
2414 // info about keys
2415                   for(int hi=0;hi<hfta_keys.size();++hi){
2416                         fprintf(qtree_output, "\t\t<Key value='%s'/>\n",hfta_keys[hi].c_str());
2417                   }
2418                   for(int hi=0;hi<partial_keys.size();++hi){
2419                         fprintf(qtree_output, "\t\t<PartialKey value='%s'/>\n",partial_keys[hi].c_str());
2420                   }
2421                   for(int hi=0;hi<xtra_keys.size();++hi){
2422                         fprintf(qtree_output, "\t\t<ExtraKey value='%s'/>\n",xtra_keys[hi].c_str());
2423                   }
2424
2425
2426                   // extract liveness timeout from query definition
2427                   int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
2428                   if (!liveness_timeout) {
2429 //                  fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
2430 //                    hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2431                     liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2432                   }
2433                   fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
2434
2435                   vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
2436                   int itv;
2437                   for(itv=0;itv<tmp_tv.size();++itv){
2438                         fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
2439                   }
2440                   string ifrs = hfta_list[i]->collect_refd_ifaces();
2441                   if(ifrs != ""){
2442                         fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
2443                   }
2444                   fprintf(qtree_output,"\t</HFTA>\n");
2445
2446                   fclose(hfta_fl);
2447                 }else{
2448 //                                      debug only -- do code generation to catch generation-time errors.
2449                   hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
2450                 }
2451
2452                 hfta_count++;   // for hfta file names with numeric suffixes
2453
2454                 hfta_list[i]->get_external_libs(extra_external_libs);
2455
2456           }
2457
2458 string ext_lib_string;
2459 set<string>::iterator ssi_el;
2460 for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
2461         ext_lib_string += (*ssi_el)+" ";
2462
2463
2464
2465 //                      Report on the set of operator views
2466   for(i=0;i<opviews.size();++i){
2467         opview_entry *opve = opviews.get_entry(i);
2468         fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
2469         fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
2470         fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
2471         fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
2472         fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2473
2474         if (!opve->liveness_timeout) {
2475 //              fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
2476 //                      opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2477                 opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2478         }
2479         fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
2480     int j;
2481         for(j=0;j<opve->subq_names.size();j++)
2482                 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
2483         fprintf(qtree_output,"\t</UDOP>\n");
2484   }
2485
2486
2487 //-----------------------------------------------------------------
2488
2489 //                      Create interface-specific meta code files.
2490 //                              first, open and parse the interface resources file.
2491         ifaces_db = new ifq_t();
2492     ierr = "";
2493         if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
2494                 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
2495                                 ifx_fname.c_str(), ierr.c_str());
2496                 exit(1);
2497         }
2498
2499         map<string, vector<stream_query *> >::iterator svsi;
2500         for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
2501                 string lmach = (*svsi).first;
2502
2503         //              For this machine, create a set of lftas per interface.
2504                 vector<stream_query *> mach_lftas = (*svsi).second;
2505                 map<string, vector<stream_query *> > lfta_iface_lists;
2506                 int li;
2507                 for(li=0;li<mach_lftas.size();++li){
2508                         vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2509                         string lfta_iface = "_local_";
2510                         if(tvec.size()>0){
2511                                 string lfta_iface = tvec[0]->get_interface();
2512                         }
2513                         lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2514                 }
2515
2516                 map<string, vector<stream_query *> >::iterator lsvsi;
2517                 for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
2518                         int erri;
2519                         string liface = (*lsvsi).first;
2520                         vector<stream_query *> iface_lftas = (*lsvsi).second;
2521                         vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2522                         if(iface_codegen_type.size()){
2523                                 nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2524                                 if(!nicprop){
2525                                         fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2526                                         exit(1);
2527                                 }
2528                                 string mcs = generate_nic_code(iface_lftas, nicprop);
2529                                 string mcf_flnm;
2530                                 if(lmach != "")
2531                                   mcf_flnm = lmach + "_"+liface+".mcf";
2532                                 else
2533                                   mcf_flnm = hostname + "_"+liface+".mcf";
2534                                 FILE *mcf_fl ;
2535                                 if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
2536                                         fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
2537                                         exit(1);
2538                                 }
2539                                 fprintf(mcf_fl,"%s",mcs.c_str());
2540                                 fclose(mcf_fl);
2541 //printf("mcs of machine %s, iface %s of type %s is \n%s\n",
2542 //lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
2543                         }
2544                 }
2545
2546
2547         }
2548
2549
2550
2551 //-----------------------------------------------------------------
2552
2553
2554 //                      Find common filter predicates in the LFTAs.
2555 //                      in addition generate structs to store the temporal attributes unpacked by prefilter
2556         
2557         map<string, vector<stream_query *> >::iterator ssqi;
2558         for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
2559
2560                 string lmach = (*ssqi).first;
2561                 bool packed_return = false;
2562                 int li, erri;
2563
2564
2565 //      The LFTAs of this machine.
2566                 vector<stream_query *> mach_lftas = (*ssqi).second;
2567 //      break up on a per-interface basis.
2568                 map<string, vector<stream_query *> > lfta_iface_lists;
2569                 map<string, vector<int> > lfta_iface_qname_ix; // need the query name
2570                                                         // for fta_init
2571                 for(li=0;li<mach_lftas.size();++li){
2572                         vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2573                         string lfta_iface = "_local_";
2574                         if(tvec.size()>0){
2575                                 lfta_iface = tvec[0]->get_interface();
2576                         }
2577                         lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2578                         lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
2579                 }
2580
2581
2582 //      Are the return values "packed"?
2583 //      This should be done on a per-interface basis.
2584 //      But this is defunct code for gs-lite
2585                 for(li=0;li<mach_lftas.size();++li){
2586                   vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2587                   string liface = "_local_";
2588                   if(tvec.size()>0){
2589                          liface = tvec[0]->get_interface();
2590                   }
2591                   vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2592                   if(iface_codegen_type.size()){
2593                         if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
2594                         packed_return = true;
2595                         }
2596                   }
2597                 }
2598
2599
2600 // Separate lftas by interface, collect results on a per-interface basis.
2601
2602                 vector<cnf_set *> no_preds;     // fallback if there is no prefilter
2603                 map<string, vector<cnf_set *> > prefilter_preds;
2604                 set<unsigned int> pred_ids;     // this can be global for all interfaces
2605                 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2606                         string liface = (*mvsi).first;
2607                         vector<cnf_set *> empty_list;
2608                         prefilter_preds[liface] = empty_list;
2609                         if(! packed_return){
2610                                 get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
2611                         }
2612
2613 //                              get NIC capabilities.  (Is this needed?)
2614                         nic_property *nicprop = NULL;
2615                         vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2616                         if(iface_codegen_type.size()){
2617                                 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2618                                 if(!nicprop){
2619                                         fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2620                                         exit(1);
2621                                 }
2622                         }
2623                 }
2624
2625
2626 //              Now that we know the prefilter preds, generate the lfta code.
2627 //      Do this for all lftas in this machine.
2628                 for(li=0;li<mach_lftas.size();++li){
2629                         set<unsigned int> subsumed_preds;
2630                         set<unsigned int>::iterator sii;
2631 #ifdef PREFILTER_OK
2632                         for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
2633                                 int pid = (*sii);
2634                                 if((pid>>16) == li){
2635                                         subsumed_preds.insert(pid & 0xffff);
2636                                 }
2637                         }
2638 #endif
2639                         string lfta_schema_str = mach_lftas[li]->make_schema();
2640                         string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2641                         nic_property *nicprop = NULL; // no NIC properties?
2642                         lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
2643                 }
2644
2645
2646 //                      generate structs to store the temporal attributes
2647 //                      unpacked by prefilter
2648                 col_id_set temp_cids;
2649                 get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
2650                 lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
2651
2652 //                      Compute the lfta bit signatures and the lfta colrefs
2653 //      do this on a per-interface basis
2654 #ifdef PREFILTER_OK
2655                         lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2656 #endif
2657                 map<string, vector<long long int> > lfta_sigs; // used again later
2658                 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2659                         string liface = (*mvsi).first;
2660                         vector<long long int> empty_list;
2661                         lfta_sigs[liface] = empty_list;
2662
2663                         vector<col_id_set> lfta_cols;
2664                         vector<int> lfta_snap_length;
2665                         for(li=0;li<lfta_iface_lists[liface].size();++li){
2666                                 unsigned long long int mask=0, bpos=1;
2667                                 int f_pos;
2668                                 for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
2669                                         if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
2670                                                 mask |= bpos;
2671                                         bpos = bpos << 1;
2672                                 }
2673                                 lfta_sigs[liface].push_back(mask);
2674                                 lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
2675                                 lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
2676                         }
2677
2678 //for(li=0;li<mach_lftas.size();++li){
2679 //printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
2680 //col_id_set::iterator tcisi;
2681 //for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
2682 //printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
2683 //}
2684 //}
2685
2686
2687 //                      generate the prefilter
2688 //      Do this on a per-interface basis, except for the #define
2689 #ifdef PREFILTER_OK
2690 //                      lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2691                         lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2692 #else
2693                         lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns,  lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2694
2695 #endif
2696                 }
2697
2698 //                      Generate interface parameter lookup function
2699           lfta_val[lmach] += "// lookup interface properties by name\n";
2700           lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
2701           lfta_val[lmach] += "// returns NULL if given property does not exist\n";
2702           lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
2703
2704 //        collect a lit of interface names used by queries running on this host
2705           set<std::string> iface_names;
2706           for(i=0;i<mach_query_names[lmach].size();i++){
2707                 int mi = mach_query_names[lmach][i];
2708                 stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2709
2710                 if(interface_names[mi]=="")
2711                         iface_names.insert("DEFAULTDEV");
2712                 else
2713                         iface_names.insert(interface_names[mi]);
2714           }
2715
2716 //        generate interface property lookup code for every interface
2717           set<std::string>::iterator sir;
2718           for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
2719                 if (sir == iface_names.begin())
2720                         lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2721                 else
2722                         lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2723
2724                 // iterate through interface properties
2725                 vector<string> iface_properties;
2726                 if(*sir!="_local_"){    // dummy watchlist interface, don't process.
2727                         iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
2728                 }
2729                 if (erri) {
2730                         fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
2731                         exit(1);
2732                 }
2733                 if (iface_properties.empty())
2734                         lfta_val[lmach] += "\t\treturn NULL;\n";
2735                 else {
2736                         for (int i = 0; i < iface_properties.size(); ++i) {
2737                                 if (i == 0)
2738                                         lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2739                                 else
2740                                         lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2741
2742                                 // combine all values for the interface property using comma separator
2743                                 vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
2744                                 lfta_val[lmach] += "\t\t\treturn \"";
2745                                 for (int j = 0; j < vals.size(); ++j) {
2746                                         lfta_val[lmach] +=  vals[j];
2747                                         if (j != vals.size()-1)
2748                                                 lfta_val[lmach] += ",";
2749                                 }
2750                                 lfta_val[lmach] += "\";\n";
2751                         }
2752                         lfta_val[lmach] += "\t\t} else\n";
2753                         lfta_val[lmach] += "\t\t\treturn NULL;\n";
2754                 }
2755           }
2756           lfta_val[lmach] += "\t} else\n";
2757           lfta_val[lmach] += "\t\treturn NULL;\n";
2758           lfta_val[lmach] += "}\n\n";
2759
2760
2761 //                      Generate a full list of FTAs for clearinghouse reference
2762           lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
2763           lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
2764
2765           bool first = true;
2766           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2767                 string liface = (*mvsi).first;
2768                 if(liface != "_local_"){        // these don't register themselves
2769                         vector<stream_query *> lfta_list = (*mvsi).second;
2770                         for(i=0;i<lfta_list.size();i++){
2771                                 int mi = lfta_iface_qname_ix[liface][i];
2772                                 if(first) first = false;
2773                                 else      lfta_val[lmach] += ", ";
2774                                 lfta_val[lmach] += "\"" + registration_query_names[mi] + "\"";
2775                         }
2776                 }
2777           }
2778 //        for (i = 0; i < registration_query_names.size(); ++i) {
2779 //                 if (i)
2780 //                        lfta_val[lmach] += ", ";
2781 //                 lfta_val[lmach] += "\"" + registration_query_names[i] + "\"";
2782 //        }
2783
2784           for (i = 0; i < hfta_list.size(); ++i) {
2785                    lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
2786           }
2787           lfta_val[lmach] += ", NULL};\n\n";
2788
2789
2790 //                      Add the initialization function to lfta.c
2791 //      Change to accept the interface name, and 
2792 //      set the prefilter function accordingly.
2793 //      see the example in demo/err2
2794           lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
2795           lfta_val[lmach] += "//        note: the last parameter in fta_register is the prefilter signature\n";
2796
2797 //        for(i=0;i<mach_query_names[lmach].size();i++)
2798 //              int mi = mach_query_names[lmach][i];
2799 //              stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2800
2801           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2802                 string liface = (*mvsi).first;
2803                 vector<stream_query *> lfta_list = (*mvsi).second;
2804                 for(i=0;i<lfta_list.size();i++){
2805                         stream_query *lfta_sq = lfta_list[i];
2806                         int mi = lfta_iface_qname_ix[liface][i];
2807
2808                         if(liface == "_local_"){
2809 //  Don't register an init function, do the init code inline
2810                                 lfta_val[lmach] += "// initialization code for "+registration_query_names[mi]+";\n";
2811                                 lfta_val[lmach] += "\t"+generate_watchlist_name(registration_query_names[mi])+".filename = "+generate_watchlist_name(registration_query_names[mi])+"__fstr;\n";
2812                                 continue;
2813                         }
2814                 
2815                         fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
2816
2817                         string this_iface = "DEFAULTDEV";
2818                         if(interface_names[mi]!="")
2819                                 this_iface = '"'+interface_names[mi]+'"';
2820                         lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
2821                 lfta_val[lmach] += "\t\tfta_register(\""+registration_query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
2822 //              if(interface_names[mi]=="")
2823 //                              lfta_val[lmach]+="DEFAULTDEV";
2824 //              else
2825 //                              lfta_val[lmach]+='"'+interface_names[mi]+'"';
2826                         lfta_val[lmach] += this_iface;
2827
2828
2829                 lfta_val[lmach] += ", "+generate_alloc_name(registration_query_names[mi])
2830                         +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(registration_query_names[mi])
2831                         +"\n#endif\n";
2832                                 sprintf(tmpstr,",%d",snap_lengths[mi]);
2833                         lfta_val[lmach] += tmpstr;
2834
2835 //                      unsigned long long int mask=0, bpos=1;
2836 //                      int f_pos;
2837 //                      for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
2838 //                              if(prefilter_preds[f_pos]->lfta_id.count(i))
2839 //                                      mask |= bpos;
2840 //                              bpos = bpos << 1;
2841 //                      }
2842
2843 #ifdef PREFILTER_OK
2844 //                      sprintf(tmpstr,",%lluull",mask);
2845                         sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
2846                         lfta_val[lmach]+=tmpstr;
2847 #else
2848                         lfta_val[lmach] += ",0ull";
2849 #endif
2850
2851                         lfta_val[lmach] += ");\n";
2852
2853
2854
2855 //    End of lfta prefilter stuff
2856 // --------------------------------------------------
2857
2858 //                      If there is a field verifier, warn about
2859 //                      lack of compatability
2860                   string lfta_comment;
2861                   string lfta_title;
2862                   string lfta_namespace;
2863                   map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
2864                   if(ldefs.count("comment")>0)
2865                         lfta_comment = lfta_sq->defines["comment"];
2866                   if(ldefs.count("Comment")>0)
2867                         lfta_comment = lfta_sq->defines["Comment"];
2868                   if(ldefs.count("COMMENT")>0)
2869                         lfta_comment = lfta_sq->defines["COMMENT"];
2870                   if(ldefs.count("title")>0)
2871                         lfta_title = lfta_sq->defines["title"];
2872                   if(ldefs.count("Title")>0)
2873                         lfta_title = lfta_sq->defines["Title"];
2874                   if(ldefs.count("TITLE")>0)
2875                         lfta_title = lfta_sq->defines["TITLE"];
2876                   if(ldefs.count("NAMESPACE")>0)
2877                         lfta_namespace = lfta_sq->defines["NAMESPACE"];
2878                   if(ldefs.count("Namespace")>0)
2879                         lfta_namespace = lfta_sq->defines["Namespace"];
2880                   if(ldefs.count("namespace")>0)
2881                         lfta_namespace = lfta_sq->defines["namespace"];
2882
2883                   string lfta_ht_size;
2884                   if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
2885                         lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
2886                   if(ldefs.count("aggregate_slots")>0){
2887                         lfta_ht_size = ldefs["aggregate_slots"];
2888                   }
2889
2890 //                      NOTE : I'm assuming that visible lftas do not start with _fta.
2891 //                              -- will fail for non-visible simple selection queries.
2892                 if(field_verifier != NULL && registration_query_names[mi].substr(0,1) != "_"){
2893                         string warning_str;
2894                         if(lfta_comment == "")
2895                                 warning_str += "\tcomment not found.\n";
2896 // Obsolete stuff that carsten wanted
2897 //                      if(lfta_title == "")
2898 //                              warning_str += "\ttitle not found.\n";
2899 //                      if(lfta_namespace == "")
2900 //                              warning_str += "\tnamespace not found.\n";
2901
2902                         vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2903                         int fi;
2904                         for(fi=0;fi<flds.size();fi++){
2905                                 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2906                         }
2907                         if(warning_str != "")
2908                                 fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
2909                                         registration_query_names[mi].c_str(),warning_str.c_str());
2910                 }
2911
2912
2913 //                      Create qtree output
2914                 fprintf(qtree_output,"\t<LFTA name='%s' >\n",registration_query_names[mi].c_str());
2915         if(lfta_comment != "")
2916               fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
2917         if(lfta_title != "")
2918               fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
2919         if(lfta_namespace != "")
2920               fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
2921         if(lfta_ht_size != "")
2922               fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
2923                 if(lmach != "")
2924                   fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
2925                 else
2926                   fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
2927                 fprintf(qtree_output,"\t\t<Interface  value='%s' />\n",interface_names[mi].c_str());
2928                 std::vector<tablevar_t *> itbls = lfta_sq->get_input_tables();
2929                 for(int t=0;t<itbls.size();++t){
2930                         fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",itbls[t]->get_schema_name().c_str());
2931                 }
2932 //              fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
2933                 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2934                 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
2935 //                              write info about fields to qtree.xml
2936                   vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2937                   int fi;
2938                   for(fi=0;fi<flds.size();fi++){
2939                         fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2940                         if(flds[fi]->get_modifier_list()->size()){
2941                                 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2942                         }
2943                         fprintf(qtree_output," />\n");
2944                   }
2945                 fprintf(qtree_output,"\t</LFTA>\n");
2946
2947
2948             }
2949           }
2950
2951           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2952                         string liface = (*mvsi).first;
2953                         lfta_val[lmach] += 
2954 "       if (!strcmp(device, \""+liface+"\")) \n"
2955 "               lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
2956 ;
2957                 }
2958                 lfta_val[lmach] += 
2959 "       if(lfta_prefilter == NULL){\n"
2960 "               fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
2961 "               exit(1);\n"
2962 "       }\n"
2963 ;
2964
2965
2966
2967           lfta_val[lmach] += "}\n\n";
2968
2969       if(!(debug_only || hfta_only) ){
2970                 string lfta_flnm;
2971                 if(lmach != "")
2972                   lfta_flnm = lmach + "_lfta.c";
2973                 else
2974                   lfta_flnm = hostname + "_lfta.c";
2975                 if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
2976                         fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
2977                         exit(1);
2978                 }
2979               fprintf(lfta_out,"%s",lfta_header.c_str());
2980               fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
2981               fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
2982                 fclose(lfta_out);
2983           }
2984         }
2985
2986 //              Say what are the operators which must execute
2987         if(opviews.size()>0)
2988                 fprintf(stderr,"The queries use the following external operators:\n");
2989         for(i=0;i<opviews.size();++i){
2990                 opview_entry *opv = opviews.get_entry(i);
2991                 fprintf(stderr,"\t%s\n",opv->view_name.c_str());
2992         }
2993
2994         if(create_makefile)
2995                 generate_makefile(input_file_names, nfiles, hfta_names, opviews,
2996                 machine_names, schema_file_name,
2997                 interface_names,
2998                 ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
2999
3000
3001         fprintf(qtree_output,"</QueryNodes>\n");
3002
3003         return(0);
3004 }
3005
3006 ////////////////////////////////////////////////////////////
3007
3008 void generate_makefile(vector<string> &input_file_names, int nfiles,
3009                                            vector<string> &hfta_names, opview_set &opviews,
3010                                                 vector<string> &machine_names,
3011                                                 string schema_file_name,
3012                                                 vector<string> &interface_names,
3013                                                 ifq_t *ifdb, string &config_dir_path,
3014                                                 bool use_pads,
3015                                                 string extra_libs,
3016                                                 map<string, vector<int> > &rts_hload
3017                                          ){
3018         int i,j;
3019
3020         if(config_dir_path != ""){
3021                 config_dir_path = "-C "+config_dir_path;
3022         }
3023
3024         struct stat sb;
3025         bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
3026         bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
3027
3028 //      if(libz_exists && !libast_exists){
3029 //              fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
3030 //              exit(1);
3031 //      }
3032
3033 //                      Get set of operator executable files to run
3034         set<string> op_fls;
3035         set<string>::iterator ssi;
3036         for(i=0;i<opviews.size();++i){
3037                 opview_entry *opv = opviews.get_entry(i);
3038                 if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
3039         }
3040
3041         FILE *outfl = fopen("Makefile", "w");
3042         if(outfl==NULL){
3043                 fprintf(stderr,"Can't open Makefile for write, exiting.\n");
3044                 exit(0);
3045         }
3046
3047         fputs(
3048 ("CPP= g++ -O3 -g -I "+root_path+"/include  -I "+root_path+"/include/hfta\n"
3049 "CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
3050 ).c_str(), outfl
3051 );
3052         if(generate_stats)
3053                 fprintf(outfl,"  -DLFTA_STATS");
3054
3055 //              Gather the set of interfaces
3056 //              Also, gather "base interface names" for use in computing
3057 //              the hash splitting to virtual interfaces.
3058 //              TODO : must update to hanndle machines
3059         set<string> ifaces;
3060         set<string> base_vifaces;       // base interfaces of virtual interfaces
3061         map<string, string> ifmachines;
3062         map<string, string> ifattrs;
3063         for(i=0;i<interface_names.size();++i){
3064                 ifaces.insert(interface_names[i]);
3065                 ifmachines[interface_names[i]] = machine_names[i];
3066
3067                 size_t Xpos = interface_names[i].find_last_of("X");
3068                 if(Xpos!=string::npos){
3069                         string iface = interface_names[i].substr(0,Xpos);
3070                         base_vifaces.insert(iface);
3071                 }
3072                 // get interface attributes and add them to the list
3073         }
3074
3075 //              Do we need to include protobuf libraries?
3076 //              TODO Move to the interface library: get the libraries to include 
3077 //              for an interface type
3078
3079         bool use_proto = false;
3080         bool use_bsa = false;
3081         bool use_kafka = false;
3082         int erri;
3083         string err_str;
3084         for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3085                 string ifnm = (*ssi);
3086                 vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
3087                 for(int ift_i=0;ift_i<ift.size();ift_i++){
3088                         if(ift[ift_i]=="PROTO"){
3089 #ifdef PROTO_ENABLED                            
3090                                 use_proto = true;
3091 #else
3092                                 fprintf(stderr,"Runtime libraries built without PROTO support. Rebuild with PROTO_ENABLED defined in gsoptions.h\n");
3093                                 exit(0);
3094 #endif
3095                         }
3096                 }
3097                 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "BSA", erri, err_str);
3098                 for(int ift_i=0;ift_i<ift.size();ift_i++){
3099                         if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){       
3100 #ifdef BSA_ENABLED                              
3101                                 use_bsa = true;
3102 #else
3103                                 fprintf(stderr,"Runtime libraries built without BSA support. Rebuild with BSA_ENABLED defined in gsoptions.h\n");
3104                                 exit(0);
3105 #endif                                  
3106                         }
3107                 }
3108                 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "KAFKA", erri, err_str);
3109                 for(int ift_i=0;ift_i<ift.size();ift_i++){
3110                         if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){       
3111 #ifdef KAFKA_ENABLED                            
3112                                 use_kafka = true;
3113 #else
3114                                 fprintf(stderr,"Runtime libraries built without KAFKA support. Rebuild with KAFKA_ENABLED defined in gsoptions.h\n");
3115                                 exit(0);
3116 #endif  
3117                         }
3118                 }
3119         }
3120
3121         fprintf(outfl,
3122 "\n"
3123 "\n"
3124 "all: rts");
3125         for(i=0;i<hfta_names.size();++i)
3126                 fprintf(outfl," %s",hfta_names[i].c_str());
3127         fputs(
3128 ("\n"
3129 "\n"
3130 "rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a  "+root_path+"/lib/libclearinghouse.a\n"
3131 "\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
3132         if(use_pads)
3133                 fprintf(outfl,"-L. ");
3134         fputs(
3135 ("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
3136         if(use_pads)
3137                 fprintf(outfl,"-lgscppads -lpads ");
3138         fprintf(outfl,
3139 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux  -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
3140         if(use_pads)
3141                 fprintf(outfl, " -lpz -lz -lbz ");
3142         if(libz_exists && libast_exists)
3143                 fprintf(outfl," -last ");
3144         if(use_pads)
3145                 fprintf(outfl, " -ldll -ldl ");
3146
3147 #ifdef PROTO_ENABLED    
3148         fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
3149 #endif
3150 #ifdef BSA_ENABLED      
3151         fprintf(outfl, " -lbsa_stream ");
3152 #endif
3153 #ifdef KAFKA_ENABLED    
3154         fprintf(outfl, " -lrdkafka ");
3155 #endif
3156         fprintf(outfl," -lgscpaux");
3157 #ifdef GCOV
3158         fprintf(outfl," -fprofile-arcs");
3159 #endif
3160         fprintf(outfl,
3161 "\n"
3162 "\n"
3163 "lfta.o: %s_lfta.c\n"
3164 "\t$(CC) -o lfta.o -c %s_lfta.c\n"
3165 "\n"
3166 "%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
3167         for(i=0;i<nfiles;++i)
3168                 fprintf(outfl," %s",input_file_names[i].c_str());
3169         if(hostname == ""){
3170                 fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3171         }else{
3172                 fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3173         }
3174         for(i=0;i<nfiles;++i)
3175                 fprintf(outfl," %s",input_file_names[i].c_str());
3176         fprintf(outfl,"\n");
3177
3178         for(i=0;i<hfta_names.size();++i)
3179                 fprintf(outfl,
3180 ("%s: %s.o\n"
3181 "\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
3182 "\n"
3183 "%s.o: %s.cc\n"
3184 "\t$(CPP) -o %s.o -c %s.cc\n"
3185 "\n"
3186 "\n").c_str(),
3187     hfta_names[i].c_str(), hfta_names[i].c_str(),
3188         hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
3189         hfta_names[i].c_str(), hfta_names[i].c_str(),
3190         hfta_names[i].c_str(), hfta_names[i].c_str()
3191                 );
3192
3193         fprintf(outfl,
3194 ("\n"
3195 "packet_schema.txt:\n"
3196 "\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
3197 "\n"
3198 "external_fcns.def:\n"
3199 "\tln -s "+root_path+"/cfg/external_fcns.def .\n"
3200 "\n"
3201 "clean:\n"
3202 "\trm -rf core rts *.o  %s_lfta.c  external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
3203         for(i=0;i<hfta_names.size();++i)
3204                 fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
3205         fprintf(outfl,"\n");
3206
3207         fclose(outfl);
3208
3209
3210
3211 //              Gather the set of interfaces
3212 //              TODO : must update to hanndle machines
3213 //              TODO : lookup interface attributes and add them as a parameter to rts process
3214         outfl = fopen("runit", "w");
3215         if(outfl==NULL){
3216                 fprintf(stderr,"Can't open runit for write, exiting.\n");
3217                 exit(0);
3218         }
3219
3220
3221         fputs(
3222 ("#!/bin/sh\n"
3223 "./stopit\n"
3224 +root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
3225 "sleep 5\n"
3226 "if [ ! -f gshub.log ]\n"
3227 "then\n"
3228 "\techo \"Failed to start bin/gshub.py\"\n"
3229 "\texit -1\n"
3230 "fi\n"
3231 "ADDR=`cat gshub.log`\n"
3232 "ps opgid= $! >> gs.pids\n"
3233 "./rts $ADDR default ").c_str(), outfl);
3234 //      int erri;
3235 //      string err_str;
3236         for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3237                 string ifnm = (*ssi);
3238                 // suppress internal _local_ interface
3239                 if (ifnm == "_local_")
3240                         continue;
3241                 fprintf(outfl, "%s ",ifnm.c_str());
3242                 vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
3243                 for(j=0;j<ifv.size();++j)
3244                         fprintf(outfl, "%s ",ifv[j].c_str());
3245         }
3246         fprintf(outfl, " &\n");
3247         fprintf(outfl, "echo $! >> gs.pids\n");
3248         for(i=0;i<hfta_names.size();++i)
3249                 fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
3250
3251         for(j=0;j<opviews.opview_list.size();++j){
3252                 fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
3253         }
3254
3255         fclose(outfl);
3256         system("chmod +x runit");
3257
3258         outfl = fopen("stopit", "w");
3259         if(outfl==NULL){
3260                 fprintf(stderr,"Can't open stopit for write, exiting.\n");
3261                 exit(0);
3262         }
3263
3264         fprintf(outfl,"#!/bin/sh\n"
3265 "rm -f gshub.log\n"
3266 "if [ ! -f gs.pids ]\n"
3267 "then\n"
3268 "exit\n"
3269 "fi\n"
3270 "for pgid in `cat gs.pids`\n"
3271 "do\n"
3272 "kill -TERM -$pgid\n"
3273 "done\n"
3274 "sleep 1\n"
3275 "for pgid in `cat gs.pids`\n"
3276 "do\n"
3277 "kill -9 -$pgid\n"
3278 "done\n"
3279 "rm gs.pids\n");
3280
3281         fclose(outfl);
3282         system("chmod +x stopit");
3283
3284 //-----------------------------------------------
3285
3286 /* For now disable support for virtual interfaces
3287         outfl = fopen("set_vinterface_hash.bat", "w");
3288         if(outfl==NULL){
3289                 fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
3290                 exit(0);
3291         }
3292
3293 //              The format should be determined by an entry in the ifres.xml file,
3294 //              but for now hardcode the only example I have.
3295         for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
3296                 if(rts_hload.count((*ssi))){
3297                         string iface_name = (*ssi);
3298                         string iface_number = "";
3299                         for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
3300                                 if(isdigit(iface_name[j])){
3301                                         iface_number = iface_name[j];
3302                                         if(j>0 && isdigit(iface_name[j-1]))
3303                                                 iface_number = iface_name[j-1] + iface_number;
3304                                 }
3305                         }
3306
3307                         fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
3308                         vector<int> halloc = rts_hload[iface_name];
3309                         int prev_limit = 0;
3310                         for(j=0;j<halloc.size();++j){
3311                                 if(j>0)
3312                                         fprintf(outfl,":");
3313                                 fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
3314                                 prev_limit = halloc[j];
3315                         }
3316                         fprintf(outfl,"\n");
3317                 }
3318         }
3319         fclose(outfl);
3320         system("chmod +x set_vinterface_hash.bat");
3321 */
3322 }
3323
3324 //              Code for implementing a local schema
3325 /*
3326                 table_list qpSchema;
3327
3328 //                              Load the schemas of any LFTAs.
3329                 int l;
3330                 for(l=0;l<hfta_nbr;++l){
3331                         stream_query *sq0 = split_queries[l];
3332                         table_def *td = sq0->get_output_tabledef();
3333                         qpSchema.append_table(td);
3334                 }
3335 //                              load the schemas of any other ref'd tables.
3336 //                              (e.g., hftas)
3337                 vector<tablevar_t *>  input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
3338                 int ti;
3339                 for(ti=0;ti<input_tbl_names.size();++ti){
3340                         int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
3341                         if(tbl_ref < 0){
3342                                 tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
3343                                 if(tbl_ref < 0){
3344                                         fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
3345                                         exit(1);
3346                                 }
3347                                 qpSchema.append_table(Schema->get_table(tbl_ref));
3348                         }
3349                 }
3350 */
3351
3352 //              Functions related to parsing.
3353
3354 /*
3355 static int split_string(char *instr,char sep, char **words,int max_words){
3356    char *loc;
3357    char *str;
3358    int nwords = 0;
3359
3360    str = instr;
3361    words[nwords++] = str;
3362    while( (loc = strchr(str,sep)) != NULL){
3363         *loc = '\0';
3364         str = loc+1;
3365         if(nwords >= max_words){
3366                 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
3367                 nwords = max_words-1;
3368         }
3369         words[nwords++] = str;
3370    }
3371
3372    return(nwords);
3373 }
3374
3375 */
3376