4a1263fd08a0863cffee8bf188eb8621fd4d71ac
[com/gs-lite.git] / src / ftacmp / translate_fta.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #include<unistd.h>              // for gethostname
17
18 #include <string>
19 #include "parse_fta.h"
20 #include "parse_schema.h"
21 #include "parse_ext_fcns.h"
22 #include"analyze_fta.h"
23 #include"query_plan.h"
24 #include"generate_lfta_code.h"
25 #include"stream_query.h"
26 #include"generate_utils.h"
27 #include"nic_def.h"
28 #include"generate_nic_code.h"
29
30 #include <stdlib.h>
31 #include <stdio.h>
32 #include<ctype.h>
33 #include<glob.h>
34 #include<string.h>
35
36 #include<list>
37
38 //              for the scandir
39      #include <sys/types.h>
40      #include <dirent.h>
41
42
43 #include<errno.h>
44
45 //              to verify that some files exist.
46      #include <sys/types.h>
47      #include <sys/stat.h>
48
49 #include "parse_partn.h"
50
51 #include "print_plan.h"
52
53 //              Interface to the xml parser
54
55 #include"xml_t.h"
56 #include"field_list.h"
57
58 extern int xmlParserparse(void);
59 extern FILE *xmlParserin;
60 extern int xmlParserdebug;
61
62 std::vector<std::string> xml_attr_vec;
63 std::vector<std::string> xml_val_vec;
64 std::string xml_a, xml_v;
65 xml_t *xml_leaves = NULL;
66
67 //      Interface to the field list verifier
68 field_list *field_verifier = NULL;
69
70 #define TMPSTRLEN 1000
71
72 #ifndef PATH_DELIM
73   #define PATH_DELIM '/'
74 #endif
75
76 char tmp_schema_str[10000];
77
78 // maximum delay between two hearbeats produced
79 // by UDOP. Used when its not explicity
80 // provided in udop definition
81 #define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
82
83 //              Default lfta hash table size, must be power of 2.
84 int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
85
86 //              Interface to FTA definition lexer and parser ...
87
88 extern int FtaParserparse(void);
89 extern FILE *FtaParserin;
90 extern int FtaParserdebug;
91
92 fta_parse_t *fta_parse_result;
93 var_defs_t *fta_parse_defines;
94
95
96
97 //              Interface to external function lexer and parser ...
98
99 extern int Ext_fcnsParserparse(void);
100 extern FILE *Ext_fcnsParserin;
101 extern int Ext_fcnsParserdebug;
102
103 ext_fcn_list *Ext_fcns;
104
105
106 //              Interface to partition definition parser
107 extern int PartnParserparse();
108 partn_def_list_t *partn_parse_result = NULL;
109
110
111
112 using namespace std;
113 //extern int errno;
114
115
116 //              forward delcaration of local utility function
117 void generate_makefile(vector<string> &input_file_names, int nfiles,
118                                            vector<string> &hfta_names, opview_set &opviews,
119                                                 vector<string> &machine_names,
120                                                 string schema_file_name,
121                                                 vector<string> &interface_names,
122                                                 ifq_t *ifdb, string &config_dir_path,
123                                                 bool use_pads,
124                                                 string extra_libs,
125                                                 map<string, vector<int> > &rts_hload
126                                         );
127
128 //static int split_string(char *instr,char sep, char **words,int max_words);
129 #define MAXFLDS 100
130
131   FILE *schema_summary_output = NULL;           // query names
132
133 //                      Dump schema summary
134 void dump_summary(stream_query *str){
135         fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
136
137         table_def *sch = str->get_output_tabledef();
138
139         vector<field_entry *> flds = sch->get_fields();
140         int f;
141         for(f=0;f<flds.size();++f){
142                 if(f>0) fprintf(schema_summary_output,"|");
143                 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
144         }
145         fprintf(schema_summary_output,"\n");
146         for(f=0;f<flds.size();++f){
147                 if(f>0) fprintf(schema_summary_output,"|");
148                 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
149         }
150         fprintf(schema_summary_output,"\n");
151 }
152
153 //              Globals
154 string hostname;                // name of current host.
155 int hostname_len;
156 bool generate_stats = false;
157 string root_path = "../..";
158
159
160 int main(int argc, char **argv){
161   char tmpstr[TMPSTRLEN];
162   string err_str;
163   int q,s,h,f;
164
165   set<int>::iterator si;
166
167   vector<string> query_names;                   // for lfta.c registration
168   map<string, vector<int> > mach_query_names;   // list queries of machine
169   vector<int> snap_lengths;                             // for lfta.c registration
170   vector<string> interface_names;                       // for lfta.c registration
171   vector<string> machine_names;                 // machine of interface
172   vector<bool> lfta_reuse_options;                      // for lfta.c registration
173   vector<int> lfta_liveness_timeouts;           // fot qtree.xml generation
174   vector<string> hfta_names;                    // hfta cource code names, for
175                                                                                 // creating make file.
176   vector<string> qnames;                                // ensure unique names
177   map<string, int> lfta_names;                  // keep track of unique lftas.
178
179
180 //                              set these to 1 to debug the parser
181   FtaParserdebug = 0;
182   Ext_fcnsParserdebug = 0;
183
184   FILE *lfta_out;                               // lfta.c output.
185   FILE *fta_in;                                 // input file
186   FILE *table_schemas_in;               // source tables definition file
187   FILE *query_name_output;              // query names
188   FILE *qtree_output;                   // interconnections of query nodes
189
190   // -------------------------------
191   // Handling of Input Arguments
192   // -------------------------------
193     char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
194         const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
195                 "\t[-B] : debug only (don't create output files)\n"
196                 "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
197                 "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
198                 "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
199                 "\t[-C] : use <config directory> for definition files\n"
200                 "\t[-l] : use <library directory> for library queries\n"
201                 "\t[-N] : output query names in query_names.txt\n"
202                 "\t[-H] : create HFTA only (no schema_file)\n"
203                 "\t[-Q] : use query name for hfta suffix\n"
204                 "\t[-M] : generate make file and runit, stopit scripts\n"
205                 "\t[-S] : enable LFTA statistics (alters Makefile).\n"
206                 "\t[-f] : Output schema summary to schema_summary.txt\n"
207                 "\t[-P] : link with PADS\n"
208                 "\t[-h] : override host name.\n"
209                 "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
210                 "\t[-R] : path to root of GS-lite\n"
211 ;
212
213 //              parameters gathered from command line processing
214         string external_fcns_path;
215 //      string internal_fcn_path;
216         string config_dir_path;
217         string library_path = "./";
218         vector<string> input_file_names;
219         string schema_file_name;
220         bool debug_only = false;
221         bool hfta_only = false;
222         bool output_query_names = false;
223         bool output_schema_summary=false;
224         bool numeric_hfta_flname = true;
225         bool create_makefile = false;
226         bool distributed_mode = false;
227         bool partitioned_mode = false;
228         bool use_live_hosts_file = false;
229         bool use_pads = false;
230         bool clean_make = false;
231         int n_virtual_interfaces = 1;
232
233    char chopt;
234    while((chopt = getopt(argc,argv,optstr)) != -1){
235                 switch(chopt){
236                 case 'B':
237                         debug_only = true;
238                         break;
239                 case 'D':
240                         distributed_mode = true;
241                         break;
242                 case 'p':
243                         partitioned_mode = true;
244                         break;
245                 case 'L':
246                         use_live_hosts_file = true;
247                         break;
248                 case 'C':
249                                 if(optarg != NULL)
250                                  config_dir_path = string(optarg) + string("/");
251                         break;
252                 case 'l':
253                                 if(optarg != NULL)
254                                  library_path = string(optarg) + string("/");
255                         break;
256                 case 'N':
257                         output_query_names = true;
258                         break;
259                 case 'Q':
260                         numeric_hfta_flname = false;
261                         break;
262                 case 'H':
263                         if(schema_file_name == ""){
264                                 hfta_only = true;
265                         }
266                         break;
267                 case 'f':
268                         output_schema_summary=true;
269                         break;
270                 case 'M':
271                         create_makefile=true;
272                         break;
273                 case 'S':
274                         generate_stats=true;
275                         break;
276                 case 'P':
277                         use_pads = true;
278                         break;
279                 case 'c':
280                         clean_make = true;
281                         break;
282                 case 'h':
283                         if(optarg != NULL)
284                                 hostname = optarg;
285                         break;
286                 case 'R':
287                         if(optarg != NULL)
288                                 root_path = optarg;
289                         break;
290                 case 'n':
291                         if(optarg != NULL){
292                                 n_virtual_interfaces = atoi(optarg);
293                                 if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
294                                         fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
295                                         n_virtual_interfaces = 1;
296                                 }
297                         }
298                         break;
299                 case '?':
300                         fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
301                         fprintf(stderr,"%s\n", usage_str);
302                         exit(1);
303                 default:
304                         fprintf(stderr, "Argument was %c\n", optopt);
305                         fprintf(stderr,"Invalid arguments\n");
306                         fprintf(stderr,"%s\n", usage_str);
307                         exit(1);
308                 }
309         }
310         argc -= optind;
311         argv += optind;
312         for (int i = 0; i < argc; ++i) {
313                 if((schema_file_name == "") && !hfta_only){
314                         schema_file_name = argv[i];
315                 }else{
316                         input_file_names.push_back(argv[i]);
317                 }
318         }
319
320         if(input_file_names.size() == 0){
321                 fprintf(stderr,"%s\n", usage_str);
322                 exit(1);
323         }
324
325         if(clean_make){
326                 string clean_cmd = "rm Makefile hfta_*.cc";
327                 int clean_ret = system(clean_cmd.c_str());
328                 if(clean_ret){
329                         fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
330                 }
331         }
332
333
334         nic_prop_db *npdb = new nic_prop_db(config_dir_path);
335
336 //                      Open globally used file names.
337
338         // prepend config directory to schema file
339         schema_file_name = config_dir_path + schema_file_name;
340         external_fcns_path = config_dir_path + string("external_fcns.def");
341     string ifx_fname = config_dir_path + string("ifres.xml");
342
343 //              Find interface query file(s).
344         if(hostname == ""){
345                 gethostname(tmpstr,TMPSTRLEN);
346                 hostname = tmpstr;
347         }
348         hostname_len = strlen(tmpstr);
349     string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
350         vector<string> ifq_fls;
351
352                 ifq_fls.push_back(ifq_fname);
353
354
355 //                      Get the field list, if it exists
356         string flist_fl = config_dir_path + "field_list.xml";
357         FILE *flf_in = NULL;
358         if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
359                 fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
360                 xml_leaves = new xml_t();
361                 xmlParser_setfileinput(flf_in);
362                 if(xmlParserparse()){
363                         fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
364                 }else{
365                         field_verifier = new field_list(xml_leaves);
366                 }
367         }
368
369         if(!hfta_only){
370           if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
371                 fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
372                 exit(1);
373           }
374         }
375
376 /*
377         if(!(debug_only || hfta_only)){
378           if((lfta_out = fopen("lfta.c","w")) == NULL){
379                 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
380                 exit(1);
381           }
382         }
383 */
384
385 //              Get the output specification file.
386 //              format is
387 //                      query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
388         string ospec_fl = "output_spec.cfg";
389         FILE *osp_in = NULL;
390         vector<ospec_str *> output_specs;
391         multimap<string, int> qname_to_ospec;
392         if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
393                 char *flds[MAXFLDS];
394                 int o_lineno = 0;
395                 while(fgets(tmpstr,TMPSTRLEN,osp_in)){
396                         o_lineno++;
397                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
398                         if(nflds == 7){
399 //              make operator type lowercase
400                                 char *tmpc;
401                                 for(tmpc=flds[1];*tmpc!='\0';++tmpc)
402                                         *tmpc = tolower(*tmpc);
403
404                                 ospec_str *tmp_ospec = new ospec_str();
405                                 tmp_ospec->query = flds[0];
406                                 tmp_ospec->operator_type = flds[1];
407                                 tmp_ospec->operator_param = flds[2];
408                                 tmp_ospec->output_directory = flds[3];
409                                 tmp_ospec->bucketwidth = atoi(flds[4]);
410                                 tmp_ospec->partitioning_flds = flds[5];
411                                 tmp_ospec->n_partitions = atoi(flds[6]);
412                                 qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
413                                 output_specs.push_back(tmp_ospec);
414                         }else{
415                                 fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
416                         }
417                 }
418                 fclose(osp_in);
419         }else{
420                 fprintf(stderr,"output_spec.cfg not found.  The query set has no output.  exiting.\n");
421                 exit(1);
422         }
423
424 //              hfta parallelism
425         string pspec_fl = "hfta_parallelism.cfg";
426         FILE *psp_in = NULL;
427         map<string, int> hfta_parallelism;
428         if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
429                 char *flds[MAXFLDS];
430                 int o_lineno = 0;
431                 while(fgets(tmpstr,TMPSTRLEN,psp_in)){
432                         bool good_entry = true;
433                         o_lineno++;
434                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
435                         if(nflds == 2){
436                                 string hname = flds[0];
437                                 int par = atoi(flds[1]);
438                                 if(par <= 0 || par > n_virtual_interfaces){
439                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
440                                         good_entry = false;
441                                 }
442                                 if(good_entry && n_virtual_interfaces % par != 0){
443                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
444                                         good_entry = false;
445                                 }
446                                 if(good_entry)
447                                         hfta_parallelism[hname] = par;
448                         }
449                 }
450         }else{
451                 fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
452         }
453
454
455 //              LFTA hash table sizes
456         string htspec_fl = "lfta_htsize.cfg";
457         FILE *htsp_in = NULL;
458         map<string, int> lfta_htsize;
459         if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
460                 char *flds[MAXFLDS];
461                 int o_lineno = 0;
462                 while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
463                         bool good_entry = true;
464                         o_lineno++;
465                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
466                         if(nflds == 2){
467                                 string lfta_name = flds[0];
468                                 int htsz = atoi(flds[1]);
469                                 if(htsz>0){
470                                         lfta_htsize[lfta_name] = htsz;
471                                 }else{
472                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
473                                 }
474                         }
475                 }
476         }else{
477                 fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
478         }
479
480 //              LFTA vitual interface hash split
481         string rtlspec_fl = "rts_load.cfg";
482         FILE *rtl_in = NULL;
483         map<string, vector<int> > rts_hload;
484         if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
485                 char *flds[MAXFLDS];
486                 int r_lineno = 0;
487                 string iface_name;
488                 vector<int> hload;
489                 while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
490                         bool good_entry = true;
491                         r_lineno++;
492                         iface_name = "";
493                         hload.clear();
494                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
495                         if(nflds >1){
496                                 iface_name = flds[0];
497                                 int cumm_h = 0;
498                                 int j;
499                                 for(j=1;j<nflds;++j){
500                                         int h = atoi(flds[j]);
501                                         if(h<=0)
502                                                 good_entry = false;
503                                         cumm_h += h;
504                                         hload.push_back(cumm_h);
505                                 }
506                         }else{
507                                 good_entry = false;
508                         }
509                         if(good_entry){
510                                 rts_hload[iface_name] = hload;
511                         }else{
512                                 fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
513                         }
514                 }
515         }
516
517
518
519         if(output_query_names){
520           if((query_name_output = fopen("query_names.txt","w")) == NULL){
521                 fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
522                 exit(1);
523           }
524         }
525
526         if(output_schema_summary){
527           if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
528                 fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
529                 exit(1);
530           }
531         }
532
533         if((qtree_output = fopen("qtree.xml","w")) == NULL){
534                 fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
535                 exit(1);
536         }
537         fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
538         fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
539         fprintf(qtree_output,"<QueryNodes>\n");
540
541
542 //                      Get an initial Schema
543         table_list *Schema;
544         if(!hfta_only){
545 //                      Parse the table schema definitions.
546           fta_parse_result = new fta_parse_t();
547           FtaParser_setfileinput(table_schemas_in);
548           if(FtaParserparse()){
549                 fprintf(stderr,"Table schema parse failed.\n");
550                 exit(1);
551           }
552           if(fta_parse_result->parse_type != TABLE_PARSE){
553                 fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
554                 exit(1);
555           }
556           Schema = fta_parse_result->tables;
557
558 //                      Process schema field inheritance
559           int retval;
560           retval = Schema->unroll_tables(err_str);
561           if(retval){
562                 fprintf(stderr,"Error processing schema filed inheritance:\n %s\n", err_str.c_str() );
563                 exit(1);
564           }
565         }else{
566 //                      hfta only => we will try to fetch schemas from the registry.
567 //                      therefore, start off with an empty schema.
568           Schema = new table_list();
569         }
570
571
572 //                      Open and parse the external functions file.
573         Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
574         if(Ext_fcnsParserin == NULL){
575                 fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
576                 Ext_fcns = new ext_fcn_list();
577         }else{
578                 if(Ext_fcnsParserparse()){
579                         fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
580                         Ext_fcns = new ext_fcn_list();
581                 }
582         }
583         if(Ext_fcns->validate_fcns(err_str)){
584                 fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
585                 exit(1);
586         }
587
588 //              Open and parse the interface resources file.
589 //      ifq_t *ifaces_db = new ifq_t();
590 //   string ierr;
591 //      if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
592 //              fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
593 //                              ifx_fname.c_str(), ierr.c_str());
594 //              exit(1);
595 //      }
596 //      if(ifaces_db->load_ifqs(ifq_fname, ierr)){
597 //              fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
598 //                              ifq_fname.c_str(), ierr.c_str());
599 //              exit(1);
600 //      }
601
602
603 //                      The LFTA code string.
604 //                      Put the standard preamble here.
605 //                      NOTE: the hash macros, fcns should go into the run time
606   map<string, string> lfta_val;
607   map<string, string> lfta_prefilter_val;
608
609   string lfta_header =
610 "#include <limits.h>\n\n"
611 "#include \"rts.h\"\n"
612 "#include \"fta.h\"\n"
613 "#include \"lapp.h\"\n"
614 "#include \"rts_udaf.h\"\n\n"
615 ;
616 // Get any locally defined parsing headers
617     glob_t glob_result;
618     memset(&glob_result, 0, sizeof(glob_result));
619
620     // do the glob operation
621     int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
622         if(return_value == 0){
623         for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
624                         char *flds[1000];
625                         int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
626                         lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n\n";
627         }
628         }else{
629                 fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
630         }
631
632 /*
633 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
634 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
635 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
636 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
637 */
638
639         lfta_header += 
640 "\n"
641 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
642 "\n"
643 "#define SLOT_FILLED 0x04\n"
644 "#define SLOT_GEN_BITS 0x03\n"
645 "#define SLOT_HASH_BITS 0xfffffff8\n"
646 "#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
647 "#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
648 "#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
649 "\n\n"
650
651 "#define lfta_BOOL_to_hash(x) (x)\n"
652 "#define lfta_USHORT_to_hash(x) (x)\n"
653 "#define lfta_UINT_to_hash(x) (x)\n"
654 "#define lfta_IP_to_hash(x) (x)\n"
655 "#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
656 "#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
657 "#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
658 "#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
659 "#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
660 "static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
661 "       gs_uint32_t i,ret=0,tmp_sum = 0;\n"
662 "       for(i=0;i<x.length;++i){\n"
663 "               tmp_sum |= (x.data[i]) << (8*(i%4));\n"
664 "               if((i%4) == 3){\n"
665 "                       ret ^= tmp_sum;\n"
666 "                       tmp_sum = 0;\n"
667 "               }\n"
668 "       }\n"
669 "       if((i%4)!=0) ret ^=tmp_sum;\n"
670 "       return(ret);\n"
671 "}\n\n\n";
672
673
674
675 //////////////////////////////////////////////////////////////////
676 /////                   Get all of the query parse trees
677
678
679   int i,p;
680   int hfta_count = 0;           // for numeric suffixes to hfta .cc files
681
682 //---------------------------
683 //              Global info needed for post processing.
684
685 //                      Set of operator views ref'd in the query set.
686         opview_set opviews;
687 //                      lftas on a per-machine basis.
688         map<string, vector<stream_query *> > lfta_mach_lists;
689         int nfiles = input_file_names.size();
690         vector<stream_query *> hfta_list;               // list of hftas.
691         map<string, stream_query *> sq_map;             // map from query name to stream query.
692
693
694 //////////////////////////////////////////
695
696 //              Open and parse the interface resources file.
697         ifq_t *ifaces_db = new ifq_t();
698     string ierr;
699         if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
700                 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
701                                 ifx_fname.c_str(), ierr.c_str());
702                 exit(1);
703         }
704         if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
705                 fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
706                                 ifq_fls[0].c_str(), ierr.c_str());
707                 exit(1);
708         }
709
710   map<string, string> qname_to_flname;  // for detecting duplicate query names
711
712
713
714 //                      Parse the files to create a vector of parse trees.
715 //                      Load qnodes with information to perform a topo sort
716 //                      based on query dependencies.
717   vector<query_node *> qnodes;                          // for topo sort.
718   map<string,int> name_node_map;                        // map query name to qnodes entry
719   for(i=0;i<input_file_names.size();i++){
720
721           if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
722                   fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
723                   continue;
724           }
725 fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
726
727 //                      Parse the FTA query
728           fta_parse_result = new fta_parse_t();
729           FtaParser_setfileinput(fta_in);
730           if(FtaParserparse()){
731                 fprintf(stderr,"FTA parse failed.\n");
732                 exit(1);
733           }
734           if(fta_parse_result->parse_type != QUERY_PARSE){
735                 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
736                 exit(1);
737           }
738
739 //                      returns a list of parse trees
740           vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
741           for(p=0;p<qlist.size();++p){
742             table_exp_t *fta_parse_tree = qlist[p];
743 //              query_parse_trees.push_back(fta_parse_tree);
744
745 //                      compute the default name -- extract from query name
746                 strcpy(tmpstr,input_file_names[i].c_str());
747                 char *qname = strrchr(tmpstr,PATH_DELIM);
748                 if(qname == NULL)
749                         qname = tmpstr;
750                 else
751                         qname++;
752                 char *qname_end = strchr(qname,'.');
753                 if(qname_end != NULL) *qname_end = '\0';
754                 string qname_str = qname;
755                 string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
756
757 //                      Deternmine visibility.  Should I be attaching all of the output methods?
758                 if(qname_to_ospec.count(imputed_qname)>0)
759                         fta_parse_tree->set_visible(true);
760                 else
761                         fta_parse_tree->set_visible(false);
762
763
764 //                              Create a manipulable repesentation of the parse tree.
765 //                              the qnode inherits the visibility assigned to the parse tree.
766             int pos = qnodes.size();
767                 qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
768                 name_node_map[ qnodes[pos]->name ] = pos;
769 //printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
770 //              qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
771 //              qfiles.push_back(i);
772
773 //                      Check for duplicate query names
774 //                                      NOTE : in hfta-only generation, I should
775 //                                      also check with the names of the registered queries.
776                 if(qname_to_flname.count(qnodes[pos]->name) > 0){
777                         fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
778                                 qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
779                         exit(1);
780                 }
781                 if(Schema->find_tbl(qnodes[pos]->name) >= 0){
782                         fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
783                                 qnodes[pos]->name.c_str(), input_file_names[i].c_str());
784                         exit(1);
785                 }
786                 qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
787
788
789         }
790   }
791
792 //              Add the library queries
793
794   int pos;
795   for(pos=0;pos<qnodes.size();++pos){
796         int fi;
797         for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
798                 string src_tbl = qnodes[pos]->refd_tbls[fi];
799                 if(qname_to_flname.count(src_tbl) == 0){
800                         int last_sep = src_tbl.find_last_of('/');
801                         if(last_sep != string::npos){
802 fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
803                                 string target_qname = src_tbl.substr(last_sep+1);
804                                 string qpathname = library_path + src_tbl + ".gsql";
805                                 if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
806                                         fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
807                                         exit(1);
808                                         fprintf(stderr,"After exit\n");
809                                 }
810 fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
811 //                      Parse the FTA query
812                                 fta_parse_result = new fta_parse_t();
813                                 FtaParser_setfileinput(fta_in);
814                                 if(FtaParserparse()){
815                                         fprintf(stderr,"FTA parse failed.\n");
816                                         exit(1);
817                                 }
818                                 if(fta_parse_result->parse_type != QUERY_PARSE){
819                                         fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
820                                         exit(1);
821                                 }
822
823                                 map<string, int> local_query_map;
824                                 vector<string> local_query_names;
825                                 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
826                                 for(p=0;p<qlist.size();++p){
827                                 table_exp_t *fta_parse_tree = qlist[p];
828                                         fta_parse_tree->set_visible(false);             // assumed to not produce output
829                                         string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
830                                         if(imputed_qname == target_qname)
831                                                 imputed_qname = src_tbl;
832                                         if(local_query_map.count(imputed_qname)>0){
833                                                 fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
834                                                 exit(1);
835                                         }
836                                         local_query_map[ imputed_qname ] = p;
837                                         local_query_names.push_back(imputed_qname);
838                                 }
839
840                                 if(local_query_map.count(src_tbl)==0){
841                                         fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
842                                         exit(1);
843                                 }
844
845                                 vector<int> worklist;
846                                 set<int> added_queries;
847                                 vector<query_node *> new_qnodes;
848                                 worklist.push_back(local_query_map[target_qname]);
849                                 added_queries.insert(local_query_map[target_qname]);
850                                 int qq;
851                                 int qpos = qnodes.size();
852                                 for(qq=0;qq<worklist.size();++qq){
853                                         int q_id = worklist[qq];
854                                         query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
855                                         new_qnodes.push_back( new_qnode);
856                                         vector<string> refd_tbls =  new_qnode->refd_tbls;
857                                         int ff;
858                                         for(ff = 0;ff<refd_tbls.size();++ff){
859                                                 if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
860
861                                                         if(name_node_map.count(refd_tbls[ff])>0){
862                                                                 fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s,  and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
863                                                                 exit(1);
864                                                         }else{
865                                                                 worklist.push_back(local_query_map[refd_tbls[ff]]);
866                                                         }
867                                                 }
868                                         }
869                                 }
870
871                                 for(qq=0;qq<new_qnodes.size();++qq){
872                                         int qpos = qnodes.size();
873                                         qnodes.push_back(new_qnodes[qq]);
874                                         name_node_map[qnodes[qpos]->name ] = qpos;
875                                         qname_to_flname[qnodes[qpos]->name ] = qpathname;
876                                 }
877                         }
878                 }
879         }
880   }
881
882
883
884
885
886
887
888
889 //---------------------------------------
890
891
892 //              Add the UDOPS.
893
894   string udop_missing_sources;
895   for(i=0;i<qnodes.size();++i){
896         int fi;
897         for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
898                 int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
899                 if(sid >= 0){
900                         if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
901                                 if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
902                                 int pos = qnodes.size();
903                                         qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
904                                         name_node_map[ qnodes[pos]->name ] = pos;
905                                         qnodes[pos]->is_externally_visible = false;   // its visible
906         //                                      Need to mark the source queries as visible.
907                                         int si;
908                                         string missing_sources = "";
909                                         for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
910                                                 string src_tbl = qnodes[pos]->refd_tbls[si];
911                                                 if(name_node_map.count(src_tbl)==0){
912                                                         missing_sources += src_tbl + " ";
913                                                 }
914                                         }
915                                         if(missing_sources != ""){
916                                                 udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
917                                         }
918                                 }
919                         }
920                 }
921         }
922   }
923   if(udop_missing_sources != ""){
924         fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
925         exit(1);
926   }
927
928
929
930 ////////////////////////////////////////////////////////////////////
931 ///                             Check parse trees to verify that some
932 ///                             global properties are met :
933 ///                             if q1 reads from q2, then
934 ///                               q2 is processed before q1
935 ///                               q1 can supply q2's parameters
936 ///                             Verify there is no cycle in the reads-from graph.
937
938 //                      Compute an order in which to process the
939 //                      queries.
940
941 //                      Start by building the reads-from lists.
942 //
943
944   for(i=0;i<qnodes.size();++i){
945         int qi, fi;
946         vector<string> refd_tbls =  qnodes[i]->refd_tbls;
947         for(fi = 0;fi<refd_tbls.size();++fi){
948                 if(name_node_map.count(refd_tbls[fi])>0){
949 //printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
950                         (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
951                 }
952         }
953   }
954
955
956 //              If one query reads the result of another,
957 //              check for parameter compatibility.  Currently it must
958 //              be an exact match.  I will move to requiring
959 //              containment after re-ordering, but will require
960 //              some analysis for code generation which is not
961 //              yet in place.
962 //printf("There are %d query nodes.\n",qnodes.size());
963
964
965   for(i=0;i<qnodes.size();++i){
966         vector<var_pair_t *> target_params  = qnodes[i]->params;
967         for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
968                 vector<var_pair_t *> source_params  = qnodes[(*si)]->params;
969                 if(target_params.size() != source_params.size()){
970                         fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
971                         exit(1);
972                 }
973                 int p;
974                 for(p=0;p<target_params.size();++p){
975                         if(! (target_params[p]->name == source_params[p]->name &&
976                               target_params[p]->val == source_params[p]->val ) ){
977                         fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
978                                 exit(1);
979                         }
980                 }
981         }
982   }
983
984
985 //              Find the roots.
986 //              Start by counting inedges.
987   for(i=0;i<qnodes.size();++i){
988         for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
989                 qnodes[(*si)]->n_consumers++;
990         }
991   }
992
993 //              The roots are the nodes with indegree zero.
994   set<int> roots;
995   for(i=0;i<qnodes.size();++i){
996         if(qnodes[i]->n_consumers == 0){
997                 if(qnodes[i]->is_externally_visible == false){
998                         fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible.  Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
999                 }
1000                 roots.insert(i);
1001         }
1002   }
1003
1004 //              Remove the parts of the subtree that produce no output.
1005   set<int> valid_roots;
1006   set<int> discarded_nodes;
1007   set<int> candidates;
1008   while(roots.size() >0){
1009         for(si=roots.begin();si!=roots.end();++si){
1010                 if(qnodes[(*si)]->is_externally_visible){
1011                         valid_roots.insert((*si));
1012                 }else{
1013                         discarded_nodes.insert((*si));
1014                         set<int>::iterator sir;
1015                         for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1016                                 qnodes[(*sir)]->n_consumers--;
1017                                 if(qnodes[(*sir)]->n_consumers == 0)
1018                                         candidates.insert( (*sir));
1019                         }
1020                 }
1021         }
1022         roots = candidates;
1023         candidates.clear();
1024   }
1025   roots = valid_roots;
1026   if(discarded_nodes.size()>0){
1027         fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
1028         int di = 0;
1029         for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
1030                 if(di>0 && (di%8)==0) fprintf(stderr,"\n");
1031                 di++;
1032                 fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
1033         }
1034         fprintf(stderr,"\n");
1035   }
1036
1037 //              Compute the sources_to set, ignoring discarded nodes.
1038   for(i=0;i<qnodes.size();++i){
1039         if(discarded_nodes.count(i)==0)
1040                 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1041                         qnodes[(*si)]->sources_to.insert(i);
1042         }
1043   }
1044
1045
1046 //              Find the nodes that are shared by multiple visible subtrees.
1047 //              THe roots become inferred visible nodes.
1048
1049 //              Find the visible nodes.
1050         vector<int> visible_nodes;
1051         for(i=0;i<qnodes.size();i++){
1052                 if(qnodes[i]->is_externally_visible){
1053                         visible_nodes.push_back(i);
1054                 }
1055         }
1056
1057 //              Find UDOPs referenced by visible nodes.
1058   list<int> workq;
1059   for(i=0;i<visible_nodes.size();++i){
1060         workq.push_back(visible_nodes[i]);
1061   }
1062   while(!workq.empty()){
1063         int node = workq.front();
1064         workq.pop_front();
1065         set<int>::iterator children;
1066         if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
1067                 qnodes[node]->is_externally_visible = true;
1068                 visible_nodes.push_back(node);
1069                 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1070                         if(qnodes[(*children)]->is_externally_visible == false){
1071                                 qnodes[(*children)]->is_externally_visible = true;
1072                                 visible_nodes.push_back((*children));
1073                         }
1074                 }
1075         }
1076         for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1077                 workq.push_back((*children));
1078         }
1079   }
1080
1081         bool done = false;
1082         while(!done){
1083 //      reset the nodes
1084                 for(i=0;i<qnodes.size();i++){
1085                         qnodes[i]->subtree_roots.clear();
1086                 }
1087
1088 //              Walk the tree defined by a visible node, not descending into
1089 //              subtrees rooted by a visible node.  Mark the node visited with
1090 //              the visible node ID.
1091                 for(i=0;i<visible_nodes.size();++i){
1092                         set<int> vroots;
1093                         vroots.insert(visible_nodes[i]);
1094                         while(vroots.size()>0){
1095                                 for(si=vroots.begin();si!=vroots.end();++si){
1096                                         qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
1097
1098                                         set<int>::iterator sir;
1099                                         for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1100                                                 if(! qnodes[(*sir)]->is_externally_visible){
1101                                                         candidates.insert( (*sir));
1102                                                 }
1103                                         }
1104                                 }
1105                                 vroots = candidates;
1106                                 candidates.clear();
1107                         }
1108                 }
1109 //              Find the nodes in multiple visible node subtrees, but with no parent
1110 //              that has is in multile visible node subtrees.  Mark these as inferred visible nodes.
1111                 done = true;    // until proven otherwise
1112                 for(i=0;i<qnodes.size();i++){
1113                         if(qnodes[i]->subtree_roots.size()>1){
1114                                 bool is_new_root = true;
1115                                 set<int>::iterator sir;
1116                                 for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
1117                                         if(qnodes[(*sir)]->subtree_roots.size()>1)
1118                                                 is_new_root = false;
1119                                 }
1120                                 if(is_new_root){
1121                                         qnodes[i]->is_externally_visible = true;
1122                                         qnodes[i]->inferred_visible_node = true;
1123                                         visible_nodes.push_back(i);
1124                                         done = false;
1125                                 }
1126                         }
1127                 }
1128         }
1129
1130
1131
1132
1133
1134 //              get visible nodes in topo ordering.
1135 //  for(i=0;i<qnodes.size();i++){
1136 //              qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
1137 //  }
1138   vector<int> process_order;
1139   while(roots.size() >0){
1140         for(si=roots.begin();si!=roots.end();++si){
1141                 if(discarded_nodes.count((*si))==0){
1142                         process_order.push_back( (*si) );
1143                 }
1144                 set<int>::iterator sir;
1145                 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1146                         qnodes[(*sir)]->n_consumers--;
1147                         if(qnodes[(*sir)]->n_consumers == 0)
1148                                 candidates.insert( (*sir));
1149                 }
1150         }
1151         roots = candidates;
1152         candidates.clear();
1153   }
1154
1155
1156 //printf("process_order.size() =%d\n",process_order.size());
1157
1158 //              Search for cyclic dependencies
1159   string found_dep;
1160   for(i=0;i<qnodes.size();++i){
1161         if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
1162                 if(found_dep.size() != 0) found_dep += ", ";
1163                 found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
1164         }
1165   }
1166   if(found_dep.size()>0){
1167         fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
1168         exit(1);
1169   }
1170
1171 //              Get a list of query sets, in the order to be processed.
1172 //              Start at visible root and do bfs.
1173 //              The query set includes queries referenced indirectly,
1174 //              as sources for user-defined operators.  These are needed
1175 //              to ensure that they are added to the schema, but are not part
1176 //              of the query tree.
1177
1178 //              stream_node_sets contains queries reachable only through the
1179 //              FROM clause, so I can tell which queries to add to the stream
1180 //              query. (DISABLED, UDOPS are integrated, does this cause problems?)
1181
1182 //                      NOTE: this code works because in order for data to be
1183 //                      read by multiple hftas, the node must be externally visible.
1184 //                      But visible nodes define roots of process sets.
1185 //                      internally visible nodes can feed data only
1186 //                      to other nodes in the same query file.
1187 //                      Therefore, any access can be restricted to a file,
1188 //                      hfta output sharing is done only on roots
1189 //                      never on interior nodes.
1190
1191
1192
1193
1194 //              Conpute the base collection of hftas.
1195   vector<hfta_node *> hfta_sets;
1196   map<string, int> hfta_name_map;
1197 //  vector< vector<int> > process_sets;
1198 //  vector< set<int> > stream_node_sets;
1199   reverse(process_order.begin(), process_order.end());  // get listing in reverse order.
1200                                                                                                                 // i.e. process leaves 1st.
1201   for(i=0;i<process_order.size();++i){
1202         if(qnodes[process_order[i]]->is_externally_visible == true){
1203 //printf("Visible.\n");
1204                 int root = process_order[i];
1205                 hfta_node *hnode = new hfta_node();
1206                 hnode->name = qnodes[root]-> name;
1207                 hnode->source_name = qnodes[root]-> name;
1208                 hnode->is_udop = qnodes[root]->is_udop;
1209                 hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
1210
1211                 vector<int> proc_list;  proc_list.push_back(root);
1212 //                      Ensure that nodes are added only once.
1213                 set<int> proc_set;      proc_set.insert(root);
1214                 roots.clear();                  roots.insert(root);
1215                 candidates.clear();
1216                 while(roots.size()>0){
1217                         for(si=roots.begin();si!=roots.end();++si){
1218 //printf("Processing root %d\n",(*si));
1219                                 set<int>::iterator sir;
1220                                 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1221 //printf("reads fom %d\n",(*sir));
1222                                         if(qnodes[(*sir)]->is_externally_visible==false){
1223                                                 candidates.insert( (*sir) );
1224                                                 if(proc_set.count( (*sir) )==0){
1225                                                         proc_set.insert( (*sir) );
1226                                                         proc_list.push_back( (*sir) );
1227                                                 }
1228                                         }
1229                                 }
1230                         }
1231                         roots = candidates;
1232                         candidates.clear();
1233                 }
1234
1235                 reverse(proc_list.begin(), proc_list.end());
1236                 hnode->query_node_indices = proc_list;
1237                 hfta_name_map[hnode->name] = hfta_sets.size();
1238                 hfta_sets.push_back(hnode);
1239         }
1240   }
1241
1242 //              Compute the reads_from / sources_to graphs for the hftas.
1243
1244   for(i=0;i<hfta_sets.size();++i){
1245         hfta_node *hnode = hfta_sets[i];
1246         for(q=0;q<hnode->query_node_indices.size();q++){
1247                 query_node *qnode = qnodes[hnode->query_node_indices[q]];
1248                 for(s=0;s<qnode->refd_tbls.size();++s){
1249                         if(hfta_name_map.count(qnode->refd_tbls[s])){
1250                                 int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
1251                                 hnode->reads_from.insert(other_hfta);
1252                                 hfta_sets[other_hfta]->sources_to.insert(i);
1253                         }
1254                 }
1255         }
1256   }
1257
1258 //              Compute a topological sort of the hfta_sets.
1259
1260   vector<int> hfta_topsort;
1261   workq.clear();
1262   int hnode_srcs[hfta_sets.size()];
1263   for(i=0;i<hfta_sets.size();++i){
1264         hnode_srcs[i] = 0;
1265         if(hfta_sets[i]->sources_to.size() == 0)
1266                 workq.push_back(i);
1267   }
1268
1269   while(! workq.empty()){
1270         int     node = workq.front();
1271         workq.pop_front();
1272         hfta_topsort.push_back(node);
1273         set<int>::iterator stsi;
1274         for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
1275                 int parent = (*stsi);
1276                 hnode_srcs[parent]++;
1277                 if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
1278                         workq.push_back(parent);
1279                 }
1280         }
1281   }
1282
1283 //              Decorate hfta nodes with the level of parallelism given as input.
1284
1285   map<string, int>::iterator msii;
1286   for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1287         string hfta_name = (*msii).first;
1288         int par = (*msii).second;
1289         if(hfta_name_map.count(hfta_name) > 0){
1290                 hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
1291         }else{
1292                 fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
1293         }
1294   }
1295
1296 //              Propagate levels of parallelism: children should have a level of parallelism
1297 //              as large as any of its parents.  Adjust children upwards to compensate.
1298 //              Start at parents and adjust children, auto-propagation will occur.
1299
1300   for(i=hfta_sets.size()-1;i>=0;i--){
1301         set<int>::iterator stsi;
1302         for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
1303                 if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
1304                         hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
1305                 }
1306         }
1307   }
1308
1309 //              Before all the name mangling, check if therey are any output_spec.cfg
1310 //              or hfta_parallelism.cfg entries that do not have a matching query.
1311
1312         string dangling_ospecs = "";
1313         for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
1314                 string oq = (*msii).first;
1315                 if(hfta_name_map.count(oq) == 0){
1316                         dangling_ospecs += " "+(*msii).first;
1317                 }
1318         }
1319         if(dangling_ospecs!=""){
1320                 fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
1321         }
1322
1323         string dangling_par = "";
1324         for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1325                 string oq = (*msii).first;
1326                 if(hfta_name_map.count(oq) == 0){
1327                         dangling_par += " "+(*msii).first;
1328                 }
1329         }
1330         if(dangling_par!=""){
1331                 fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
1332         }
1333
1334
1335
1336 //              Replicate parallelized hftas.  Do __copyX name mangling.  Adjust
1337 //              FROM clauses: retarget any name which is an internal node, and
1338 //              any which is in hfta_sets (and which is parallelized). Add Merge nodes
1339 //              when the source hfta has more parallelism than the target node.
1340 //              Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
1341
1342
1343   int n_original_hfta_sets = hfta_sets.size();
1344   for(i=0;i<n_original_hfta_sets;++i){
1345         if(hfta_sets[i]->n_parallel > 1){
1346                 hfta_sets[i]->do_generation =false;     // set the deletion flag for this entry.
1347                 set<string> local_nodes;                // names of query nodes in the hfta.
1348                 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1349                         local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
1350                 }
1351
1352                 for(p=0;p<hfta_sets[i]->n_parallel;++p){
1353                         string mangler = "__copy"+int_to_string(p);
1354                         hfta_node *par_hfta  = new hfta_node();
1355                         par_hfta->name = hfta_sets[i]->name + mangler;
1356                         par_hfta->source_name = hfta_sets[i]->name;
1357                         par_hfta->is_udop = hfta_sets[i]->is_udop;
1358                         par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
1359                         par_hfta->n_parallel = hfta_sets[i]->n_parallel;
1360                         par_hfta->parallel_idx = p;
1361
1362                         map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
1363
1364 //      Is it a UDOP?
1365                         if(hfta_sets[i]->is_udop){
1366                                 int root = hfta_sets[i]->query_node_indices[0];
1367
1368                                 string unequal_par_sources;
1369                                 set<int>::iterator rfsii;
1370                                 for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
1371                                         if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
1372                                                 unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
1373                                         }
1374                                 }
1375                                 if(unequal_par_sources != ""){
1376                                         fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
1377                                         exit(1);
1378                                 }
1379
1380                                 int rti;
1381                                 vector<string> new_sources;
1382                                 for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
1383                                         new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
1384                                 }
1385
1386                                 query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
1387                                 new_qn->name += mangler;
1388                                 new_qn->mangler = mangler;
1389                                 new_qn->refd_tbls = new_sources;
1390                                 par_hfta->query_node_indices.push_back(qnodes.size());
1391                                 par_qnode_map[new_qn->name] = qnodes.size();
1392                                 name_node_map[ new_qn->name ] = qnodes.size();
1393                                 qnodes.push_back(new_qn);
1394                         }else{
1395 //              regular query node
1396                           for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1397                                 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1398                                 table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
1399 //                                      rehome the from clause on mangled names.
1400 //                                      create merge nodes as needed for external sources.
1401                                 for(f=0;f<dup_pt->fm->tlist.size();++f){
1402                                         if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
1403                                                 dup_pt->fm->tlist[f]->schema_name += mangler;
1404                                         }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
1405 //                      Ref's an external HFTA.  No parallelism => leave as is.  Else, if level of parallelism of the two hftas is the same, rename by the mangler.  Else, there mnust be more sources, so create a merge node.
1406                                                 int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
1407                                                 if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
1408                                                         dup_pt->fm->tlist[f]->schema_name += mangler;
1409                                                 }else{
1410                                                         vector<string> src_tbls;
1411                                                         int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
1412                                                         if(stride == 0){
1413                                                                 fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
1414                                                                 exit(1);
1415                                                         }
1416                                                         for(s=0;s<stride;++s){
1417                                                                 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
1418                                                                 src_tbls.push_back(ext_src_name);
1419                                                         }
1420                                                         table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1421                                                         string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
1422                                                         dup_pt->fm->tlist[f]->schema_name = merge_node_name;
1423 //                                      Make a qnode to represent the new merge node
1424                                                         query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1425                                                         qn_pt->refd_tbls = src_tbls;
1426                                                         qn_pt->is_udop  = false;
1427                                                         qn_pt->is_externally_visible = false;
1428                                                         qn_pt->inferred_visible_node  = false;
1429                                                         par_hfta->query_node_indices.push_back(qnodes.size());
1430                                                         par_qnode_map[merge_node_name] = qnodes.size();
1431                                                         name_node_map[ merge_node_name ] = qnodes.size();
1432                                                         qnodes.push_back(qn_pt);
1433                                                 }
1434                                         }
1435                                 }
1436                                 query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
1437                                 for(f=0;f<dup_pt->fm->tlist.size();++f){
1438                                         new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
1439                                 }
1440                                 new_qn->params = qnodes[hqn_idx]->params;
1441                                 new_qn->is_udop = false;
1442                                 new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
1443                                 new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
1444                                 par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
1445                                 par_qnode_map[new_qn->name] = qnodes.size();
1446                                 name_node_map[ new_qn->name ] = qnodes.size();
1447                                 qnodes.push_back(new_qn);
1448                           }
1449                         }
1450                         hfta_name_map[par_hfta->name] = hfta_sets.size();
1451                         hfta_sets.push_back(par_hfta);
1452                 }
1453         }else{
1454 //              This hfta isn't being parallelized, but add merge nodes for any parallelized
1455 //              hfta sources.
1456                 if(!hfta_sets[i]->is_udop){
1457                   for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1458                         int hqn_idx = hfta_sets[i]->query_node_indices[h];
1459                         for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
1460                                 if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
1461 //                      Ref's an external HFTA.  No parallelism => leave as is.  Else, if level of parallelism of the two hftas is the same, rename by the mangler.  Else, there mnust be more sources, so create a merge node.
1462                                         int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
1463                                         if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
1464                                                 vector<string> src_tbls;
1465                                                 for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
1466                                                         string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
1467                                                         src_tbls.push_back(ext_src_name);
1468                                                 }
1469                                                 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1470                                                 string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
1471                                                 qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
1472 //                                      Make a qnode to represent the new merge node
1473                                                 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1474                                                 qn_pt->refd_tbls = src_tbls;
1475                                                 qn_pt->is_udop  = false;
1476                                                 qn_pt->is_externally_visible = false;
1477                                                 qn_pt->inferred_visible_node  = false;
1478                                                 hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
1479                                                 name_node_map[ merge_node_name ] = qnodes.size();
1480                                                 qnodes.push_back(qn_pt);
1481                                         }
1482                                 }
1483                         }
1484                 }
1485           }
1486         }
1487   }
1488
1489 //                      Rebuild the reads_from / sources_to lists in the qnodes
1490   for(q=0;q<qnodes.size();++q){
1491         qnodes[q]->reads_from.clear();
1492         qnodes[q]->sources_to.clear();
1493   }
1494   for(q=0;q<qnodes.size();++q){
1495         for(s=0;s<qnodes[q]->refd_tbls.size();++s){
1496                 if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
1497                         int rf = name_node_map[qnodes[q]->refd_tbls[s]];
1498                         qnodes[q]->reads_from.insert(rf);
1499                         qnodes[rf]->sources_to.insert(q);
1500                 }
1501         }
1502   }
1503
1504 //                      Rebuild the reads_from / sources_to lists in hfta_sets
1505   for(q=0;q<hfta_sets.size();++q){
1506         hfta_sets[q]->reads_from.clear();
1507         hfta_sets[q]->sources_to.clear();
1508   }
1509   for(q=0;q<hfta_sets.size();++q){
1510         for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
1511                 int node = hfta_sets[q]->query_node_indices[s];
1512                 set<int>::iterator rfsii;
1513                 for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
1514                         if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
1515                                 hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
1516                                 hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
1517                         }
1518                 }
1519         }
1520   }
1521
1522 /*
1523 for(q=0;q<qnodes.size();++q){
1524  printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
1525  set<int>::iterator rsii;
1526  for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
1527   printf(" %d",(*rsii));
1528   printf(", and sources-to %d:",qnodes[q]->sources_to.size());
1529  for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
1530   printf(" %d",(*rsii));
1531  printf("\n");
1532 }
1533
1534 for(q=0;q<hfta_sets.size();++q){
1535  if(hfta_sets[q]->do_generation==false)
1536         continue;
1537  printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
1538  set<int>::iterator rsii;
1539  for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
1540   printf(" %d",(*rsii));
1541   printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
1542  for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
1543   printf(" %d",(*rsii));
1544  printf("\n");
1545 }
1546 */
1547
1548
1549
1550 //              Re-topo sort the hftas
1551   hfta_topsort.clear();
1552   workq.clear();
1553   int hnode_srcs_2[hfta_sets.size()];
1554   for(i=0;i<hfta_sets.size();++i){
1555         hnode_srcs_2[i] = 0;
1556         if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
1557                 workq.push_back(i);
1558         }
1559   }
1560
1561   while(workq.empty() == false){
1562         int     node = workq.front();
1563         workq.pop_front();
1564         hfta_topsort.push_back(node);
1565         set<int>::iterator stsii;
1566         for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
1567                 int child = (*stsii);
1568                 hnode_srcs_2[child]++;
1569                 if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
1570                         workq.push_back(child);
1571                 }
1572         }
1573   }
1574
1575 //              Ensure that all of the query_node_indices in hfta_sets are topologically
1576 //              sorted, don't rely on assumptions that all transforms maintain some kind of order.
1577   for(i=0;i<hfta_sets.size();++i){
1578         if(hfta_sets[i]->do_generation){
1579                 map<int,int> n_accounted;
1580                 vector<int> new_order;
1581                 workq.clear();
1582                 vector<int>::iterator vii;
1583                 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1584                         n_accounted[(*vii)]= 0;
1585                 }
1586                 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1587                         set<int>::iterator rfsii;
1588                         for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
1589                                 if(n_accounted.count((*rfsii)) == 0){
1590                                         n_accounted[(*vii)]++;
1591                                 }
1592                         }
1593                         if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
1594                                 workq.push_back((*vii));
1595                         }
1596                 }
1597
1598                 while(workq.empty() == false){
1599                         int node = workq.front();
1600                         workq.pop_front();
1601                         new_order.push_back(node);
1602                         set<int>::iterator stsii;
1603                         for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
1604                                 if(n_accounted.count((*stsii))){
1605                                         n_accounted[(*stsii)]++;
1606                                         if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
1607                                                 workq.push_back((*stsii));
1608                                         }
1609                                 }
1610                         }
1611                 }
1612                 hfta_sets[i]->query_node_indices = new_order;
1613         }
1614   }
1615
1616
1617
1618
1619
1620 ///                     Global checkng is done, start the analysis and translation
1621 ///                     of the query parse tree in the order specified by process_order
1622
1623
1624 //                      Get a list of the LFTAs for global lfta optimization
1625 //                              TODO: separate building operators from spliting lftas,
1626 //                                      that will make optimizations such as predicate pushing easier.
1627         vector<stream_query *> lfta_list;
1628
1629         stream_query *rootq;
1630
1631     int qi,qj;
1632
1633         for(qi=hfta_topsort.size()-1;qi>=0;--qi){
1634
1635         int hfta_id = hfta_topsort[qi];
1636     vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
1637
1638
1639
1640 //              Two possibilities, either its a UDOP, or its a collection of queries.
1641 //      if(qnodes[curr_list.back()]->is_udop)
1642         if(hfta_sets[hfta_id]->is_udop){
1643                 int node_id = curr_list.back();
1644                 int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
1645                 opview_entry *opv = new opview_entry();
1646
1647 //                      Many of the UDOP properties aren't currently used.
1648                 opv->parent_qname = "no_parent";
1649                 opv->root_name = qnodes[node_id]->name;
1650                 opv->view_name = qnodes[node_id]->file;
1651                 opv->pos = qi;
1652                 sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
1653                 opv->udop_alias = tmpstr;
1654                 opv->mangler = qnodes[node_id]->mangler;
1655
1656                 if(opv->mangler != ""){
1657                         int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
1658                         Schema->mangle_subq_names(new_udop_schref,opv->mangler);
1659                 }
1660
1661 //                      This piece of code makes each hfta which referes to the same udop
1662 //                      reference a distinct running udop.  Do this at query optimization time?
1663 //              fmtbl->set_udop_alias(opv->udop_alias);
1664
1665                 opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
1666                 opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
1667
1668                 vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
1669                 int s,f,q;
1670                 for(s=0;s<subq.size();++s){
1671 //                              Validate that the fields match.
1672                         subquery_spec *sqs = subq[s];
1673                         string subq_name = sqs->name + opv->mangler;
1674                         vector<field_entry *> flds = Schema->get_fields(subq_name);
1675                         if(flds.size() == 0){
1676                                 fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
1677                                 return(1);
1678                         }
1679                         if(flds.size() < sqs->types.size()){
1680                                 fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
1681                                 return(1);
1682                         }
1683                         bool failed = false;
1684                         for(f=0;f<sqs->types.size();++f){
1685                                 data_type dte(sqs->types[f],sqs->modifiers[f]);
1686                                 data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
1687                                 if(! dte.subsumes_type(&dtf) ){
1688                                         fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
1689                                         failed = true;
1690                                 }
1691 /*
1692                                 if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
1693                                         string pstr = dte.get_temporal_string();
1694                                         fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
1695                                         failed = true;
1696                                 }
1697 */
1698                         }
1699                         if(failed)
1700                                 return(1);
1701 ///                             Validation done, find the subquery, make a copy of the
1702 ///                             parse tree, and add it to the return list.
1703                         for(q=0;q<qnodes.size();++q)
1704                                 if(qnodes[q]->name == subq_name)
1705                                         break;
1706                         if(q==qnodes.size()){
1707                                 fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
1708                                 return(1);
1709                         }
1710
1711                 }
1712
1713 //                      Cross-link to from entry(s) in all sourced-to tables.
1714                 set<int>::iterator sii;
1715                 for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
1716 //printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
1717                         vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
1718                         int ii;
1719                         for(ii=0;ii<tblvars.size();++ii){
1720                                 if(tblvars[ii]->schema_name == opv->root_name){
1721                                         tblvars[ii]->set_opview_idx(opviews.size());
1722                                 }
1723
1724                         }
1725                 }
1726
1727                 opviews.append(opv);
1728         }else{
1729
1730 //                      Analyze the parse trees in this query,
1731 //                      put them in rootq
1732 //      vector<int> curr_list = process_sets[qi];
1733
1734
1735 ////////////////////////////////////////
1736
1737           rootq = NULL;
1738 //printf("Process set %d, has %d queries\n",qi,curr_list.size());
1739           for(qj=0;qj<curr_list.size();++qj){
1740                 i = curr_list[qj];
1741         fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
1742
1743 //                      Select the current query parse tree
1744                 table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
1745
1746 //                      if hfta only, try to fetch any missing schemas
1747 //                      from the registry (using the print_schema program).
1748 //                      Here I use a hack to avoid analyzing the query -- all referenced
1749 //                      tables must be in the from clause
1750 //                      If there is a problem loading any table, just issue a warning,
1751 //
1752                 tablevar_list_t *fm = fta_parse_tree->get_from();
1753                 vector<string> refd_tbls =  fm->get_src_tbls(Schema);
1754 //                      iterate over all referenced tables
1755                 int t;
1756                 for(t=0;t<refd_tbls.size();++t){
1757                   int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
1758
1759                   if(tbl_ref < 0){      // if this table is not in the Schema
1760
1761                         if(hfta_only){
1762                                 string cmd="print_schema "+refd_tbls[t];
1763                                 FILE *schema_in = popen(cmd.c_str(), "r");
1764                                 if(schema_in == NULL){
1765                                   fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
1766                                 }else{
1767                                   string schema_instr;
1768                                   while(fgets(tmpstr,TMPSTRLEN,schema_in)){
1769                                         schema_instr += tmpstr;
1770                                   }
1771                           fta_parse_result = new fta_parse_t();
1772                                   strcpy(tmp_schema_str,schema_instr.c_str());
1773                                   FtaParser_setstringinput(tmp_schema_str);
1774                           if(FtaParserparse()){
1775                                 fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
1776                           }else{
1777                                         if( fta_parse_result->tables != NULL){
1778                                                 int tl;
1779                                                 for(tl=0;tl<fta_parse_result->tables->size();++tl){
1780                                                         Schema->add_table(fta_parse_result->tables->get_table(tl));
1781                                                 }
1782                                         }else{
1783                                                 fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
1784                                         }
1785                                 }
1786                         }
1787                   }else{
1788                                 fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
1789                                 exit(1);
1790                   }
1791
1792                 }
1793           }
1794
1795
1796 //                              Analyze the query.
1797           query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
1798           if(qs == NULL){
1799                 fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
1800                 exit(1);
1801           }
1802
1803           stream_query new_sq(qs, Schema);
1804           if(new_sq.error_code){
1805                         fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
1806                         exit(1);
1807           }
1808
1809 //                      Add it to the Schema
1810           table_def *output_td = new_sq.get_output_tabledef();
1811           Schema->add_table(output_td);
1812
1813 //                      Create a query plan from the analyzed parse tree.
1814 //                      If its a query referneced via FROM, add it to the stream query.
1815           if(rootq){
1816                 rootq->add_query(new_sq);
1817           }else{
1818                 rootq = new stream_query(new_sq);
1819 //                      have the stream query object inherit properties form the analyzed
1820 //                      hfta_node object.
1821                 rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
1822                 rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
1823           }
1824
1825
1826     }
1827
1828 //              This stream query has all its parts
1829 //              Build and optimize it.
1830 //printf("translate_fta: generating plan.\n");
1831         if(rootq->generate_plan(Schema)){
1832                 fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
1833                 continue;
1834         }
1835
1836 //      If we've found the query plan head, so now add the output operators
1837         if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
1838                 pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
1839                 multimap<string, int>::iterator mmsi;
1840                 oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
1841                 for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
1842                         rootq->add_output_operator(output_specs[(*mmsi).second]);
1843                 }
1844         }
1845
1846
1847
1848 //                              Perform query splitting if necessary.
1849         bool hfta_returned;
1850     vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
1851
1852         int l;
1853 //for(l=0;l<split_queries.size();++l){
1854 //printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
1855 //}
1856
1857
1858
1859
1860     if(split_queries.size() > 0){       // should be at least one component.
1861
1862 //                              Compute the number of LFTAs.
1863           int n_lfta = split_queries.size();
1864           if(hfta_returned) n_lfta--;
1865
1866
1867 //                              Process the LFTA components.
1868           for(l=0;l<n_lfta;++l){
1869            if(lfta_names.count(split_queries[l]->query_name) == 0){
1870 //                              Grab the lfta for global optimization.
1871                 vector<tablevar_t *> tvec =  split_queries[l]->query_plan[0]->get_input_tbls();
1872                 string liface = tvec[0]->get_interface();
1873                 string lmach = tvec[0]->get_machine();
1874                 if (lmach == "")
1875                         lmach = hostname;
1876                 interface_names.push_back(liface);
1877                 machine_names.push_back(lmach);
1878 //printf("Machine is %s\n",lmach.c_str());
1879
1880 //                      Set the ht size from the recommendation, if there is one in the rec file
1881                 if(lfta_htsize.count(split_queries[l]->query_name)>0){
1882                         split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
1883                 }
1884
1885
1886                 lfta_names[split_queries[l]->query_name] = lfta_list.size();
1887                 split_queries[l]->set_gid(lfta_list.size());  // set lfta global id
1888                 lfta_list.push_back(split_queries[l]);
1889                 lfta_mach_lists[lmach].push_back(split_queries[l]);
1890
1891 //                      THe following is a hack,
1892 //                      as I should be generating LFTA code through
1893 //                      the stream_query object.
1894                 split_queries[l]->query_plan[0]->bind_to_schema(Schema);
1895 //              split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
1896
1897 /*
1898 //                              Create query description to embed in lfta.c
1899                 string lfta_schema_str = split_queries[l]->make_schema();
1900                 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
1901
1902 //                              get NIC capabilities.
1903                 int erri;
1904                 nic_property *nicprop = NULL;
1905                 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
1906                 if(iface_codegen_type.size()){
1907                         nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
1908                         if(!nicprop){
1909                                 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
1910                                         exit(1);
1911                         }
1912                 }
1913
1914                 lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
1915 */
1916
1917                 snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
1918                 query_names.push_back(split_queries[l]->query_name);
1919                 mach_query_names[lmach].push_back(query_names.size()-1);
1920 //                      NOTE: I will assume a 1-1 correspondance between
1921 //                      mach_query_names[lmach] and lfta_mach_lists[lmach]
1922 //                      where mach_query_names[lmach][i] contains the index into
1923 //                      query_names, which names the lfta, and
1924 //                      mach_query_names[lmach][i] is the stream_query * of the
1925 //                      corresponding lfta.
1926 //                      Later, lfta_iface_qnames are the query names matching lfta_iface_lists
1927
1928
1929
1930                 // check if lfta is reusable
1931                 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
1932
1933                 bool lfta_reusable = false;
1934                 if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
1935                         split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
1936                         lfta_reusable = true;
1937                 }
1938                 lfta_reuse_options.push_back(lfta_reusable);
1939
1940                 // LFTA will inherit the liveness timeout specification from the containing query
1941                 // it is too conservative as lfta are expected to spend less time per tuple
1942                 // then full query
1943
1944                 // extract liveness timeout from query definition
1945                 int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
1946                 if (!liveness_timeout) {
1947 //                  fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
1948 //                    split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
1949                     liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
1950                 }
1951                 lfta_liveness_timeouts.push_back(liveness_timeout);
1952
1953 //                      Add it to the schema
1954                 table_def *td = split_queries[l]->get_output_tabledef();
1955                 Schema->append_table(td);
1956 //printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
1957
1958           }
1959          }
1960
1961 //                              If the output is lfta-only, dump out the query name.
1962       if(split_queries.size() == 1 && !hfta_returned){
1963         if(output_query_names ){
1964            fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
1965                 }
1966 /*
1967 else{
1968            fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
1969                 }
1970 */
1971
1972 /*
1973 //                              output schema summary
1974                 if(output_schema_summary){
1975                         dump_summary(split_queries[0]);
1976                 }
1977 */
1978       }
1979
1980
1981           if(hfta_returned){            // query also has an HFTA component
1982                 int hfta_nbr = split_queries.size()-1;
1983
1984                         hfta_list.push_back(split_queries[hfta_nbr]);
1985
1986 //                                      report on generated query names
1987         if(output_query_names){
1988                         string hfta_name =split_queries[hfta_nbr]->query_name;
1989                 fprintf(query_name_output,"%s H\n",hfta_name.c_str());
1990                         for(l=0;l<hfta_nbr;++l){
1991                                 string lfta_name =split_queries[l]->query_name;
1992                         fprintf(query_name_output,"%s L\n",lfta_name.c_str());
1993                         }
1994                 }
1995 //              else{
1996 //              fprintf(stderr,"query names are ");
1997 //                      for(l=0;l<hfta_nbr;++l){
1998 //                              if(l>0) fprintf(stderr,",");
1999 //                              string fta_name =split_queries[l]->query_name;
2000 //                      fprintf(stderr," %s",fta_name.c_str());
2001 //                      }
2002 //                      fprintf(stderr,"\n");
2003 //              }
2004           }
2005
2006   }else{
2007           fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
2008           fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
2009           exit(1);
2010   }
2011  }
2012 }
2013
2014
2015 //-----------------------------------------------------------------
2016 //              Compute and propagate the SE in PROTOCOL fields compute a field.
2017 //-----------------------------------------------------------------
2018
2019 for(i=0;i<lfta_list.size();i++){
2020         lfta_list[i]->generate_protocol_se(sq_map, Schema);
2021         sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
2022 }
2023 for(i=0;i<hfta_list.size();i++){
2024         hfta_list[i]->generate_protocol_se(sq_map, Schema);
2025         sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
2026 }
2027
2028
2029
2030 //------------------------------------------------------------------------
2031 //              Perform  individual FTA optimizations
2032 //-----------------------------------------------------------------------
2033
2034 if (partitioned_mode) {
2035
2036         // open partition definition file
2037         string part_fname = config_dir_path + "partition.txt";
2038
2039         FILE* partfd = fopen(part_fname.c_str(), "r");
2040         if (!partfd) {
2041                 fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
2042                 exit(1);
2043         }
2044         PartnParser_setfileinput(partfd);
2045         if (PartnParserparse()) {
2046                 fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
2047                 exit(1);
2048         }
2049         fclose(partfd);
2050 }
2051
2052
2053 print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
2054
2055 int num_hfta = hfta_list.size();
2056 for(i=0; i < hfta_list.size(); ++i){
2057         hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
2058 }
2059
2060 //                      Add all new hftas to schema
2061 for(i=num_hfta; i < hfta_list.size(); ++i){
2062                 table_def *td = hfta_list[i]->get_output_tabledef();
2063                 Schema->append_table(td);
2064 }
2065
2066 print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
2067
2068
2069
2070 //------------------------------------------------------------------------
2071 //              Do global (cross-fta) optimization
2072 //-----------------------------------------------------------------------
2073
2074
2075
2076
2077
2078
2079 set<string> extra_external_libs;
2080
2081 for(i=0;i<hfta_list.size();++i){                // query also has an HFTA component
2082
2083                 if(! debug_only){
2084 //                                      build hfta file name, create output
2085            if(numeric_hfta_flname){
2086             sprintf(tmpstr,"hfta_%d",hfta_count);
2087                         hfta_names.push_back(tmpstr);
2088             sprintf(tmpstr,"hfta_%d.cc",hfta_count);
2089          }else{
2090             sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
2091                         hfta_names.push_back(tmpstr);
2092             sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
2093           }
2094                   FILE *hfta_fl = fopen(tmpstr,"w");
2095                   if(hfta_fl == NULL){
2096                         fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
2097                         exit(1);
2098                   }
2099                   fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
2100
2101 //                      If there is a field verifier, warn about
2102 //                      lack of compatability
2103 //                              NOTE : this code assumes that visible non-lfta queries
2104 //                              are those at the root of a stream query.
2105                   string hfta_comment;
2106                   string hfta_title;
2107                   string hfta_namespace;
2108                   if(hfta_list[i]->defines.count("comment")>0)
2109                         hfta_comment = hfta_list[i]->defines["comment"];
2110                   if(hfta_list[i]->defines.count("Comment")>0)
2111                         hfta_comment = hfta_list[i]->defines["Comment"];
2112                   if(hfta_list[i]->defines.count("COMMENT")>0)
2113                         hfta_comment = hfta_list[i]->defines["COMMENT"];
2114                   if(hfta_list[i]->defines.count("title")>0)
2115                         hfta_title = hfta_list[i]->defines["title"];
2116                   if(hfta_list[i]->defines.count("Title")>0)
2117                         hfta_title = hfta_list[i]->defines["Title"];
2118                   if(hfta_list[i]->defines.count("TITLE")>0)
2119                         hfta_title = hfta_list[i]->defines["TITLE"];
2120                   if(hfta_list[i]->defines.count("namespace")>0)
2121                         hfta_namespace = hfta_list[i]->defines["namespace"];
2122                   if(hfta_list[i]->defines.count("Namespace")>0)
2123                         hfta_namespace = hfta_list[i]->defines["Namespace"];
2124                   if(hfta_list[i]->defines.count("Namespace")>0)
2125                         hfta_namespace = hfta_list[i]->defines["Namespace"];
2126
2127                   if(field_verifier != NULL){
2128                         string warning_str;
2129                         if(hfta_comment == "")
2130                                 warning_str += "\tcomment not found.\n";
2131                         if(hfta_title == "")
2132                                 warning_str += "\ttitle not found.\n";
2133                         if(hfta_namespace == "")
2134                                 warning_str += "\tnamespace not found.\n";
2135
2136                         vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2137                         int fi;
2138                         for(fi=0;fi<flds.size();fi++){
2139                                 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2140                         }
2141                         if(warning_str != "")
2142                                 fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
2143                                         hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
2144                   }
2145
2146                   fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2147                   if(hfta_comment != "")
2148                         fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
2149                   if(hfta_title != "")
2150                         fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
2151                   if(hfta_namespace != "")
2152                         fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
2153                   fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
2154                   fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2155
2156 //                              write info about fields to qtree.xml
2157                   vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2158                   int fi;
2159                   for(fi=0;fi<flds.size();fi++){
2160                         fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2161                         if(flds[fi]->get_modifier_list()->size()){
2162                                 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2163                         }
2164                         fprintf(qtree_output," />\n");
2165                   }
2166
2167                   // extract liveness timeout from query definition
2168                   int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
2169                   if (!liveness_timeout) {
2170 //                  fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
2171 //                    hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2172                     liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2173                   }
2174                   fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
2175
2176                   vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
2177                   int itv;
2178                   for(itv=0;itv<tmp_tv.size();++itv){
2179                         fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
2180                   }
2181                   string ifrs = hfta_list[i]->collect_refd_ifaces();
2182                   if(ifrs != ""){
2183                         fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
2184                   }
2185                   fprintf(qtree_output,"\t</HFTA>\n");
2186
2187                   fclose(hfta_fl);
2188                 }else{
2189 //                                      debug only -- do code generation to catch generation-time errors.
2190                   hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
2191                 }
2192
2193                 hfta_count++;   // for hfta file names with numeric suffixes
2194
2195                 hfta_list[i]->get_external_libs(extra_external_libs);
2196
2197           }
2198
2199 string ext_lib_string;
2200 set<string>::iterator ssi_el;
2201 for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
2202         ext_lib_string += (*ssi_el)+" ";
2203
2204
2205
2206 //                      Report on the set of operator views
2207   for(i=0;i<opviews.size();++i){
2208         opview_entry *opve = opviews.get_entry(i);
2209         fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
2210         fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
2211         fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
2212         fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
2213         fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2214
2215         if (!opve->liveness_timeout) {
2216 //              fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
2217 //                      opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2218                 opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2219         }
2220         fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
2221     int j;
2222         for(j=0;j<opve->subq_names.size();j++)
2223                 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
2224         fprintf(qtree_output,"\t</UDOP>\n");
2225   }
2226
2227
2228 //-----------------------------------------------------------------
2229
2230 //                      Create interface-specific meta code files.
2231 //                              first, open and parse the interface resources file.
2232         ifaces_db = new ifq_t();
2233     ierr = "";
2234         if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
2235                 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
2236                                 ifx_fname.c_str(), ierr.c_str());
2237                 exit(1);
2238         }
2239
2240         map<string, vector<stream_query *> >::iterator svsi;
2241         for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
2242                 string lmach = (*svsi).first;
2243
2244         //              For this machine, create a set of lftas per interface.
2245                 vector<stream_query *> mach_lftas = (*svsi).second;
2246                 map<string, vector<stream_query *> > lfta_iface_lists;
2247                 int li;
2248                 for(li=0;li<mach_lftas.size();++li){
2249                         vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2250                         string lfta_iface = tvec[0]->get_interface();
2251                         lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2252                 }
2253
2254                 map<string, vector<stream_query *> >::iterator lsvsi;
2255                 for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
2256                         int erri;
2257                         string liface = (*lsvsi).first;
2258                         vector<stream_query *> iface_lftas = (*lsvsi).second;
2259                         vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2260                         if(iface_codegen_type.size()){
2261                                 nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2262                                 if(!nicprop){
2263                                         fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2264                                         exit(1);
2265                                 }
2266                                 string mcs = generate_nic_code(iface_lftas, nicprop);
2267                                 string mcf_flnm;
2268                                 if(lmach != "")
2269                                   mcf_flnm = lmach + "_"+liface+".mcf";
2270                                 else
2271                                   mcf_flnm = hostname + "_"+liface+".mcf";
2272                                 FILE *mcf_fl ;
2273                                 if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
2274                                         fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
2275                                         exit(1);
2276                                 }
2277                                 fprintf(mcf_fl,"%s",mcs.c_str());
2278                                 fclose(mcf_fl);
2279 //printf("mcs of machine %s, iface %s of type %s is \n%s\n",
2280 //lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
2281                         }
2282                 }
2283
2284
2285         }
2286
2287
2288
2289 //-----------------------------------------------------------------
2290
2291
2292 //                      Find common filter predicates in the LFTAs.
2293 //                      in addition generate structs to store the temporal attributes unpacked by prefilter
2294         
2295         map<string, vector<stream_query *> >::iterator ssqi;
2296         for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
2297
2298                 string lmach = (*ssqi).first;
2299                 bool packed_return = false;
2300                 int li, erri;
2301
2302
2303 //      The LFTAs of this machine.
2304                 vector<stream_query *> mach_lftas = (*ssqi).second;
2305 //      break up on a per-interface basis.
2306                 map<string, vector<stream_query *> > lfta_iface_lists;
2307                 map<string, vector<int> > lfta_iface_qname_ix; // need the query name
2308                                                         // for fta_init
2309                 for(li=0;li<mach_lftas.size();++li){
2310                         vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2311                         string lfta_iface = tvec[0]->get_interface();
2312                         lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2313                         lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
2314                 }
2315
2316
2317 //      Are the return values "packed"?
2318 //      This should be done on a per-interface basis.
2319 //      But this is defunct code for gs-lite
2320                 for(li=0;li<mach_lftas.size();++li){
2321                   vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2322                   string liface = tvec[0]->get_interface();
2323                   vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2324                   if(iface_codegen_type.size()){
2325                         if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
2326                           packed_return = true;
2327                         }
2328                   }
2329                 }
2330
2331
2332 // Separate lftas by interface, collect results on a per-interface basis.
2333
2334                 vector<cnf_set *> no_preds;     // fallback if there is no prefilter
2335                 map<string, vector<cnf_set *> > prefilter_preds;
2336                 set<unsigned int> pred_ids;     // this can be global for all interfaces
2337                 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2338                         string liface = (*mvsi).first;
2339                         vector<cnf_set *> empty_list;
2340                         prefilter_preds[liface] = empty_list;
2341                         if(! packed_return){
2342                                 get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
2343                         }
2344
2345 //                              get NIC capabilities.  (Is this needed?)
2346                         nic_property *nicprop = NULL;
2347                         vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2348                         if(iface_codegen_type.size()){
2349                                 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2350                                 if(!nicprop){
2351                                         fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2352                                         exit(1);
2353                                 }
2354                         }
2355                 }
2356
2357
2358 //              Now that we know the prefilter preds, generate the lfta code.
2359 //      Do this for all lftas in this machine.
2360                 for(li=0;li<mach_lftas.size();++li){
2361                         set<unsigned int> subsumed_preds;
2362                         set<unsigned int>::iterator sii;
2363 #ifdef PREFILTER_OK
2364                         for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
2365                                 int pid = (*sii);
2366                                 if((pid>>16) == li){
2367                                         subsumed_preds.insert(pid & 0xffff);
2368                                 }
2369                         }
2370 #endif
2371                         string lfta_schema_str = mach_lftas[li]->make_schema();
2372                         string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2373                         nic_property *nicprop = NULL; // no NIC properties?
2374                         lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
2375                 }
2376
2377
2378 //                      generate structs to store the temporal attributes
2379 //                      unpacked by prefilter
2380                 col_id_set temp_cids;
2381                 get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
2382                 lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
2383
2384 //                      Compute the lfta bit signatures and the lfta colrefs
2385 //      do this on a per-interface basis
2386 #ifdef PREFILTER_OK
2387                         lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2388 #endif
2389                 map<string, vector<long long int> > lfta_sigs; // used again later
2390                 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2391                         string liface = (*mvsi).first;
2392                         vector<long long int> empty_list;
2393                         lfta_sigs[liface] = empty_list;
2394
2395                         vector<col_id_set> lfta_cols;
2396                         vector<int> lfta_snap_length;
2397                         for(li=0;li<lfta_iface_lists[liface].size();++li){
2398                                 unsigned long long int mask=0, bpos=1;
2399                                 int f_pos;
2400                                 for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
2401                                         if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
2402                                                 mask |= bpos;
2403                                         bpos = bpos << 1;
2404                                 }
2405                                 lfta_sigs[liface].push_back(mask);
2406                                 lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
2407                                 lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
2408                         }
2409
2410 //for(li=0;li<mach_lftas.size();++li){
2411 //printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
2412 //col_id_set::iterator tcisi;
2413 //for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
2414 //printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
2415 //}
2416 //}
2417
2418
2419 //                      generate the prefilter
2420 //      Do this on a per-interface basis, except for the #define
2421 #ifdef PREFILTER_OK
2422 //                      lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2423                         lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2424 #else
2425                         lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns,  lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2426
2427 #endif
2428                 }
2429
2430 //                      Generate interface parameter lookup function
2431           lfta_val[lmach] += "// lookup interface properties by name\n";
2432           lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
2433           lfta_val[lmach] += "// returns NULL if given property does not exist\n";
2434           lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
2435
2436 //        collect a lit of interface names used by queries running on this host
2437           set<std::string> iface_names;
2438           for(i=0;i<mach_query_names[lmach].size();i++){
2439                 int mi = mach_query_names[lmach][i];
2440                 stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2441
2442                 if(interface_names[mi]=="")
2443                         iface_names.insert("DEFAULTDEV");
2444                 else
2445                         iface_names.insert(interface_names[mi]);
2446           }
2447
2448 //        generate interface property lookup code for every interface
2449           set<std::string>::iterator sir;
2450           for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
2451                 if (sir == iface_names.begin())
2452                         lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2453                 else
2454                         lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2455
2456                 // iterate through interface properties
2457                 vector<string> iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
2458                 if (erri) {
2459                         fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
2460                         exit(1);
2461                 }
2462                 if (iface_properties.empty())
2463                         lfta_val[lmach] += "\t\treturn NULL;\n";
2464                 else {
2465                         for (int i = 0; i < iface_properties.size(); ++i) {
2466                                 if (i == 0)
2467                                         lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2468                                 else
2469                                         lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2470
2471                                 // combine all values for the interface property using comma separator
2472                                 vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
2473                                 for (int j = 0; j < vals.size(); ++j) {
2474                                         lfta_val[lmach] += "\t\t\treturn \"" + vals[j];
2475                                         if (j != vals.size()-1)
2476                                                 lfta_val[lmach] += ",";
2477                                 }
2478                                 lfta_val[lmach] += "\";\n";
2479                         }
2480                         lfta_val[lmach] += "\t\t} else\n";
2481                         lfta_val[lmach] += "\t\t\treturn NULL;\n";
2482                 }
2483           }
2484           lfta_val[lmach] += "\t} else\n";
2485           lfta_val[lmach] += "\t\treturn NULL;\n";
2486           lfta_val[lmach] += "}\n\n";
2487
2488
2489 //                      Generate a full list of FTAs for clearinghouse reference
2490           lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
2491           lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
2492
2493           for (i = 0; i < query_names.size(); ++i) {
2494                    if (i)
2495                           lfta_val[lmach] += ", ";
2496                    lfta_val[lmach] += "\"" + query_names[i] + "\"";
2497           }
2498           for (i = 0; i < hfta_list.size(); ++i) {
2499                    lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
2500           }
2501           lfta_val[lmach] += ", NULL};\n\n";
2502
2503
2504 //                      Add the initialization function to lfta.c
2505 //      Change to accept the interface name, and 
2506 //      set the prefilter function accordingly.
2507 //      see the example in demo/err2
2508           lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
2509
2510 //        for(i=0;i<mach_query_names[lmach].size();i++)
2511 //              int mi = mach_query_names[lmach][i];
2512 //              stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2513
2514           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2515                 string liface = (*mvsi).first;
2516                 vector<stream_query *> lfta_list = (*mvsi).second;
2517                 for(i=0;i<lfta_list.size();i++){
2518                         stream_query *lfta_sq = lfta_list[i];
2519                         int mi = lfta_iface_qname_ix[liface][i];
2520                 
2521                         fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
2522
2523                         string this_iface = "DEFAULTDEV";
2524                         if(interface_names[mi]!="")
2525                                 this_iface = '"'+interface_names[mi]+'"';
2526                         lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
2527                 lfta_val[lmach] += "\t\tfta_register(\""+query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
2528 //              if(interface_names[mi]=="")
2529 //                              lfta_val[lmach]+="DEFAULTDEV";
2530 //              else
2531 //                              lfta_val[lmach]+='"'+interface_names[mi]+'"';
2532                         lfta_val[lmach] += this_iface;
2533
2534
2535                 lfta_val[lmach] += ", "+generate_alloc_name(query_names[mi])
2536                         +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(query_names[mi])
2537                         +"\n#endif\n";
2538                                 sprintf(tmpstr,",%d",snap_lengths[mi]);
2539                         lfta_val[lmach] += tmpstr;
2540
2541 //                      unsigned long long int mask=0, bpos=1;
2542 //                      int f_pos;
2543 //                      for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
2544 //                              if(prefilter_preds[f_pos]->lfta_id.count(i))
2545 //                                      mask |= bpos;
2546 //                              bpos = bpos << 1;
2547 //                      }
2548
2549 #ifdef PREFILTER_OK
2550 //                      sprintf(tmpstr,",%lluull",mask);
2551                         sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
2552                         lfta_val[lmach]+=tmpstr;
2553 #else
2554                         lfta_val[lmach] += ",0ull";
2555 #endif
2556
2557                         lfta_val[lmach] += ");\n";
2558
2559
2560
2561 //    End of lfta prefilter stuff
2562 // --------------------------------------------------
2563
2564 //                      If there is a field verifier, warn about
2565 //                      lack of compatability
2566                   string lfta_comment;
2567                   string lfta_title;
2568                   string lfta_namespace;
2569                   map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
2570                   if(ldefs.count("comment")>0)
2571                         lfta_comment = lfta_sq->defines["comment"];
2572                   if(ldefs.count("Comment")>0)
2573                         lfta_comment = lfta_sq->defines["Comment"];
2574                   if(ldefs.count("COMMENT")>0)
2575                         lfta_comment = lfta_sq->defines["COMMENT"];
2576                   if(ldefs.count("title")>0)
2577                         lfta_title = lfta_sq->defines["title"];
2578                   if(ldefs.count("Title")>0)
2579                         lfta_title = lfta_sq->defines["Title"];
2580                   if(ldefs.count("TITLE")>0)
2581                         lfta_title = lfta_sq->defines["TITLE"];
2582                   if(ldefs.count("NAMESPACE")>0)
2583                         lfta_namespace = lfta_sq->defines["NAMESPACE"];
2584                   if(ldefs.count("Namespace")>0)
2585                         lfta_namespace = lfta_sq->defines["Namespace"];
2586                   if(ldefs.count("namespace")>0)
2587                         lfta_namespace = lfta_sq->defines["namespace"];
2588
2589                   string lfta_ht_size;
2590                   if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
2591                         lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
2592                   if(ldefs.count("aggregate_slots")>0){
2593                         lfta_ht_size = ldefs["aggregate_slots"];
2594                   }
2595
2596 //                      NOTE : I'm assuming that visible lftas do not start with _fta.
2597 //                              -- will fail for non-visible simple selection queries.
2598                 if(field_verifier != NULL && query_names[mi].substr(0,1) != "_"){
2599                         string warning_str;
2600                         if(lfta_comment == "")
2601                                 warning_str += "\tcomment not found.\n";
2602                         if(lfta_title == "")
2603                                 warning_str += "\ttitle not found.\n";
2604                         if(lfta_namespace == "")
2605                                 warning_str += "\tnamespace not found.\n";
2606
2607                         vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2608                         int fi;
2609                         for(fi=0;fi<flds.size();fi++){
2610                                 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2611                         }
2612                         if(warning_str != "")
2613                                 fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
2614                                         query_names[mi].c_str(),warning_str.c_str());
2615                 }
2616
2617
2618 //                      Create qtree output
2619                 fprintf(qtree_output,"\t<LFTA name='%s' >\n",query_names[mi].c_str());
2620                   if(lfta_comment != "")
2621                         fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
2622                   if(lfta_title != "")
2623                         fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
2624                   if(lfta_namespace != "")
2625                         fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
2626                   if(lfta_ht_size != "")
2627                         fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
2628                 if(lmach != "")
2629                   fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
2630                 else
2631                   fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
2632                 fprintf(qtree_output,"\t\t<Interface  value='%s' />\n",interface_names[mi].c_str());
2633                 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
2634                 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2635                 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
2636 //                              write info about fields to qtree.xml
2637                   vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2638                   int fi;
2639                   for(fi=0;fi<flds.size();fi++){
2640                         fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2641                         if(flds[fi]->get_modifier_list()->size()){
2642                                 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2643                         }
2644                         fprintf(qtree_output," />\n");
2645                   }
2646                 fprintf(qtree_output,"\t</LFTA>\n");
2647
2648
2649             }
2650           }
2651
2652           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2653                         string liface = (*mvsi).first;
2654                         lfta_val[lmach] += 
2655 "       if (!strcmp(device, \""+liface+"\")) \n"
2656 "               lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
2657 ;
2658                 }
2659                 lfta_val[lmach] += 
2660 "       if(lfta_prefilter == NULL){\n"
2661 "               fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
2662 "               exit(1);\n"
2663 "       }\n"
2664 ;
2665
2666
2667
2668           lfta_val[lmach] += "}\n\n";
2669
2670       if(!(debug_only || hfta_only) ){
2671                 string lfta_flnm;
2672                 if(lmach != "")
2673                   lfta_flnm = lmach + "_lfta.c";
2674                 else
2675                   lfta_flnm = hostname + "_lfta.c";
2676                 if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
2677                         fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
2678                         exit(1);
2679                 }
2680               fprintf(lfta_out,"%s",lfta_header.c_str());
2681               fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
2682               fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
2683                 fclose(lfta_out);
2684           }
2685         }
2686
2687 //              Say what are the operators which must execute
2688         if(opviews.size()>0)
2689                 fprintf(stderr,"The queries use the following external operators:\n");
2690         for(i=0;i<opviews.size();++i){
2691                 opview_entry *opv = opviews.get_entry(i);
2692                 fprintf(stderr,"\t%s\n",opv->view_name.c_str());
2693         }
2694
2695         if(create_makefile)
2696                 generate_makefile(input_file_names, nfiles, hfta_names, opviews,
2697                 machine_names, schema_file_name,
2698                 interface_names,
2699                 ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
2700
2701
2702         fprintf(qtree_output,"</QueryNodes>\n");
2703
2704         return(0);
2705 }
2706
2707 ////////////////////////////////////////////////////////////
2708
2709 void generate_makefile(vector<string> &input_file_names, int nfiles,
2710                                            vector<string> &hfta_names, opview_set &opviews,
2711                                                 vector<string> &machine_names,
2712                                                 string schema_file_name,
2713                                                 vector<string> &interface_names,
2714                                                 ifq_t *ifdb, string &config_dir_path,
2715                                                 bool use_pads,
2716                                                 string extra_libs,
2717                                                 map<string, vector<int> > &rts_hload
2718                                          ){
2719         int i,j;
2720
2721         if(config_dir_path != ""){
2722                 config_dir_path = "-C "+config_dir_path;
2723         }
2724
2725         struct stat sb;
2726         bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
2727         bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
2728
2729 //      if(libz_exists && !libast_exists){
2730 //              fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
2731 //              exit(1);
2732 //      }
2733
2734 //                      Get set of operator executable files to run
2735         set<string> op_fls;
2736         set<string>::iterator ssi;
2737         for(i=0;i<opviews.size();++i){
2738                 opview_entry *opv = opviews.get_entry(i);
2739                 if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
2740         }
2741
2742         FILE *outfl = fopen("Makefile", "w");
2743         if(outfl==NULL){
2744                 fprintf(stderr,"Can't open Makefile for write, exiting.\n");
2745                 exit(0);
2746         }
2747
2748         fputs(
2749 ("CPP= g++ -O3 -g -I "+root_path+"/include  -I "+root_path+"/include/hfta\n"
2750 "CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
2751 ).c_str(), outfl
2752 );
2753         if(generate_stats)
2754                 fprintf(outfl,"  -DLFTA_STATS");
2755
2756 //              Gather the set of interfaces
2757 //              Also, gather "base interface names" for use in computing
2758 //              the hash splitting to virtual interfaces.
2759 //              TODO : must update to hanndle machines
2760         set<string> ifaces;
2761         set<string> base_vifaces;       // base interfaces of virtual interfaces
2762         map<string, string> ifmachines;
2763         map<string, string> ifattrs;
2764         for(i=0;i<interface_names.size();++i){
2765                 ifaces.insert(interface_names[i]);
2766                 ifmachines[interface_names[i]] = machine_names[i];
2767
2768                 size_t Xpos = interface_names[i].find_last_of("X");
2769                 if(Xpos!=string::npos){
2770                         string iface = interface_names[i].substr(0,Xpos);
2771                         base_vifaces.insert(iface);
2772                 }
2773                 // get interface attributes and add them to the list
2774         }
2775
2776 //              Do we need to include protobuf libraries?
2777         bool use_proto = false;
2778         int erri;
2779         string err_str;
2780         for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
2781                 string ifnm = (*ssi);
2782                 vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
2783                 for(int ift_i=0;ift_i<ift.size();ift_i++){
2784                         if(ift[ift_i]=="PROTO"){
2785                                 use_proto = true;
2786                         }
2787                 }
2788         }
2789
2790         fprintf(outfl,
2791 "\n"
2792 "\n"
2793 "all: rts");
2794         for(i=0;i<hfta_names.size();++i)
2795                 fprintf(outfl," %s",hfta_names[i].c_str());
2796         fputs(
2797 ("\n"
2798 "\n"
2799 "rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a  "+root_path+"/lib/libclearinghouse.a\n"
2800 "\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
2801         if(use_pads)
2802                 fprintf(outfl,"-L. ");
2803         fputs(
2804 ("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
2805         if(use_pads)
2806                 fprintf(outfl,"-lgscppads -lpads ");
2807         fprintf(outfl,
2808 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux  -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
2809         if(use_pads)
2810                 fprintf(outfl, " -lpz -lz -lbz ");
2811         if(libz_exists && libast_exists)
2812                 fprintf(outfl," -last ");
2813         if(use_pads)
2814                 fprintf(outfl, " -ldll -ldl ");
2815         if(use_proto)
2816                 fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
2817         fprintf(outfl," -lgscpaux");
2818 #ifdef GCOV
2819         fprintf(outfl," -fprofile-arcs");
2820 #endif
2821         fprintf(outfl,
2822 "\n"
2823 "\n"
2824 "lfta.o: %s_lfta.c\n"
2825 "\t$(CC) -o lfta.o -c %s_lfta.c\n"
2826 "\n"
2827 "%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
2828         for(i=0;i<nfiles;++i)
2829                 fprintf(outfl," %s",input_file_names[i].c_str());
2830         if(hostname == ""){
2831                 fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
2832         }else{
2833                 fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
2834         }
2835         for(i=0;i<nfiles;++i)
2836                 fprintf(outfl," %s",input_file_names[i].c_str());
2837         fprintf(outfl,"\n");
2838
2839         for(i=0;i<hfta_names.size();++i)
2840                 fprintf(outfl,
2841 ("%s: %s.o\n"
2842 "\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
2843 "\n"
2844 "%s.o: %s.cc\n"
2845 "\t$(CPP) -o %s.o -c %s.cc\n"
2846 "\n"
2847 "\n").c_str(),
2848     hfta_names[i].c_str(), hfta_names[i].c_str(),
2849         hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
2850         hfta_names[i].c_str(), hfta_names[i].c_str(),
2851         hfta_names[i].c_str(), hfta_names[i].c_str()
2852                 );
2853
2854         fprintf(outfl,
2855 ("\n"
2856 "packet_schema.txt:\n"
2857 "\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
2858 "\n"
2859 "external_fcns.def:\n"
2860 "\tln -s "+root_path+"/cfg/external_fcns.def .\n"
2861 "\n"
2862 "clean:\n"
2863 "\trm -rf core rts *.o  %s_lfta.c  external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
2864         for(i=0;i<hfta_names.size();++i)
2865                 fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
2866         fprintf(outfl,"\n");
2867
2868         fclose(outfl);
2869
2870
2871
2872 //              Gather the set of interfaces
2873 //              TODO : must update to hanndle machines
2874 //              TODO : lookup interface attributes and add them as a parameter to rts process
2875         outfl = fopen("runit", "w");
2876         if(outfl==NULL){
2877                 fprintf(stderr,"Can't open runit for write, exiting.\n");
2878                 exit(0);
2879         }
2880
2881
2882         fputs(
2883 ("#!/bin/sh\n"
2884 "./stopit\n"
2885 +root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
2886 "sleep 5\n"
2887 "if [ ! -f gshub.log ]\n"
2888 "then\n"
2889 "\techo \"Failed to start bin/gshub.py\"\n"
2890 "\texit -1\n"
2891 "fi\n"
2892 "ADDR=`cat gshub.log`\n"
2893 "ps opgid= $! >> gs.pids\n"
2894 "./rts $ADDR default ").c_str(), outfl);
2895 //      int erri;
2896 //      string err_str;
2897         for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
2898                 string ifnm = (*ssi);
2899                 fprintf(outfl, "%s ",ifnm.c_str());
2900                 vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
2901                 for(j=0;j<ifv.size();++j)
2902                         fprintf(outfl, "%s ",ifv[j].c_str());
2903         }
2904         fprintf(outfl, " &\n");
2905         fprintf(outfl, "echo $! >> gs.pids\n");
2906         for(i=0;i<hfta_names.size();++i)
2907                 fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
2908
2909         for(j=0;j<opviews.opview_list.size();++j){
2910                 fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
2911         }
2912
2913         fclose(outfl);
2914         system("chmod +x runit");
2915
2916         outfl = fopen("stopit", "w");
2917         if(outfl==NULL){
2918                 fprintf(stderr,"Can't open stopit for write, exiting.\n");
2919                 exit(0);
2920         }
2921
2922         fprintf(outfl,"#!/bin/sh\n"
2923 "rm -f gshub.log\n"
2924 "if [ ! -f gs.pids ]\n"
2925 "then\n"
2926 "exit\n"
2927 "fi\n"
2928 "for pgid in `cat gs.pids`\n"
2929 "do\n"
2930 "kill -TERM -$pgid\n"
2931 "done\n"
2932 "sleep 1\n"
2933 "for pgid in `cat gs.pids`\n"
2934 "do\n"
2935 "kill -9 -$pgid\n"
2936 "done\n"
2937 "rm gs.pids\n");
2938
2939         fclose(outfl);
2940         system("chmod +x stopit");
2941
2942 //-----------------------------------------------
2943
2944 /* For now disable support for virtual interfaces
2945         outfl = fopen("set_vinterface_hash.bat", "w");
2946         if(outfl==NULL){
2947                 fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
2948                 exit(0);
2949         }
2950
2951 //              The format should be determined by an entry in the ifres.xml file,
2952 //              but for now hardcode the only example I have.
2953         for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
2954                 if(rts_hload.count((*ssi))){
2955                         string iface_name = (*ssi);
2956                         string iface_number = "";
2957                         for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
2958                                 if(isdigit(iface_name[j])){
2959                                         iface_number = iface_name[j];
2960                                         if(j>0 && isdigit(iface_name[j-1]))
2961                                                 iface_number = iface_name[j-1] + iface_number;
2962                                 }
2963                         }
2964
2965                         fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
2966                         vector<int> halloc = rts_hload[iface_name];
2967                         int prev_limit = 0;
2968                         for(j=0;j<halloc.size();++j){
2969                                 if(j>0)
2970                                         fprintf(outfl,":");
2971                                 fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
2972                                 prev_limit = halloc[j];
2973                         }
2974                         fprintf(outfl,"\n");
2975                 }
2976         }
2977         fclose(outfl);
2978         system("chmod +x set_vinterface_hash.bat");
2979 */
2980 }
2981
2982 //              Code for implementing a local schema
2983 /*
2984                 table_list qpSchema;
2985
2986 //                              Load the schemas of any LFTAs.
2987                 int l;
2988                 for(l=0;l<hfta_nbr;++l){
2989                         stream_query *sq0 = split_queries[l];
2990                         table_def *td = sq0->get_output_tabledef();
2991                         qpSchema.append_table(td);
2992                 }
2993 //                              load the schemas of any other ref'd tables.
2994 //                              (e.g., hftas)
2995                 vector<tablevar_t *>  input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
2996                 int ti;
2997                 for(ti=0;ti<input_tbl_names.size();++ti){
2998                         int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
2999                         if(tbl_ref < 0){
3000                                 tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
3001                                 if(tbl_ref < 0){
3002                                         fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
3003                                         exit(1);
3004                                 }
3005                                 qpSchema.append_table(Schema->get_table(tbl_ref));
3006                         }
3007                 }
3008 */
3009
3010 //              Functions related to parsing.
3011
3012 /*
3013 static int split_string(char *instr,char sep, char **words,int max_words){
3014    char *loc;
3015    char *str;
3016    int nwords = 0;
3017
3018    str = instr;
3019    words[nwords++] = str;
3020    while( (loc = strchr(str,sep)) != NULL){
3021         *loc = '\0';
3022         str = loc+1;
3023         if(nwords >= max_words){
3024                 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
3025                 nwords = max_words-1;
3026         }
3027         words[nwords++] = str;
3028    }
3029
3030    return(nwords);
3031 }
3032
3033 */
3034