Update running groupby operator
[com/gs-lite.git] / src / ftacmp / translate_fta.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #include<unistd.h>              // for gethostname
17
18 #include <string>
19 #include "parse_fta.h"
20 #include "parse_schema.h"
21 #include "parse_ext_fcns.h"
22 #include"analyze_fta.h"
23 #include"query_plan.h"
24 #include"generate_lfta_code.h"
25 #include"stream_query.h"
26 #include"generate_utils.h"
27 #include"nic_def.h"
28 #include"generate_nic_code.h"
29
30 #include <stdlib.h>
31 #include <stdio.h>
32 #include<ctype.h>
33 #include<glob.h>
34 #include<string.h>
35
36 #include<list>
37
38 //              for the scandir
39      #include <sys/types.h>
40      #include <dirent.h>
41
42
43 #include<errno.h>
44
45 //              to verify that some files exist.
46      #include <sys/types.h>
47      #include <sys/stat.h>
48
49 #include "parse_partn.h"
50
51 #include "print_plan.h"
52
53 //              Interface to the xml parser
54
55 #include"xml_t.h"
56 #include"field_list.h"
57
58 extern int xmlParserparse(void);
59 extern FILE *xmlParserin;
60 extern int xmlParserdebug;
61
62 std::vector<std::string> xml_attr_vec;
63 std::vector<std::string> xml_val_vec;
64 std::string xml_a, xml_v;
65 xml_t *xml_leaves = NULL;
66
67 //      Interface to the field list verifier
68 field_list *field_verifier = NULL;
69
70 #define TMPSTRLEN 1000
71
72 #ifndef PATH_DELIM
73   #define PATH_DELIM '/'
74 #endif
75
76 char tmp_schema_str[10000];
77
78 // maximum delay between two hearbeats produced
79 // by UDOP. Used when its not explicity
80 // provided in udop definition
81 #define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
82
83 //              Default lfta hash table size, must be power of 2.
84 int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
85
86 //              Interface to FTA definition lexer and parser ...
87
88 extern int FtaParserparse(void);
89 extern FILE *FtaParserin;
90 extern int FtaParserdebug;
91
92 fta_parse_t *fta_parse_result;
93 var_defs_t *fta_parse_defines;
94
95
96
97 //              Interface to external function lexer and parser ...
98
99 extern int Ext_fcnsParserparse(void);
100 extern FILE *Ext_fcnsParserin;
101 extern int Ext_fcnsParserdebug;
102
103 ext_fcn_list *Ext_fcns;
104
105
106 //              Interface to partition definition parser
107 extern int PartnParserparse();
108 partn_def_list_t *partn_parse_result = NULL;
109
110
111
112 using namespace std;
113 //extern int errno;
114
115
116 //              forward delcaration of local utility function
117 void generate_makefile(vector<string> &input_file_names, int nfiles,
118                                            vector<string> &hfta_names, opview_set &opviews,
119                                                 vector<string> &machine_names,
120                                                 string schema_file_name,
121                                                 vector<string> &interface_names,
122                                                 ifq_t *ifdb, string &config_dir_path,
123                                                 bool use_pads,
124                                                 string extra_libs,
125                                                 map<string, vector<int> > &rts_hload
126                                         );
127
128 //static int split_string(char *instr,char sep, char **words,int max_words);
129 #define MAXFLDS 100
130
131   FILE *schema_summary_output = NULL;           // query names
132
133 //                      Dump schema summary
134 void dump_summary(stream_query *str){
135         fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
136
137         table_def *sch = str->get_output_tabledef();
138
139         vector<field_entry *> flds = sch->get_fields();
140         int f;
141         for(f=0;f<flds.size();++f){
142                 if(f>0) fprintf(schema_summary_output,"|");
143                 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
144         }
145         fprintf(schema_summary_output,"\n");
146         for(f=0;f<flds.size();++f){
147                 if(f>0) fprintf(schema_summary_output,"|");
148                 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
149         }
150         fprintf(schema_summary_output,"\n");
151 }
152
153 //              Globals
154 string hostname;                // name of current host.
155 int hostname_len;
156 bool generate_stats = false;
157 string root_path = "../..";
158
159
160 int main(int argc, char **argv){
161   char tmpstr[TMPSTRLEN];
162   string err_str;
163   int q,s,h,f;
164
165   set<int>::iterator si;
166
167   vector<string> registration_query_names;                      // for lfta.c registration
168   map<string, vector<int> > mach_query_names;   // list queries of machine
169   vector<int> snap_lengths;                             // for lfta.c registration
170   vector<string> interface_names;                       // for lfta.c registration
171   vector<string> machine_names;                 // machine of interface
172   vector<bool> lfta_reuse_options;                      // for lfta.c registration
173   vector<int> lfta_liveness_timeouts;           // fot qtree.xml generation
174   vector<string> hfta_names;                    // hfta cource code names, for
175                                                                                 // creating make file.
176   vector<string> qnames;                                // ensure unique names
177   map<string, int> lfta_names;                  // keep track of unique lftas.
178
179
180 //                              set these to 1 to debug the parser
181   FtaParserdebug = 0;
182   Ext_fcnsParserdebug = 0;
183
184   FILE *lfta_out;                               // lfta.c output.
185   FILE *fta_in;                                 // input file
186   FILE *table_schemas_in;               // source tables definition file
187   FILE *query_name_output;              // query names
188   FILE *qtree_output;                   // interconnections of query nodes
189
190   // -------------------------------
191   // Handling of Input Arguments
192   // -------------------------------
193     char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
194         const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
195                 "\t[-B] : debug only (don't create output files)\n"
196                 "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
197                 "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
198                 "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
199                 "\t[-C] : use <config directory> for definition files\n"
200                 "\t[-l] : use <library directory> for library queries\n"
201                 "\t[-N] : output query names in query_names.txt\n"
202                 "\t[-H] : create HFTA only (no schema_file)\n"
203                 "\t[-Q] : use query name for hfta suffix\n"
204                 "\t[-M] : generate make file and runit, stopit scripts\n"
205                 "\t[-S] : enable LFTA statistics (alters Makefile).\n"
206                 "\t[-f] : Output schema summary to schema_summary.txt\n"
207                 "\t[-P] : link with PADS\n"
208                 "\t[-h] : override host name.\n"
209                 "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
210                 "\t[-R] : path to root of GS-lite\n"
211 ;
212
213 //              parameters gathered from command line processing
214         string external_fcns_path;
215 //      string internal_fcn_path;
216         string config_dir_path;
217         string library_path = "./";
218         vector<string> input_file_names;
219         string schema_file_name;
220         bool debug_only = false;
221         bool hfta_only = false;
222         bool output_query_names = false;
223         bool output_schema_summary=false;
224         bool numeric_hfta_flname = true;
225         bool create_makefile = false;
226         bool distributed_mode = false;
227         bool partitioned_mode = false;
228         bool use_live_hosts_file = false;
229         bool use_pads = false;
230         bool clean_make = false;
231         int n_virtual_interfaces = 1;
232
233    char chopt;
234    while((chopt = getopt(argc,argv,optstr)) != -1){
235                 switch(chopt){
236                 case 'B':
237                         debug_only = true;
238                         break;
239                 case 'D':
240                         distributed_mode = true;
241                         break;
242                 case 'p':
243                         partitioned_mode = true;
244                         break;
245                 case 'L':
246                         use_live_hosts_file = true;
247                         break;
248                 case 'C':
249                                 if(optarg != NULL)
250                                  config_dir_path = string(optarg) + string("/");
251                         break;
252                 case 'l':
253                                 if(optarg != NULL)
254                                  library_path = string(optarg) + string("/");
255                         break;
256                 case 'N':
257                         output_query_names = true;
258                         break;
259                 case 'Q':
260                         numeric_hfta_flname = false;
261                         break;
262                 case 'H':
263                         if(schema_file_name == ""){
264                                 hfta_only = true;
265                         }
266                         break;
267                 case 'f':
268                         output_schema_summary=true;
269                         break;
270                 case 'M':
271                         create_makefile=true;
272                         break;
273                 case 'S':
274                         generate_stats=true;
275                         break;
276                 case 'P':
277                         use_pads = true;
278                         break;
279                 case 'c':
280                         clean_make = true;
281                         break;
282                 case 'h':
283                         if(optarg != NULL)
284                                 hostname = optarg;
285                         break;
286                 case 'R':
287                         if(optarg != NULL)
288                                 root_path = optarg;
289                         break;
290                 case 'n':
291                         if(optarg != NULL){
292                                 n_virtual_interfaces = atoi(optarg);
293                                 if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
294                                         fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
295                                         n_virtual_interfaces = 1;
296                                 }
297                         }
298                         break;
299                 case '?':
300                         fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
301                         fprintf(stderr,"%s\n", usage_str);
302                         exit(1);
303                 default:
304                         fprintf(stderr, "Argument was %c\n", optopt);
305                         fprintf(stderr,"Invalid arguments\n");
306                         fprintf(stderr,"%s\n", usage_str);
307                         exit(1);
308                 }
309         }
310         argc -= optind;
311         argv += optind;
312         for (int i = 0; i < argc; ++i) {
313                 if((schema_file_name == "") && !hfta_only){
314                         schema_file_name = argv[i];
315                 }else{
316                         input_file_names.push_back(argv[i]);
317                 }
318         }
319
320         if(input_file_names.size() == 0){
321                 fprintf(stderr,"%s\n", usage_str);
322                 exit(1);
323         }
324
325         if(clean_make){
326                 string clean_cmd = "rm Makefile hfta_*.cc";
327                 int clean_ret = system(clean_cmd.c_str());
328                 if(clean_ret){
329                         fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
330                 }
331         }
332
333
334         nic_prop_db *npdb = new nic_prop_db(config_dir_path);
335
336 //                      Open globally used file names.
337
338         // prepend config directory to schema file
339         schema_file_name = config_dir_path + schema_file_name;
340         external_fcns_path = config_dir_path + string("external_fcns.def");
341     string ifx_fname = config_dir_path + string("ifres.xml");
342
343 //              Find interface query file(s).
344         if(hostname == ""){
345                 gethostname(tmpstr,TMPSTRLEN);
346                 hostname = tmpstr;
347         }
348         hostname_len = strlen(tmpstr);
349     string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
350         vector<string> ifq_fls;
351
352                 ifq_fls.push_back(ifq_fname);
353
354
355 //                      Get the field list, if it exists
356         string flist_fl = config_dir_path + "field_list.xml";
357         FILE *flf_in = NULL;
358         if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
359                 fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
360                 xml_leaves = new xml_t();
361                 xmlParser_setfileinput(flf_in);
362                 if(xmlParserparse()){
363                         fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
364                 }else{
365                         field_verifier = new field_list(xml_leaves);
366                 }
367         }
368
369         if(!hfta_only){
370           if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
371                 fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
372                 exit(1);
373           }
374         }
375
376 /*
377         if(!(debug_only || hfta_only)){
378           if((lfta_out = fopen("lfta.c","w")) == NULL){
379                 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
380                 exit(1);
381           }
382         }
383 */
384
385 //              Get the output specification file.
386 //              format is
387 //                      query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
388         string ospec_fl = "output_spec.cfg";
389         FILE *osp_in = NULL;
390         vector<ospec_str *> output_specs;
391         multimap<string, int> qname_to_ospec;
392         if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
393                 char *flds[MAXFLDS];
394                 int o_lineno = 0;
395                 while(fgets(tmpstr,TMPSTRLEN,osp_in)){
396                         o_lineno++;
397                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
398                         if(nflds == 7){
399 //              make operator type lowercase
400                                 char *tmpc;
401                                 for(tmpc=flds[1];*tmpc!='\0';++tmpc)
402                                         *tmpc = tolower(*tmpc);
403
404                                 ospec_str *tmp_ospec = new ospec_str();
405                                 tmp_ospec->query = flds[0];
406                                 tmp_ospec->operator_type = flds[1];
407                                 tmp_ospec->operator_param = flds[2];
408                                 tmp_ospec->output_directory = flds[3];
409                                 tmp_ospec->bucketwidth = atoi(flds[4]);
410                                 tmp_ospec->partitioning_flds = flds[5];
411                                 tmp_ospec->n_partitions = atoi(flds[6]);
412                                 qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
413                                 output_specs.push_back(tmp_ospec);
414                         }else{
415                                 fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
416                         }
417                 }
418                 fclose(osp_in);
419         }else{
420                 fprintf(stderr,"output_spec.cfg not found.  The query set has no output.  exiting.\n");
421                 exit(1);
422         }
423
424 //              hfta parallelism
425         string pspec_fl = "hfta_parallelism.cfg";
426         FILE *psp_in = NULL;
427         map<string, int> hfta_parallelism;
428         if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
429                 char *flds[MAXFLDS];
430                 int o_lineno = 0;
431                 while(fgets(tmpstr,TMPSTRLEN,psp_in)){
432                         bool good_entry = true;
433                         o_lineno++;
434                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
435                         if(nflds == 2){
436                                 string hname = flds[0];
437                                 int par = atoi(flds[1]);
438                                 if(par <= 0 || par > n_virtual_interfaces){
439                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
440                                         good_entry = false;
441                                 }
442                                 if(good_entry && n_virtual_interfaces % par != 0){
443                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
444                                         good_entry = false;
445                                 }
446                                 if(good_entry)
447                                         hfta_parallelism[hname] = par;
448                         }
449                 }
450         }else{
451                 fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
452         }
453
454
455 //              LFTA hash table sizes
456         string htspec_fl = "lfta_htsize.cfg";
457         FILE *htsp_in = NULL;
458         map<string, int> lfta_htsize;
459         if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
460                 char *flds[MAXFLDS];
461                 int o_lineno = 0;
462                 while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
463                         bool good_entry = true;
464                         o_lineno++;
465                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
466                         if(nflds == 2){
467                                 string lfta_name = flds[0];
468                                 int htsz = atoi(flds[1]);
469                                 if(htsz>0){
470                                         lfta_htsize[lfta_name] = htsz;
471                                 }else{
472                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
473                                 }
474                         }
475                 }
476         }else{
477                 fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
478         }
479
480 //              LFTA vitual interface hash split
481         string rtlspec_fl = "rts_load.cfg";
482         FILE *rtl_in = NULL;
483         map<string, vector<int> > rts_hload;
484         if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
485                 char *flds[MAXFLDS];
486                 int r_lineno = 0;
487                 string iface_name;
488                 vector<int> hload;
489                 while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
490                         bool good_entry = true;
491                         r_lineno++;
492                         iface_name = "";
493                         hload.clear();
494                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
495                         if(nflds >1){
496                                 iface_name = flds[0];
497                                 int cumm_h = 0;
498                                 int j;
499                                 for(j=1;j<nflds;++j){
500                                         int h = atoi(flds[j]);
501                                         if(h<=0)
502                                                 good_entry = false;
503                                         cumm_h += h;
504                                         hload.push_back(cumm_h);
505                                 }
506                         }else{
507                                 good_entry = false;
508                         }
509                         if(good_entry){
510                                 rts_hload[iface_name] = hload;
511                         }else{
512                                 fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
513                         }
514                 }
515         }
516
517
518
519         if(output_query_names){
520           if((query_name_output = fopen("query_names.txt","w")) == NULL){
521                 fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
522                 exit(1);
523           }
524         }
525
526         if(output_schema_summary){
527           if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
528                 fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
529                 exit(1);
530           }
531         }
532
533         if((qtree_output = fopen("qtree.xml","w")) == NULL){
534                 fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
535                 exit(1);
536         }
537         fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
538         fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
539         fprintf(qtree_output,"<QueryNodes>\n");
540
541
542 //                      Get an initial Schema
543         table_list *Schema;
544         if(!hfta_only){
545 //                      Parse the table schema definitions.
546           fta_parse_result = new fta_parse_t();
547           FtaParser_setfileinput(table_schemas_in);
548           if(FtaParserparse()){
549                 fprintf(stderr,"Table schema parse failed.\n");
550                 exit(1);
551           }
552           if(fta_parse_result->parse_type != TABLE_PARSE){
553                 fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
554                 exit(1);
555           }
556           Schema = fta_parse_result->tables;
557
558 //                      Ensure that all schema_ids, if set, are distinct.
559 //  Obsolete? There is code elsewhere to ensure that schema IDs are
560 //  distinct on a per-interface basis.
561 /*
562           set<int> found_ids;
563           set<int> dup_ids;
564           for(int t=0;t<Schema->size();++t){
565                 int sch_id = Schema->get_table(t)->get_schema_id();
566                 if(sch_id> 0){
567                         if(found_ids.find(sch_id) != found_ids.end()){
568                                 dup_ids.insert(sch_id);
569                         }else{
570                                 found_ids.insert(sch_id);
571                         }
572                 }
573                 if(dup_ids.size()>0){
574                         fprintf(stderr, "Error, the schema has duplicate schema_ids:");
575                         for(auto dit=dup_ids.begin();dit!=dup_ids.end(); ++dit)
576                                 fprintf(stderr," %d",(*dit));
577                         fprintf(stderr,"\n");
578                         exit(1);
579                 }
580           }
581 */
582
583
584 //                      Process schema field inheritance
585           int retval;
586           retval = Schema->unroll_tables(err_str);
587           if(retval){
588                 fprintf(stderr,"Error processing schema field inheritance:\n %s\n", err_str.c_str() );
589                 exit(1);
590           }
591         }else{
592 //                      hfta only => we will try to fetch schemas from the registry.
593 //                      therefore, start off with an empty schema.
594           Schema = new table_list();
595         }
596
597
598 //                      Open and parse the external functions file.
599         Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
600         if(Ext_fcnsParserin == NULL){
601                 fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
602                 Ext_fcns = new ext_fcn_list();
603         }else{
604                 if(Ext_fcnsParserparse()){
605                         fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
606                         Ext_fcns = new ext_fcn_list();
607                 }
608         }
609         if(Ext_fcns->validate_fcns(err_str)){
610                 fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
611                 exit(1);
612         }
613
614 //              Open and parse the interface resources file.
615 //      ifq_t *ifaces_db = new ifq_t();
616 //   string ierr;
617 //      if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
618 //              fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
619 //                              ifx_fname.c_str(), ierr.c_str());
620 //              exit(1);
621 //      }
622 //      if(ifaces_db->load_ifqs(ifq_fname, ierr)){
623 //              fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
624 //                              ifq_fname.c_str(), ierr.c_str());
625 //              exit(1);
626 //      }
627
628
629 //                      The LFTA code string.
630 //                      Put the standard preamble here.
631 //                      NOTE: the hash macros, fcns should go into the run time
632   map<string, string> lfta_val;
633   map<string, string> lfta_prefilter_val;
634
635   string lfta_header =
636 "#include <limits.h>\n"
637 "#include \"rts.h\"\n"
638 "#include \"fta.h\"\n"
639 "#include \"lapp.h\"\n"
640 "#include \"rts_udaf.h\"\n"
641 "#include<stdio.h>\n"
642 "#include <float.h>\n"
643 "#include \"rdtsc.h\"\n"
644 "#include \"watchlist.h\"\n\n"
645
646 ;
647 // Get any locally defined parsing headers
648     glob_t glob_result;
649     memset(&glob_result, 0, sizeof(glob_result));
650
651     // do the glob operation TODO should be from GSROOT
652     int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
653         if(return_value == 0){
654                 lfta_header += "\n";
655         for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
656                         char *flds[1000];
657                         int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
658                         lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n";
659         }
660                 lfta_header += "\n";
661         }else{
662                 fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
663         }
664
665 /*
666 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
667 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
668 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
669 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
670 */
671
672         lfta_header += 
673 "\n"
674 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
675 "\n"
676 "#define SLOT_FILLED 0x04\n"
677 "#define SLOT_GEN_BITS 0x03\n"
678 "#define SLOT_HASH_BITS 0xfffffff8\n"
679 "#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
680 "#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
681 "#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
682 "\n\n"
683
684 "#define lfta_BOOL_to_hash(x) (x)\n"
685 "#define lfta_USHORT_to_hash(x) (x)\n"
686 "#define lfta_UINT_to_hash(x) (x)\n"
687 "#define lfta_IP_to_hash(x) (x)\n"
688 "#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
689 "#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
690 "#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
691 "#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
692 "#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
693 "static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
694 "       gs_uint32_t i,ret=0,tmp_sum = 0;\n"
695 "       for(i=0;i<x.length;++i){\n"
696 "               tmp_sum |= (x.data[i]) << (8*(i%4));\n"
697 "               if((i%4) == 3){\n"
698 "                       ret ^= tmp_sum;\n"
699 "                       tmp_sum = 0;\n"
700 "               }\n"
701 "       }\n"
702 "       if((i%4)!=0) ret ^=tmp_sum;\n"
703 "       return(ret);\n"
704 "}\n\n\n";
705
706
707
708 //////////////////////////////////////////////////////////////////
709 /////                   Get all of the query parse trees
710
711
712   int i,p;
713   int hfta_count = 0;           // for numeric suffixes to hfta .cc files
714
715 //---------------------------
716 //              Global info needed for post processing.
717
718 //                      Set of operator views ref'd in the query set.
719         opview_set opviews;
720 //                      lftas on a per-machine basis.
721         map<string, vector<stream_query *> > lfta_mach_lists;
722         int nfiles = input_file_names.size();
723         vector<stream_query *> hfta_list;               // list of hftas.
724         map<string, stream_query *> sq_map;             // map from query name to stream query.
725
726
727 //////////////////////////////////////////
728
729 //              Open and parse the interface resources file.
730         ifq_t *ifaces_db = new ifq_t();
731     string ierr;
732         if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
733                 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
734                                 ifx_fname.c_str(), ierr.c_str());
735                 exit(1);
736         }
737         if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
738                 fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
739                                 ifq_fls[0].c_str(), ierr.c_str());
740                 exit(1);
741         }
742
743   map<string, string> qname_to_flname;  // for detecting duplicate query names
744
745
746
747 //                      Parse the files to create a vector of parse trees.
748 //                      Load qnodes with information to perform a topo sort
749 //                      based on query dependencies.
750   vector<query_node *> qnodes;                          // for topo sort.
751   map<string,int> name_node_map;                        // map query name to qnodes entry
752   for(i=0;i<input_file_names.size();i++){
753
754           if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
755                   fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
756                   continue;
757           }
758 fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
759
760 //                      Parse the FTA query
761           fta_parse_result = new fta_parse_t();
762           FtaParser_setfileinput(fta_in);
763           if(FtaParserparse()){
764                 fprintf(stderr,"FTA parse failed.\n");
765                 exit(1);
766           }
767           if(fta_parse_result->parse_type != QUERY_PARSE){
768                 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
769                 exit(1);
770           }
771
772 //                      returns a list of parse trees
773           vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
774           for(p=0;p<qlist.size();++p){
775             table_exp_t *fta_parse_tree = qlist[p];
776 //              query_parse_trees.push_back(fta_parse_tree);
777
778 //                      compute the default name -- extract from query name
779                 strcpy(tmpstr,input_file_names[i].c_str());
780                 char *qname = strrchr(tmpstr,PATH_DELIM);
781                 if(qname == NULL)
782                         qname = tmpstr;
783                 else
784                         qname++;
785                 char *qname_end = strchr(qname,'.');
786                 if(qname_end != NULL) *qname_end = '\0';
787                 string qname_str = qname;
788                 string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
789
790 //                      Deternmine visibility.  Should I be attaching all of the output methods?
791                 if(qname_to_ospec.count(imputed_qname)>0)
792                         fta_parse_tree->set_visible(true);
793                 else
794                         fta_parse_tree->set_visible(false);
795
796
797 //                              Create a manipulable repesentation of the parse tree.
798 //                              the qnode inherits the visibility assigned to the parse tree.
799             int pos = qnodes.size();
800                 qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
801                 name_node_map[ qnodes[pos]->name ] = pos;
802 //printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
803 //              qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
804 //              qfiles.push_back(i);
805
806 //                      Check for duplicate query names
807 //                                      NOTE : in hfta-only generation, I should
808 //                                      also check with the names of the registered queries.
809                 if(qname_to_flname.count(qnodes[pos]->name) > 0){
810                         fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
811                                 qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
812                         exit(1);
813                 }
814                 if(Schema->find_tbl(qnodes[pos]->name) >= 0){
815                         fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
816                                 qnodes[pos]->name.c_str(), input_file_names[i].c_str());
817                         exit(1);
818                 }
819                 qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
820
821
822         }
823   }
824
825 //              Add the library queries
826
827   int pos;
828   for(pos=0;pos<qnodes.size();++pos){
829         int fi;
830         for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
831                 string src_tbl = qnodes[pos]->refd_tbls[fi];
832                 if(qname_to_flname.count(src_tbl) == 0){
833                         int last_sep = src_tbl.find_last_of('/');
834                         if(last_sep != string::npos){
835 fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
836                                 string target_qname = src_tbl.substr(last_sep+1);
837                                 string qpathname = library_path + src_tbl + ".gsql";
838                                 if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
839                                         fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
840                                         exit(1);
841                                         fprintf(stderr,"After exit\n");
842                                 }
843 fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
844 //                      Parse the FTA query
845                                 fta_parse_result = new fta_parse_t();
846                                 FtaParser_setfileinput(fta_in);
847                                 if(FtaParserparse()){
848                                         fprintf(stderr,"FTA parse failed.\n");
849                                         exit(1);
850                                 }
851                                 if(fta_parse_result->parse_type != QUERY_PARSE){
852                                         fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
853                                         exit(1);
854                                 }
855
856                                 map<string, int> local_query_map;
857                                 vector<string> local_query_names;
858                                 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
859                                 for(p=0;p<qlist.size();++p){
860                                 table_exp_t *fta_parse_tree = qlist[p];
861                                         fta_parse_tree->set_visible(false);             // assumed to not produce output
862                                         string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
863                                         if(imputed_qname == target_qname)
864                                                 imputed_qname = src_tbl;
865                                         if(local_query_map.count(imputed_qname)>0){
866                                                 fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
867                                                 exit(1);
868                                         }
869                                         local_query_map[ imputed_qname ] = p;
870                                         local_query_names.push_back(imputed_qname);
871                                 }
872
873                                 if(local_query_map.count(src_tbl)==0){
874                                         fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
875                                         exit(1);
876                                 }
877
878                                 vector<int> worklist;
879                                 set<int> added_queries;
880                                 vector<query_node *> new_qnodes;
881                                 worklist.push_back(local_query_map[target_qname]);
882                                 added_queries.insert(local_query_map[target_qname]);
883                                 int qq;
884                                 int qpos = qnodes.size();
885                                 for(qq=0;qq<worklist.size();++qq){
886                                         int q_id = worklist[qq];
887                                         query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
888                                         new_qnodes.push_back( new_qnode);
889                                         vector<string> refd_tbls =  new_qnode->refd_tbls;
890                                         int ff;
891                                         for(ff = 0;ff<refd_tbls.size();++ff){
892                                                 if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
893
894                                                         if(name_node_map.count(refd_tbls[ff])>0){
895                                                                 fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s,  and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
896                                                                 exit(1);
897                                                         }else{
898                                                                 worklist.push_back(local_query_map[refd_tbls[ff]]);
899                                                         }
900                                                 }
901                                         }
902                                 }
903
904                                 for(qq=0;qq<new_qnodes.size();++qq){
905                                         int qpos = qnodes.size();
906                                         qnodes.push_back(new_qnodes[qq]);
907                                         name_node_map[qnodes[qpos]->name ] = qpos;
908                                         qname_to_flname[qnodes[qpos]->name ] = qpathname;
909                                 }
910                         }
911                 }
912         }
913   }
914
915
916
917
918
919
920
921
922 //---------------------------------------
923
924
925 //              Add the UDOPS.
926
927   string udop_missing_sources;
928   for(i=0;i<qnodes.size();++i){
929         int fi;
930         for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
931                 int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
932                 if(sid >= 0){
933                         if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
934                                 if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
935                                 int pos = qnodes.size();
936                                         qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
937                                         name_node_map[ qnodes[pos]->name ] = pos;
938                                         qnodes[pos]->is_externally_visible = false;   // its visible
939         //                                      Need to mark the source queries as visible.
940                                         int si;
941                                         string missing_sources = "";
942                                         for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
943                                                 string src_tbl = qnodes[pos]->refd_tbls[si];
944                                                 if(name_node_map.count(src_tbl)==0){
945                                                         missing_sources += src_tbl + " ";
946                                                 }
947                                         }
948                                         if(missing_sources != ""){
949                                                 udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
950                                         }
951                                 }
952                         }
953                 }
954         }
955   }
956   if(udop_missing_sources != ""){
957         fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
958         exit(1);
959   }
960
961
962
963 ////////////////////////////////////////////////////////////////////
964 ///                             Check parse trees to verify that some
965 ///                             global properties are met :
966 ///                             if q1 reads from q2, then
967 ///                               q2 is processed before q1
968 ///                               q1 can supply q2's parameters
969 ///                             Verify there is no cycle in the reads-from graph.
970
971 //                      Compute an order in which to process the
972 //                      queries.
973
974 //                      Start by building the reads-from lists.
975 //
976
977   for(i=0;i<qnodes.size();++i){
978         int qi, fi;
979         vector<string> refd_tbls =  qnodes[i]->refd_tbls;
980         for(fi = 0;fi<refd_tbls.size();++fi){
981                 if(name_node_map.count(refd_tbls[fi])>0){
982 //printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
983                         (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
984                 }
985         }
986   }
987
988
989 //              If one query reads the result of another,
990 //              check for parameter compatibility.  Currently it must
991 //              be an exact match.  I will move to requiring
992 //              containment after re-ordering, but will require
993 //              some analysis for code generation which is not
994 //              yet in place.
995 //printf("There are %d query nodes.\n",qnodes.size());
996
997
998   for(i=0;i<qnodes.size();++i){
999         vector<var_pair_t *> target_params  = qnodes[i]->params;
1000         for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1001                 vector<var_pair_t *> source_params  = qnodes[(*si)]->params;
1002                 if(target_params.size() != source_params.size()){
1003                         fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1004                         exit(1);
1005                 }
1006                 int p;
1007                 for(p=0;p<target_params.size();++p){
1008                         if(! (target_params[p]->name == source_params[p]->name &&
1009                               target_params[p]->val == source_params[p]->val ) ){
1010                         fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1011                                 exit(1);
1012                         }
1013                 }
1014         }
1015   }
1016
1017
1018 //              Find the roots.
1019 //              Start by counting inedges.
1020   for(i=0;i<qnodes.size();++i){
1021         for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1022                 qnodes[(*si)]->n_consumers++;
1023         }
1024   }
1025
1026 //              The roots are the nodes with indegree zero.
1027   set<int> roots;
1028   for(i=0;i<qnodes.size();++i){
1029         if(qnodes[i]->n_consumers == 0){
1030                 if(qnodes[i]->is_externally_visible == false){
1031                         fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible.  Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
1032                 }
1033                 roots.insert(i);
1034         }
1035   }
1036
1037 //              Remove the parts of the subtree that produce no output.
1038   set<int> valid_roots;
1039   set<int> discarded_nodes;
1040   set<int> candidates;
1041   while(roots.size() >0){
1042         for(si=roots.begin();si!=roots.end();++si){
1043                 if(qnodes[(*si)]->is_externally_visible){
1044                         valid_roots.insert((*si));
1045                 }else{
1046                         discarded_nodes.insert((*si));
1047                         set<int>::iterator sir;
1048                         for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1049                                 qnodes[(*sir)]->n_consumers--;
1050                                 if(qnodes[(*sir)]->n_consumers == 0)
1051                                         candidates.insert( (*sir));
1052                         }
1053                 }
1054         }
1055         roots = candidates;
1056         candidates.clear();
1057   }
1058   roots = valid_roots;
1059   if(discarded_nodes.size()>0){
1060         fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
1061         int di = 0;
1062         for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
1063                 if(di>0 && (di%8)==0) fprintf(stderr,"\n");
1064                 di++;
1065                 fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
1066         }
1067         fprintf(stderr,"\n");
1068   }
1069
1070 //              Compute the sources_to set, ignoring discarded nodes.
1071   for(i=0;i<qnodes.size();++i){
1072         if(discarded_nodes.count(i)==0)
1073                 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1074                         qnodes[(*si)]->sources_to.insert(i);
1075         }
1076   }
1077
1078
1079 //              Find the nodes that are shared by multiple visible subtrees.
1080 //              THe roots become inferred visible nodes.
1081
1082 //              Find the visible nodes.
1083         vector<int> visible_nodes;
1084         for(i=0;i<qnodes.size();i++){
1085                 if(qnodes[i]->is_externally_visible){
1086                         visible_nodes.push_back(i);
1087                 }
1088         }
1089
1090 //              Find UDOPs referenced by visible nodes.
1091   list<int> workq;
1092   for(i=0;i<visible_nodes.size();++i){
1093         workq.push_back(visible_nodes[i]);
1094   }
1095   while(!workq.empty()){
1096         int node = workq.front();
1097         workq.pop_front();
1098         set<int>::iterator children;
1099         if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
1100                 qnodes[node]->is_externally_visible = true;
1101                 visible_nodes.push_back(node);
1102                 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1103                         if(qnodes[(*children)]->is_externally_visible == false){
1104                                 qnodes[(*children)]->is_externally_visible = true;
1105                                 visible_nodes.push_back((*children));
1106                         }
1107                 }
1108         }
1109         for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1110                 workq.push_back((*children));
1111         }
1112   }
1113
1114         bool done = false;
1115         while(!done){
1116 //      reset the nodes
1117                 for(i=0;i<qnodes.size();i++){
1118                         qnodes[i]->subtree_roots.clear();
1119                 }
1120
1121 //              Walk the tree defined by a visible node, not descending into
1122 //              subtrees rooted by a visible node.  Mark the node visited with
1123 //              the visible node ID.
1124                 for(i=0;i<visible_nodes.size();++i){
1125                         set<int> vroots;
1126                         vroots.insert(visible_nodes[i]);
1127                         while(vroots.size()>0){
1128                                 for(si=vroots.begin();si!=vroots.end();++si){
1129                                         qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
1130
1131                                         set<int>::iterator sir;
1132                                         for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1133                                                 if(! qnodes[(*sir)]->is_externally_visible){
1134                                                         candidates.insert( (*sir));
1135                                                 }
1136                                         }
1137                                 }
1138                                 vroots = candidates;
1139                                 candidates.clear();
1140                         }
1141                 }
1142 //              Find the nodes in multiple visible node subtrees, but with no parent
1143 //              that has is in multile visible node subtrees.  Mark these as inferred visible nodes.
1144                 done = true;    // until proven otherwise
1145                 for(i=0;i<qnodes.size();i++){
1146                         if(qnodes[i]->subtree_roots.size()>1){
1147                                 bool is_new_root = true;
1148                                 set<int>::iterator sir;
1149                                 for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
1150                                         if(qnodes[(*sir)]->subtree_roots.size()>1)
1151                                                 is_new_root = false;
1152                                 }
1153                                 if(is_new_root){
1154                                         qnodes[i]->is_externally_visible = true;
1155                                         qnodes[i]->inferred_visible_node = true;
1156                                         visible_nodes.push_back(i);
1157                                         done = false;
1158                                 }
1159                         }
1160                 }
1161         }
1162
1163
1164
1165
1166
1167 //              get visible nodes in topo ordering.
1168 //  for(i=0;i<qnodes.size();i++){
1169 //              qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
1170 //  }
1171   vector<int> process_order;
1172   while(roots.size() >0){
1173         for(si=roots.begin();si!=roots.end();++si){
1174                 if(discarded_nodes.count((*si))==0){
1175                         process_order.push_back( (*si) );
1176                 }
1177                 set<int>::iterator sir;
1178                 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1179                         qnodes[(*sir)]->n_consumers--;
1180                         if(qnodes[(*sir)]->n_consumers == 0)
1181                                 candidates.insert( (*sir));
1182                 }
1183         }
1184         roots = candidates;
1185         candidates.clear();
1186   }
1187
1188
1189 //printf("process_order.size() =%d\n",process_order.size());
1190
1191 //              Search for cyclic dependencies
1192   string found_dep;
1193   for(i=0;i<qnodes.size();++i){
1194         if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
1195                 if(found_dep.size() != 0) found_dep += ", ";
1196                 found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
1197         }
1198   }
1199   if(found_dep.size()>0){
1200         fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
1201         exit(1);
1202   }
1203
1204 //              Get a list of query sets, in the order to be processed.
1205 //              Start at visible root and do bfs.
1206 //              The query set includes queries referenced indirectly,
1207 //              as sources for user-defined operators.  These are needed
1208 //              to ensure that they are added to the schema, but are not part
1209 //              of the query tree.
1210
1211 //              stream_node_sets contains queries reachable only through the
1212 //              FROM clause, so I can tell which queries to add to the stream
1213 //              query. (DISABLED, UDOPS are integrated, does this cause problems?)
1214
1215 //                      NOTE: this code works because in order for data to be
1216 //                      read by multiple hftas, the node must be externally visible.
1217 //                      But visible nodes define roots of process sets.
1218 //                      internally visible nodes can feed data only
1219 //                      to other nodes in the same query file.
1220 //                      Therefore, any access can be restricted to a file,
1221 //                      hfta output sharing is done only on roots
1222 //                      never on interior nodes.
1223
1224
1225
1226
1227 //              Conpute the base collection of hftas.
1228   vector<hfta_node *> hfta_sets;
1229   map<string, int> hfta_name_map;
1230 //  vector< vector<int> > process_sets;
1231 //  vector< set<int> > stream_node_sets;
1232   reverse(process_order.begin(), process_order.end());  // get listing in reverse order.
1233                                                                                                                 // i.e. process leaves 1st.
1234   for(i=0;i<process_order.size();++i){
1235         if(qnodes[process_order[i]]->is_externally_visible == true){
1236 //printf("Visible.\n");
1237                 int root = process_order[i];
1238                 hfta_node *hnode = new hfta_node();
1239                 hnode->name = qnodes[root]-> name;
1240                 hnode->source_name = qnodes[root]-> name;
1241                 hnode->is_udop = qnodes[root]->is_udop;
1242                 hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
1243
1244                 vector<int> proc_list;  proc_list.push_back(root);
1245 //                      Ensure that nodes are added only once.
1246                 set<int> proc_set;      proc_set.insert(root);
1247                 roots.clear();                  roots.insert(root);
1248                 candidates.clear();
1249                 while(roots.size()>0){
1250                         for(si=roots.begin();si!=roots.end();++si){
1251 //printf("Processing root %d\n",(*si));
1252                                 set<int>::iterator sir;
1253                                 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1254 //printf("reads fom %d\n",(*sir));
1255                                         if(qnodes[(*sir)]->is_externally_visible==false){
1256                                                 candidates.insert( (*sir) );
1257                                                 if(proc_set.count( (*sir) )==0){
1258                                                         proc_set.insert( (*sir) );
1259                                                         proc_list.push_back( (*sir) );
1260                                                 }
1261                                         }
1262                                 }
1263                         }
1264                         roots = candidates;
1265                         candidates.clear();
1266                 }
1267
1268                 reverse(proc_list.begin(), proc_list.end());
1269                 hnode->query_node_indices = proc_list;
1270                 hfta_name_map[hnode->name] = hfta_sets.size();
1271                 hfta_sets.push_back(hnode);
1272         }
1273   }
1274
1275 //              Compute the reads_from / sources_to graphs for the hftas.
1276
1277   for(i=0;i<hfta_sets.size();++i){
1278         hfta_node *hnode = hfta_sets[i];
1279         for(q=0;q<hnode->query_node_indices.size();q++){
1280                 query_node *qnode = qnodes[hnode->query_node_indices[q]];
1281                 for(s=0;s<qnode->refd_tbls.size();++s){
1282                         if(hfta_name_map.count(qnode->refd_tbls[s])){
1283                                 int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
1284                                 hnode->reads_from.insert(other_hfta);
1285                                 hfta_sets[other_hfta]->sources_to.insert(i);
1286                         }
1287                 }
1288         }
1289   }
1290
1291 //              Compute a topological sort of the hfta_sets.
1292
1293   vector<int> hfta_topsort;
1294   workq.clear();
1295   int hnode_srcs[hfta_sets.size()];
1296   for(i=0;i<hfta_sets.size();++i){
1297         hnode_srcs[i] = 0;
1298         if(hfta_sets[i]->sources_to.size() == 0)
1299                 workq.push_back(i);
1300   }
1301
1302   while(! workq.empty()){
1303         int     node = workq.front();
1304         workq.pop_front();
1305         hfta_topsort.push_back(node);
1306         set<int>::iterator stsi;
1307         for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
1308                 int parent = (*stsi);
1309                 hnode_srcs[parent]++;
1310                 if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
1311                         workq.push_back(parent);
1312                 }
1313         }
1314   }
1315
1316 //              Decorate hfta nodes with the level of parallelism given as input.
1317
1318   map<string, int>::iterator msii;
1319   for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1320         string hfta_name = (*msii).first;
1321         int par = (*msii).second;
1322         if(hfta_name_map.count(hfta_name) > 0){
1323                 hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
1324         }else{
1325                 fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
1326         }
1327   }
1328
1329 //              Propagate levels of parallelism: children should have a level of parallelism
1330 //              as large as any of its parents.  Adjust children upwards to compensate.
1331 //              Start at parents and adjust children, auto-propagation will occur.
1332
1333   for(i=hfta_sets.size()-1;i>=0;i--){
1334         set<int>::iterator stsi;
1335         for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
1336                 if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
1337                         hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
1338                 }
1339         }
1340   }
1341
1342 //              Before all the name mangling, check if therey are any output_spec.cfg
1343 //              or hfta_parallelism.cfg entries that do not have a matching query.
1344
1345         string dangling_ospecs = "";
1346         for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
1347                 string oq = (*msii).first;
1348                 if(hfta_name_map.count(oq) == 0){
1349                         dangling_ospecs += " "+(*msii).first;
1350                 }
1351         }
1352         if(dangling_ospecs!=""){
1353                 fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
1354         }
1355
1356         string dangling_par = "";
1357         for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1358                 string oq = (*msii).first;
1359                 if(hfta_name_map.count(oq) == 0){
1360                         dangling_par += " "+(*msii).first;
1361                 }
1362         }
1363         if(dangling_par!=""){
1364                 fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
1365         }
1366
1367
1368
1369 //              Replicate parallelized hftas.  Do __copyX name mangling.  Adjust
1370 //              FROM clauses: retarget any name which is an internal node, and
1371 //              any which is in hfta_sets (and which is parallelized). Add Merge nodes
1372 //              when the source hfta has more parallelism than the target node.
1373 //              Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
1374
1375
1376   int n_original_hfta_sets = hfta_sets.size();
1377   for(i=0;i<n_original_hfta_sets;++i){
1378         if(hfta_sets[i]->n_parallel > 1){
1379                 hfta_sets[i]->do_generation =false;     // set the deletion flag for this entry.
1380                 set<string> local_nodes;                // names of query nodes in the hfta.
1381                 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1382                         local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
1383                 }
1384
1385                 for(p=0;p<hfta_sets[i]->n_parallel;++p){
1386                         string mangler = "__copy"+int_to_string(p);
1387                         hfta_node *par_hfta  = new hfta_node();
1388                         par_hfta->name = hfta_sets[i]->name + mangler;
1389                         par_hfta->source_name = hfta_sets[i]->name;
1390                         par_hfta->is_udop = hfta_sets[i]->is_udop;
1391                         par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
1392                         par_hfta->n_parallel = hfta_sets[i]->n_parallel;
1393                         par_hfta->parallel_idx = p;
1394
1395                         map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
1396
1397 //      Is it a UDOP?
1398                         if(hfta_sets[i]->is_udop){
1399                                 int root = hfta_sets[i]->query_node_indices[0];
1400
1401                                 string unequal_par_sources;
1402                                 set<int>::iterator rfsii;
1403                                 for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
1404                                         if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
1405                                                 unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
1406                                         }
1407                                 }
1408                                 if(unequal_par_sources != ""){
1409                                         fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
1410                                         exit(1);
1411                                 }
1412
1413                                 int rti;
1414                                 vector<string> new_sources;
1415                                 for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
1416                                         new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
1417                                 }
1418
1419                                 query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
1420                                 new_qn->name += mangler;
1421                                 new_qn->mangler = mangler;
1422                                 new_qn->refd_tbls = new_sources;
1423                                 par_hfta->query_node_indices.push_back(qnodes.size());
1424                                 par_qnode_map[new_qn->name] = qnodes.size();
1425                                 name_node_map[ new_qn->name ] = qnodes.size();
1426                                 qnodes.push_back(new_qn);
1427                         }else{
1428 //              regular query node
1429                           for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1430                                 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1431                                 table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
1432 //                                      rehome the from clause on mangled names.
1433 //                                      create merge nodes as needed for external sources.
1434                                 for(f=0;f<dup_pt->fm->tlist.size();++f){
1435                                         if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
1436                                                 dup_pt->fm->tlist[f]->schema_name += mangler;
1437                                         }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
1438 //                      Ref's an external HFTA.  No parallelism => leave as is.  Else, if level of parallelism of the two hftas is the same, rename by the mangler.  Else, there mnust be more sources, so create a merge node.
1439                                                 int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
1440                                                 if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
1441                                                         dup_pt->fm->tlist[f]->schema_name += mangler;
1442                                                 }else{
1443                                                         vector<string> src_tbls;
1444                                                         int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
1445                                                         if(stride == 0){
1446                                                                 fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
1447                                                                 exit(1);
1448                                                         }
1449                                                         for(s=0;s<stride;++s){
1450                                                                 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
1451                                                                 src_tbls.push_back(ext_src_name);
1452                                                         }
1453                                                         table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1454                                                         string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
1455                                                         dup_pt->fm->tlist[f]->schema_name = merge_node_name;
1456 //                                      Make a qnode to represent the new merge node
1457                                                         query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1458                                                         qn_pt->refd_tbls = src_tbls;
1459                                                         qn_pt->is_udop  = false;
1460                                                         qn_pt->is_externally_visible = false;
1461                                                         qn_pt->inferred_visible_node  = false;
1462                                                         par_hfta->query_node_indices.push_back(qnodes.size());
1463                                                         par_qnode_map[merge_node_name] = qnodes.size();
1464                                                         name_node_map[ merge_node_name ] = qnodes.size();
1465                                                         qnodes.push_back(qn_pt);
1466                                                 }
1467                                         }
1468                                 }
1469                                 query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
1470                                 for(f=0;f<dup_pt->fm->tlist.size();++f){
1471                                         new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
1472                                 }
1473                                 new_qn->params = qnodes[hqn_idx]->params;
1474                                 new_qn->is_udop = false;
1475                                 new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
1476                                 new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
1477                                 par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
1478                                 par_qnode_map[new_qn->name] = qnodes.size();
1479                                 name_node_map[ new_qn->name ] = qnodes.size();
1480                                 qnodes.push_back(new_qn);
1481                           }
1482                         }
1483                         hfta_name_map[par_hfta->name] = hfta_sets.size();
1484                         hfta_sets.push_back(par_hfta);
1485                 }
1486         }else{
1487 //              This hfta isn't being parallelized, but add merge nodes for any parallelized
1488 //              hfta sources.
1489                 if(!hfta_sets[i]->is_udop){
1490                   for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1491                         int hqn_idx = hfta_sets[i]->query_node_indices[h];
1492                         for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
1493                                 if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
1494 //                      Ref's an external HFTA.  No parallelism => leave as is.  Else, if level of parallelism of the two hftas is the same, rename by the mangler.  Else, there mnust be more sources, so create a merge node.
1495                                         int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
1496                                         if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
1497                                                 vector<string> src_tbls;
1498                                                 for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
1499                                                         string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
1500                                                         src_tbls.push_back(ext_src_name);
1501                                                 }
1502                                                 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1503                                                 string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
1504                                                 qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
1505 //                                      Make a qnode to represent the new merge node
1506                                                 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1507                                                 qn_pt->refd_tbls = src_tbls;
1508                                                 qn_pt->is_udop  = false;
1509                                                 qn_pt->is_externally_visible = false;
1510                                                 qn_pt->inferred_visible_node  = false;
1511                                                 hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
1512                                                 name_node_map[ merge_node_name ] = qnodes.size();
1513                                                 qnodes.push_back(qn_pt);
1514                                         }
1515                                 }
1516                         }
1517                 }
1518           }
1519         }
1520   }
1521
1522 //                      Rebuild the reads_from / sources_to lists in the qnodes
1523   for(q=0;q<qnodes.size();++q){
1524         qnodes[q]->reads_from.clear();
1525         qnodes[q]->sources_to.clear();
1526   }
1527   for(q=0;q<qnodes.size();++q){
1528         for(s=0;s<qnodes[q]->refd_tbls.size();++s){
1529                 if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
1530                         int rf = name_node_map[qnodes[q]->refd_tbls[s]];
1531                         qnodes[q]->reads_from.insert(rf);
1532                         qnodes[rf]->sources_to.insert(q);
1533                 }
1534         }
1535   }
1536
1537 //                      Rebuild the reads_from / sources_to lists in hfta_sets
1538   for(q=0;q<hfta_sets.size();++q){
1539         hfta_sets[q]->reads_from.clear();
1540         hfta_sets[q]->sources_to.clear();
1541   }
1542   for(q=0;q<hfta_sets.size();++q){
1543         for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
1544                 int node = hfta_sets[q]->query_node_indices[s];
1545                 set<int>::iterator rfsii;
1546                 for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
1547                         if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
1548                                 hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
1549                                 hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
1550                         }
1551                 }
1552         }
1553   }
1554
1555 /*
1556 for(q=0;q<qnodes.size();++q){
1557  printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
1558  set<int>::iterator rsii;
1559  for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
1560   printf(" %d",(*rsii));
1561   printf(", and sources-to %d:",qnodes[q]->sources_to.size());
1562  for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
1563   printf(" %d",(*rsii));
1564  printf("\n");
1565 }
1566
1567 for(q=0;q<hfta_sets.size();++q){
1568  if(hfta_sets[q]->do_generation==false)
1569         continue;
1570  printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
1571  set<int>::iterator rsii;
1572  for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
1573   printf(" %d",(*rsii));
1574   printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
1575  for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
1576   printf(" %d",(*rsii));
1577  printf("\n");
1578 }
1579 */
1580
1581
1582
1583 //              Re-topo sort the hftas
1584   hfta_topsort.clear();
1585   workq.clear();
1586   int hnode_srcs_2[hfta_sets.size()];
1587   for(i=0;i<hfta_sets.size();++i){
1588         hnode_srcs_2[i] = 0;
1589         if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
1590                 workq.push_back(i);
1591         }
1592   }
1593
1594   while(workq.empty() == false){
1595         int     node = workq.front();
1596         workq.pop_front();
1597         hfta_topsort.push_back(node);
1598         set<int>::iterator stsii;
1599         for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
1600                 int child = (*stsii);
1601                 hnode_srcs_2[child]++;
1602                 if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
1603                         workq.push_back(child);
1604                 }
1605         }
1606   }
1607
1608 //              Ensure that all of the query_node_indices in hfta_sets are topologically
1609 //              sorted, don't rely on assumptions that all transforms maintain some kind of order.
1610   for(i=0;i<hfta_sets.size();++i){
1611         if(hfta_sets[i]->do_generation){
1612                 map<int,int> n_accounted;
1613                 vector<int> new_order;
1614                 workq.clear();
1615                 vector<int>::iterator vii;
1616                 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1617                         n_accounted[(*vii)]= 0;
1618                 }
1619                 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1620                         set<int>::iterator rfsii;
1621                         for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
1622                                 if(n_accounted.count((*rfsii)) == 0){
1623                                         n_accounted[(*vii)]++;
1624                                 }
1625                         }
1626                         if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
1627                                 workq.push_back((*vii));
1628                         }
1629                 }
1630
1631                 while(workq.empty() == false){
1632                         int node = workq.front();
1633                         workq.pop_front();
1634                         new_order.push_back(node);
1635                         set<int>::iterator stsii;
1636                         for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
1637                                 if(n_accounted.count((*stsii))){
1638                                         n_accounted[(*stsii)]++;
1639                                         if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
1640                                                 workq.push_back((*stsii));
1641                                         }
1642                                 }
1643                         }
1644                 }
1645                 hfta_sets[i]->query_node_indices = new_order;
1646         }
1647   }
1648
1649
1650
1651
1652
1653 ///                     Global checkng is done, start the analysis and translation
1654 ///                     of the query parse tree in the order specified by process_order
1655
1656
1657 //                      Get a list of the LFTAs for global lfta optimization
1658 //                              TODO: separate building operators from spliting lftas,
1659 //                                      that will make optimizations such as predicate pushing easier.
1660         vector<stream_query *> lfta_list;
1661         stream_query *rootq;
1662     int qi,qj;
1663
1664         map<string, map<int, string> > schema_of_schemaid; // ensure that schema IDs are unique to an <interface, schema>
1665
1666         for(qi=hfta_topsort.size()-1;qi>=0;--qi){
1667
1668         int hfta_id = hfta_topsort[qi];
1669     vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
1670
1671
1672
1673 //              Two possibilities, either its a UDOP, or its a collection of queries.
1674 //      if(qnodes[curr_list.back()]->is_udop)
1675         if(hfta_sets[hfta_id]->is_udop){
1676                 int node_id = curr_list.back();
1677                 int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
1678                 opview_entry *opv = new opview_entry();
1679
1680 //                      Many of the UDOP properties aren't currently used.
1681                 opv->parent_qname = "no_parent";
1682                 opv->root_name = qnodes[node_id]->name;
1683                 opv->view_name = qnodes[node_id]->file;
1684                 opv->pos = qi;
1685                 sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
1686                 opv->udop_alias = tmpstr;
1687                 opv->mangler = qnodes[node_id]->mangler;
1688
1689                 if(opv->mangler != ""){
1690                         int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
1691                         Schema->mangle_subq_names(new_udop_schref,opv->mangler);
1692                 }
1693
1694 //                      This piece of code makes each hfta which referes to the same udop
1695 //                      reference a distinct running udop.  Do this at query optimization time?
1696 //              fmtbl->set_udop_alias(opv->udop_alias);
1697
1698                 opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
1699                 opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
1700
1701                 vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
1702                 int s,f,q;
1703                 for(s=0;s<subq.size();++s){
1704 //                              Validate that the fields match.
1705                         subquery_spec *sqs = subq[s];
1706                         string subq_name = sqs->name + opv->mangler;
1707                         vector<field_entry *> flds = Schema->get_fields(subq_name);
1708                         if(flds.size() == 0){
1709                                 fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
1710                                 return(1);
1711                         }
1712                         if(flds.size() < sqs->types.size()){
1713                                 fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
1714                                 return(1);
1715                         }
1716                         bool failed = false;
1717                         for(f=0;f<sqs->types.size();++f){
1718                                 data_type dte(sqs->types[f],sqs->modifiers[f]);
1719                                 data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
1720                                 if(! dte.subsumes_type(&dtf) ){
1721                                         fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
1722                                         failed = true;
1723                                 }
1724 /*
1725                                 if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
1726                                         string pstr = dte.get_temporal_string();
1727                                         fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
1728                                         failed = true;
1729                                 }
1730 */
1731                         }
1732                         if(failed)
1733                                 return(1);
1734 ///                             Validation done, find the subquery, make a copy of the
1735 ///                             parse tree, and add it to the return list.
1736                         for(q=0;q<qnodes.size();++q)
1737                                 if(qnodes[q]->name == subq_name)
1738                                         break;
1739                         if(q==qnodes.size()){
1740                                 fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
1741                                 return(1);
1742                         }
1743
1744                 }
1745
1746 //                      Cross-link to from entry(s) in all sourced-to tables.
1747                 set<int>::iterator sii;
1748                 for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
1749 //printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
1750                         vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
1751                         int ii;
1752                         for(ii=0;ii<tblvars.size();++ii){
1753                                 if(tblvars[ii]->schema_name == opv->root_name){
1754                                         tblvars[ii]->set_opview_idx(opviews.size());
1755                                 }
1756
1757                         }
1758                 }
1759
1760                 opviews.append(opv);
1761         }else{
1762
1763 //                      Analyze the parse trees in this query,
1764 //                      put them in rootq
1765 //      vector<int> curr_list = process_sets[qi];
1766
1767
1768 ////////////////////////////////////////
1769
1770           rootq = NULL;
1771 //printf("Process set %d, has %d queries\n",qi,curr_list.size());
1772           for(qj=0;qj<curr_list.size();++qj){
1773                 i = curr_list[qj];
1774         fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
1775
1776 //                      Select the current query parse tree
1777                 table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
1778
1779 //                      if hfta only, try to fetch any missing schemas
1780 //                      from the registry (using the print_schema program).
1781 //                      Here I use a hack to avoid analyzing the query -- all referenced
1782 //                      tables must be in the from clause
1783 //                      If there is a problem loading any table, just issue a warning,
1784 //
1785                 tablevar_list_t *fm = fta_parse_tree->get_from();
1786                 vector<string> refd_tbls =  fm->get_src_tbls(Schema);
1787 //                      iterate over all referenced tables
1788                 int t;
1789                 for(t=0;t<refd_tbls.size();++t){
1790                   int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
1791
1792                   if(tbl_ref < 0){      // if this table is not in the Schema
1793
1794                         if(hfta_only){
1795                                 string cmd="print_schema "+refd_tbls[t];
1796                                 FILE *schema_in = popen(cmd.c_str(), "r");
1797                                 if(schema_in == NULL){
1798                                   fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
1799                                 }else{
1800                                   string schema_instr;
1801                                   while(fgets(tmpstr,TMPSTRLEN,schema_in)){
1802                                         schema_instr += tmpstr;
1803                                   }
1804                           fta_parse_result = new fta_parse_t();
1805                                   strcpy(tmp_schema_str,schema_instr.c_str());
1806                                   FtaParser_setstringinput(tmp_schema_str);
1807                           if(FtaParserparse()){
1808                                 fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
1809                           }else{
1810                                         if( fta_parse_result->tables != NULL){
1811                                                 int tl;
1812                                                 for(tl=0;tl<fta_parse_result->tables->size();++tl){
1813                                                         Schema->add_table(fta_parse_result->tables->get_table(tl));
1814                                                 }
1815                                         }else{
1816                                                 fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
1817                                         }
1818                                 }
1819                         }
1820                   }else{
1821                                 fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
1822                                 exit(1);
1823                   }
1824
1825                 }
1826           }
1827
1828
1829 //                              Analyze the query.
1830           query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
1831           if(qs == NULL){
1832                 fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
1833                 exit(1);
1834           }
1835
1836           stream_query new_sq(qs, Schema);
1837           if(new_sq.error_code){
1838                         fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
1839                         exit(1);
1840           }
1841
1842 //                      Add it to the Schema
1843           table_def *output_td = new_sq.get_output_tabledef();
1844           Schema->add_table(output_td);
1845
1846 //                      Create a query plan from the analyzed parse tree.
1847 //                      If its a query referneced via FROM, add it to the stream query.
1848           if(rootq){
1849                 rootq->add_query(new_sq);
1850           }else{
1851                 rootq = new stream_query(new_sq);
1852 //                      have the stream query object inherit properties form the analyzed
1853 //                      hfta_node object.
1854                 rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
1855                 rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
1856           }
1857
1858
1859     }
1860
1861 //              This stream query has all its parts
1862 //              Build and optimize it.
1863 //printf("translate_fta: generating plan.\n");
1864         if(rootq->generate_plan(Schema)){
1865                 fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
1866                 continue;
1867         }
1868
1869 //      If we've found the query plan head, so now add the output operators
1870         if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
1871                 pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
1872                 multimap<string, int>::iterator mmsi;
1873                 oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
1874                 for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
1875                         rootq->add_output_operator(output_specs[(*mmsi).second]);
1876                 }
1877         }
1878
1879
1880
1881 //                              Perform query splitting if necessary.
1882         bool hfta_returned;
1883     vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
1884
1885         int l;
1886 //for(l=0;l<split_queries.size();++l){
1887 //printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
1888 //}
1889
1890
1891
1892
1893     if(split_queries.size() > 0){       // should be at least one component.
1894
1895 //                              Compute the number of LFTAs.
1896           int n_lfta = split_queries.size();
1897           if(hfta_returned) n_lfta--;
1898 //                              Check if a schemaId constraint needs to be inserted.
1899
1900 //                              Process the LFTA components.
1901           for(l=0;l<n_lfta;++l){
1902            if(lfta_names.count(split_queries[l]->query_name) == 0){
1903 //                              Grab the lfta for global optimization.
1904                 vector<tablevar_t *> tvec =  split_queries[l]->query_plan[0]->get_input_tbls();
1905                 string liface = "_local_";
1906 //              string lmach = "";
1907                 string lmach = hostname;
1908                 if(tvec.size()>0){
1909                         liface = tvec[0]->get_interface();      // iface queries have been resolved
1910                         if(tvec[0]->get_machine() != ""){
1911                                 lmach = tvec[0]->get_machine();
1912                         }else{
1913                                 fprintf(stderr,"WARNING, lfta %s has empty machine name, using %s\n",  split_queries[l]->query_plan[0]->node_name.c_str(), hostname.c_str());
1914                         }
1915                 } // else{
1916                         interface_names.push_back(liface);
1917                         machine_names.push_back(lmach);
1918 //              }
1919
1920                 vector<predicate_t *> schemaid_preds;
1921                 for(int irv=0;irv<tvec.size();++irv){
1922
1923                         string schema_name = tvec[irv]->get_schema_name();
1924                         string rvar_name = tvec[irv]->get_var_name();
1925                         int schema_ref = tvec[irv]->get_schema_ref();
1926                         if (lmach == "")
1927                                 lmach = hostname;
1928 //                      interface_names.push_back(liface);
1929 //                      machine_names.push_back(lmach);
1930
1931 //printf("Machine is %s\n",lmach.c_str());
1932
1933 //                              Check if a schemaId constraint needs to be inserted.
1934                         if(schema_ref<0){ // can result from some kinds of splits
1935                                 schema_ref = Schema->get_table_ref(schema_name);
1936                         }
1937                         int schema_id = Schema->get_schema_id(schema_ref);  // id associated with PROTOCOL
1938                         int errnum = 0;
1939                         string if_error;
1940                         iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
1941                         if(iface==NULL){
1942                                 fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
1943                                 exit(1);
1944                         }       
1945
1946
1947                         if(tvec[irv]->get_interface() != "_local_"){
1948                          if(iface->has_multiple_schemas()){
1949                                 if(schema_id<0){        // invalid schema_id
1950                                         fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
1951                                         exit(1);
1952                                 }
1953                                 vector<string> iface_schemas = iface->get_property("Schemas");
1954                                 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1955                                         fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1956                                         exit(1);
1957                                 }
1958 // Ensure that in liface, schema_id is used for only one schema
1959                                 if(schema_of_schemaid.count(liface)==0){
1960                                         map<int, string> empty_map;
1961                                         schema_of_schemaid[liface] = empty_map;
1962                                 }
1963                                 if(schema_of_schemaid[liface].count(schema_id)==0){
1964                                         schema_of_schemaid[liface][schema_id] = schema_name;
1965                                 }else{
1966                                         if(schema_of_schemaid[liface][schema_id] != schema_name){
1967                                                 fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
1968                                                 exit(1);
1969                                         }
1970                                 }
1971                          }else{ // single-schema interface
1972                                 schema_id = -1; // don't generate schema_id predicate
1973                                 vector<string> iface_schemas = iface->get_property("Schemas");
1974                                 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1975                                         fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1976                                         exit(1);
1977                                 }
1978                                 if(iface_schemas.size()>1){
1979                                         fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
1980                                         exit(1);
1981                                 }
1982                          }                      
1983                         }else{
1984                                 schema_id = -1;
1985                         }
1986
1987 // If we need to check the schema_id, insert a predicate into the lfta.
1988 //       TODO not just schema_id, the full all_schema_ids set.
1989                         if(schema_id>=0){
1990                                 colref_t *schid_cr = new colref_t("schemaId");
1991                                 schid_cr->schema_ref = schema_ref;
1992                                 schid_cr->table_name = rvar_name;
1993                                 schid_cr->tablevar_ref = 0;
1994                                 schid_cr->default_table = false;
1995                                 scalarexp_t *schid_se = new scalarexp_t(schid_cr);
1996                                 data_type *schid_dt = new data_type("uint");
1997                                 schid_se->dt = schid_dt;
1998
1999                                 string schid_str = int_to_string(schema_id);
2000                                 literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
2001                                 scalarexp_t *lit_se = new scalarexp_t(schid_lit);
2002                                 lit_se->dt = schid_dt;
2003
2004                                 predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
2005                                 vector<cnf_elem *> clist;
2006                                 make_cnf_from_pr(schid_pr, clist);
2007                                 analyze_cnf(clist[0]);
2008                                 clist[0]->cost = 1;     // cheap one comparison
2009 // cnf built, now insert it.
2010                                 split_queries[l]->query_plan[0]->append_to_where(clist[0]);
2011
2012 // Specialized processing 
2013 //              filter join, get two schemaid preds
2014                                 string node_type = split_queries[l]->query_plan[0]->node_type();
2015                                 if(node_type == "filter_join"){
2016                                         filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2017                                         if(irv==0){
2018                                                 fj->pred_t0.push_back(clist[0]);
2019                                         }else{
2020                                                 fj->pred_t1.push_back(clist[0]);
2021                                         }
2022                                         schemaid_preds.push_back(schid_pr);
2023                                 }
2024 //              watchlist join, get the first schemaid pred
2025                                 if(node_type == "watch_join"){
2026                                         watch_join_qpn *fj = (watch_join_qpn *)split_queries[l]->query_plan[0];
2027                                         if(irv==0){
2028                                                 fj->pred_t0.push_back(clist[0]);
2029                                                 schemaid_preds.push_back(schid_pr);
2030                                         }
2031                                 }
2032                         }
2033                 }
2034 // Specialized processing, currently filter join.
2035                 if(schemaid_preds.size()>1){
2036                         string node_type = split_queries[l]->query_plan[0]->node_type();
2037                         if(node_type == "filter_join"){
2038                                 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2039                                 predicate_t *filter_pr = new predicate_t("OR", schemaid_preds[0], schemaid_preds[1]);
2040                                 vector<cnf_elem *> clist;
2041                                 make_cnf_from_pr(filter_pr, clist);
2042                                 analyze_cnf(clist[0]);
2043                                 clist[0]->cost = 1;     // cheap one comparison
2044                                 fj->shared_pred.push_back(clist[0]);
2045                         }
2046                 }
2047                         
2048
2049
2050
2051
2052
2053
2054 //                      Set the ht size from the recommendation, if there is one in the rec file
2055                 if(lfta_htsize.count(split_queries[l]->query_name)>0){
2056                         split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
2057                 }
2058
2059
2060                 lfta_names[split_queries[l]->query_name] = lfta_list.size();
2061                 split_queries[l]->set_gid(lfta_list.size());  // set lfta global id
2062                 lfta_list.push_back(split_queries[l]);
2063                 lfta_mach_lists[lmach].push_back(split_queries[l]);
2064
2065 //                      THe following is a hack,
2066 //                      as I should be generating LFTA code through
2067 //                      the stream_query object.
2068
2069                 split_queries[l]->query_plan[0]->bind_to_schema(Schema);
2070
2071 //              split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
2072
2073 /*
2074 //                              Create query description to embed in lfta.c
2075                 string lfta_schema_str = split_queries[l]->make_schema();
2076                 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2077
2078 //                              get NIC capabilities.
2079                 int erri;
2080                 nic_property *nicprop = NULL;
2081                 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2082                 if(iface_codegen_type.size()){
2083                         nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2084                         if(!nicprop){
2085                                 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2086                                         exit(1);
2087                         }
2088                 }
2089
2090                 lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
2091 */
2092
2093                 snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
2094
2095 // TODO NOTE : I'd like it to be the case that registration_query_names
2096 //      are the queries to be registered for subsciption.
2097 //      but there is some complex bookkeeping here.
2098                 registration_query_names.push_back(split_queries[l]->query_name);
2099                 mach_query_names[lmach].push_back(registration_query_names.size()-1);
2100 //                      NOTE: I will assume a 1-1 correspondance between
2101 //                      mach_query_names[lmach] and lfta_mach_lists[lmach]
2102 //                      where mach_query_names[lmach][i] contains the index into
2103 //                      query_names, which names the lfta, and
2104 //                      mach_query_names[lmach][i] is the stream_query * of the
2105 //                      corresponding lfta.
2106 //                      Later, lfta_iface_qnames are the query names matching lfta_iface_lists
2107
2108
2109
2110                 // check if lfta is reusable
2111                 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
2112
2113                 bool lfta_reusable = false;
2114                 if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
2115                         split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
2116                         lfta_reusable = true;
2117                 }
2118                 lfta_reuse_options.push_back(lfta_reusable);
2119
2120                 // LFTA will inherit the liveness timeout specification from the containing query
2121                 // it is too conservative as lfta are expected to spend less time per tuple
2122                 // then full query
2123
2124                 // extract liveness timeout from query definition
2125                 int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
2126                 if (!liveness_timeout) {
2127 //                  fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
2128 //                    split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2129                     liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2130                 }
2131                 lfta_liveness_timeouts.push_back(liveness_timeout);
2132
2133 //                      Add it to the schema
2134                 table_def *td = split_queries[l]->get_output_tabledef();
2135                 Schema->append_table(td);
2136 //printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
2137
2138           }
2139          }
2140
2141 //                              If the output is lfta-only, dump out the query name.
2142       if(split_queries.size() == 1 && !hfta_returned){
2143         if(output_query_names ){
2144            fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
2145                 }
2146 /*
2147 else{
2148            fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
2149                 }
2150 */
2151
2152 /*
2153 //                              output schema summary
2154                 if(output_schema_summary){
2155                         dump_summary(split_queries[0]);
2156                 }
2157 */
2158       }
2159
2160
2161           if(hfta_returned){            // query also has an HFTA component
2162                 int hfta_nbr = split_queries.size()-1;
2163
2164                         hfta_list.push_back(split_queries[hfta_nbr]);
2165
2166 //                                      report on generated query names
2167         if(output_query_names){
2168                         string hfta_name =split_queries[hfta_nbr]->query_name;
2169                 fprintf(query_name_output,"%s H\n",hfta_name.c_str());
2170                         for(l=0;l<hfta_nbr;++l){
2171                                 string lfta_name =split_queries[l]->query_name;
2172                         fprintf(query_name_output,"%s L\n",lfta_name.c_str());
2173                         }
2174                 }
2175 //              else{
2176 //              fprintf(stderr,"query names are ");
2177 //                      for(l=0;l<hfta_nbr;++l){
2178 //                              if(l>0) fprintf(stderr,",");
2179 //                              string fta_name =split_queries[l]->query_name;
2180 //                      fprintf(stderr," %s",fta_name.c_str());
2181 //                      }
2182 //                      fprintf(stderr,"\n");
2183 //              }
2184           }
2185
2186   }else{
2187           fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
2188           fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
2189           exit(1);
2190   }
2191  }
2192 }
2193
2194
2195 //-----------------------------------------------------------------
2196 //              Compute and propagate the SE in PROTOCOL fields compute a field.
2197 //-----------------------------------------------------------------
2198
2199 for(i=0;i<lfta_list.size();i++){
2200         lfta_list[i]->generate_protocol_se(sq_map, Schema);
2201         sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
2202 }
2203 for(i=0;i<hfta_list.size();i++){
2204         hfta_list[i]->generate_protocol_se(sq_map, Schema);
2205         sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
2206 }
2207
2208
2209
2210 //------------------------------------------------------------------------
2211 //              Perform  individual FTA optimizations
2212 //-----------------------------------------------------------------------
2213
2214 if (partitioned_mode) {
2215
2216         // open partition definition file
2217         string part_fname = config_dir_path + "partition.txt";
2218
2219         FILE* partfd = fopen(part_fname.c_str(), "r");
2220         if (!partfd) {
2221                 fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
2222                 exit(1);
2223         }
2224         PartnParser_setfileinput(partfd);
2225         if (PartnParserparse()) {
2226                 fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
2227                 exit(1);
2228         }
2229         fclose(partfd);
2230 }
2231
2232
2233 print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2234
2235 int num_hfta = hfta_list.size();
2236 for(i=0; i < hfta_list.size(); ++i){
2237         hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
2238 }
2239
2240 //                      Add all new hftas to schema
2241 for(i=num_hfta; i < hfta_list.size(); ++i){
2242                 table_def *td = hfta_list[i]->get_output_tabledef();
2243                 Schema->append_table(td);
2244 }
2245
2246 print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2247
2248
2249
2250 //------------------------------------------------------------------------
2251 //              Do global (cross-fta) optimization
2252 //-----------------------------------------------------------------------
2253
2254
2255
2256
2257
2258
2259 set<string> extra_external_libs;
2260
2261 for(i=0;i<hfta_list.size();++i){                // query also has an HFTA component
2262
2263                 if(! debug_only){
2264 //                                      build hfta file name, create output
2265            if(numeric_hfta_flname){
2266             sprintf(tmpstr,"hfta_%d",hfta_count);
2267                         hfta_names.push_back(tmpstr);
2268             sprintf(tmpstr,"hfta_%d.cc",hfta_count);
2269          }else{
2270             sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
2271                         hfta_names.push_back(tmpstr);
2272             sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
2273           }
2274                   FILE *hfta_fl = fopen(tmpstr,"w");
2275                   if(hfta_fl == NULL){
2276                         fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
2277                         exit(1);
2278                   }
2279                   fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
2280
2281 //                      If there is a field verifier, warn about
2282 //                      lack of compatability
2283 //                              NOTE : this code assumes that visible non-lfta queries
2284 //                              are those at the root of a stream query.
2285                   string hfta_comment;
2286                   string hfta_title;
2287                   string hfta_namespace;
2288                   if(hfta_list[i]->defines.count("comment")>0)
2289                         hfta_comment = hfta_list[i]->defines["comment"];
2290                   if(hfta_list[i]->defines.count("Comment")>0)
2291                         hfta_comment = hfta_list[i]->defines["Comment"];
2292                   if(hfta_list[i]->defines.count("COMMENT")>0)
2293                         hfta_comment = hfta_list[i]->defines["COMMENT"];
2294                   if(hfta_list[i]->defines.count("title")>0)
2295                         hfta_title = hfta_list[i]->defines["title"];
2296                   if(hfta_list[i]->defines.count("Title")>0)
2297                         hfta_title = hfta_list[i]->defines["Title"];
2298                   if(hfta_list[i]->defines.count("TITLE")>0)
2299                         hfta_title = hfta_list[i]->defines["TITLE"];
2300                   if(hfta_list[i]->defines.count("namespace")>0)
2301                         hfta_namespace = hfta_list[i]->defines["namespace"];
2302                   if(hfta_list[i]->defines.count("Namespace")>0)
2303                         hfta_namespace = hfta_list[i]->defines["Namespace"];
2304                   if(hfta_list[i]->defines.count("NAMESPACE")>0)
2305                         hfta_namespace = hfta_list[i]->defines["NAMESPACE"];
2306
2307                   if(field_verifier != NULL){
2308                         string warning_str;
2309                         if(hfta_comment == "")
2310                                 warning_str += "\tcomment not found.\n";
2311
2312 // Obsolete stuff that Carsten wanted
2313 //                      if(hfta_title == "")
2314 //                              warning_str += "\ttitle not found.\n";
2315 //                      if(hfta_namespace == "")
2316 //                              warning_str += "\tnamespace not found.\n";
2317
2318 // STOPPED HERE
2319 //      There is a get_tbl_keys method implemented for qp_nodes,
2320 //      integrate it into steam_query, then call it to find keys,
2321 //      and annotate feidls with their key-ness.
2322 //      If there is a "keys" proprty in the defines block, override anything returned
2323 //      from the automated analysis
2324
2325                         vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2326                         int fi;
2327                         for(fi=0;fi<flds.size();fi++){
2328                                 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2329                         }
2330                         if(warning_str != "")
2331                                 fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
2332                                         hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
2333                   }
2334
2335 // Get the fields in this query
2336                   vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2337
2338 // do key processing
2339                   string hfta_keys_s = "";
2340                   if(hfta_list[i]->defines.count("keys")>0)
2341                         hfta_keys_s = hfta_list[i]->defines["keys"];
2342                   if(hfta_list[i]->defines.count("Keys")>0)
2343                         hfta_keys_s = hfta_list[i]->defines["Keys"];
2344                   if(hfta_list[i]->defines.count("KEYS")>0)
2345                         hfta_keys_s = hfta_list[i]->defines["KEYS"];
2346                   string xtra_keys_s = "";
2347                   if(hfta_list[i]->defines.count("extra_keys")>0)
2348                         xtra_keys_s = hfta_list[i]->defines["extra_keys"];
2349                   if(hfta_list[i]->defines.count("Extra_Keys")>0)
2350                         xtra_keys_s = hfta_list[i]->defines["Extra_Keys"];
2351                   if(hfta_list[i]->defines.count("EXTRA_KEYS")>0)
2352                         xtra_keys_s = hfta_list[i]->defines["EXTRA_KEYS"];
2353 // get the keys
2354                   vector<string> hfta_keys;
2355                   vector<string> partial_keys;
2356                   vector<string> xtra_keys;
2357                   if(hfta_keys_s==""){
2358                                 hfta_keys = hfta_list[i]->get_tbl_keys(partial_keys);
2359                                 if(xtra_keys_s.size()>0){
2360                                         xtra_keys = split_string(xtra_keys_s, ',');
2361                                 }
2362                                 for(int xi=0;xi<xtra_keys.size();++xi){
2363                                         if(find(hfta_keys.begin(), hfta_keys.end(), xtra_keys[xi]) == hfta_keys.end()){
2364                                                 hfta_keys.push_back(xtra_keys[xi]);
2365                                         }
2366                                 }
2367                   }else{
2368                                 hfta_keys = split_string(hfta_keys_s, ',');
2369                   }
2370 // validate that all of the keys exist in the output.
2371 //      (exit on error, as its a bad specificiation)
2372                   vector<string> missing_keys;
2373                   for(int ki=0;ki<hfta_keys.size(); ++ki){
2374                         int fi;
2375                         for(fi=0;fi<flds.size();++fi){
2376                                 if(hfta_keys[ki] == flds[fi]->get_name())
2377                                         break;
2378                         }
2379                         if(fi==flds.size())
2380                                 missing_keys.push_back(hfta_keys[ki]);
2381                   }
2382                   if(missing_keys.size()>0){
2383                         fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the output:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2384                         for(int hi=0; hi<missing_keys.size(); ++hi){
2385                                 fprintf(stderr," %s", missing_keys[hi].c_str());
2386                         }
2387                         fprintf(stderr,"\n");
2388                         exit(1);
2389                   }
2390
2391                   fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2392                   if(hfta_comment != "")
2393                         fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
2394                   if(hfta_title != "")
2395                         fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
2396                   if(hfta_namespace != "")
2397                         fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
2398                   fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
2399                   fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2400
2401 //                              write info about fields to qtree.xml
2402                   int fi;
2403                   for(fi=0;fi<flds.size();fi++){
2404                         fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2405                         if(flds[fi]->get_modifier_list()->size()){
2406                                 fprintf(qtree_output,"mods='%s'", flds[fi]->get_modifier_list()->to_string().c_str());
2407                         }
2408                         fprintf(qtree_output," />\n");
2409                   }
2410 // info about keys
2411                   for(int hi=0;hi<hfta_keys.size();++hi){
2412                         fprintf(qtree_output, "\t\t<Key value='%s'/>\n",hfta_keys[hi].c_str());
2413                   }
2414                   for(int hi=0;hi<partial_keys.size();++hi){
2415                         fprintf(qtree_output, "\t\t<PartialKey value='%s'/>\n",partial_keys[hi].c_str());
2416                   }
2417                   for(int hi=0;hi<xtra_keys.size();++hi){
2418                         fprintf(qtree_output, "\t\t<ExtraKey value='%s'/>\n",xtra_keys[hi].c_str());
2419                   }
2420
2421
2422                   // extract liveness timeout from query definition
2423                   int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
2424                   if (!liveness_timeout) {
2425 //                  fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
2426 //                    hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2427                     liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2428                   }
2429                   fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
2430
2431                   vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
2432                   int itv;
2433                   for(itv=0;itv<tmp_tv.size();++itv){
2434                         fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
2435                   }
2436                   string ifrs = hfta_list[i]->collect_refd_ifaces();
2437                   if(ifrs != ""){
2438                         fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
2439                   }
2440                   fprintf(qtree_output,"\t</HFTA>\n");
2441
2442                   fclose(hfta_fl);
2443                 }else{
2444 //                                      debug only -- do code generation to catch generation-time errors.
2445                   hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
2446                 }
2447
2448                 hfta_count++;   // for hfta file names with numeric suffixes
2449
2450                 hfta_list[i]->get_external_libs(extra_external_libs);
2451
2452           }
2453
2454 string ext_lib_string;
2455 set<string>::iterator ssi_el;
2456 for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
2457         ext_lib_string += (*ssi_el)+" ";
2458
2459
2460
2461 //                      Report on the set of operator views
2462   for(i=0;i<opviews.size();++i){
2463         opview_entry *opve = opviews.get_entry(i);
2464         fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
2465         fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
2466         fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
2467         fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
2468         fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2469
2470         if (!opve->liveness_timeout) {
2471 //              fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
2472 //                      opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2473                 opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2474         }
2475         fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
2476     int j;
2477         for(j=0;j<opve->subq_names.size();j++)
2478                 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
2479         fprintf(qtree_output,"\t</UDOP>\n");
2480   }
2481
2482
2483 //-----------------------------------------------------------------
2484
2485 //                      Create interface-specific meta code files.
2486 //                              first, open and parse the interface resources file.
2487         ifaces_db = new ifq_t();
2488     ierr = "";
2489         if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
2490                 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
2491                                 ifx_fname.c_str(), ierr.c_str());
2492                 exit(1);
2493         }
2494
2495         map<string, vector<stream_query *> >::iterator svsi;
2496         for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
2497                 string lmach = (*svsi).first;
2498
2499         //              For this machine, create a set of lftas per interface.
2500                 vector<stream_query *> mach_lftas = (*svsi).second;
2501                 map<string, vector<stream_query *> > lfta_iface_lists;
2502                 int li;
2503                 for(li=0;li<mach_lftas.size();++li){
2504                         vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2505                         string lfta_iface = "_local_";
2506                         if(tvec.size()>0){
2507                                 string lfta_iface = tvec[0]->get_interface();
2508                         }
2509                         lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2510                 }
2511
2512                 map<string, vector<stream_query *> >::iterator lsvsi;
2513                 for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
2514                         int erri;
2515                         string liface = (*lsvsi).first;
2516                         vector<stream_query *> iface_lftas = (*lsvsi).second;
2517                         vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2518                         if(iface_codegen_type.size()){
2519                                 nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2520                                 if(!nicprop){
2521                                         fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2522                                         exit(1);
2523                                 }
2524                                 string mcs = generate_nic_code(iface_lftas, nicprop);
2525                                 string mcf_flnm;
2526                                 if(lmach != "")
2527                                   mcf_flnm = lmach + "_"+liface+".mcf";
2528                                 else
2529                                   mcf_flnm = hostname + "_"+liface+".mcf";
2530                                 FILE *mcf_fl ;
2531                                 if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
2532                                         fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
2533                                         exit(1);
2534                                 }
2535                                 fprintf(mcf_fl,"%s",mcs.c_str());
2536                                 fclose(mcf_fl);
2537 //printf("mcs of machine %s, iface %s of type %s is \n%s\n",
2538 //lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
2539                         }
2540                 }
2541
2542
2543         }
2544
2545
2546
2547 //-----------------------------------------------------------------
2548
2549
2550 //                      Find common filter predicates in the LFTAs.
2551 //                      in addition generate structs to store the temporal attributes unpacked by prefilter
2552         
2553         map<string, vector<stream_query *> >::iterator ssqi;
2554         for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
2555
2556                 string lmach = (*ssqi).first;
2557                 bool packed_return = false;
2558                 int li, erri;
2559
2560
2561 //      The LFTAs of this machine.
2562                 vector<stream_query *> mach_lftas = (*ssqi).second;
2563 //      break up on a per-interface basis.
2564                 map<string, vector<stream_query *> > lfta_iface_lists;
2565                 map<string, vector<int> > lfta_iface_qname_ix; // need the query name
2566                                                         // for fta_init
2567                 for(li=0;li<mach_lftas.size();++li){
2568                         vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2569                         string lfta_iface = "_local_";
2570                         if(tvec.size()>0){
2571                                 lfta_iface = tvec[0]->get_interface();
2572                         }
2573                         lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2574                         lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
2575                 }
2576
2577
2578 //      Are the return values "packed"?
2579 //      This should be done on a per-interface basis.
2580 //      But this is defunct code for gs-lite
2581                 for(li=0;li<mach_lftas.size();++li){
2582                   vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2583                   string liface = "_local_";
2584                   if(tvec.size()>0){
2585                          liface = tvec[0]->get_interface();
2586                   }
2587                   vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2588                   if(iface_codegen_type.size()){
2589                         if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
2590                         packed_return = true;
2591                         }
2592                   }
2593                 }
2594
2595
2596 // Separate lftas by interface, collect results on a per-interface basis.
2597
2598                 vector<cnf_set *> no_preds;     // fallback if there is no prefilter
2599                 map<string, vector<cnf_set *> > prefilter_preds;
2600                 set<unsigned int> pred_ids;     // this can be global for all interfaces
2601                 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2602                         string liface = (*mvsi).first;
2603                         vector<cnf_set *> empty_list;
2604                         prefilter_preds[liface] = empty_list;
2605                         if(! packed_return){
2606                                 get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
2607                         }
2608
2609 //                              get NIC capabilities.  (Is this needed?)
2610                         nic_property *nicprop = NULL;
2611                         vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2612                         if(iface_codegen_type.size()){
2613                                 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2614                                 if(!nicprop){
2615                                         fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2616                                         exit(1);
2617                                 }
2618                         }
2619                 }
2620
2621
2622 //              Now that we know the prefilter preds, generate the lfta code.
2623 //      Do this for all lftas in this machine.
2624                 for(li=0;li<mach_lftas.size();++li){
2625                         set<unsigned int> subsumed_preds;
2626                         set<unsigned int>::iterator sii;
2627 #ifdef PREFILTER_OK
2628                         for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
2629                                 int pid = (*sii);
2630                                 if((pid>>16) == li){
2631                                         subsumed_preds.insert(pid & 0xffff);
2632                                 }
2633                         }
2634 #endif
2635                         string lfta_schema_str = mach_lftas[li]->make_schema();
2636                         string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2637                         nic_property *nicprop = NULL; // no NIC properties?
2638                         lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
2639                 }
2640
2641
2642 //                      generate structs to store the temporal attributes
2643 //                      unpacked by prefilter
2644                 col_id_set temp_cids;
2645                 get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
2646                 lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
2647
2648 //                      Compute the lfta bit signatures and the lfta colrefs
2649 //      do this on a per-interface basis
2650 #ifdef PREFILTER_OK
2651                         lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2652 #endif
2653                 map<string, vector<long long int> > lfta_sigs; // used again later
2654                 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2655                         string liface = (*mvsi).first;
2656                         vector<long long int> empty_list;
2657                         lfta_sigs[liface] = empty_list;
2658
2659                         vector<col_id_set> lfta_cols;
2660                         vector<int> lfta_snap_length;
2661                         for(li=0;li<lfta_iface_lists[liface].size();++li){
2662                                 unsigned long long int mask=0, bpos=1;
2663                                 int f_pos;
2664                                 for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
2665                                         if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
2666                                                 mask |= bpos;
2667                                         bpos = bpos << 1;
2668                                 }
2669                                 lfta_sigs[liface].push_back(mask);
2670                                 lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
2671                                 lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
2672                         }
2673
2674 //for(li=0;li<mach_lftas.size();++li){
2675 //printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
2676 //col_id_set::iterator tcisi;
2677 //for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
2678 //printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
2679 //}
2680 //}
2681
2682
2683 //                      generate the prefilter
2684 //      Do this on a per-interface basis, except for the #define
2685 #ifdef PREFILTER_OK
2686 //                      lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2687                         lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2688 #else
2689                         lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns,  lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2690
2691 #endif
2692                 }
2693
2694 //                      Generate interface parameter lookup function
2695           lfta_val[lmach] += "// lookup interface properties by name\n";
2696           lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
2697           lfta_val[lmach] += "// returns NULL if given property does not exist\n";
2698           lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
2699
2700 //        collect a lit of interface names used by queries running on this host
2701           set<std::string> iface_names;
2702           for(i=0;i<mach_query_names[lmach].size();i++){
2703                 int mi = mach_query_names[lmach][i];
2704                 stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2705
2706                 if(interface_names[mi]=="")
2707                         iface_names.insert("DEFAULTDEV");
2708                 else
2709                         iface_names.insert(interface_names[mi]);
2710           }
2711
2712 //        generate interface property lookup code for every interface
2713           set<std::string>::iterator sir;
2714           for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
2715                 if (sir == iface_names.begin())
2716                         lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2717                 else
2718                         lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2719
2720                 // iterate through interface properties
2721                 vector<string> iface_properties;
2722                 if(*sir!="_local_"){    // dummy watchlist interface, don't process.
2723                         iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
2724                 }
2725                 if (erri) {
2726                         fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
2727                         exit(1);
2728                 }
2729                 if (iface_properties.empty())
2730                         lfta_val[lmach] += "\t\treturn NULL;\n";
2731                 else {
2732                         for (int i = 0; i < iface_properties.size(); ++i) {
2733                                 if (i == 0)
2734                                         lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2735                                 else
2736                                         lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2737
2738                                 // combine all values for the interface property using comma separator
2739                                 vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
2740                                 lfta_val[lmach] += "\t\t\treturn \"";
2741                                 for (int j = 0; j < vals.size(); ++j) {
2742                                         lfta_val[lmach] +=  vals[j];
2743                                         if (j != vals.size()-1)
2744                                                 lfta_val[lmach] += ",";
2745                                 }
2746                                 lfta_val[lmach] += "\";\n";
2747                         }
2748                         lfta_val[lmach] += "\t\t} else\n";
2749                         lfta_val[lmach] += "\t\t\treturn NULL;\n";
2750                 }
2751           }
2752           lfta_val[lmach] += "\t} else\n";
2753           lfta_val[lmach] += "\t\treturn NULL;\n";
2754           lfta_val[lmach] += "}\n\n";
2755
2756
2757 //                      Generate a full list of FTAs for clearinghouse reference
2758           lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
2759           lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
2760
2761           bool first = true;
2762           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2763                 string liface = (*mvsi).first;
2764                 if(liface != "_local_"){        // these don't register themselves
2765                         vector<stream_query *> lfta_list = (*mvsi).second;
2766                         for(i=0;i<lfta_list.size();i++){
2767                                 int mi = lfta_iface_qname_ix[liface][i];
2768                                 if(first) first = false;
2769                                 else      lfta_val[lmach] += ", ";
2770                                 lfta_val[lmach] += "\"" + registration_query_names[mi] + "\"";
2771                         }
2772                 }
2773           }
2774 //        for (i = 0; i < registration_query_names.size(); ++i) {
2775 //                 if (i)
2776 //                        lfta_val[lmach] += ", ";
2777 //                 lfta_val[lmach] += "\"" + registration_query_names[i] + "\"";
2778 //        }
2779
2780           for (i = 0; i < hfta_list.size(); ++i) {
2781                    lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
2782           }
2783           lfta_val[lmach] += ", NULL};\n\n";
2784
2785
2786 //                      Add the initialization function to lfta.c
2787 //      Change to accept the interface name, and 
2788 //      set the prefilter function accordingly.
2789 //      see the example in demo/err2
2790           lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
2791           lfta_val[lmach] += "//        note: the last parameter in fta_register is the prefilter signature\n";
2792
2793 //        for(i=0;i<mach_query_names[lmach].size();i++)
2794 //              int mi = mach_query_names[lmach][i];
2795 //              stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2796
2797           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2798                 string liface = (*mvsi).first;
2799                 vector<stream_query *> lfta_list = (*mvsi).second;
2800                 for(i=0;i<lfta_list.size();i++){
2801                         stream_query *lfta_sq = lfta_list[i];
2802                         int mi = lfta_iface_qname_ix[liface][i];
2803
2804                         if(liface == "_local_"){
2805 //  Don't register an init function, do the init code inline
2806                                 lfta_val[lmach] += "// initialization code for "+registration_query_names[mi]+";\n";
2807                                 lfta_val[lmach] += "\t"+generate_watchlist_name(registration_query_names[mi])+".filename = "+generate_watchlist_name(registration_query_names[mi])+"__fstr;\n";
2808                                 continue;
2809                         }
2810                 
2811                         fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
2812
2813                         string this_iface = "DEFAULTDEV";
2814                         if(interface_names[mi]!="")
2815                                 this_iface = '"'+interface_names[mi]+'"';
2816                         lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
2817                 lfta_val[lmach] += "\t\tfta_register(\""+registration_query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
2818 //              if(interface_names[mi]=="")
2819 //                              lfta_val[lmach]+="DEFAULTDEV";
2820 //              else
2821 //                              lfta_val[lmach]+='"'+interface_names[mi]+'"';
2822                         lfta_val[lmach] += this_iface;
2823
2824
2825                 lfta_val[lmach] += ", "+generate_alloc_name(registration_query_names[mi])
2826                         +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(registration_query_names[mi])
2827                         +"\n#endif\n";
2828                                 sprintf(tmpstr,",%d",snap_lengths[mi]);
2829                         lfta_val[lmach] += tmpstr;
2830
2831 //                      unsigned long long int mask=0, bpos=1;
2832 //                      int f_pos;
2833 //                      for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
2834 //                              if(prefilter_preds[f_pos]->lfta_id.count(i))
2835 //                                      mask |= bpos;
2836 //                              bpos = bpos << 1;
2837 //                      }
2838
2839 #ifdef PREFILTER_OK
2840 //                      sprintf(tmpstr,",%lluull",mask);
2841                         sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
2842                         lfta_val[lmach]+=tmpstr;
2843 #else
2844                         lfta_val[lmach] += ",0ull";
2845 #endif
2846
2847                         lfta_val[lmach] += ");\n";
2848
2849
2850
2851 //    End of lfta prefilter stuff
2852 // --------------------------------------------------
2853
2854 //                      If there is a field verifier, warn about
2855 //                      lack of compatability
2856                   string lfta_comment;
2857                   string lfta_title;
2858                   string lfta_namespace;
2859                   map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
2860                   if(ldefs.count("comment")>0)
2861                         lfta_comment = lfta_sq->defines["comment"];
2862                   if(ldefs.count("Comment")>0)
2863                         lfta_comment = lfta_sq->defines["Comment"];
2864                   if(ldefs.count("COMMENT")>0)
2865                         lfta_comment = lfta_sq->defines["COMMENT"];
2866                   if(ldefs.count("title")>0)
2867                         lfta_title = lfta_sq->defines["title"];
2868                   if(ldefs.count("Title")>0)
2869                         lfta_title = lfta_sq->defines["Title"];
2870                   if(ldefs.count("TITLE")>0)
2871                         lfta_title = lfta_sq->defines["TITLE"];
2872                   if(ldefs.count("NAMESPACE")>0)
2873                         lfta_namespace = lfta_sq->defines["NAMESPACE"];
2874                   if(ldefs.count("Namespace")>0)
2875                         lfta_namespace = lfta_sq->defines["Namespace"];
2876                   if(ldefs.count("namespace")>0)
2877                         lfta_namespace = lfta_sq->defines["namespace"];
2878
2879                   string lfta_ht_size;
2880                   if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
2881                         lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
2882                   if(ldefs.count("aggregate_slots")>0){
2883                         lfta_ht_size = ldefs["aggregate_slots"];
2884                   }
2885
2886 //                      NOTE : I'm assuming that visible lftas do not start with _fta.
2887 //                              -- will fail for non-visible simple selection queries.
2888                 if(field_verifier != NULL && registration_query_names[mi].substr(0,1) != "_"){
2889                         string warning_str;
2890                         if(lfta_comment == "")
2891                                 warning_str += "\tcomment not found.\n";
2892 // Obsolete stuff that carsten wanted
2893 //                      if(lfta_title == "")
2894 //                              warning_str += "\ttitle not found.\n";
2895 //                      if(lfta_namespace == "")
2896 //                              warning_str += "\tnamespace not found.\n";
2897
2898                         vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2899                         int fi;
2900                         for(fi=0;fi<flds.size();fi++){
2901                                 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2902                         }
2903                         if(warning_str != "")
2904                                 fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
2905                                         registration_query_names[mi].c_str(),warning_str.c_str());
2906                 }
2907
2908
2909 //                      Create qtree output
2910                 fprintf(qtree_output,"\t<LFTA name='%s' >\n",registration_query_names[mi].c_str());
2911         if(lfta_comment != "")
2912               fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
2913         if(lfta_title != "")
2914               fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
2915         if(lfta_namespace != "")
2916               fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
2917         if(lfta_ht_size != "")
2918               fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
2919                 if(lmach != "")
2920                   fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
2921                 else
2922                   fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
2923                 fprintf(qtree_output,"\t\t<Interface  value='%s' />\n",interface_names[mi].c_str());
2924                 std::vector<tablevar_t *> itbls = lfta_sq->get_input_tables();
2925                 for(int t=0;t<itbls.size();++t){
2926                         fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",itbls[t]->get_schema_name().c_str());
2927                 }
2928 //              fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
2929                 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2930                 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
2931 //                              write info about fields to qtree.xml
2932                   vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2933                   int fi;
2934                   for(fi=0;fi<flds.size();fi++){
2935                         fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2936                         if(flds[fi]->get_modifier_list()->size()){
2937                                 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2938                         }
2939                         fprintf(qtree_output," />\n");
2940                   }
2941                 fprintf(qtree_output,"\t</LFTA>\n");
2942
2943
2944             }
2945           }
2946
2947           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2948                         string liface = (*mvsi).first;
2949                         lfta_val[lmach] += 
2950 "       if (!strcmp(device, \""+liface+"\")) \n"
2951 "               lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
2952 ;
2953                 }
2954                 lfta_val[lmach] += 
2955 "       if(lfta_prefilter == NULL){\n"
2956 "               fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
2957 "               exit(1);\n"
2958 "       }\n"
2959 ;
2960
2961
2962
2963           lfta_val[lmach] += "}\n\n";
2964
2965       if(!(debug_only || hfta_only) ){
2966                 string lfta_flnm;
2967                 if(lmach != "")
2968                   lfta_flnm = lmach + "_lfta.c";
2969                 else
2970                   lfta_flnm = hostname + "_lfta.c";
2971                 if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
2972                         fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
2973                         exit(1);
2974                 }
2975               fprintf(lfta_out,"%s",lfta_header.c_str());
2976               fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
2977               fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
2978                 fclose(lfta_out);
2979           }
2980         }
2981
2982 //              Say what are the operators which must execute
2983         if(opviews.size()>0)
2984                 fprintf(stderr,"The queries use the following external operators:\n");
2985         for(i=0;i<opviews.size();++i){
2986                 opview_entry *opv = opviews.get_entry(i);
2987                 fprintf(stderr,"\t%s\n",opv->view_name.c_str());
2988         }
2989
2990         if(create_makefile)
2991                 generate_makefile(input_file_names, nfiles, hfta_names, opviews,
2992                 machine_names, schema_file_name,
2993                 interface_names,
2994                 ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
2995
2996
2997         fprintf(qtree_output,"</QueryNodes>\n");
2998
2999         return(0);
3000 }
3001
3002 ////////////////////////////////////////////////////////////
3003
3004 void generate_makefile(vector<string> &input_file_names, int nfiles,
3005                                            vector<string> &hfta_names, opview_set &opviews,
3006                                                 vector<string> &machine_names,
3007                                                 string schema_file_name,
3008                                                 vector<string> &interface_names,
3009                                                 ifq_t *ifdb, string &config_dir_path,
3010                                                 bool use_pads,
3011                                                 string extra_libs,
3012                                                 map<string, vector<int> > &rts_hload
3013                                          ){
3014         int i,j;
3015
3016         if(config_dir_path != ""){
3017                 config_dir_path = "-C "+config_dir_path;
3018         }
3019
3020         struct stat sb;
3021         bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
3022         bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
3023
3024 //      if(libz_exists && !libast_exists){
3025 //              fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
3026 //              exit(1);
3027 //      }
3028
3029 //                      Get set of operator executable files to run
3030         set<string> op_fls;
3031         set<string>::iterator ssi;
3032         for(i=0;i<opviews.size();++i){
3033                 opview_entry *opv = opviews.get_entry(i);
3034                 if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
3035         }
3036
3037         FILE *outfl = fopen("Makefile", "w");
3038         if(outfl==NULL){
3039                 fprintf(stderr,"Can't open Makefile for write, exiting.\n");
3040                 exit(0);
3041         }
3042
3043         fputs(
3044 ("CPP= g++ -O3 -g -I "+root_path+"/include  -I "+root_path+"/include/hfta\n"
3045 "CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
3046 ).c_str(), outfl
3047 );
3048         if(generate_stats)
3049                 fprintf(outfl,"  -DLFTA_STATS");
3050
3051 //              Gather the set of interfaces
3052 //              Also, gather "base interface names" for use in computing
3053 //              the hash splitting to virtual interfaces.
3054 //              TODO : must update to hanndle machines
3055         set<string> ifaces;
3056         set<string> base_vifaces;       // base interfaces of virtual interfaces
3057         map<string, string> ifmachines;
3058         map<string, string> ifattrs;
3059         for(i=0;i<interface_names.size();++i){
3060                 ifaces.insert(interface_names[i]);
3061                 ifmachines[interface_names[i]] = machine_names[i];
3062
3063                 size_t Xpos = interface_names[i].find_last_of("X");
3064                 if(Xpos!=string::npos){
3065                         string iface = interface_names[i].substr(0,Xpos);
3066                         base_vifaces.insert(iface);
3067                 }
3068                 // get interface attributes and add them to the list
3069         }
3070
3071 //              Do we need to include protobuf libraries?
3072 //              TODO Move to the interface library: get the libraries to include 
3073 //              for an interface type
3074
3075         bool use_proto = false;
3076         bool use_bsa = false;
3077         bool use_kafka = false;
3078         int erri;
3079         string err_str;
3080         for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3081                 string ifnm = (*ssi);
3082                 vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
3083                 for(int ift_i=0;ift_i<ift.size();ift_i++){
3084                         if(ift[ift_i]=="PROTO"){
3085                                 use_proto = true;
3086                         }
3087                 }
3088                 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "BSA", erri, err_str);
3089                 for(int ift_i=0;ift_i<ift.size();ift_i++){
3090                         if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){       
3091                                 use_bsa = true;
3092                         }
3093                 }
3094                 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "KAFKA", erri, err_str);
3095                 for(int ift_i=0;ift_i<ift.size();ift_i++){
3096                         if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){       
3097                                 use_kafka = true;
3098                         }
3099                 }
3100         }
3101
3102         fprintf(outfl,
3103 "\n"
3104 "\n"
3105 "all: rts");
3106         for(i=0;i<hfta_names.size();++i)
3107                 fprintf(outfl," %s",hfta_names[i].c_str());
3108         fputs(
3109 ("\n"
3110 "\n"
3111 "rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a  "+root_path+"/lib/libclearinghouse.a\n"
3112 "\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
3113         if(use_pads)
3114                 fprintf(outfl,"-L. ");
3115         fputs(
3116 ("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
3117         if(use_pads)
3118                 fprintf(outfl,"-lgscppads -lpads ");
3119         fprintf(outfl,
3120 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux  -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
3121         if(use_pads)
3122                 fprintf(outfl, " -lpz -lz -lbz ");
3123         if(libz_exists && libast_exists)
3124                 fprintf(outfl," -last ");
3125         if(use_pads)
3126                 fprintf(outfl, " -ldll -ldl ");
3127         if(use_proto)
3128                 fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
3129         if(use_bsa)
3130                 fprintf(outfl, " -lbsa_stream ");
3131         if(use_kafka)
3132                 fprintf(outfl, " -lrdkafka ");
3133         fprintf(outfl," -lgscpaux");
3134 #ifdef GCOV
3135         fprintf(outfl," -fprofile-arcs");
3136 #endif
3137         fprintf(outfl,
3138 "\n"
3139 "\n"
3140 "lfta.o: %s_lfta.c\n"
3141 "\t$(CC) -o lfta.o -c %s_lfta.c\n"
3142 "\n"
3143 "%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
3144         for(i=0;i<nfiles;++i)
3145                 fprintf(outfl," %s",input_file_names[i].c_str());
3146         if(hostname == ""){
3147                 fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3148         }else{
3149                 fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3150         }
3151         for(i=0;i<nfiles;++i)
3152                 fprintf(outfl," %s",input_file_names[i].c_str());
3153         fprintf(outfl,"\n");
3154
3155         for(i=0;i<hfta_names.size();++i)
3156                 fprintf(outfl,
3157 ("%s: %s.o\n"
3158 "\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
3159 "\n"
3160 "%s.o: %s.cc\n"
3161 "\t$(CPP) -o %s.o -c %s.cc\n"
3162 "\n"
3163 "\n").c_str(),
3164     hfta_names[i].c_str(), hfta_names[i].c_str(),
3165         hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
3166         hfta_names[i].c_str(), hfta_names[i].c_str(),
3167         hfta_names[i].c_str(), hfta_names[i].c_str()
3168                 );
3169
3170         fprintf(outfl,
3171 ("\n"
3172 "packet_schema.txt:\n"
3173 "\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
3174 "\n"
3175 "external_fcns.def:\n"
3176 "\tln -s "+root_path+"/cfg/external_fcns.def .\n"
3177 "\n"
3178 "clean:\n"
3179 "\trm -rf core rts *.o  %s_lfta.c  external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
3180         for(i=0;i<hfta_names.size();++i)
3181                 fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
3182         fprintf(outfl,"\n");
3183
3184         fclose(outfl);
3185
3186
3187
3188 //              Gather the set of interfaces
3189 //              TODO : must update to hanndle machines
3190 //              TODO : lookup interface attributes and add them as a parameter to rts process
3191         outfl = fopen("runit", "w");
3192         if(outfl==NULL){
3193                 fprintf(stderr,"Can't open runit for write, exiting.\n");
3194                 exit(0);
3195         }
3196
3197
3198         fputs(
3199 ("#!/bin/sh\n"
3200 "./stopit\n"
3201 +root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
3202 "sleep 5\n"
3203 "if [ ! -f gshub.log ]\n"
3204 "then\n"
3205 "\techo \"Failed to start bin/gshub.py\"\n"
3206 "\texit -1\n"
3207 "fi\n"
3208 "ADDR=`cat gshub.log`\n"
3209 "ps opgid= $! >> gs.pids\n"
3210 "./rts $ADDR default ").c_str(), outfl);
3211 //      int erri;
3212 //      string err_str;
3213         for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3214                 string ifnm = (*ssi);
3215                 // suppress internal _local_ interface
3216                 if (ifnm == "_local_")
3217                         continue;
3218                 fprintf(outfl, "%s ",ifnm.c_str());
3219                 vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
3220                 for(j=0;j<ifv.size();++j)
3221                         fprintf(outfl, "%s ",ifv[j].c_str());
3222         }
3223         fprintf(outfl, " &\n");
3224         fprintf(outfl, "echo $! >> gs.pids\n");
3225         for(i=0;i<hfta_names.size();++i)
3226                 fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
3227
3228         for(j=0;j<opviews.opview_list.size();++j){
3229                 fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
3230         }
3231
3232         fclose(outfl);
3233         system("chmod +x runit");
3234
3235         outfl = fopen("stopit", "w");
3236         if(outfl==NULL){
3237                 fprintf(stderr,"Can't open stopit for write, exiting.\n");
3238                 exit(0);
3239         }
3240
3241         fprintf(outfl,"#!/bin/sh\n"
3242 "rm -f gshub.log\n"
3243 "if [ ! -f gs.pids ]\n"
3244 "then\n"
3245 "exit\n"
3246 "fi\n"
3247 "for pgid in `cat gs.pids`\n"
3248 "do\n"
3249 "kill -TERM -$pgid\n"
3250 "done\n"
3251 "sleep 1\n"
3252 "for pgid in `cat gs.pids`\n"
3253 "do\n"
3254 "kill -9 -$pgid\n"
3255 "done\n"
3256 "rm gs.pids\n");
3257
3258         fclose(outfl);
3259         system("chmod +x stopit");
3260
3261 //-----------------------------------------------
3262
3263 /* For now disable support for virtual interfaces
3264         outfl = fopen("set_vinterface_hash.bat", "w");
3265         if(outfl==NULL){
3266                 fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
3267                 exit(0);
3268         }
3269
3270 //              The format should be determined by an entry in the ifres.xml file,
3271 //              but for now hardcode the only example I have.
3272         for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
3273                 if(rts_hload.count((*ssi))){
3274                         string iface_name = (*ssi);
3275                         string iface_number = "";
3276                         for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
3277                                 if(isdigit(iface_name[j])){
3278                                         iface_number = iface_name[j];
3279                                         if(j>0 && isdigit(iface_name[j-1]))
3280                                                 iface_number = iface_name[j-1] + iface_number;
3281                                 }
3282                         }
3283
3284                         fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
3285                         vector<int> halloc = rts_hload[iface_name];
3286                         int prev_limit = 0;
3287                         for(j=0;j<halloc.size();++j){
3288                                 if(j>0)
3289                                         fprintf(outfl,":");
3290                                 fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
3291                                 prev_limit = halloc[j];
3292                         }
3293                         fprintf(outfl,"\n");
3294                 }
3295         }
3296         fclose(outfl);
3297         system("chmod +x set_vinterface_hash.bat");
3298 */
3299 }
3300
3301 //              Code for implementing a local schema
3302 /*
3303                 table_list qpSchema;
3304
3305 //                              Load the schemas of any LFTAs.
3306                 int l;
3307                 for(l=0;l<hfta_nbr;++l){
3308                         stream_query *sq0 = split_queries[l];
3309                         table_def *td = sq0->get_output_tabledef();
3310                         qpSchema.append_table(td);
3311                 }
3312 //                              load the schemas of any other ref'd tables.
3313 //                              (e.g., hftas)
3314                 vector<tablevar_t *>  input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
3315                 int ti;
3316                 for(ti=0;ti<input_tbl_names.size();++ti){
3317                         int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
3318                         if(tbl_ref < 0){
3319                                 tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
3320                                 if(tbl_ref < 0){
3321                                         fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
3322                                         exit(1);
3323                                 }
3324                                 qpSchema.append_table(Schema->get_table(tbl_ref));
3325                         }
3326                 }
3327 */
3328
3329 //              Functions related to parsing.
3330
3331 /*
3332 static int split_string(char *instr,char sep, char **words,int max_words){
3333    char *loc;
3334    char *str;
3335    int nwords = 0;
3336
3337    str = instr;
3338    words[nwords++] = str;
3339    while( (loc = strchr(str,sep)) != NULL){
3340         *loc = '\0';
3341         str = loc+1;
3342         if(nwords >= max_words){
3343                 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
3344                 nwords = max_words-1;
3345         }
3346         words[nwords++] = str;
3347    }
3348
3349    return(nwords);
3350 }
3351
3352 */
3353