Initial commit
[com/gs-lite.git] / src / ftacmp / translate_fta.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #include<unistd.h>              // for gethostname
17
18 #include <string>
19 #include "parse_fta.h"
20 #include "parse_schema.h"
21 #include "parse_ext_fcns.h"
22 #include"analyze_fta.h"
23 #include"query_plan.h"
24 #include"generate_lfta_code.h"
25 #include"stream_query.h"
26 #include"generate_utils.h"
27 #include"nic_def.h"
28 #include"generate_nic_code.h"
29
30 #include <stdlib.h>
31 #include <stdio.h>
32 #include<ctype.h>
33
34 #include<list>
35
36 //              for the scandir
37      #include <sys/types.h>
38      #include <dirent.h>
39
40
41 #include<errno.h>
42
43 //              to verify that some files exist.
44      #include <sys/types.h>
45      #include <sys/stat.h>
46
47 #include "parse_partn.h"
48
49 #include "print_plan.h"
50
51 //              Interface to the xml parser
52
53 #include"xml_t.h"
54 #include"field_list.h"
55
56 extern int xmlParserparse(void);
57 extern FILE *xmlParserin;
58 extern int xmlParserdebug;
59
60 std::vector<std::string> xml_attr_vec;
61 std::vector<std::string> xml_val_vec;
62 std::string xml_a, xml_v;
63 xml_t *xml_leaves = NULL;
64
65 //      Interface to the field list verifier
66 field_list *field_verifier = NULL;
67
68 #define TMPSTRLEN 1000
69
70 #ifndef PATH_DELIM
71   #define PATH_DELIM '/'
72 #endif
73
74 char tmp_schema_str[10000];
75
76 // maximum delay between two hearbeats produced
77 // by UDOP. Used when its not explicity
78 // provided in udop definition
79 #define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
80
81 //              Default lfta hash table size, must be power of 2.
82 int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
83
84 //              Interface to FTA definition lexer and parser ...
85
86 extern int FtaParserparse(void);
87 extern FILE *FtaParserin;
88 extern int FtaParserdebug;
89
90 fta_parse_t *fta_parse_result;
91 var_defs_t *fta_parse_defines;
92
93
94
95 //              Interface to external function lexer and parser ...
96
97 extern int Ext_fcnsParserparse(void);
98 extern FILE *Ext_fcnsParserin;
99 extern int Ext_fcnsParserdebug;
100
101 ext_fcn_list *Ext_fcns;
102
103
104 //              Interface to partition definition parser
105 extern int PartnParserparse();
106 partn_def_list_t *partn_parse_result = NULL;
107
108
109
110 using namespace std;
111 //extern int errno;
112
113
114 //              forward delcaration of local utility function
115 void generate_makefile(vector<string> &input_file_names, int nfiles,
116                                            vector<string> &hfta_names, opview_set &opviews,
117                                                 vector<string> &machine_names,
118                                                 string schema_file_name,
119                                                 vector<string> &interface_names,
120                                                 ifq_t *ifdb, string &config_dir_path,
121                                                 bool use_pads,
122                                                 string extra_libs,
123                                                 map<string, vector<int> > &rts_hload
124                                         );
125
126 //static int split_string(char *instr,char sep, char **words,int max_words);
127 #define MAXFLDS 100
128
129   FILE *schema_summary_output = NULL;           // query names
130
131 //                      Dump schema summary
132 void dump_summary(stream_query *str){
133         fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
134
135         table_def *sch = str->get_output_tabledef();
136
137         vector<field_entry *> flds = sch->get_fields();
138         int f;
139         for(f=0;f<flds.size();++f){
140                 if(f>0) fprintf(schema_summary_output,"|");
141                 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
142         }
143         fprintf(schema_summary_output,"\n");
144         for(f=0;f<flds.size();++f){
145                 if(f>0) fprintf(schema_summary_output,"|");
146                 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
147         }
148         fprintf(schema_summary_output,"\n");
149 }
150
151 //              Globals
152 string hostname;                // name of current host.
153 int hostname_len;
154 bool generate_stats = false;
155 string root_path = "../..";
156
157
158 int main(int argc, char **argv){
159   char tmpstr[TMPSTRLEN];
160   string err_str;
161   int q,s,h,f;
162
163   set<int>::iterator si;
164
165   vector<string> query_names;                   // for lfta.c registration
166   map<string, vector<int> > mach_query_names;   // list queries of machine
167   vector<int> snap_lengths;                             // for lfta.c registration
168   vector<string> interface_names;                       // for lfta.c registration
169   vector<string> machine_names;                 // machine of interface
170   vector<bool> lfta_reuse_options;                      // for lfta.c registration
171   vector<int> lfta_liveness_timeouts;           // fot qtree.xml generation
172   vector<string> hfta_names;                    // hfta cource code names, for
173                                                                                 // creating make file.
174   vector<string> qnames;                                // ensure unique names
175   map<string, int> lfta_names;                  // keep track of unique lftas.
176
177
178 //                              set these to 1 to debug the parser
179   FtaParserdebug = 0;
180   Ext_fcnsParserdebug = 0;
181
182   FILE *lfta_out;                               // lfta.c output.
183   FILE *fta_in;                                 // input file
184   FILE *table_schemas_in;               // source tables definition file
185   FILE *query_name_output;              // query names
186   FILE *qtree_output;                   // interconnections of query nodes
187
188   // -------------------------------
189   // Handling of Input Arguments
190   // -------------------------------
191     char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
192         const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
193                 "\t[-B] : debug only (don't create output files)\n"
194                 "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
195                 "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
196                 "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
197                 "\t[-C] : use <config directory> for definition files\n"
198                 "\t[-l] : use <library directory> for library queries\n"
199                 "\t[-N] : output query names in query_names.txt\n"
200                 "\t[-H] : create HFTA only (no schema_file)\n"
201                 "\t[-Q] : use query name for hfta suffix\n"
202                 "\t[-M] : generate make file and runit, stopit scripts\n"
203                 "\t[-S] : enable LFTA statistics (alters Makefile).\n"
204                 "\t[-f] : Output schema summary to schema_summary.txt\n"
205                 "\t[-P] : link with PADS\n"
206                 "\t[-h] : override host name.\n"
207                 "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
208                 "\t[-R] : path to root of GS-lite\n"
209 ;
210
211 //              parameters gathered from command line processing
212         string external_fcns_path;
213 //      string internal_fcn_path;
214         string config_dir_path;
215         string library_path = "./";
216         vector<string> input_file_names;
217         string schema_file_name;
218         bool debug_only = false;
219         bool hfta_only = false;
220         bool output_query_names = false;
221         bool output_schema_summary=false;
222         bool numeric_hfta_flname = true;
223         bool create_makefile = false;
224         bool distributed_mode = false;
225         bool partitioned_mode = false;
226         bool use_live_hosts_file = false;
227         bool use_pads = false;
228         bool clean_make = false;
229         int n_virtual_interfaces = 1;
230
231    char chopt;
232    while((chopt = getopt(argc,argv,optstr)) != -1){
233                 switch(chopt){
234                 case 'B':
235                         debug_only = true;
236                         break;
237                 case 'D':
238                         distributed_mode = true;
239                         break;
240                 case 'p':
241                         partitioned_mode = true;
242                         break;
243                 case 'L':
244                         use_live_hosts_file = true;
245                         break;
246                 case 'C':
247                                 if(optarg != NULL)
248                                  config_dir_path = string(optarg) + string("/");
249                         break;
250                 case 'l':
251                                 if(optarg != NULL)
252                                  library_path = string(optarg) + string("/");
253                         break;
254                 case 'N':
255                         output_query_names = true;
256                         break;
257                 case 'Q':
258                         numeric_hfta_flname = false;
259                         break;
260                 case 'H':
261                         if(schema_file_name == ""){
262                                 hfta_only = true;
263                         }
264                         break;
265                 case 'f':
266                         output_schema_summary=true;
267                         break;
268                 case 'M':
269                         create_makefile=true;
270                         break;
271                 case 'S':
272                         generate_stats=true;
273                         break;
274                 case 'P':
275                         use_pads = true;
276                         break;
277                 case 'c':
278                         clean_make = true;
279                         break;
280                 case 'h':
281                         if(optarg != NULL)
282                                 hostname = optarg;
283                         break;
284                 case 'R':
285                         if(optarg != NULL)
286                                 root_path = optarg;
287                         break;
288                 case 'n':
289                         if(optarg != NULL){
290                                 n_virtual_interfaces = atoi(optarg);
291                                 if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
292                                         fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
293                                         n_virtual_interfaces = 1;
294                                 }
295                         }
296                         break;
297                 case '?':
298                         fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
299                         fprintf(stderr,"%s\n", usage_str);
300                         exit(1);
301                 default:
302                         fprintf(stderr, "Argument was %c\n", optopt);
303                         fprintf(stderr,"Invalid arguments\n");
304                         fprintf(stderr,"%s\n", usage_str);
305                         exit(1);
306                 }
307         }
308         argc -= optind;
309         argv += optind;
310         for (int i = 0; i < argc; ++i) {
311                 if((schema_file_name == "") && !hfta_only){
312                         schema_file_name = argv[i];
313                 }else{
314                         input_file_names.push_back(argv[i]);
315                 }
316         }
317
318         if(input_file_names.size() == 0){
319                 fprintf(stderr,"%s\n", usage_str);
320                 exit(1);
321         }
322
323         if(clean_make){
324                 string clean_cmd = "rm Makefile hfta_*.cc";
325                 int clean_ret = system(clean_cmd.c_str());
326                 if(clean_ret){
327                         fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
328                 }
329         }
330
331
332         nic_prop_db *npdb = new nic_prop_db(config_dir_path);
333
334 //                      Open globally used file names.
335
336         // prepend config directory to schema file
337         schema_file_name = config_dir_path + schema_file_name;
338         external_fcns_path = config_dir_path + string("external_fcns.def");
339     string ifx_fname = config_dir_path + string("ifres.xml");
340
341 //              Find interface query file(s).
342         if(hostname == ""){
343                 gethostname(tmpstr,TMPSTRLEN);
344                 hostname = tmpstr;
345         }
346         hostname_len = strlen(tmpstr);
347     string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
348         vector<string> ifq_fls;
349
350                 ifq_fls.push_back(ifq_fname);
351
352
353 //                      Get the field list, if it exists
354         string flist_fl = config_dir_path + "field_list.xml";
355         FILE *flf_in = NULL;
356         if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
357                 fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
358                 xml_leaves = new xml_t();
359                 xmlParser_setfileinput(flf_in);
360                 if(xmlParserparse()){
361                         fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
362                 }else{
363                         field_verifier = new field_list(xml_leaves);
364                 }
365         }
366
367         if(!hfta_only){
368           if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
369                 fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
370                 exit(1);
371           }
372         }
373
374 /*
375         if(!(debug_only || hfta_only)){
376           if((lfta_out = fopen("lfta.c","w")) == NULL){
377                 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
378                 exit(1);
379           }
380         }
381 */
382
383 //              Get the output specification file.
384 //              format is
385 //                      query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
386         string ospec_fl = "output_spec.cfg";
387         FILE *osp_in = NULL;
388         vector<ospec_str *> output_specs;
389         multimap<string, int> qname_to_ospec;
390         if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
391                 char *flds[MAXFLDS];
392                 int o_lineno = 0;
393                 while(fgets(tmpstr,TMPSTRLEN,osp_in)){
394                         o_lineno++;
395                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
396                         if(nflds == 7){
397 //              make operator type lowercase
398                                 char *tmpc;
399                                 for(tmpc=flds[1];*tmpc!='\0';++tmpc)
400                                         *tmpc = tolower(*tmpc);
401
402                                 ospec_str *tmp_ospec = new ospec_str();
403                                 tmp_ospec->query = flds[0];
404                                 tmp_ospec->operator_type = flds[1];
405                                 tmp_ospec->operator_param = flds[2];
406                                 tmp_ospec->output_directory = flds[3];
407                                 tmp_ospec->bucketwidth = atoi(flds[4]);
408                                 tmp_ospec->partitioning_flds = flds[5];
409                                 tmp_ospec->n_partitions = atoi(flds[6]);
410                                 qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
411                                 output_specs.push_back(tmp_ospec);
412                         }else{
413                                 fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
414                         }
415                 }
416                 fclose(osp_in);
417         }else{
418                 fprintf(stderr,"output_spec.cfg not found.  The query set has no output.  exiting.\n");
419                 exit(1);
420         }
421
422 //              hfta parallelism
423         string pspec_fl = "hfta_parallelism.cfg";
424         FILE *psp_in = NULL;
425         map<string, int> hfta_parallelism;
426         if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
427                 char *flds[MAXFLDS];
428                 int o_lineno = 0;
429                 while(fgets(tmpstr,TMPSTRLEN,psp_in)){
430                         bool good_entry = true;
431                         o_lineno++;
432                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
433                         if(nflds == 2){
434                                 string hname = flds[0];
435                                 int par = atoi(flds[1]);
436                                 if(par <= 0 || par > n_virtual_interfaces){
437                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
438                                         good_entry = false;
439                                 }
440                                 if(good_entry && n_virtual_interfaces % par != 0){
441                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
442                                         good_entry = false;
443                                 }
444                                 if(good_entry)
445                                         hfta_parallelism[hname] = par;
446                         }
447                 }
448         }else{
449                 fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
450         }
451
452
453 //              LFTA hash table sizes
454         string htspec_fl = "lfta_htsize.cfg";
455         FILE *htsp_in = NULL;
456         map<string, int> lfta_htsize;
457         if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
458                 char *flds[MAXFLDS];
459                 int o_lineno = 0;
460                 while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
461                         bool good_entry = true;
462                         o_lineno++;
463                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
464                         if(nflds == 2){
465                                 string lfta_name = flds[0];
466                                 int htsz = atoi(flds[1]);
467                                 if(htsz>0){
468                                         lfta_htsize[lfta_name] = htsz;
469                                 }else{
470                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
471                                 }
472                         }
473                 }
474         }else{
475                 fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
476         }
477
478 //              LFTA vitual interface hash split
479         string rtlspec_fl = "rts_load.cfg";
480         FILE *rtl_in = NULL;
481         map<string, vector<int> > rts_hload;
482         if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
483                 char *flds[MAXFLDS];
484                 int r_lineno = 0;
485                 string iface_name;
486                 vector<int> hload;
487                 while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
488                         bool good_entry = true;
489                         r_lineno++;
490                         iface_name = "";
491                         hload.clear();
492                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
493                         if(nflds >1){
494                                 iface_name = flds[0];
495                                 int cumm_h = 0;
496                                 int j;
497                                 for(j=1;j<nflds;++j){
498                                         int h = atoi(flds[j]);
499                                         if(h<=0)
500                                                 good_entry = false;
501                                         cumm_h += h;
502                                         hload.push_back(cumm_h);
503                                 }
504                         }else{
505                                 good_entry = false;
506                         }
507                         if(good_entry){
508                                 rts_hload[iface_name] = hload;
509                         }else{
510                                 fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
511                         }
512                 }
513         }
514
515
516
517         if(output_query_names){
518           if((query_name_output = fopen("query_names.txt","w")) == NULL){
519                 fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
520                 exit(1);
521           }
522         }
523
524         if(output_schema_summary){
525           if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
526                 fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
527                 exit(1);
528           }
529         }
530
531         if((qtree_output = fopen("qtree.xml","w")) == NULL){
532                 fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
533                 exit(1);
534         }
535         fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
536         fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
537         fprintf(qtree_output,"<QueryNodes>\n");
538
539
540 //                      Get an initial Schema
541         table_list *Schema;
542         if(!hfta_only){
543 //                      Parse the table schema definitions.
544           fta_parse_result = new fta_parse_t();
545           FtaParser_setfileinput(table_schemas_in);
546           if(FtaParserparse()){
547                 fprintf(stderr,"Table schema parse failed.\n");
548                 exit(1);
549           }
550           if(fta_parse_result->parse_type != TABLE_PARSE){
551                 fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
552                 exit(1);
553           }
554           Schema = fta_parse_result->tables;
555
556 //                      Process schema field inheritance
557           int retval;
558           retval = Schema->unroll_tables(err_str);
559           if(retval){
560                 fprintf(stderr,"Error processing schema filed inheritance:\n %s\n", err_str.c_str() );
561                 exit(1);
562           }
563         }else{
564 //                      hfta only => we will try to fetch schemas from the registry.
565 //                      therefore, start off with an empty schema.
566           Schema = new table_list();
567         }
568
569
570 //                      Open and parse the external functions file.
571         Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
572         if(Ext_fcnsParserin == NULL){
573                 fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
574                 Ext_fcns = new ext_fcn_list();
575         }else{
576                 if(Ext_fcnsParserparse()){
577                         fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
578                         Ext_fcns = new ext_fcn_list();
579                 }
580         }
581         if(Ext_fcns->validate_fcns(err_str)){
582                 fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
583                 exit(1);
584         }
585
586 //              Open and parse the interface resources file.
587 //      ifq_t *ifaces_db = new ifq_t();
588 //   string ierr;
589 //      if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
590 //              fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
591 //                              ifx_fname.c_str(), ierr.c_str());
592 //              exit(1);
593 //      }
594 //      if(ifaces_db->load_ifqs(ifq_fname, ierr)){
595 //              fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
596 //                              ifq_fname.c_str(), ierr.c_str());
597 //              exit(1);
598 //      }
599
600
601 //                      The LFTA code string.
602 //                      Put the standard preamble here.
603 //                      NOTE: the hash macros, fcns should go into the run time
604   map<string, string> lfta_val;
605   map<string, string> lfta_prefilter_val;
606
607   string lfta_header =
608 "#include <limits.h>\n\n"
609 "#include \"rts.h\"\n"
610 "#include \"fta.h\"\n"
611 "#include \"lapp.h\"\n"
612 "#include \"rts_udaf.h\"\n\n"
613 /*
614 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
615 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
616 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
617 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
618 */
619 "\n"
620 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
621 "\n"
622 "#define SLOT_FILLED 0x04\n"
623 "#define SLOT_GEN_BITS 0x03\n"
624 "#define SLOT_HASH_BITS 0xfffffff8\n"
625 "#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
626 "#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
627 "#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
628 "\n\n"
629
630 "#define lfta_BOOL_to_hash(x) (x)\n"
631 "#define lfta_USHORT_to_hash(x) (x)\n"
632 "#define lfta_UINT_to_hash(x) (x)\n"
633 "#define lfta_IP_to_hash(x) (x)\n"
634 "#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
635 "#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
636 "#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
637 "#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
638 "#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
639 "static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
640 "       gs_uint32_t i,ret=0,tmp_sum = 0;\n"
641 "       for(i=0;i<x.length;++i){\n"
642 "               tmp_sum |= (x.data[i]) << (8*(i%4));\n"
643 "               if((i%4) == 3){\n"
644 "                       ret ^= tmp_sum;\n"
645 "                       tmp_sum = 0;\n"
646 "               }\n"
647 "       }\n"
648 "       if((i%4)!=0) ret ^=tmp_sum;\n"
649 "       return(ret);\n"
650 "}\n\n\n";
651
652
653
654 //////////////////////////////////////////////////////////////////
655 /////                   Get all of the query parse trees
656
657
658   int i,p;
659   int hfta_count = 0;           // for numeric suffixes to hfta .cc files
660
661 //---------------------------
662 //              Global info needed for post processing.
663
664 //                      Set of operator views ref'd in the query set.
665         opview_set opviews;
666 //                      lftas on a per-machine basis.
667         map<string, vector<stream_query *> > lfta_mach_lists;
668         int nfiles = input_file_names.size();
669         vector<stream_query *> hfta_list;               // list of hftas.
670         map<string, stream_query *> sq_map;             // map from query name to stream query.
671
672
673 //////////////////////////////////////////
674
675 //              Open and parse the interface resources file.
676         ifq_t *ifaces_db = new ifq_t();
677     string ierr;
678         if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
679                 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
680                                 ifx_fname.c_str(), ierr.c_str());
681                 exit(1);
682         }
683         if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
684                 fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
685                                 ifq_fls[0].c_str(), ierr.c_str());
686                 exit(1);
687         }
688
689   map<string, string> qname_to_flname;  // for detecting duplicate query names
690
691
692
693 //                      Parse the files to create a vector of parse trees.
694 //                      Load qnodes with information to perform a topo sort
695 //                      based on query dependencies.
696   vector<query_node *> qnodes;                          // for topo sort.
697   map<string,int> name_node_map;                        // map query name to qnodes entry
698   for(i=0;i<input_file_names.size();i++){
699
700           if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
701                   fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
702                   continue;
703           }
704 fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
705
706 //                      Parse the FTA query
707           fta_parse_result = new fta_parse_t();
708           FtaParser_setfileinput(fta_in);
709           if(FtaParserparse()){
710                 fprintf(stderr,"FTA parse failed.\n");
711                 exit(1);
712           }
713           if(fta_parse_result->parse_type != QUERY_PARSE){
714                 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
715                 exit(1);
716           }
717
718 //                      returns a list of parse trees
719           vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
720           for(p=0;p<qlist.size();++p){
721             table_exp_t *fta_parse_tree = qlist[p];
722 //              query_parse_trees.push_back(fta_parse_tree);
723
724 //                      compute the default name -- extract from query name
725                 strcpy(tmpstr,input_file_names[i].c_str());
726                 char *qname = strrchr(tmpstr,PATH_DELIM);
727                 if(qname == NULL)
728                         qname = tmpstr;
729                 else
730                         qname++;
731                 char *qname_end = strchr(qname,'.');
732                 if(qname_end != NULL) *qname_end = '\0';
733                 string qname_str = qname;
734                 string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
735
736 //                      Deternmine visibility.  Should I be attaching all of the output methods?
737                 if(qname_to_ospec.count(imputed_qname)>0)
738                         fta_parse_tree->set_visible(true);
739                 else
740                         fta_parse_tree->set_visible(false);
741
742
743 //                              Create a manipulable repesentation of the parse tree.
744 //                              the qnode inherits the visibility assigned to the parse tree.
745             int pos = qnodes.size();
746                 qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
747                 name_node_map[ qnodes[pos]->name ] = pos;
748 //printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
749 //              qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
750 //              qfiles.push_back(i);
751
752 //                      Check for duplicate query names
753 //                                      NOTE : in hfta-only generation, I should
754 //                                      also check with the names of the registered queries.
755                 if(qname_to_flname.count(qnodes[pos]->name) > 0){
756                         fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
757                                 qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
758                         exit(1);
759                 }
760                 if(Schema->find_tbl(qnodes[pos]->name) >= 0){
761                         fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
762                                 qnodes[pos]->name.c_str(), input_file_names[i].c_str());
763                         exit(1);
764                 }
765                 qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
766
767
768         }
769   }
770
771 //              Add the library queries
772
773   int pos;
774   for(pos=0;pos<qnodes.size();++pos){
775         int fi;
776         for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
777                 string src_tbl = qnodes[pos]->refd_tbls[fi];
778                 if(qname_to_flname.count(src_tbl) == 0){
779                         int last_sep = src_tbl.find_last_of('/');
780                         if(last_sep != string::npos){
781 fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
782                                 string target_qname = src_tbl.substr(last_sep+1);
783                                 string qpathname = library_path + src_tbl + ".gsql";
784                                 if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
785                                         fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
786                                         exit(1);
787                                         fprintf(stderr,"After exit\n");
788                                 }
789 fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
790 //                      Parse the FTA query
791                                 fta_parse_result = new fta_parse_t();
792                                 FtaParser_setfileinput(fta_in);
793                                 if(FtaParserparse()){
794                                         fprintf(stderr,"FTA parse failed.\n");
795                                         exit(1);
796                                 }
797                                 if(fta_parse_result->parse_type != QUERY_PARSE){
798                                         fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
799                                         exit(1);
800                                 }
801
802                                 map<string, int> local_query_map;
803                                 vector<string> local_query_names;
804                                 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
805                                 for(p=0;p<qlist.size();++p){
806                                 table_exp_t *fta_parse_tree = qlist[p];
807                                         fta_parse_tree->set_visible(false);             // assumed to not produce output
808                                         string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
809                                         if(imputed_qname == target_qname)
810                                                 imputed_qname = src_tbl;
811                                         if(local_query_map.count(imputed_qname)>0){
812                                                 fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
813                                                 exit(1);
814                                         }
815                                         local_query_map[ imputed_qname ] = p;
816                                         local_query_names.push_back(imputed_qname);
817                                 }
818
819                                 if(local_query_map.count(src_tbl)==0){
820                                         fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
821                                         exit(1);
822                                 }
823
824                                 vector<int> worklist;
825                                 set<int> added_queries;
826                                 vector<query_node *> new_qnodes;
827                                 worklist.push_back(local_query_map[target_qname]);
828                                 added_queries.insert(local_query_map[target_qname]);
829                                 int qq;
830                                 int qpos = qnodes.size();
831                                 for(qq=0;qq<worklist.size();++qq){
832                                         int q_id = worklist[qq];
833                                         query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
834                                         new_qnodes.push_back( new_qnode);
835                                         vector<string> refd_tbls =  new_qnode->refd_tbls;
836                                         int ff;
837                                         for(ff = 0;ff<refd_tbls.size();++ff){
838                                                 if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
839
840                                                         if(name_node_map.count(refd_tbls[ff])>0){
841                                                                 fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s,  and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
842                                                                 exit(1);
843                                                         }else{
844                                                                 worklist.push_back(local_query_map[refd_tbls[ff]]);
845                                                         }
846                                                 }
847                                         }
848                                 }
849
850                                 for(qq=0;qq<new_qnodes.size();++qq){
851                                         int qpos = qnodes.size();
852                                         qnodes.push_back(new_qnodes[qq]);
853                                         name_node_map[qnodes[qpos]->name ] = qpos;
854                                         qname_to_flname[qnodes[qpos]->name ] = qpathname;
855                                 }
856                         }
857                 }
858         }
859   }
860
861
862
863
864
865
866
867
868 //---------------------------------------
869
870
871 //              Add the UDOPS.
872
873   string udop_missing_sources;
874   for(i=0;i<qnodes.size();++i){
875         int fi;
876         for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
877                 int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
878                 if(sid >= 0){
879                         if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
880                                 if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
881                                 int pos = qnodes.size();
882                                         qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
883                                         name_node_map[ qnodes[pos]->name ] = pos;
884                                         qnodes[pos]->is_externally_visible = false;   // its visible
885         //                                      Need to mark the source queries as visible.
886                                         int si;
887                                         string missing_sources = "";
888                                         for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
889                                                 string src_tbl = qnodes[pos]->refd_tbls[si];
890                                                 if(name_node_map.count(src_tbl)==0){
891                                                         missing_sources += src_tbl + " ";
892                                                 }
893                                         }
894                                         if(missing_sources != ""){
895                                                 udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
896                                         }
897                                 }
898                         }
899                 }
900         }
901   }
902   if(udop_missing_sources != ""){
903         fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
904         exit(1);
905   }
906
907
908
909 ////////////////////////////////////////////////////////////////////
910 ///                             Check parse trees to verify that some
911 ///                             global properties are met :
912 ///                             if q1 reads from q2, then
913 ///                               q2 is processed before q1
914 ///                               q1 can supply q2's parameters
915 ///                             Verify there is no cycle in the reads-from graph.
916
917 //                      Compute an order in which to process the
918 //                      queries.
919
920 //                      Start by building the reads-from lists.
921 //
922
923   for(i=0;i<qnodes.size();++i){
924         int qi, fi;
925         vector<string> refd_tbls =  qnodes[i]->refd_tbls;
926         for(fi = 0;fi<refd_tbls.size();++fi){
927                 if(name_node_map.count(refd_tbls[fi])>0){
928 //printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
929                         (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
930                 }
931         }
932   }
933
934
935 //              If one query reads the result of another,
936 //              check for parameter compatibility.  Currently it must
937 //              be an exact match.  I will move to requiring
938 //              containment after re-ordering, but will require
939 //              some analysis for code generation which is not
940 //              yet in place.
941 //printf("There are %d query nodes.\n",qnodes.size());
942
943
944   for(i=0;i<qnodes.size();++i){
945         vector<var_pair_t *> target_params  = qnodes[i]->params;
946         for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
947                 vector<var_pair_t *> source_params  = qnodes[(*si)]->params;
948                 if(target_params.size() != source_params.size()){
949                         fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
950                         exit(1);
951                 }
952                 int p;
953                 for(p=0;p<target_params.size();++p){
954                         if(! (target_params[p]->name == source_params[p]->name &&
955                               target_params[p]->val == source_params[p]->val ) ){
956                         fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
957                                 exit(1);
958                         }
959                 }
960         }
961   }
962
963
964 //              Find the roots.
965 //              Start by counting inedges.
966   for(i=0;i<qnodes.size();++i){
967         for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
968                 qnodes[(*si)]->n_consumers++;
969         }
970   }
971
972 //              The roots are the nodes with indegree zero.
973   set<int> roots;
974   for(i=0;i<qnodes.size();++i){
975         if(qnodes[i]->n_consumers == 0){
976                 if(qnodes[i]->is_externally_visible == false){
977                         fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible.  Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
978                 }
979                 roots.insert(i);
980         }
981   }
982
983 //              Remove the parts of the subtree that produce no output.
984   set<int> valid_roots;
985   set<int> discarded_nodes;
986   set<int> candidates;
987   while(roots.size() >0){
988         for(si=roots.begin();si!=roots.end();++si){
989                 if(qnodes[(*si)]->is_externally_visible){
990                         valid_roots.insert((*si));
991                 }else{
992                         discarded_nodes.insert((*si));
993                         set<int>::iterator sir;
994                         for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
995                                 qnodes[(*sir)]->n_consumers--;
996                                 if(qnodes[(*sir)]->n_consumers == 0)
997                                         candidates.insert( (*sir));
998                         }
999                 }
1000         }
1001         roots = candidates;
1002         candidates.clear();
1003   }
1004   roots = valid_roots;
1005   if(discarded_nodes.size()>0){
1006         fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
1007         int di = 0;
1008         for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
1009                 if(di>0 && (di%8)==0) fprintf(stderr,"\n");
1010                 di++;
1011                 fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
1012         }
1013         fprintf(stderr,"\n");
1014   }
1015
1016 //              Compute the sources_to set, ignoring discarded nodes.
1017   for(i=0;i<qnodes.size();++i){
1018         if(discarded_nodes.count(i)==0)
1019                 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1020                         qnodes[(*si)]->sources_to.insert(i);
1021         }
1022   }
1023
1024
1025 //              Find the nodes that are shared by multiple visible subtrees.
1026 //              THe roots become inferred visible nodes.
1027
1028 //              Find the visible nodes.
1029         vector<int> visible_nodes;
1030         for(i=0;i<qnodes.size();i++){
1031                 if(qnodes[i]->is_externally_visible){
1032                         visible_nodes.push_back(i);
1033                 }
1034         }
1035
1036 //              Find UDOPs referenced by visible nodes.
1037   list<int> workq;
1038   for(i=0;i<visible_nodes.size();++i){
1039         workq.push_back(visible_nodes[i]);
1040   }
1041   while(!workq.empty()){
1042         int node = workq.front();
1043         workq.pop_front();
1044         set<int>::iterator children;
1045         if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
1046                 qnodes[node]->is_externally_visible = true;
1047                 visible_nodes.push_back(node);
1048                 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1049                         if(qnodes[(*children)]->is_externally_visible == false){
1050                                 qnodes[(*children)]->is_externally_visible = true;
1051                                 visible_nodes.push_back((*children));
1052                         }
1053                 }
1054         }
1055         for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1056                 workq.push_back((*children));
1057         }
1058   }
1059
1060         bool done = false;
1061         while(!done){
1062 //      reset the nodes
1063                 for(i=0;i<qnodes.size();i++){
1064                         qnodes[i]->subtree_roots.clear();
1065                 }
1066
1067 //              Walk the tree defined by a visible node, not descending into
1068 //              subtrees rooted by a visible node.  Mark the node visited with
1069 //              the visible node ID.
1070                 for(i=0;i<visible_nodes.size();++i){
1071                         set<int> vroots;
1072                         vroots.insert(visible_nodes[i]);
1073                         while(vroots.size()>0){
1074                                 for(si=vroots.begin();si!=vroots.end();++si){
1075                                         qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
1076
1077                                         set<int>::iterator sir;
1078                                         for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1079                                                 if(! qnodes[(*sir)]->is_externally_visible){
1080                                                         candidates.insert( (*sir));
1081                                                 }
1082                                         }
1083                                 }
1084                                 vroots = candidates;
1085                                 candidates.clear();
1086                         }
1087                 }
1088 //              Find the nodes in multiple visible node subtrees, but with no parent
1089 //              that has is in multile visible node subtrees.  Mark these as inferred visible nodes.
1090                 done = true;    // until proven otherwise
1091                 for(i=0;i<qnodes.size();i++){
1092                         if(qnodes[i]->subtree_roots.size()>1){
1093                                 bool is_new_root = true;
1094                                 set<int>::iterator sir;
1095                                 for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
1096                                         if(qnodes[(*sir)]->subtree_roots.size()>1)
1097                                                 is_new_root = false;
1098                                 }
1099                                 if(is_new_root){
1100                                         qnodes[i]->is_externally_visible = true;
1101                                         qnodes[i]->inferred_visible_node = true;
1102                                         visible_nodes.push_back(i);
1103                                         done = false;
1104                                 }
1105                         }
1106                 }
1107         }
1108
1109
1110
1111
1112
1113 //              get visible nodes in topo ordering.
1114 //  for(i=0;i<qnodes.size();i++){
1115 //              qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
1116 //  }
1117   vector<int> process_order;
1118   while(roots.size() >0){
1119         for(si=roots.begin();si!=roots.end();++si){
1120                 if(discarded_nodes.count((*si))==0){
1121                         process_order.push_back( (*si) );
1122                 }
1123                 set<int>::iterator sir;
1124                 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1125                         qnodes[(*sir)]->n_consumers--;
1126                         if(qnodes[(*sir)]->n_consumers == 0)
1127                                 candidates.insert( (*sir));
1128                 }
1129         }
1130         roots = candidates;
1131         candidates.clear();
1132   }
1133
1134
1135 //printf("process_order.size() =%d\n",process_order.size());
1136
1137 //              Search for cyclic dependencies
1138   string found_dep;
1139   for(i=0;i<qnodes.size();++i){
1140         if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
1141                 if(found_dep.size() != 0) found_dep += ", ";
1142                 found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
1143         }
1144   }
1145   if(found_dep.size()>0){
1146         fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
1147         exit(1);
1148   }
1149
1150 //              Get a list of query sets, in the order to be processed.
1151 //              Start at visible root and do bfs.
1152 //              The query set includes queries referenced indirectly,
1153 //              as sources for user-defined operators.  These are needed
1154 //              to ensure that they are added to the schema, but are not part
1155 //              of the query tree.
1156
1157 //              stream_node_sets contains queries reachable only through the
1158 //              FROM clause, so I can tell which queries to add to the stream
1159 //              query. (DISABLED, UDOPS are integrated, does this cause problems?)
1160
1161 //                      NOTE: this code works because in order for data to be
1162 //                      read by multiple hftas, the node must be externally visible.
1163 //                      But visible nodes define roots of process sets.
1164 //                      internally visible nodes can feed data only
1165 //                      to other nodes in the same query file.
1166 //                      Therefore, any access can be restricted to a file,
1167 //                      hfta output sharing is done only on roots
1168 //                      never on interior nodes.
1169
1170
1171
1172
1173 //              Conpute the base collection of hftas.
1174   vector<hfta_node *> hfta_sets;
1175   map<string, int> hfta_name_map;
1176 //  vector< vector<int> > process_sets;
1177 //  vector< set<int> > stream_node_sets;
1178   reverse(process_order.begin(), process_order.end());  // get listing in reverse order.
1179                                                                                                                 // i.e. process leaves 1st.
1180   for(i=0;i<process_order.size();++i){
1181         if(qnodes[process_order[i]]->is_externally_visible == true){
1182 //printf("Visible.\n");
1183                 int root = process_order[i];
1184                 hfta_node *hnode = new hfta_node();
1185                 hnode->name = qnodes[root]-> name;
1186                 hnode->source_name = qnodes[root]-> name;
1187                 hnode->is_udop = qnodes[root]->is_udop;
1188                 hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
1189
1190                 vector<int> proc_list;  proc_list.push_back(root);
1191 //                      Ensure that nodes are added only once.
1192                 set<int> proc_set;      proc_set.insert(root);
1193                 roots.clear();                  roots.insert(root);
1194                 candidates.clear();
1195                 while(roots.size()>0){
1196                         for(si=roots.begin();si!=roots.end();++si){
1197 //printf("Processing root %d\n",(*si));
1198                                 set<int>::iterator sir;
1199                                 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1200 //printf("reads fom %d\n",(*sir));
1201                                         if(qnodes[(*sir)]->is_externally_visible==false){
1202                                                 candidates.insert( (*sir) );
1203                                                 if(proc_set.count( (*sir) )==0){
1204                                                         proc_set.insert( (*sir) );
1205                                                         proc_list.push_back( (*sir) );
1206                                                 }
1207                                         }
1208                                 }
1209                         }
1210                         roots = candidates;
1211                         candidates.clear();
1212                 }
1213
1214                 reverse(proc_list.begin(), proc_list.end());
1215                 hnode->query_node_indices = proc_list;
1216                 hfta_name_map[hnode->name] = hfta_sets.size();
1217                 hfta_sets.push_back(hnode);
1218         }
1219   }
1220
1221 //              Compute the reads_from / sources_to graphs for the hftas.
1222
1223   for(i=0;i<hfta_sets.size();++i){
1224         hfta_node *hnode = hfta_sets[i];
1225         for(q=0;q<hnode->query_node_indices.size();q++){
1226                 query_node *qnode = qnodes[hnode->query_node_indices[q]];
1227                 for(s=0;s<qnode->refd_tbls.size();++s){
1228                         if(hfta_name_map.count(qnode->refd_tbls[s])){
1229                                 int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
1230                                 hnode->reads_from.insert(other_hfta);
1231                                 hfta_sets[other_hfta]->sources_to.insert(i);
1232                         }
1233                 }
1234         }
1235   }
1236
1237 //              Compute a topological sort of the hfta_sets.
1238
1239   vector<int> hfta_topsort;
1240   workq.clear();
1241   int hnode_srcs[hfta_sets.size()];
1242   for(i=0;i<hfta_sets.size();++i){
1243         hnode_srcs[i] = 0;
1244         if(hfta_sets[i]->sources_to.size() == 0)
1245                 workq.push_back(i);
1246   }
1247
1248   while(! workq.empty()){
1249         int     node = workq.front();
1250         workq.pop_front();
1251         hfta_topsort.push_back(node);
1252         set<int>::iterator stsi;
1253         for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
1254                 int parent = (*stsi);
1255                 hnode_srcs[parent]++;
1256                 if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
1257                         workq.push_back(parent);
1258                 }
1259         }
1260   }
1261
1262 //              Decorate hfta nodes with the level of parallelism given as input.
1263
1264   map<string, int>::iterator msii;
1265   for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1266         string hfta_name = (*msii).first;
1267         int par = (*msii).second;
1268         if(hfta_name_map.count(hfta_name) > 0){
1269                 hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
1270         }else{
1271                 fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
1272         }
1273   }
1274
1275 //              Propagate levels of parallelism: children should have a level of parallelism
1276 //              as large as any of its parents.  Adjust children upwards to compensate.
1277 //              Start at parents and adjust children, auto-propagation will occur.
1278
1279   for(i=hfta_sets.size()-1;i>=0;i--){
1280         set<int>::iterator stsi;
1281         for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
1282                 if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
1283                         hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
1284                 }
1285         }
1286   }
1287
1288 //              Before all the name mangling, check if therey are any output_spec.cfg
1289 //              or hfta_parallelism.cfg entries that do not have a matching query.
1290
1291         string dangling_ospecs = "";
1292         for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
1293                 string oq = (*msii).first;
1294                 if(hfta_name_map.count(oq) == 0){
1295                         dangling_ospecs += " "+(*msii).first;
1296                 }
1297         }
1298         if(dangling_ospecs!=""){
1299                 fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
1300         }
1301
1302         string dangling_par = "";
1303         for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1304                 string oq = (*msii).first;
1305                 if(hfta_name_map.count(oq) == 0){
1306                         dangling_par += " "+(*msii).first;
1307                 }
1308         }
1309         if(dangling_par!=""){
1310                 fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
1311         }
1312
1313
1314
1315 //              Replicate parallelized hftas.  Do __copyX name mangling.  Adjust
1316 //              FROM clauses: retarget any name which is an internal node, and
1317 //              any which is in hfta_sets (and which is parallelized). Add Merge nodes
1318 //              when the source hfta has more parallelism than the target node.
1319 //              Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
1320
1321
1322   int n_original_hfta_sets = hfta_sets.size();
1323   for(i=0;i<n_original_hfta_sets;++i){
1324         if(hfta_sets[i]->n_parallel > 1){
1325                 hfta_sets[i]->do_generation =false;     // set the deletion flag for this entry.
1326                 set<string> local_nodes;                // names of query nodes in the hfta.
1327                 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1328                         local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
1329                 }
1330
1331                 for(p=0;p<hfta_sets[i]->n_parallel;++p){
1332                         string mangler = "__copy"+int_to_string(p);
1333                         hfta_node *par_hfta  = new hfta_node();
1334                         par_hfta->name = hfta_sets[i]->name + mangler;
1335                         par_hfta->source_name = hfta_sets[i]->name;
1336                         par_hfta->is_udop = hfta_sets[i]->is_udop;
1337                         par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
1338                         par_hfta->n_parallel = hfta_sets[i]->n_parallel;
1339                         par_hfta->parallel_idx = p;
1340
1341                         map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
1342
1343 //      Is it a UDOP?
1344                         if(hfta_sets[i]->is_udop){
1345                                 int root = hfta_sets[i]->query_node_indices[0];
1346
1347                                 string unequal_par_sources;
1348                                 set<int>::iterator rfsii;
1349                                 for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
1350                                         if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
1351                                                 unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
1352                                         }
1353                                 }
1354                                 if(unequal_par_sources != ""){
1355                                         fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
1356                                         exit(1);
1357                                 }
1358
1359                                 int rti;
1360                                 vector<string> new_sources;
1361                                 for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
1362                                         new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
1363                                 }
1364
1365                                 query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
1366                                 new_qn->name += mangler;
1367                                 new_qn->mangler = mangler;
1368                                 new_qn->refd_tbls = new_sources;
1369                                 par_hfta->query_node_indices.push_back(qnodes.size());
1370                                 par_qnode_map[new_qn->name] = qnodes.size();
1371                                 name_node_map[ new_qn->name ] = qnodes.size();
1372                                 qnodes.push_back(new_qn);
1373                         }else{
1374 //              regular query node
1375                           for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1376                                 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1377                                 table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
1378 //                                      rehome the from clause on mangled names.
1379 //                                      create merge nodes as needed for external sources.
1380                                 for(f=0;f<dup_pt->fm->tlist.size();++f){
1381                                         if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
1382                                                 dup_pt->fm->tlist[f]->schema_name += mangler;
1383                                         }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
1384 //                      Ref's an external HFTA.  No parallelism => leave as is.  Else, if level of parallelism of the two hftas is the same, rename by the mangler.  Else, there mnust be more sources, so create a merge node.
1385                                                 int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
1386                                                 if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
1387                                                         dup_pt->fm->tlist[f]->schema_name += mangler;
1388                                                 }else{
1389                                                         vector<string> src_tbls;
1390                                                         int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
1391                                                         if(stride == 0){
1392                                                                 fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
1393                                                                 exit(1);
1394                                                         }
1395                                                         for(s=0;s<stride;++s){
1396                                                                 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
1397                                                                 src_tbls.push_back(ext_src_name);
1398                                                         }
1399                                                         table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1400                                                         string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
1401                                                         dup_pt->fm->tlist[f]->schema_name = merge_node_name;
1402 //                                      Make a qnode to represent the new merge node
1403                                                         query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1404                                                         qn_pt->refd_tbls = src_tbls;
1405                                                         qn_pt->is_udop  = false;
1406                                                         qn_pt->is_externally_visible = false;
1407                                                         qn_pt->inferred_visible_node  = false;
1408                                                         par_hfta->query_node_indices.push_back(qnodes.size());
1409                                                         par_qnode_map[merge_node_name] = qnodes.size();
1410                                                         name_node_map[ merge_node_name ] = qnodes.size();
1411                                                         qnodes.push_back(qn_pt);
1412                                                 }
1413                                         }
1414                                 }
1415                                 query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
1416                                 for(f=0;f<dup_pt->fm->tlist.size();++f){
1417                                         new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
1418                                 }
1419                                 new_qn->params = qnodes[hqn_idx]->params;
1420                                 new_qn->is_udop = false;
1421                                 new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
1422                                 new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
1423                                 par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
1424                                 par_qnode_map[new_qn->name] = qnodes.size();
1425                                 name_node_map[ new_qn->name ] = qnodes.size();
1426                                 qnodes.push_back(new_qn);
1427                           }
1428                         }
1429                         hfta_name_map[par_hfta->name] = hfta_sets.size();
1430                         hfta_sets.push_back(par_hfta);
1431                 }
1432         }else{
1433 //              This hfta isn't being parallelized, but add merge nodes for any parallelized
1434 //              hfta sources.
1435                 if(!hfta_sets[i]->is_udop){
1436                   for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1437                         int hqn_idx = hfta_sets[i]->query_node_indices[h];
1438                         for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
1439                                 if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
1440 //                      Ref's an external HFTA.  No parallelism => leave as is.  Else, if level of parallelism of the two hftas is the same, rename by the mangler.  Else, there mnust be more sources, so create a merge node.
1441                                         int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
1442                                         if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
1443                                                 vector<string> src_tbls;
1444                                                 for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
1445                                                         string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
1446                                                         src_tbls.push_back(ext_src_name);
1447                                                 }
1448                                                 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1449                                                 string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
1450                                                 qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
1451 //                                      Make a qnode to represent the new merge node
1452                                                 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1453                                                 qn_pt->refd_tbls = src_tbls;
1454                                                 qn_pt->is_udop  = false;
1455                                                 qn_pt->is_externally_visible = false;
1456                                                 qn_pt->inferred_visible_node  = false;
1457                                                 hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
1458                                                 name_node_map[ merge_node_name ] = qnodes.size();
1459                                                 qnodes.push_back(qn_pt);
1460                                         }
1461                                 }
1462                         }
1463                 }
1464           }
1465         }
1466   }
1467
1468 //                      Rebuild the reads_from / sources_to lists in the qnodes
1469   for(q=0;q<qnodes.size();++q){
1470         qnodes[q]->reads_from.clear();
1471         qnodes[q]->sources_to.clear();
1472   }
1473   for(q=0;q<qnodes.size();++q){
1474         for(s=0;s<qnodes[q]->refd_tbls.size();++s){
1475                 if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
1476                         int rf = name_node_map[qnodes[q]->refd_tbls[s]];
1477                         qnodes[q]->reads_from.insert(rf);
1478                         qnodes[rf]->sources_to.insert(q);
1479                 }
1480         }
1481   }
1482
1483 //                      Rebuild the reads_from / sources_to lists in hfta_sets
1484   for(q=0;q<hfta_sets.size();++q){
1485         hfta_sets[q]->reads_from.clear();
1486         hfta_sets[q]->sources_to.clear();
1487   }
1488   for(q=0;q<hfta_sets.size();++q){
1489         for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
1490                 int node = hfta_sets[q]->query_node_indices[s];
1491                 set<int>::iterator rfsii;
1492                 for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
1493                         if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
1494                                 hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
1495                                 hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
1496                         }
1497                 }
1498         }
1499   }
1500
1501 /*
1502 for(q=0;q<qnodes.size();++q){
1503  printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
1504  set<int>::iterator rsii;
1505  for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
1506   printf(" %d",(*rsii));
1507   printf(", and sources-to %d:",qnodes[q]->sources_to.size());
1508  for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
1509   printf(" %d",(*rsii));
1510  printf("\n");
1511 }
1512
1513 for(q=0;q<hfta_sets.size();++q){
1514  if(hfta_sets[q]->do_generation==false)
1515         continue;
1516  printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
1517  set<int>::iterator rsii;
1518  for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
1519   printf(" %d",(*rsii));
1520   printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
1521  for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
1522   printf(" %d",(*rsii));
1523  printf("\n");
1524 }
1525 */
1526
1527
1528
1529 //              Re-topo sort the hftas
1530   hfta_topsort.clear();
1531   workq.clear();
1532   int hnode_srcs_2[hfta_sets.size()];
1533   for(i=0;i<hfta_sets.size();++i){
1534         hnode_srcs_2[i] = 0;
1535         if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
1536                 workq.push_back(i);
1537         }
1538   }
1539
1540   while(workq.empty() == false){
1541         int     node = workq.front();
1542         workq.pop_front();
1543         hfta_topsort.push_back(node);
1544         set<int>::iterator stsii;
1545         for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
1546                 int child = (*stsii);
1547                 hnode_srcs_2[child]++;
1548                 if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
1549                         workq.push_back(child);
1550                 }
1551         }
1552   }
1553
1554 //              Ensure that all of the query_node_indices in hfta_sets are topologically
1555 //              sorted, don't rely on assumptions that all transforms maintain some kind of order.
1556   for(i=0;i<hfta_sets.size();++i){
1557         if(hfta_sets[i]->do_generation){
1558                 map<int,int> n_accounted;
1559                 vector<int> new_order;
1560                 workq.clear();
1561                 vector<int>::iterator vii;
1562                 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1563                         n_accounted[(*vii)]= 0;
1564                 }
1565                 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1566                         set<int>::iterator rfsii;
1567                         for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
1568                                 if(n_accounted.count((*rfsii)) == 0){
1569                                         n_accounted[(*vii)]++;
1570                                 }
1571                         }
1572                         if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
1573                                 workq.push_back((*vii));
1574                         }
1575                 }
1576
1577                 while(workq.empty() == false){
1578                         int node = workq.front();
1579                         workq.pop_front();
1580                         new_order.push_back(node);
1581                         set<int>::iterator stsii;
1582                         for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
1583                                 if(n_accounted.count((*stsii))){
1584                                         n_accounted[(*stsii)]++;
1585                                         if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
1586                                                 workq.push_back((*stsii));
1587                                         }
1588                                 }
1589                         }
1590                 }
1591                 hfta_sets[i]->query_node_indices = new_order;
1592         }
1593   }
1594
1595
1596
1597
1598
1599 ///                     Global checkng is done, start the analysis and translation
1600 ///                     of the query parse tree in the order specified by process_order
1601
1602
1603 //                      Get a list of the LFTAs for global lfta optimization
1604 //                              TODO: separate building operators from spliting lftas,
1605 //                                      that will make optimizations such as predicate pushing easier.
1606         vector<stream_query *> lfta_list;
1607
1608         stream_query *rootq;
1609
1610     int qi,qj;
1611
1612         for(qi=hfta_topsort.size()-1;qi>=0;--qi){
1613
1614         int hfta_id = hfta_topsort[qi];
1615     vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
1616
1617
1618
1619 //              Two possibilities, either its a UDOP, or its a collection of queries.
1620 //      if(qnodes[curr_list.back()]->is_udop)
1621         if(hfta_sets[hfta_id]->is_udop){
1622                 int node_id = curr_list.back();
1623                 int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
1624                 opview_entry *opv = new opview_entry();
1625
1626 //                      Many of the UDOP properties aren't currently used.
1627                 opv->parent_qname = "no_parent";
1628                 opv->root_name = qnodes[node_id]->name;
1629                 opv->view_name = qnodes[node_id]->file;
1630                 opv->pos = qi;
1631                 sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
1632                 opv->udop_alias = tmpstr;
1633                 opv->mangler = qnodes[node_id]->mangler;
1634
1635                 if(opv->mangler != ""){
1636                         int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
1637                         Schema->mangle_subq_names(new_udop_schref,opv->mangler);
1638                 }
1639
1640 //                      This piece of code makes each hfta which referes to the same udop
1641 //                      reference a distinct running udop.  Do this at query optimization time?
1642 //              fmtbl->set_udop_alias(opv->udop_alias);
1643
1644                 opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
1645                 opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
1646
1647                 vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
1648                 int s,f,q;
1649                 for(s=0;s<subq.size();++s){
1650 //                              Validate that the fields match.
1651                         subquery_spec *sqs = subq[s];
1652                         string subq_name = sqs->name + opv->mangler;
1653                         vector<field_entry *> flds = Schema->get_fields(subq_name);
1654                         if(flds.size() == 0){
1655                                 fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
1656                                 return(1);
1657                         }
1658                         if(flds.size() < sqs->types.size()){
1659                                 fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
1660                                 return(1);
1661                         }
1662                         bool failed = false;
1663                         for(f=0;f<sqs->types.size();++f){
1664                                 data_type dte(sqs->types[f],sqs->modifiers[f]);
1665                                 data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
1666                                 if(! dte.subsumes_type(&dtf) ){
1667                                         fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
1668                                         failed = true;
1669                                 }
1670 /*
1671                                 if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
1672                                         string pstr = dte.get_temporal_string();
1673                                         fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
1674                                         failed = true;
1675                                 }
1676 */
1677                         }
1678                         if(failed)
1679                                 return(1);
1680 ///                             Validation done, find the subquery, make a copy of the
1681 ///                             parse tree, and add it to the return list.
1682                         for(q=0;q<qnodes.size();++q)
1683                                 if(qnodes[q]->name == subq_name)
1684                                         break;
1685                         if(q==qnodes.size()){
1686                                 fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
1687                                 return(1);
1688                         }
1689
1690                 }
1691
1692 //                      Cross-link to from entry(s) in all sourced-to tables.
1693                 set<int>::iterator sii;
1694                 for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
1695 //printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
1696                         vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
1697                         int ii;
1698                         for(ii=0;ii<tblvars.size();++ii){
1699                                 if(tblvars[ii]->schema_name == opv->root_name){
1700                                         tblvars[ii]->set_opview_idx(opviews.size());
1701                                 }
1702
1703                         }
1704                 }
1705
1706                 opviews.append(opv);
1707         }else{
1708
1709 //                      Analyze the parse trees in this query,
1710 //                      put them in rootq
1711 //      vector<int> curr_list = process_sets[qi];
1712
1713
1714 ////////////////////////////////////////
1715
1716           rootq = NULL;
1717 //printf("Process set %d, has %d queries\n",qi,curr_list.size());
1718           for(qj=0;qj<curr_list.size();++qj){
1719                 i = curr_list[qj];
1720         fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
1721
1722 //                      Select the current query parse tree
1723                 table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
1724
1725 //                      if hfta only, try to fetch any missing schemas
1726 //                      from the registry (using the print_schema program).
1727 //                      Here I use a hack to avoid analyzing the query -- all referenced
1728 //                      tables must be in the from clause
1729 //                      If there is a problem loading any table, just issue a warning,
1730 //
1731                 tablevar_list_t *fm = fta_parse_tree->get_from();
1732                 vector<string> refd_tbls =  fm->get_src_tbls(Schema);
1733 //                      iterate over all referenced tables
1734                 int t;
1735                 for(t=0;t<refd_tbls.size();++t){
1736                   int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
1737
1738                   if(tbl_ref < 0){      // if this table is not in the Schema
1739
1740                         if(hfta_only){
1741                                 string cmd="print_schema "+refd_tbls[t];
1742                                 FILE *schema_in = popen(cmd.c_str(), "r");
1743                                 if(schema_in == NULL){
1744                                   fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
1745                                 }else{
1746                                   string schema_instr;
1747                                   while(fgets(tmpstr,TMPSTRLEN,schema_in)){
1748                                         schema_instr += tmpstr;
1749                                   }
1750                           fta_parse_result = new fta_parse_t();
1751                                   strcpy(tmp_schema_str,schema_instr.c_str());
1752                                   FtaParser_setstringinput(tmp_schema_str);
1753                           if(FtaParserparse()){
1754                                 fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
1755                           }else{
1756                                         if( fta_parse_result->tables != NULL){
1757                                                 int tl;
1758                                                 for(tl=0;tl<fta_parse_result->tables->size();++tl){
1759                                                         Schema->add_table(fta_parse_result->tables->get_table(tl));
1760                                                 }
1761                                         }else{
1762                                                 fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
1763                                         }
1764                                 }
1765                         }
1766                   }else{
1767                                 fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
1768                                 exit(1);
1769                   }
1770
1771                 }
1772           }
1773
1774
1775 //                              Analyze the query.
1776           query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
1777           if(qs == NULL){
1778                 fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
1779                 exit(1);
1780           }
1781
1782           stream_query new_sq(qs, Schema);
1783           if(new_sq.error_code){
1784                         fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
1785                         exit(1);
1786           }
1787
1788 //                      Add it to the Schema
1789           table_def *output_td = new_sq.get_output_tabledef();
1790           Schema->add_table(output_td);
1791
1792 //                      Create a query plan from the analyzed parse tree.
1793 //                      If its a query referneced via FROM, add it to the stream query.
1794           if(rootq){
1795                 rootq->add_query(new_sq);
1796           }else{
1797                 rootq = new stream_query(new_sq);
1798 //                      have the stream query object inherit properties form the analyzed
1799 //                      hfta_node object.
1800                 rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
1801                 rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
1802           }
1803
1804
1805     }
1806
1807 //              This stream query has all its parts
1808 //              Build and optimize it.
1809 //printf("translate_fta: generating plan.\n");
1810         if(rootq->generate_plan(Schema)){
1811                 fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
1812                 continue;
1813         }
1814
1815 //      If we've found the query plan head, so now add the output operators
1816         if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
1817                 pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
1818                 multimap<string, int>::iterator mmsi;
1819                 oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
1820                 for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
1821                         rootq->add_output_operator(output_specs[(*mmsi).second]);
1822                 }
1823         }
1824
1825
1826
1827 //                              Perform query splitting if necessary.
1828         bool hfta_returned;
1829     vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
1830
1831         int l;
1832 //for(l=0;l<split_queries.size();++l){
1833 //printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
1834 //}
1835
1836
1837
1838
1839     if(split_queries.size() > 0){       // should be at least one component.
1840
1841 //                              Compute the number of LFTAs.
1842           int n_lfta = split_queries.size();
1843           if(hfta_returned) n_lfta--;
1844
1845
1846 //                              Process the LFTA components.
1847           for(l=0;l<n_lfta;++l){
1848            if(lfta_names.count(split_queries[l]->query_name) == 0){
1849 //                              Grab the lfta for global optimization.
1850                 vector<tablevar_t *> tvec =  split_queries[l]->query_plan[0]->get_input_tbls();
1851                 string liface = tvec[0]->get_interface();
1852                 string lmach = tvec[0]->get_machine();
1853                 if (lmach == "")
1854                         lmach = hostname;
1855                 interface_names.push_back(liface);
1856                 machine_names.push_back(lmach);
1857 //printf("Machine is %s\n",lmach.c_str());
1858
1859 //                      Set the ht size from the recommendation, if there is one in the rec file
1860                 if(lfta_htsize.count(split_queries[l]->query_name)>0){
1861                         split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
1862                 }
1863
1864
1865                 lfta_names[split_queries[l]->query_name] = lfta_list.size();
1866                 split_queries[l]->set_gid(lfta_list.size());  // set lfta global id
1867                 lfta_list.push_back(split_queries[l]);
1868                 lfta_mach_lists[lmach].push_back(split_queries[l]);
1869
1870 //                      THe following is a hack,
1871 //                      as I should be generating LFTA code through
1872 //                      the stream_query object.
1873                 split_queries[l]->query_plan[0]->bind_to_schema(Schema);
1874 //              split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
1875
1876 /*
1877 //                              Create query description to embed in lfta.c
1878                 string lfta_schema_str = split_queries[l]->make_schema();
1879                 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
1880
1881 //                              get NIC capabilities.
1882                 int erri;
1883                 nic_property *nicprop = NULL;
1884                 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
1885                 if(iface_codegen_type.size()){
1886                         nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
1887                         if(!nicprop){
1888                                 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
1889                                         exit(1);
1890                         }
1891                 }
1892
1893                 lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
1894 */
1895
1896                 snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
1897                 query_names.push_back(split_queries[l]->query_name);
1898                 mach_query_names[lmach].push_back(query_names.size()-1);
1899 //                      NOTE: I will assume a 1-1 correspondance between
1900 //                      mach_query_names[lmach] and lfta_mach_lists[lmach]
1901 //                      where mach_query_names[lmach][i] contains the index into
1902 //                      query_names, which names the lfta, and
1903 //                      mach_query_names[lmach][i] is the stream_query * of the
1904 //                      corresponding lfta.
1905 //                      Later, lfta_iface_qnames are the query names matching lfta_iface_lists
1906
1907
1908
1909                 // check if lfta is reusable
1910                 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
1911
1912                 bool lfta_reusable = false;
1913                 if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
1914                         split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
1915                         lfta_reusable = true;
1916                 }
1917                 lfta_reuse_options.push_back(lfta_reusable);
1918
1919                 // LFTA will inherit the liveness timeout specification from the containing query
1920                 // it is too conservative as lfta are expected to spend less time per tuple
1921                 // then full query
1922
1923                 // extract liveness timeout from query definition
1924                 int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
1925                 if (!liveness_timeout) {
1926 //                  fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
1927 //                    split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
1928                     liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
1929                 }
1930                 lfta_liveness_timeouts.push_back(liveness_timeout);
1931
1932 //                      Add it to the schema
1933                 table_def *td = split_queries[l]->get_output_tabledef();
1934                 Schema->append_table(td);
1935 //printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
1936
1937           }
1938          }
1939
1940 //                              If the output is lfta-only, dump out the query name.
1941       if(split_queries.size() == 1 && !hfta_returned){
1942         if(output_query_names ){
1943            fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
1944                 }
1945 /*
1946 else{
1947            fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
1948                 }
1949 */
1950
1951 /*
1952 //                              output schema summary
1953                 if(output_schema_summary){
1954                         dump_summary(split_queries[0]);
1955                 }
1956 */
1957       }
1958
1959
1960           if(hfta_returned){            // query also has an HFTA component
1961                 int hfta_nbr = split_queries.size()-1;
1962
1963                         hfta_list.push_back(split_queries[hfta_nbr]);
1964
1965 //                                      report on generated query names
1966         if(output_query_names){
1967                         string hfta_name =split_queries[hfta_nbr]->query_name;
1968                 fprintf(query_name_output,"%s H\n",hfta_name.c_str());
1969                         for(l=0;l<hfta_nbr;++l){
1970                                 string lfta_name =split_queries[l]->query_name;
1971                         fprintf(query_name_output,"%s L\n",lfta_name.c_str());
1972                         }
1973                 }
1974 //              else{
1975 //              fprintf(stderr,"query names are ");
1976 //                      for(l=0;l<hfta_nbr;++l){
1977 //                              if(l>0) fprintf(stderr,",");
1978 //                              string fta_name =split_queries[l]->query_name;
1979 //                      fprintf(stderr," %s",fta_name.c_str());
1980 //                      }
1981 //                      fprintf(stderr,"\n");
1982 //              }
1983           }
1984
1985   }else{
1986           fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
1987           fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
1988           exit(1);
1989   }
1990  }
1991 }
1992
1993
1994 //-----------------------------------------------------------------
1995 //              Compute and propagate the SE in PROTOCOL fields compute a field.
1996 //-----------------------------------------------------------------
1997
1998 for(i=0;i<lfta_list.size();i++){
1999         lfta_list[i]->generate_protocol_se(sq_map, Schema);
2000         sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
2001 }
2002 for(i=0;i<hfta_list.size();i++){
2003         hfta_list[i]->generate_protocol_se(sq_map, Schema);
2004         sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
2005 }
2006
2007
2008
2009 //------------------------------------------------------------------------
2010 //              Perform  individual FTA optimizations
2011 //-----------------------------------------------------------------------
2012
2013 if (partitioned_mode) {
2014
2015         // open partition definition file
2016         string part_fname = config_dir_path + "partition.txt";
2017
2018         FILE* partfd = fopen(part_fname.c_str(), "r");
2019         if (!partfd) {
2020                 fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
2021                 exit(1);
2022         }
2023         PartnParser_setfileinput(partfd);
2024         if (PartnParserparse()) {
2025                 fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
2026                 exit(1);
2027         }
2028         fclose(partfd);
2029 }
2030
2031
2032 print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
2033
2034 int num_hfta = hfta_list.size();
2035 for(i=0; i < hfta_list.size(); ++i){
2036         hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
2037 }
2038
2039 //                      Add all new hftas to schema
2040 for(i=num_hfta; i < hfta_list.size(); ++i){
2041                 table_def *td = hfta_list[i]->get_output_tabledef();
2042                 Schema->append_table(td);
2043 }
2044
2045 print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
2046
2047
2048
2049 //------------------------------------------------------------------------
2050 //              Do global (cross-fta) optimization
2051 //-----------------------------------------------------------------------
2052
2053
2054
2055
2056
2057
2058 set<string> extra_external_libs;
2059
2060 for(i=0;i<hfta_list.size();++i){                // query also has an HFTA component
2061
2062                 if(! debug_only){
2063 //                                      build hfta file name, create output
2064            if(numeric_hfta_flname){
2065             sprintf(tmpstr,"hfta_%d",hfta_count);
2066                         hfta_names.push_back(tmpstr);
2067             sprintf(tmpstr,"hfta_%d.cc",hfta_count);
2068          }else{
2069             sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
2070                         hfta_names.push_back(tmpstr);
2071             sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
2072           }
2073                   FILE *hfta_fl = fopen(tmpstr,"w");
2074                   if(hfta_fl == NULL){
2075                         fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
2076                         exit(1);
2077                   }
2078                   fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
2079
2080 //                      If there is a field verifier, warn about
2081 //                      lack of compatability
2082 //                              NOTE : this code assumes that visible non-lfta queries
2083 //                              are those at the root of a stream query.
2084                   string hfta_comment;
2085                   string hfta_title;
2086                   string hfta_namespace;
2087                   if(hfta_list[i]->defines.count("comment")>0)
2088                         hfta_comment = hfta_list[i]->defines["comment"];
2089                   if(hfta_list[i]->defines.count("Comment")>0)
2090                         hfta_comment = hfta_list[i]->defines["Comment"];
2091                   if(hfta_list[i]->defines.count("COMMENT")>0)
2092                         hfta_comment = hfta_list[i]->defines["COMMENT"];
2093                   if(hfta_list[i]->defines.count("title")>0)
2094                         hfta_title = hfta_list[i]->defines["title"];
2095                   if(hfta_list[i]->defines.count("Title")>0)
2096                         hfta_title = hfta_list[i]->defines["Title"];
2097                   if(hfta_list[i]->defines.count("TITLE")>0)
2098                         hfta_title = hfta_list[i]->defines["TITLE"];
2099                   if(hfta_list[i]->defines.count("namespace")>0)
2100                         hfta_namespace = hfta_list[i]->defines["namespace"];
2101                   if(hfta_list[i]->defines.count("Namespace")>0)
2102                         hfta_namespace = hfta_list[i]->defines["Namespace"];
2103                   if(hfta_list[i]->defines.count("Namespace")>0)
2104                         hfta_namespace = hfta_list[i]->defines["Namespace"];
2105
2106                   if(field_verifier != NULL){
2107                         string warning_str;
2108                         if(hfta_comment == "")
2109                                 warning_str += "\tcomment not found.\n";
2110                         if(hfta_title == "")
2111                                 warning_str += "\ttitle not found.\n";
2112                         if(hfta_namespace == "")
2113                                 warning_str += "\tnamespace not found.\n";
2114
2115                         vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2116                         int fi;
2117                         for(fi=0;fi<flds.size();fi++){
2118                                 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2119                         }
2120                         if(warning_str != "")
2121                                 fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
2122                                         hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
2123                   }
2124
2125                   fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2126                   if(hfta_comment != "")
2127                         fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
2128                   if(hfta_title != "")
2129                         fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
2130                   if(hfta_namespace != "")
2131                         fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
2132                   fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
2133                   fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2134
2135 //                              write info about fields to qtree.xml
2136                   vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2137                   int fi;
2138                   for(fi=0;fi<flds.size();fi++){
2139                         fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2140                         if(flds[fi]->get_modifier_list()->size()){
2141                                 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2142                         }
2143                         fprintf(qtree_output," />\n");
2144                   }
2145
2146                   // extract liveness timeout from query definition
2147                   int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
2148                   if (!liveness_timeout) {
2149 //                  fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
2150 //                    hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2151                     liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2152                   }
2153                   fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
2154
2155                   vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
2156                   int itv;
2157                   for(itv=0;itv<tmp_tv.size();++itv){
2158                         fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
2159                   }
2160                   string ifrs = hfta_list[i]->collect_refd_ifaces();
2161                   if(ifrs != ""){
2162                         fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
2163                   }
2164                   fprintf(qtree_output,"\t</HFTA>\n");
2165
2166                   fclose(hfta_fl);
2167                 }else{
2168 //                                      debug only -- do code generation to catch generation-time errors.
2169                   hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
2170                 }
2171
2172                 hfta_count++;   // for hfta file names with numeric suffixes
2173
2174                 hfta_list[i]->get_external_libs(extra_external_libs);
2175
2176           }
2177
2178 string ext_lib_string;
2179 set<string>::iterator ssi_el;
2180 for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
2181         ext_lib_string += (*ssi_el)+" ";
2182
2183
2184
2185 //                      Report on the set of operator views
2186   for(i=0;i<opviews.size();++i){
2187         opview_entry *opve = opviews.get_entry(i);
2188         fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
2189         fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
2190         fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
2191         fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
2192         fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2193
2194         if (!opve->liveness_timeout) {
2195 //              fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
2196 //                      opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2197                 opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2198         }
2199         fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
2200     int j;
2201         for(j=0;j<opve->subq_names.size();j++)
2202                 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
2203         fprintf(qtree_output,"\t</UDOP>\n");
2204   }
2205
2206
2207 //-----------------------------------------------------------------
2208
2209 //                      Create interface-specific meta code files.
2210 //                              first, open and parse the interface resources file.
2211         ifaces_db = new ifq_t();
2212     ierr = "";
2213         if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
2214                 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
2215                                 ifx_fname.c_str(), ierr.c_str());
2216                 exit(1);
2217         }
2218
2219         map<string, vector<stream_query *> >::iterator svsi;
2220         for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
2221                 string lmach = (*svsi).first;
2222
2223         //              For this machine, create a set of lftas per interface.
2224                 vector<stream_query *> mach_lftas = (*svsi).second;
2225                 map<string, vector<stream_query *> > lfta_iface_lists;
2226                 int li;
2227                 for(li=0;li<mach_lftas.size();++li){
2228                         vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2229                         string lfta_iface = tvec[0]->get_interface();
2230                         lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2231                 }
2232
2233                 map<string, vector<stream_query *> >::iterator lsvsi;
2234                 for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
2235                         int erri;
2236                         string liface = (*lsvsi).first;
2237                         vector<stream_query *> iface_lftas = (*lsvsi).second;
2238                         vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2239                         if(iface_codegen_type.size()){
2240                                 nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2241                                 if(!nicprop){
2242                                         fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2243                                         exit(1);
2244                                 }
2245                                 string mcs = generate_nic_code(iface_lftas, nicprop);
2246                                 string mcf_flnm;
2247                                 if(lmach != "")
2248                                   mcf_flnm = lmach + "_"+liface+".mcf";
2249                                 else
2250                                   mcf_flnm = hostname + "_"+liface+".mcf";
2251                                 FILE *mcf_fl ;
2252                                 if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
2253                                         fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
2254                                         exit(1);
2255                                 }
2256                                 fprintf(mcf_fl,"%s",mcs.c_str());
2257                                 fclose(mcf_fl);
2258 //printf("mcs of machine %s, iface %s of type %s is \n%s\n",
2259 //lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
2260                         }
2261                 }
2262
2263
2264         }
2265
2266
2267
2268 //-----------------------------------------------------------------
2269
2270
2271 //                      Find common filter predicates in the LFTAs.
2272 //                      in addition generate structs to store the temporal attributes unpacked by prefilter
2273         
2274         map<string, vector<stream_query *> >::iterator ssqi;
2275         for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
2276
2277                 string lmach = (*ssqi).first;
2278                 bool packed_return = false;
2279                 int li, erri;
2280
2281
2282 //      The LFTAs of this machine.
2283                 vector<stream_query *> mach_lftas = (*ssqi).second;
2284 //      break up on a per-interface basis.
2285                 map<string, vector<stream_query *> > lfta_iface_lists;
2286                 map<string, vector<int> > lfta_iface_qname_ix; // need the query name
2287                                                         // for fta_init
2288                 for(li=0;li<mach_lftas.size();++li){
2289                         vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2290                         string lfta_iface = tvec[0]->get_interface();
2291                         lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2292                         lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
2293                 }
2294
2295
2296 //      Are the return values "packed"?
2297 //      This should be done on a per-interface basis.
2298 //      But this is defunct code for gs-lite
2299                 for(li=0;li<mach_lftas.size();++li){
2300                   vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2301                   string liface = tvec[0]->get_interface();
2302                   vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2303                   if(iface_codegen_type.size()){
2304                         if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
2305                           packed_return = true;
2306                         }
2307                   }
2308                 }
2309
2310
2311 // Separate lftas by interface, collect results on a per-interface basis.
2312
2313                 vector<cnf_set *> no_preds;     // fallback if there is no prefilter
2314                 map<string, vector<cnf_set *> > prefilter_preds;
2315                 set<unsigned int> pred_ids;     // this can be global for all interfaces
2316                 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2317                         string liface = (*mvsi).first;
2318                         vector<cnf_set *> empty_list;
2319                         prefilter_preds[liface] = empty_list;
2320                         if(! packed_return){
2321                                 get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
2322                         }
2323
2324 //                              get NIC capabilities.  (Is this needed?)
2325                         nic_property *nicprop = NULL;
2326                         vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2327                         if(iface_codegen_type.size()){
2328                                 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2329                                 if(!nicprop){
2330                                         fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2331                                         exit(1);
2332                                 }
2333                         }
2334                 }
2335
2336
2337 //              Now that we know the prefilter preds, generate the lfta code.
2338 //      Do this for all lftas in this machine.
2339                 for(li=0;li<mach_lftas.size();++li){
2340                         set<unsigned int> subsumed_preds;
2341                         set<unsigned int>::iterator sii;
2342 #ifdef PREFILTER_OK
2343                         for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
2344                                 int pid = (*sii);
2345                                 if((pid>>16) == li){
2346                                         subsumed_preds.insert(pid & 0xffff);
2347                                 }
2348                         }
2349 #endif
2350                         string lfta_schema_str = mach_lftas[li]->make_schema();
2351                         string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2352                         nic_property *nicprop = NULL; // no NIC properties?
2353                         lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
2354                 }
2355
2356
2357 //                      generate structs to store the temporal attributes
2358 //                      unpacked by prefilter
2359                 col_id_set temp_cids;
2360                 get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
2361                 lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
2362
2363 //                      Compute the lfta bit signatures and the lfta colrefs
2364 //      do this on a per-interface basis
2365 #ifdef PREFILTER_OK
2366                         lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2367 #endif
2368                 map<string, vector<long long int> > lfta_sigs; // used again later
2369                 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2370                         string liface = (*mvsi).first;
2371                         vector<long long int> empty_list;
2372                         lfta_sigs[liface] = empty_list;
2373
2374                         vector<col_id_set> lfta_cols;
2375                         vector<int> lfta_snap_length;
2376                         for(li=0;li<lfta_iface_lists[liface].size();++li){
2377                                 unsigned long long int mask=0, bpos=1;
2378                                 int f_pos;
2379                                 for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
2380                                         if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
2381                                                 mask |= bpos;
2382                                         bpos = bpos << 1;
2383                                 }
2384                                 lfta_sigs[liface].push_back(mask);
2385                                 lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
2386                                 lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
2387                         }
2388
2389 //for(li=0;li<mach_lftas.size();++li){
2390 //printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
2391 //col_id_set::iterator tcisi;
2392 //for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
2393 //printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
2394 //}
2395 //}
2396
2397
2398 //                      generate the prefilter
2399 //      Do this on a per-interface basis, except for the #define
2400 #ifdef PREFILTER_OK
2401 //                      lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2402                         lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2403 #else
2404                         lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns,  lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2405
2406 #endif
2407                 }
2408
2409 //                      Generate interface parameter lookup function
2410           lfta_val[lmach] += "// lookup interface properties by name\n";
2411           lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
2412           lfta_val[lmach] += "// returns NULL if given property does not exist\n";
2413           lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
2414
2415 //        collect a lit of interface names used by queries running on this host
2416           set<std::string> iface_names;
2417           for(i=0;i<mach_query_names[lmach].size();i++){
2418                 int mi = mach_query_names[lmach][i];
2419                 stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2420
2421                 if(interface_names[mi]=="")
2422                         iface_names.insert("DEFAULTDEV");
2423                 else
2424                         iface_names.insert(interface_names[mi]);
2425           }
2426
2427 //        generate interface property lookup code for every interface
2428           set<std::string>::iterator sir;
2429           for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
2430                 if (sir == iface_names.begin())
2431                         lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2432                 else
2433                         lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2434
2435                 // iterate through interface properties
2436                 vector<string> iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
2437                 if (erri) {
2438                         fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
2439                         exit(1);
2440                 }
2441                 if (iface_properties.empty())
2442                         lfta_val[lmach] += "\t\treturn NULL;\n";
2443                 else {
2444                         for (int i = 0; i < iface_properties.size(); ++i) {
2445                                 if (i == 0)
2446                                         lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2447                                 else
2448                                         lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2449
2450                                 // combine all values for the interface property using comma separator
2451                                 vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
2452                                 for (int j = 0; j < vals.size(); ++j) {
2453                                         lfta_val[lmach] += "\t\t\treturn \"" + vals[j];
2454                                         if (j != vals.size()-1)
2455                                                 lfta_val[lmach] += ",";
2456                                 }
2457                                 lfta_val[lmach] += "\";\n";
2458                         }
2459                         lfta_val[lmach] += "\t\t} else\n";
2460                         lfta_val[lmach] += "\t\t\treturn NULL;\n";
2461                 }
2462           }
2463           lfta_val[lmach] += "\t} else\n";
2464           lfta_val[lmach] += "\t\treturn NULL;\n";
2465           lfta_val[lmach] += "}\n\n";
2466
2467
2468 //                      Generate a full list of FTAs for clearinghouse reference
2469           lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
2470           lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
2471
2472           for (i = 0; i < query_names.size(); ++i) {
2473                    if (i)
2474                           lfta_val[lmach] += ", ";
2475                    lfta_val[lmach] += "\"" + query_names[i] + "\"";
2476           }
2477           for (i = 0; i < hfta_list.size(); ++i) {
2478                    lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
2479           }
2480           lfta_val[lmach] += ", NULL};\n\n";
2481
2482
2483 //                      Add the initialization function to lfta.c
2484 //      Change to accept the interface name, and 
2485 //      set the prefilter function accordingly.
2486 //      see the example in demo/err2
2487           lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
2488
2489 //        for(i=0;i<mach_query_names[lmach].size();i++)
2490 //              int mi = mach_query_names[lmach][i];
2491 //              stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2492
2493           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2494                 string liface = (*mvsi).first;
2495                 vector<stream_query *> lfta_list = (*mvsi).second;
2496                 for(i=0;i<lfta_list.size();i++){
2497                         stream_query *lfta_sq = lfta_list[i];
2498                         int mi = lfta_iface_qname_ix[liface][i];
2499                 
2500                         fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
2501
2502                 lfta_val[lmach] += "\tfta_register(\""+query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
2503                 if(interface_names[mi]=="")
2504                                 lfta_val[lmach]+="DEFAULTDEV";
2505                 else
2506                                 lfta_val[lmach]+='"'+interface_names[mi]+'"';
2507
2508                 lfta_val[lmach] += ", "+generate_alloc_name(query_names[mi])
2509                         +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(query_names[mi])
2510                         +"\n#endif\n";
2511                                 sprintf(tmpstr,",%d",snap_lengths[mi]);
2512                         lfta_val[lmach] += tmpstr;
2513
2514 //                      unsigned long long int mask=0, bpos=1;
2515 //                      int f_pos;
2516 //                      for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
2517 //                              if(prefilter_preds[f_pos]->lfta_id.count(i))
2518 //                                      mask |= bpos;
2519 //                              bpos = bpos << 1;
2520 //                      }
2521
2522 #ifdef PREFILTER_OK
2523 //                      sprintf(tmpstr,",%lluull",mask);
2524                         sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
2525                         lfta_val[lmach]+=tmpstr;
2526 #else
2527                         lfta_val[lmach] += ",0ull";
2528 #endif
2529
2530                         lfta_val[lmach] += ");\n";
2531
2532
2533
2534 //    End of lfta prefilter stuff
2535 // --------------------------------------------------
2536
2537 //                      If there is a field verifier, warn about
2538 //                      lack of compatability
2539                   string lfta_comment;
2540                   string lfta_title;
2541                   string lfta_namespace;
2542                   map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
2543                   if(ldefs.count("comment")>0)
2544                         lfta_comment = lfta_sq->defines["comment"];
2545                   if(ldefs.count("Comment")>0)
2546                         lfta_comment = lfta_sq->defines["Comment"];
2547                   if(ldefs.count("COMMENT")>0)
2548                         lfta_comment = lfta_sq->defines["COMMENT"];
2549                   if(ldefs.count("title")>0)
2550                         lfta_title = lfta_sq->defines["title"];
2551                   if(ldefs.count("Title")>0)
2552                         lfta_title = lfta_sq->defines["Title"];
2553                   if(ldefs.count("TITLE")>0)
2554                         lfta_title = lfta_sq->defines["TITLE"];
2555                   if(ldefs.count("NAMESPACE")>0)
2556                         lfta_namespace = lfta_sq->defines["NAMESPACE"];
2557                   if(ldefs.count("Namespace")>0)
2558                         lfta_namespace = lfta_sq->defines["Namespace"];
2559                   if(ldefs.count("namespace")>0)
2560                         lfta_namespace = lfta_sq->defines["namespace"];
2561
2562                   string lfta_ht_size;
2563                   if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
2564                         lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
2565                   if(ldefs.count("aggregate_slots")>0){
2566                         lfta_ht_size = ldefs["aggregate_slots"];
2567                   }
2568
2569 //                      NOTE : I'm assuming that visible lftas do not start with _fta.
2570 //                              -- will fail for non-visible simple selection queries.
2571                 if(field_verifier != NULL && query_names[mi].substr(0,1) != "_"){
2572                         string warning_str;
2573                         if(lfta_comment == "")
2574                                 warning_str += "\tcomment not found.\n";
2575                         if(lfta_title == "")
2576                                 warning_str += "\ttitle not found.\n";
2577                         if(lfta_namespace == "")
2578                                 warning_str += "\tnamespace not found.\n";
2579
2580                         vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2581                         int fi;
2582                         for(fi=0;fi<flds.size();fi++){
2583                                 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2584                         }
2585                         if(warning_str != "")
2586                                 fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
2587                                         query_names[mi].c_str(),warning_str.c_str());
2588                 }
2589
2590
2591 //                      Create qtree output
2592                 fprintf(qtree_output,"\t<LFTA name='%s' >\n",query_names[mi].c_str());
2593                   if(lfta_comment != "")
2594                         fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
2595                   if(lfta_title != "")
2596                         fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
2597                   if(lfta_namespace != "")
2598                         fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
2599                   if(lfta_ht_size != "")
2600                         fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
2601                 if(lmach != "")
2602                   fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
2603                 else
2604                   fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
2605                 fprintf(qtree_output,"\t\t<Interface  value='%s' />\n",interface_names[mi].c_str());
2606                 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
2607                 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2608                 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
2609 //                              write info about fields to qtree.xml
2610                   vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2611                   int fi;
2612                   for(fi=0;fi<flds.size();fi++){
2613                         fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2614                         if(flds[fi]->get_modifier_list()->size()){
2615                                 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2616                         }
2617                         fprintf(qtree_output," />\n");
2618                   }
2619                 fprintf(qtree_output,"\t</LFTA>\n");
2620
2621
2622             }
2623           }
2624
2625           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2626                         string liface = (*mvsi).first;
2627                         lfta_val[lmach] += 
2628 "       if (!strcmp(device, \""+liface+"\")) \n"
2629 "               lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
2630 ;
2631                 }
2632                 lfta_val[lmach] += 
2633 "       if(lfta_prefilter == NULL){\n"
2634 "               fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
2635 "               exit(1);\n"
2636 "       }\n"
2637 ;
2638
2639
2640
2641           lfta_val[lmach] += "}\n\n";
2642
2643       if(!(debug_only || hfta_only) ){
2644                 string lfta_flnm;
2645                 if(lmach != "")
2646                   lfta_flnm = lmach + "_lfta.c";
2647                 else
2648                   lfta_flnm = hostname + "_lfta.c";
2649                 if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
2650                         fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
2651                         exit(1);
2652                 }
2653               fprintf(lfta_out,"%s",lfta_header.c_str());
2654               fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
2655               fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
2656                 fclose(lfta_out);
2657           }
2658         }
2659
2660 //              Say what are the operators which must execute
2661         if(opviews.size()>0)
2662                 fprintf(stderr,"The queries use the following external operators:\n");
2663         for(i=0;i<opviews.size();++i){
2664                 opview_entry *opv = opviews.get_entry(i);
2665                 fprintf(stderr,"\t%s\n",opv->view_name.c_str());
2666         }
2667
2668         if(create_makefile)
2669                 generate_makefile(input_file_names, nfiles, hfta_names, opviews,
2670                 machine_names, schema_file_name,
2671                 interface_names,
2672                 ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
2673
2674
2675         fprintf(qtree_output,"</QueryNodes>\n");
2676
2677         return(0);
2678 }
2679
2680 ////////////////////////////////////////////////////////////
2681
2682 void generate_makefile(vector<string> &input_file_names, int nfiles,
2683                                            vector<string> &hfta_names, opview_set &opviews,
2684                                                 vector<string> &machine_names,
2685                                                 string schema_file_name,
2686                                                 vector<string> &interface_names,
2687                                                 ifq_t *ifdb, string &config_dir_path,
2688                                                 bool use_pads,
2689                                                 string extra_libs,
2690                                                 map<string, vector<int> > &rts_hload
2691                                          ){
2692         int i,j;
2693
2694         if(config_dir_path != ""){
2695                 config_dir_path = "-C "+config_dir_path;
2696         }
2697
2698         struct stat sb;
2699         bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
2700         bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
2701
2702 //      if(libz_exists && !libast_exists){
2703 //              fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
2704 //              exit(1);
2705 //      }
2706
2707 //                      Get set of operator executable files to run
2708         set<string> op_fls;
2709         set<string>::iterator ssi;
2710         for(i=0;i<opviews.size();++i){
2711                 opview_entry *opv = opviews.get_entry(i);
2712                 if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
2713         }
2714
2715         FILE *outfl = fopen("Makefile", "w");
2716         if(outfl==NULL){
2717                 fprintf(stderr,"Can't open Makefile for write, exiting.\n");
2718                 exit(0);
2719         }
2720
2721         fputs(
2722 ("CPP= g++ -O3 -g -I "+root_path+"/include  -I "+root_path+"/include/hfta\n"
2723 "CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
2724 ).c_str(), outfl
2725 );
2726         if(generate_stats)
2727                 fprintf(outfl,"  -DLFTA_STATS");
2728
2729         fprintf(outfl,
2730 "\n"
2731 "\n"
2732 "all: rts");
2733         for(i=0;i<hfta_names.size();++i)
2734                 fprintf(outfl," %s",hfta_names[i].c_str());
2735         fputs(
2736 ("\n"
2737 "\n"
2738 "rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a  "+root_path+"/lib/libclearinghouse.a\n"
2739 "\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
2740         if(use_pads)
2741                 fprintf(outfl,"-L. ");
2742         fputs(
2743 ("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
2744         if(use_pads)
2745                 fprintf(outfl,"-lgscppads -lpads ");
2746         fprintf(outfl,
2747 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux  -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
2748         if(use_pads)
2749                 fprintf(outfl, "-lpz -lz -lbz ");
2750         if(libz_exists && libast_exists)
2751                 fprintf(outfl,"-last ");
2752         if(use_pads)
2753                 fprintf(outfl, "-ldll -ldl ");
2754         fprintf(outfl," -lgscpaux");
2755 #ifdef GCOV
2756         fprintf(outfl," -fprofile-arcs");
2757 #endif
2758         fprintf(outfl,
2759 "\n"
2760 "\n"
2761 "lfta.o: %s_lfta.c\n"
2762 "\t$(CC) -o lfta.o -c %s_lfta.c\n"
2763 "\n"
2764 "%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
2765         for(i=0;i<nfiles;++i)
2766                 fprintf(outfl," %s",input_file_names[i].c_str());
2767         if(hostname == ""){
2768                 fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
2769         }else{
2770                 fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
2771         }
2772         for(i=0;i<nfiles;++i)
2773                 fprintf(outfl," %s",input_file_names[i].c_str());
2774         fprintf(outfl,"\n");
2775
2776         for(i=0;i<hfta_names.size();++i)
2777                 fprintf(outfl,
2778 ("%s: %s.o\n"
2779 "\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
2780 "\n"
2781 "%s.o: %s.cc\n"
2782 "\t$(CPP) -o %s.o -c %s.cc\n"
2783 "\n"
2784 "\n").c_str(),
2785     hfta_names[i].c_str(), hfta_names[i].c_str(),
2786         hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
2787         hfta_names[i].c_str(), hfta_names[i].c_str(),
2788         hfta_names[i].c_str(), hfta_names[i].c_str()
2789                 );
2790
2791         fprintf(outfl,
2792 ("\n"
2793 "packet_schema.txt:\n"
2794 "\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
2795 "\n"
2796 "external_fcns.def:\n"
2797 "\tln -s "+root_path+"/cfg/external_fcns.def .\n"
2798 "\n"
2799 "clean:\n"
2800 "\trm -rf core rts *.o  %s_lfta.c  external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
2801         for(i=0;i<hfta_names.size();++i)
2802                 fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
2803         fprintf(outfl,"\n");
2804
2805         fclose(outfl);
2806
2807
2808
2809 //              Gather the set of interfaces
2810 //              TODO : must update to hanndle machines
2811 //              TODO : lookup interface attributes and add them as a parameter to rts process
2812         outfl = fopen("runit", "w");
2813         if(outfl==NULL){
2814                 fprintf(stderr,"Can't open runit for write, exiting.\n");
2815                 exit(0);
2816         }
2817
2818 //              Gather the set of interfaces
2819 //              Also, gather "base interface names" for use in computing
2820 //              the hash splitting to virtual interfaces.
2821 //              TODO : must update to hanndle machines
2822         set<string> ifaces;
2823         set<string> base_vifaces;       // base interfaces of virtual interfaces
2824         map<string, string> ifmachines;
2825         map<string, string> ifattrs;
2826         for(i=0;i<interface_names.size();++i){
2827                 ifaces.insert(interface_names[i]);
2828                 ifmachines[interface_names[i]] = machine_names[i];
2829
2830                 size_t Xpos = interface_names[i].find_last_of("X");
2831                 if(Xpos!=string::npos){
2832                         string iface = interface_names[i].substr(0,Xpos);
2833                         base_vifaces.insert(iface);
2834                 }
2835                 // get interface attributes and add them to the list
2836         }
2837
2838         fputs(
2839 ("#!/bin/sh\n"
2840 "./stopit\n"
2841 +root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
2842 "sleep 5\n"
2843 "if [ ! -f gshub.log ]\n"
2844 "then\n"
2845 "\techo \"Failed to start bin/gshub.py\"\n"
2846 "\texit -1\n"
2847 "fi\n"
2848 "ADDR=`cat gshub.log`\n"
2849 "ps opgid= $! >> gs.pids\n"
2850 "./rts $ADDR default ").c_str(), outfl);
2851         int erri;
2852         string err_str;
2853         for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
2854                 string ifnm = (*ssi);
2855                 fprintf(outfl, "%s ",ifnm.c_str());
2856                 vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
2857                 for(j=0;j<ifv.size();++j)
2858                         fprintf(outfl, "%s ",ifv[j].c_str());
2859         }
2860         fprintf(outfl, " &\n");
2861         fprintf(outfl, "echo $! >> gs.pids\n");
2862         for(i=0;i<hfta_names.size();++i)
2863                 fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
2864
2865         for(j=0;j<opviews.opview_list.size();++j){
2866                 fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
2867         }
2868
2869         fclose(outfl);
2870         system("chmod +x runit");
2871
2872         outfl = fopen("stopit", "w");
2873         if(outfl==NULL){
2874                 fprintf(stderr,"Can't open stopit for write, exiting.\n");
2875                 exit(0);
2876         }
2877
2878         fprintf(outfl,"#!/bin/sh\n"
2879 "rm -f gshub.log\n"
2880 "if [ ! -f gs.pids ]\n"
2881 "then\n"
2882 "exit\n"
2883 "fi\n"
2884 "for pgid in `cat gs.pids`\n"
2885 "do\n"
2886 "kill -TERM -$pgid\n"
2887 "done\n"
2888 "sleep 1\n"
2889 "for pgid in `cat gs.pids`\n"
2890 "do\n"
2891 "kill -9 -$pgid\n"
2892 "done\n"
2893 "rm gs.pids\n");
2894
2895         fclose(outfl);
2896         system("chmod +x stopit");
2897
2898 //-----------------------------------------------
2899
2900 /* For now disable support for virtual interfaces
2901         outfl = fopen("set_vinterface_hash.bat", "w");
2902         if(outfl==NULL){
2903                 fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
2904                 exit(0);
2905         }
2906
2907 //              The format should be determined by an entry in the ifres.xml file,
2908 //              but for now hardcode the only example I have.
2909         for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
2910                 if(rts_hload.count((*ssi))){
2911                         string iface_name = (*ssi);
2912                         string iface_number = "";
2913                         for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
2914                                 if(isdigit(iface_name[j])){
2915                                         iface_number = iface_name[j];
2916                                         if(j>0 && isdigit(iface_name[j-1]))
2917                                                 iface_number = iface_name[j-1] + iface_number;
2918                                 }
2919                         }
2920
2921                         fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
2922                         vector<int> halloc = rts_hload[iface_name];
2923                         int prev_limit = 0;
2924                         for(j=0;j<halloc.size();++j){
2925                                 if(j>0)
2926                                         fprintf(outfl,":");
2927                                 fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
2928                                 prev_limit = halloc[j];
2929                         }
2930                         fprintf(outfl,"\n");
2931                 }
2932         }
2933         fclose(outfl);
2934         system("chmod +x set_vinterface_hash.bat");
2935 */
2936 }
2937
2938 //              Code for implementing a local schema
2939 /*
2940                 table_list qpSchema;
2941
2942 //                              Load the schemas of any LFTAs.
2943                 int l;
2944                 for(l=0;l<hfta_nbr;++l){
2945                         stream_query *sq0 = split_queries[l];
2946                         table_def *td = sq0->get_output_tabledef();
2947                         qpSchema.append_table(td);
2948                 }
2949 //                              load the schemas of any other ref'd tables.
2950 //                              (e.g., hftas)
2951                 vector<tablevar_t *>  input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
2952                 int ti;
2953                 for(ti=0;ti<input_tbl_names.size();++ti){
2954                         int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
2955                         if(tbl_ref < 0){
2956                                 tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
2957                                 if(tbl_ref < 0){
2958                                         fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
2959                                         exit(1);
2960                                 }
2961                                 qpSchema.append_table(Schema->get_table(tbl_ref));
2962                         }
2963                 }
2964 */
2965
2966 //              Functions related to parsing.
2967
2968 /*
2969 static int split_string(char *instr,char sep, char **words,int max_words){
2970    char *loc;
2971    char *str;
2972    int nwords = 0;
2973
2974    str = instr;
2975    words[nwords++] = str;
2976    while( (loc = strchr(str,sep)) != NULL){
2977         *loc = '\0';
2978         str = loc+1;
2979         if(nwords >= max_words){
2980                 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
2981                 nwords = max_words-1;
2982         }
2983         words[nwords++] = str;
2984    }
2985
2986    return(nwords);
2987 }
2988
2989 */
2990