Add support for query key extraction
[com/gs-lite.git] / src / ftacmp / translate_fta.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #include<unistd.h>              // for gethostname
17
18 #include <string>
19 #include "parse_fta.h"
20 #include "parse_schema.h"
21 #include "parse_ext_fcns.h"
22 #include"analyze_fta.h"
23 #include"query_plan.h"
24 #include"generate_lfta_code.h"
25 #include"stream_query.h"
26 #include"generate_utils.h"
27 #include"nic_def.h"
28 #include"generate_nic_code.h"
29
30 #include <stdlib.h>
31 #include <stdio.h>
32 #include<ctype.h>
33 #include<glob.h>
34 #include<string.h>
35
36 #include<list>
37
38 //              for the scandir
39      #include <sys/types.h>
40      #include <dirent.h>
41
42
43 #include<errno.h>
44
45 //              to verify that some files exist.
46      #include <sys/types.h>
47      #include <sys/stat.h>
48
49 #include "parse_partn.h"
50
51 #include "print_plan.h"
52
53 //              Interface to the xml parser
54
55 #include"xml_t.h"
56 #include"field_list.h"
57
58 extern int xmlParserparse(void);
59 extern FILE *xmlParserin;
60 extern int xmlParserdebug;
61
62 std::vector<std::string> xml_attr_vec;
63 std::vector<std::string> xml_val_vec;
64 std::string xml_a, xml_v;
65 xml_t *xml_leaves = NULL;
66
67 //      Interface to the field list verifier
68 field_list *field_verifier = NULL;
69
70 #define TMPSTRLEN 1000
71
72 #ifndef PATH_DELIM
73   #define PATH_DELIM '/'
74 #endif
75
76 char tmp_schema_str[10000];
77
78 // maximum delay between two hearbeats produced
79 // by UDOP. Used when its not explicity
80 // provided in udop definition
81 #define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
82
83 //              Default lfta hash table size, must be power of 2.
84 int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
85
86 //              Interface to FTA definition lexer and parser ...
87
88 extern int FtaParserparse(void);
89 extern FILE *FtaParserin;
90 extern int FtaParserdebug;
91
92 fta_parse_t *fta_parse_result;
93 var_defs_t *fta_parse_defines;
94
95
96
97 //              Interface to external function lexer and parser ...
98
99 extern int Ext_fcnsParserparse(void);
100 extern FILE *Ext_fcnsParserin;
101 extern int Ext_fcnsParserdebug;
102
103 ext_fcn_list *Ext_fcns;
104
105
106 //              Interface to partition definition parser
107 extern int PartnParserparse();
108 partn_def_list_t *partn_parse_result = NULL;
109
110
111
112 using namespace std;
113 //extern int errno;
114
115
116 //              forward delcaration of local utility function
117 void generate_makefile(vector<string> &input_file_names, int nfiles,
118                                            vector<string> &hfta_names, opview_set &opviews,
119                                                 vector<string> &machine_names,
120                                                 string schema_file_name,
121                                                 vector<string> &interface_names,
122                                                 ifq_t *ifdb, string &config_dir_path,
123                                                 bool use_pads,
124                                                 string extra_libs,
125                                                 map<string, vector<int> > &rts_hload
126                                         );
127
128 //static int split_string(char *instr,char sep, char **words,int max_words);
129 #define MAXFLDS 100
130
131   FILE *schema_summary_output = NULL;           // query names
132
133 //                      Dump schema summary
134 void dump_summary(stream_query *str){
135         fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
136
137         table_def *sch = str->get_output_tabledef();
138
139         vector<field_entry *> flds = sch->get_fields();
140         int f;
141         for(f=0;f<flds.size();++f){
142                 if(f>0) fprintf(schema_summary_output,"|");
143                 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
144         }
145         fprintf(schema_summary_output,"\n");
146         for(f=0;f<flds.size();++f){
147                 if(f>0) fprintf(schema_summary_output,"|");
148                 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
149         }
150         fprintf(schema_summary_output,"\n");
151 }
152
153 //              Globals
154 string hostname;                // name of current host.
155 int hostname_len;
156 bool generate_stats = false;
157 string root_path = "../..";
158
159
160 int main(int argc, char **argv){
161   char tmpstr[TMPSTRLEN];
162   string err_str;
163   int q,s,h,f;
164
165   set<int>::iterator si;
166
167   vector<string> query_names;                   // for lfta.c registration
168   map<string, vector<int> > mach_query_names;   // list queries of machine
169   vector<int> snap_lengths;                             // for lfta.c registration
170   vector<string> interface_names;                       // for lfta.c registration
171   vector<string> machine_names;                 // machine of interface
172   vector<bool> lfta_reuse_options;                      // for lfta.c registration
173   vector<int> lfta_liveness_timeouts;           // fot qtree.xml generation
174   vector<string> hfta_names;                    // hfta cource code names, for
175                                                                                 // creating make file.
176   vector<string> qnames;                                // ensure unique names
177   map<string, int> lfta_names;                  // keep track of unique lftas.
178
179
180 //                              set these to 1 to debug the parser
181   FtaParserdebug = 0;
182   Ext_fcnsParserdebug = 0;
183
184   FILE *lfta_out;                               // lfta.c output.
185   FILE *fta_in;                                 // input file
186   FILE *table_schemas_in;               // source tables definition file
187   FILE *query_name_output;              // query names
188   FILE *qtree_output;                   // interconnections of query nodes
189
190   // -------------------------------
191   // Handling of Input Arguments
192   // -------------------------------
193     char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
194         const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
195                 "\t[-B] : debug only (don't create output files)\n"
196                 "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
197                 "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
198                 "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
199                 "\t[-C] : use <config directory> for definition files\n"
200                 "\t[-l] : use <library directory> for library queries\n"
201                 "\t[-N] : output query names in query_names.txt\n"
202                 "\t[-H] : create HFTA only (no schema_file)\n"
203                 "\t[-Q] : use query name for hfta suffix\n"
204                 "\t[-M] : generate make file and runit, stopit scripts\n"
205                 "\t[-S] : enable LFTA statistics (alters Makefile).\n"
206                 "\t[-f] : Output schema summary to schema_summary.txt\n"
207                 "\t[-P] : link with PADS\n"
208                 "\t[-h] : override host name.\n"
209                 "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
210                 "\t[-R] : path to root of GS-lite\n"
211 ;
212
213 //              parameters gathered from command line processing
214         string external_fcns_path;
215 //      string internal_fcn_path;
216         string config_dir_path;
217         string library_path = "./";
218         vector<string> input_file_names;
219         string schema_file_name;
220         bool debug_only = false;
221         bool hfta_only = false;
222         bool output_query_names = false;
223         bool output_schema_summary=false;
224         bool numeric_hfta_flname = true;
225         bool create_makefile = false;
226         bool distributed_mode = false;
227         bool partitioned_mode = false;
228         bool use_live_hosts_file = false;
229         bool use_pads = false;
230         bool clean_make = false;
231         int n_virtual_interfaces = 1;
232
233    char chopt;
234    while((chopt = getopt(argc,argv,optstr)) != -1){
235                 switch(chopt){
236                 case 'B':
237                         debug_only = true;
238                         break;
239                 case 'D':
240                         distributed_mode = true;
241                         break;
242                 case 'p':
243                         partitioned_mode = true;
244                         break;
245                 case 'L':
246                         use_live_hosts_file = true;
247                         break;
248                 case 'C':
249                                 if(optarg != NULL)
250                                  config_dir_path = string(optarg) + string("/");
251                         break;
252                 case 'l':
253                                 if(optarg != NULL)
254                                  library_path = string(optarg) + string("/");
255                         break;
256                 case 'N':
257                         output_query_names = true;
258                         break;
259                 case 'Q':
260                         numeric_hfta_flname = false;
261                         break;
262                 case 'H':
263                         if(schema_file_name == ""){
264                                 hfta_only = true;
265                         }
266                         break;
267                 case 'f':
268                         output_schema_summary=true;
269                         break;
270                 case 'M':
271                         create_makefile=true;
272                         break;
273                 case 'S':
274                         generate_stats=true;
275                         break;
276                 case 'P':
277                         use_pads = true;
278                         break;
279                 case 'c':
280                         clean_make = true;
281                         break;
282                 case 'h':
283                         if(optarg != NULL)
284                                 hostname = optarg;
285                         break;
286                 case 'R':
287                         if(optarg != NULL)
288                                 root_path = optarg;
289                         break;
290                 case 'n':
291                         if(optarg != NULL){
292                                 n_virtual_interfaces = atoi(optarg);
293                                 if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
294                                         fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
295                                         n_virtual_interfaces = 1;
296                                 }
297                         }
298                         break;
299                 case '?':
300                         fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
301                         fprintf(stderr,"%s\n", usage_str);
302                         exit(1);
303                 default:
304                         fprintf(stderr, "Argument was %c\n", optopt);
305                         fprintf(stderr,"Invalid arguments\n");
306                         fprintf(stderr,"%s\n", usage_str);
307                         exit(1);
308                 }
309         }
310         argc -= optind;
311         argv += optind;
312         for (int i = 0; i < argc; ++i) {
313                 if((schema_file_name == "") && !hfta_only){
314                         schema_file_name = argv[i];
315                 }else{
316                         input_file_names.push_back(argv[i]);
317                 }
318         }
319
320         if(input_file_names.size() == 0){
321                 fprintf(stderr,"%s\n", usage_str);
322                 exit(1);
323         }
324
325         if(clean_make){
326                 string clean_cmd = "rm Makefile hfta_*.cc";
327                 int clean_ret = system(clean_cmd.c_str());
328                 if(clean_ret){
329                         fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
330                 }
331         }
332
333
334         nic_prop_db *npdb = new nic_prop_db(config_dir_path);
335
336 //                      Open globally used file names.
337
338         // prepend config directory to schema file
339         schema_file_name = config_dir_path + schema_file_name;
340         external_fcns_path = config_dir_path + string("external_fcns.def");
341     string ifx_fname = config_dir_path + string("ifres.xml");
342
343 //              Find interface query file(s).
344         if(hostname == ""){
345                 gethostname(tmpstr,TMPSTRLEN);
346                 hostname = tmpstr;
347         }
348         hostname_len = strlen(tmpstr);
349     string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
350         vector<string> ifq_fls;
351
352                 ifq_fls.push_back(ifq_fname);
353
354
355 //                      Get the field list, if it exists
356         string flist_fl = config_dir_path + "field_list.xml";
357         FILE *flf_in = NULL;
358         if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
359                 fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
360                 xml_leaves = new xml_t();
361                 xmlParser_setfileinput(flf_in);
362                 if(xmlParserparse()){
363                         fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
364                 }else{
365                         field_verifier = new field_list(xml_leaves);
366                 }
367         }
368
369         if(!hfta_only){
370           if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
371                 fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
372                 exit(1);
373           }
374         }
375
376 /*
377         if(!(debug_only || hfta_only)){
378           if((lfta_out = fopen("lfta.c","w")) == NULL){
379                 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
380                 exit(1);
381           }
382         }
383 */
384
385 //              Get the output specification file.
386 //              format is
387 //                      query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
388         string ospec_fl = "output_spec.cfg";
389         FILE *osp_in = NULL;
390         vector<ospec_str *> output_specs;
391         multimap<string, int> qname_to_ospec;
392         if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
393                 char *flds[MAXFLDS];
394                 int o_lineno = 0;
395                 while(fgets(tmpstr,TMPSTRLEN,osp_in)){
396                         o_lineno++;
397                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
398                         if(nflds == 7){
399 //              make operator type lowercase
400                                 char *tmpc;
401                                 for(tmpc=flds[1];*tmpc!='\0';++tmpc)
402                                         *tmpc = tolower(*tmpc);
403
404                                 ospec_str *tmp_ospec = new ospec_str();
405                                 tmp_ospec->query = flds[0];
406                                 tmp_ospec->operator_type = flds[1];
407                                 tmp_ospec->operator_param = flds[2];
408                                 tmp_ospec->output_directory = flds[3];
409                                 tmp_ospec->bucketwidth = atoi(flds[4]);
410                                 tmp_ospec->partitioning_flds = flds[5];
411                                 tmp_ospec->n_partitions = atoi(flds[6]);
412                                 qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
413                                 output_specs.push_back(tmp_ospec);
414                         }else{
415                                 fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
416                         }
417                 }
418                 fclose(osp_in);
419         }else{
420                 fprintf(stderr,"output_spec.cfg not found.  The query set has no output.  exiting.\n");
421                 exit(1);
422         }
423
424 //              hfta parallelism
425         string pspec_fl = "hfta_parallelism.cfg";
426         FILE *psp_in = NULL;
427         map<string, int> hfta_parallelism;
428         if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
429                 char *flds[MAXFLDS];
430                 int o_lineno = 0;
431                 while(fgets(tmpstr,TMPSTRLEN,psp_in)){
432                         bool good_entry = true;
433                         o_lineno++;
434                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
435                         if(nflds == 2){
436                                 string hname = flds[0];
437                                 int par = atoi(flds[1]);
438                                 if(par <= 0 || par > n_virtual_interfaces){
439                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
440                                         good_entry = false;
441                                 }
442                                 if(good_entry && n_virtual_interfaces % par != 0){
443                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
444                                         good_entry = false;
445                                 }
446                                 if(good_entry)
447                                         hfta_parallelism[hname] = par;
448                         }
449                 }
450         }else{
451                 fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
452         }
453
454
455 //              LFTA hash table sizes
456         string htspec_fl = "lfta_htsize.cfg";
457         FILE *htsp_in = NULL;
458         map<string, int> lfta_htsize;
459         if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
460                 char *flds[MAXFLDS];
461                 int o_lineno = 0;
462                 while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
463                         bool good_entry = true;
464                         o_lineno++;
465                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
466                         if(nflds == 2){
467                                 string lfta_name = flds[0];
468                                 int htsz = atoi(flds[1]);
469                                 if(htsz>0){
470                                         lfta_htsize[lfta_name] = htsz;
471                                 }else{
472                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
473                                 }
474                         }
475                 }
476         }else{
477                 fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
478         }
479
480 //              LFTA vitual interface hash split
481         string rtlspec_fl = "rts_load.cfg";
482         FILE *rtl_in = NULL;
483         map<string, vector<int> > rts_hload;
484         if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
485                 char *flds[MAXFLDS];
486                 int r_lineno = 0;
487                 string iface_name;
488                 vector<int> hload;
489                 while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
490                         bool good_entry = true;
491                         r_lineno++;
492                         iface_name = "";
493                         hload.clear();
494                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
495                         if(nflds >1){
496                                 iface_name = flds[0];
497                                 int cumm_h = 0;
498                                 int j;
499                                 for(j=1;j<nflds;++j){
500                                         int h = atoi(flds[j]);
501                                         if(h<=0)
502                                                 good_entry = false;
503                                         cumm_h += h;
504                                         hload.push_back(cumm_h);
505                                 }
506                         }else{
507                                 good_entry = false;
508                         }
509                         if(good_entry){
510                                 rts_hload[iface_name] = hload;
511                         }else{
512                                 fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
513                         }
514                 }
515         }
516
517
518
519         if(output_query_names){
520           if((query_name_output = fopen("query_names.txt","w")) == NULL){
521                 fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
522                 exit(1);
523           }
524         }
525
526         if(output_schema_summary){
527           if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
528                 fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
529                 exit(1);
530           }
531         }
532
533         if((qtree_output = fopen("qtree.xml","w")) == NULL){
534                 fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
535                 exit(1);
536         }
537         fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
538         fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
539         fprintf(qtree_output,"<QueryNodes>\n");
540
541
542 //                      Get an initial Schema
543         table_list *Schema;
544         if(!hfta_only){
545 //                      Parse the table schema definitions.
546           fta_parse_result = new fta_parse_t();
547           FtaParser_setfileinput(table_schemas_in);
548           if(FtaParserparse()){
549                 fprintf(stderr,"Table schema parse failed.\n");
550                 exit(1);
551           }
552           if(fta_parse_result->parse_type != TABLE_PARSE){
553                 fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
554                 exit(1);
555           }
556           Schema = fta_parse_result->tables;
557
558 //                      Ensure that all schema_ids, if set, are distinct.
559 //  Obsolete? There is code elsewhere to ensure that schema IDs are
560 //  distinct on a per-interface basis.
561 /*
562           set<int> found_ids;
563           set<int> dup_ids;
564           for(int t=0;t<Schema->size();++t){
565                 int sch_id = Schema->get_table(t)->get_schema_id();
566                 if(sch_id> 0){
567                         if(found_ids.find(sch_id) != found_ids.end()){
568                                 dup_ids.insert(sch_id);
569                         }else{
570                                 found_ids.insert(sch_id);
571                         }
572                 }
573                 if(dup_ids.size()>0){
574                         fprintf(stderr, "Error, the schema has duplicate schema_ids:");
575                         for(auto dit=dup_ids.begin();dit!=dup_ids.end(); ++dit)
576                                 fprintf(stderr," %d",(*dit));
577                         fprintf(stderr,"\n");
578                         exit(1);
579                 }
580           }
581 */
582
583
584 //                      Process schema field inheritance
585           int retval;
586           retval = Schema->unroll_tables(err_str);
587           if(retval){
588                 fprintf(stderr,"Error processing schema field inheritance:\n %s\n", err_str.c_str() );
589                 exit(1);
590           }
591         }else{
592 //                      hfta only => we will try to fetch schemas from the registry.
593 //                      therefore, start off with an empty schema.
594           Schema = new table_list();
595         }
596
597
598 //                      Open and parse the external functions file.
599         Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
600         if(Ext_fcnsParserin == NULL){
601                 fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
602                 Ext_fcns = new ext_fcn_list();
603         }else{
604                 if(Ext_fcnsParserparse()){
605                         fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
606                         Ext_fcns = new ext_fcn_list();
607                 }
608         }
609         if(Ext_fcns->validate_fcns(err_str)){
610                 fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
611                 exit(1);
612         }
613
614 //              Open and parse the interface resources file.
615 //      ifq_t *ifaces_db = new ifq_t();
616 //   string ierr;
617 //      if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
618 //              fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
619 //                              ifx_fname.c_str(), ierr.c_str());
620 //              exit(1);
621 //      }
622 //      if(ifaces_db->load_ifqs(ifq_fname, ierr)){
623 //              fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
624 //                              ifq_fname.c_str(), ierr.c_str());
625 //              exit(1);
626 //      }
627
628
629 //                      The LFTA code string.
630 //                      Put the standard preamble here.
631 //                      NOTE: the hash macros, fcns should go into the run time
632   map<string, string> lfta_val;
633   map<string, string> lfta_prefilter_val;
634
635   string lfta_header =
636 "#include <limits.h>\n\n"
637 "#include \"rts.h\"\n"
638 "#include \"fta.h\"\n"
639 "#include \"lapp.h\"\n"
640 "#include \"rts_udaf.h\"\n\n"
641 ;
642 // Get any locally defined parsing headers
643     glob_t glob_result;
644     memset(&glob_result, 0, sizeof(glob_result));
645
646     // do the glob operation
647     int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
648         if(return_value == 0){
649         for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
650                         char *flds[1000];
651                         int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
652                         lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n\n";
653         }
654         }else{
655                 fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
656         }
657
658 /*
659 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
660 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
661 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
662 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
663 */
664
665         lfta_header += 
666 "\n"
667 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
668 "\n"
669 "#define SLOT_FILLED 0x04\n"
670 "#define SLOT_GEN_BITS 0x03\n"
671 "#define SLOT_HASH_BITS 0xfffffff8\n"
672 "#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
673 "#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
674 "#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
675 "\n\n"
676
677 "#define lfta_BOOL_to_hash(x) (x)\n"
678 "#define lfta_USHORT_to_hash(x) (x)\n"
679 "#define lfta_UINT_to_hash(x) (x)\n"
680 "#define lfta_IP_to_hash(x) (x)\n"
681 "#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
682 "#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
683 "#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
684 "#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
685 "#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
686 "static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
687 "       gs_uint32_t i,ret=0,tmp_sum = 0;\n"
688 "       for(i=0;i<x.length;++i){\n"
689 "               tmp_sum |= (x.data[i]) << (8*(i%4));\n"
690 "               if((i%4) == 3){\n"
691 "                       ret ^= tmp_sum;\n"
692 "                       tmp_sum = 0;\n"
693 "               }\n"
694 "       }\n"
695 "       if((i%4)!=0) ret ^=tmp_sum;\n"
696 "       return(ret);\n"
697 "}\n\n\n";
698
699
700
701 //////////////////////////////////////////////////////////////////
702 /////                   Get all of the query parse trees
703
704
705   int i,p;
706   int hfta_count = 0;           // for numeric suffixes to hfta .cc files
707
708 //---------------------------
709 //              Global info needed for post processing.
710
711 //                      Set of operator views ref'd in the query set.
712         opview_set opviews;
713 //                      lftas on a per-machine basis.
714         map<string, vector<stream_query *> > lfta_mach_lists;
715         int nfiles = input_file_names.size();
716         vector<stream_query *> hfta_list;               // list of hftas.
717         map<string, stream_query *> sq_map;             // map from query name to stream query.
718
719
720 //////////////////////////////////////////
721
722 //              Open and parse the interface resources file.
723         ifq_t *ifaces_db = new ifq_t();
724     string ierr;
725         if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
726                 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
727                                 ifx_fname.c_str(), ierr.c_str());
728                 exit(1);
729         }
730         if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
731                 fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
732                                 ifq_fls[0].c_str(), ierr.c_str());
733                 exit(1);
734         }
735
736   map<string, string> qname_to_flname;  // for detecting duplicate query names
737
738
739
740 //                      Parse the files to create a vector of parse trees.
741 //                      Load qnodes with information to perform a topo sort
742 //                      based on query dependencies.
743   vector<query_node *> qnodes;                          // for topo sort.
744   map<string,int> name_node_map;                        // map query name to qnodes entry
745   for(i=0;i<input_file_names.size();i++){
746
747           if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
748                   fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
749                   continue;
750           }
751 fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
752
753 //                      Parse the FTA query
754           fta_parse_result = new fta_parse_t();
755           FtaParser_setfileinput(fta_in);
756           if(FtaParserparse()){
757                 fprintf(stderr,"FTA parse failed.\n");
758                 exit(1);
759           }
760           if(fta_parse_result->parse_type != QUERY_PARSE){
761                 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
762                 exit(1);
763           }
764
765 //                      returns a list of parse trees
766           vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
767           for(p=0;p<qlist.size();++p){
768             table_exp_t *fta_parse_tree = qlist[p];
769 //              query_parse_trees.push_back(fta_parse_tree);
770
771 //                      compute the default name -- extract from query name
772                 strcpy(tmpstr,input_file_names[i].c_str());
773                 char *qname = strrchr(tmpstr,PATH_DELIM);
774                 if(qname == NULL)
775                         qname = tmpstr;
776                 else
777                         qname++;
778                 char *qname_end = strchr(qname,'.');
779                 if(qname_end != NULL) *qname_end = '\0';
780                 string qname_str = qname;
781                 string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
782
783 //                      Deternmine visibility.  Should I be attaching all of the output methods?
784                 if(qname_to_ospec.count(imputed_qname)>0)
785                         fta_parse_tree->set_visible(true);
786                 else
787                         fta_parse_tree->set_visible(false);
788
789
790 //                              Create a manipulable repesentation of the parse tree.
791 //                              the qnode inherits the visibility assigned to the parse tree.
792             int pos = qnodes.size();
793                 qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
794                 name_node_map[ qnodes[pos]->name ] = pos;
795 //printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
796 //              qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
797 //              qfiles.push_back(i);
798
799 //                      Check for duplicate query names
800 //                                      NOTE : in hfta-only generation, I should
801 //                                      also check with the names of the registered queries.
802                 if(qname_to_flname.count(qnodes[pos]->name) > 0){
803                         fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
804                                 qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
805                         exit(1);
806                 }
807                 if(Schema->find_tbl(qnodes[pos]->name) >= 0){
808                         fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
809                                 qnodes[pos]->name.c_str(), input_file_names[i].c_str());
810                         exit(1);
811                 }
812                 qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
813
814
815         }
816   }
817
818 //              Add the library queries
819
820   int pos;
821   for(pos=0;pos<qnodes.size();++pos){
822         int fi;
823         for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
824                 string src_tbl = qnodes[pos]->refd_tbls[fi];
825                 if(qname_to_flname.count(src_tbl) == 0){
826                         int last_sep = src_tbl.find_last_of('/');
827                         if(last_sep != string::npos){
828 fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
829                                 string target_qname = src_tbl.substr(last_sep+1);
830                                 string qpathname = library_path + src_tbl + ".gsql";
831                                 if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
832                                         fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
833                                         exit(1);
834                                         fprintf(stderr,"After exit\n");
835                                 }
836 fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
837 //                      Parse the FTA query
838                                 fta_parse_result = new fta_parse_t();
839                                 FtaParser_setfileinput(fta_in);
840                                 if(FtaParserparse()){
841                                         fprintf(stderr,"FTA parse failed.\n");
842                                         exit(1);
843                                 }
844                                 if(fta_parse_result->parse_type != QUERY_PARSE){
845                                         fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
846                                         exit(1);
847                                 }
848
849                                 map<string, int> local_query_map;
850                                 vector<string> local_query_names;
851                                 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
852                                 for(p=0;p<qlist.size();++p){
853                                 table_exp_t *fta_parse_tree = qlist[p];
854                                         fta_parse_tree->set_visible(false);             // assumed to not produce output
855                                         string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
856                                         if(imputed_qname == target_qname)
857                                                 imputed_qname = src_tbl;
858                                         if(local_query_map.count(imputed_qname)>0){
859                                                 fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
860                                                 exit(1);
861                                         }
862                                         local_query_map[ imputed_qname ] = p;
863                                         local_query_names.push_back(imputed_qname);
864                                 }
865
866                                 if(local_query_map.count(src_tbl)==0){
867                                         fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
868                                         exit(1);
869                                 }
870
871                                 vector<int> worklist;
872                                 set<int> added_queries;
873                                 vector<query_node *> new_qnodes;
874                                 worklist.push_back(local_query_map[target_qname]);
875                                 added_queries.insert(local_query_map[target_qname]);
876                                 int qq;
877                                 int qpos = qnodes.size();
878                                 for(qq=0;qq<worklist.size();++qq){
879                                         int q_id = worklist[qq];
880                                         query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
881                                         new_qnodes.push_back( new_qnode);
882                                         vector<string> refd_tbls =  new_qnode->refd_tbls;
883                                         int ff;
884                                         for(ff = 0;ff<refd_tbls.size();++ff){
885                                                 if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
886
887                                                         if(name_node_map.count(refd_tbls[ff])>0){
888                                                                 fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s,  and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
889                                                                 exit(1);
890                                                         }else{
891                                                                 worklist.push_back(local_query_map[refd_tbls[ff]]);
892                                                         }
893                                                 }
894                                         }
895                                 }
896
897                                 for(qq=0;qq<new_qnodes.size();++qq){
898                                         int qpos = qnodes.size();
899                                         qnodes.push_back(new_qnodes[qq]);
900                                         name_node_map[qnodes[qpos]->name ] = qpos;
901                                         qname_to_flname[qnodes[qpos]->name ] = qpathname;
902                                 }
903                         }
904                 }
905         }
906   }
907
908
909
910
911
912
913
914
915 //---------------------------------------
916
917
918 //              Add the UDOPS.
919
920   string udop_missing_sources;
921   for(i=0;i<qnodes.size();++i){
922         int fi;
923         for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
924                 int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
925                 if(sid >= 0){
926                         if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
927                                 if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
928                                 int pos = qnodes.size();
929                                         qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
930                                         name_node_map[ qnodes[pos]->name ] = pos;
931                                         qnodes[pos]->is_externally_visible = false;   // its visible
932         //                                      Need to mark the source queries as visible.
933                                         int si;
934                                         string missing_sources = "";
935                                         for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
936                                                 string src_tbl = qnodes[pos]->refd_tbls[si];
937                                                 if(name_node_map.count(src_tbl)==0){
938                                                         missing_sources += src_tbl + " ";
939                                                 }
940                                         }
941                                         if(missing_sources != ""){
942                                                 udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
943                                         }
944                                 }
945                         }
946                 }
947         }
948   }
949   if(udop_missing_sources != ""){
950         fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
951         exit(1);
952   }
953
954
955
956 ////////////////////////////////////////////////////////////////////
957 ///                             Check parse trees to verify that some
958 ///                             global properties are met :
959 ///                             if q1 reads from q2, then
960 ///                               q2 is processed before q1
961 ///                               q1 can supply q2's parameters
962 ///                             Verify there is no cycle in the reads-from graph.
963
964 //                      Compute an order in which to process the
965 //                      queries.
966
967 //                      Start by building the reads-from lists.
968 //
969
970   for(i=0;i<qnodes.size();++i){
971         int qi, fi;
972         vector<string> refd_tbls =  qnodes[i]->refd_tbls;
973         for(fi = 0;fi<refd_tbls.size();++fi){
974                 if(name_node_map.count(refd_tbls[fi])>0){
975 //printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
976                         (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
977                 }
978         }
979   }
980
981
982 //              If one query reads the result of another,
983 //              check for parameter compatibility.  Currently it must
984 //              be an exact match.  I will move to requiring
985 //              containment after re-ordering, but will require
986 //              some analysis for code generation which is not
987 //              yet in place.
988 //printf("There are %d query nodes.\n",qnodes.size());
989
990
991   for(i=0;i<qnodes.size();++i){
992         vector<var_pair_t *> target_params  = qnodes[i]->params;
993         for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
994                 vector<var_pair_t *> source_params  = qnodes[(*si)]->params;
995                 if(target_params.size() != source_params.size()){
996                         fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
997                         exit(1);
998                 }
999                 int p;
1000                 for(p=0;p<target_params.size();++p){
1001                         if(! (target_params[p]->name == source_params[p]->name &&
1002                               target_params[p]->val == source_params[p]->val ) ){
1003                         fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1004                                 exit(1);
1005                         }
1006                 }
1007         }
1008   }
1009
1010
1011 //              Find the roots.
1012 //              Start by counting inedges.
1013   for(i=0;i<qnodes.size();++i){
1014         for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1015                 qnodes[(*si)]->n_consumers++;
1016         }
1017   }
1018
1019 //              The roots are the nodes with indegree zero.
1020   set<int> roots;
1021   for(i=0;i<qnodes.size();++i){
1022         if(qnodes[i]->n_consumers == 0){
1023                 if(qnodes[i]->is_externally_visible == false){
1024                         fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible.  Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
1025                 }
1026                 roots.insert(i);
1027         }
1028   }
1029
1030 //              Remove the parts of the subtree that produce no output.
1031   set<int> valid_roots;
1032   set<int> discarded_nodes;
1033   set<int> candidates;
1034   while(roots.size() >0){
1035         for(si=roots.begin();si!=roots.end();++si){
1036                 if(qnodes[(*si)]->is_externally_visible){
1037                         valid_roots.insert((*si));
1038                 }else{
1039                         discarded_nodes.insert((*si));
1040                         set<int>::iterator sir;
1041                         for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1042                                 qnodes[(*sir)]->n_consumers--;
1043                                 if(qnodes[(*sir)]->n_consumers == 0)
1044                                         candidates.insert( (*sir));
1045                         }
1046                 }
1047         }
1048         roots = candidates;
1049         candidates.clear();
1050   }
1051   roots = valid_roots;
1052   if(discarded_nodes.size()>0){
1053         fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
1054         int di = 0;
1055         for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
1056                 if(di>0 && (di%8)==0) fprintf(stderr,"\n");
1057                 di++;
1058                 fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
1059         }
1060         fprintf(stderr,"\n");
1061   }
1062
1063 //              Compute the sources_to set, ignoring discarded nodes.
1064   for(i=0;i<qnodes.size();++i){
1065         if(discarded_nodes.count(i)==0)
1066                 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1067                         qnodes[(*si)]->sources_to.insert(i);
1068         }
1069   }
1070
1071
1072 //              Find the nodes that are shared by multiple visible subtrees.
1073 //              THe roots become inferred visible nodes.
1074
1075 //              Find the visible nodes.
1076         vector<int> visible_nodes;
1077         for(i=0;i<qnodes.size();i++){
1078                 if(qnodes[i]->is_externally_visible){
1079                         visible_nodes.push_back(i);
1080                 }
1081         }
1082
1083 //              Find UDOPs referenced by visible nodes.
1084   list<int> workq;
1085   for(i=0;i<visible_nodes.size();++i){
1086         workq.push_back(visible_nodes[i]);
1087   }
1088   while(!workq.empty()){
1089         int node = workq.front();
1090         workq.pop_front();
1091         set<int>::iterator children;
1092         if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
1093                 qnodes[node]->is_externally_visible = true;
1094                 visible_nodes.push_back(node);
1095                 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1096                         if(qnodes[(*children)]->is_externally_visible == false){
1097                                 qnodes[(*children)]->is_externally_visible = true;
1098                                 visible_nodes.push_back((*children));
1099                         }
1100                 }
1101         }
1102         for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1103                 workq.push_back((*children));
1104         }
1105   }
1106
1107         bool done = false;
1108         while(!done){
1109 //      reset the nodes
1110                 for(i=0;i<qnodes.size();i++){
1111                         qnodes[i]->subtree_roots.clear();
1112                 }
1113
1114 //              Walk the tree defined by a visible node, not descending into
1115 //              subtrees rooted by a visible node.  Mark the node visited with
1116 //              the visible node ID.
1117                 for(i=0;i<visible_nodes.size();++i){
1118                         set<int> vroots;
1119                         vroots.insert(visible_nodes[i]);
1120                         while(vroots.size()>0){
1121                                 for(si=vroots.begin();si!=vroots.end();++si){
1122                                         qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
1123
1124                                         set<int>::iterator sir;
1125                                         for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1126                                                 if(! qnodes[(*sir)]->is_externally_visible){
1127                                                         candidates.insert( (*sir));
1128                                                 }
1129                                         }
1130                                 }
1131                                 vroots = candidates;
1132                                 candidates.clear();
1133                         }
1134                 }
1135 //              Find the nodes in multiple visible node subtrees, but with no parent
1136 //              that has is in multile visible node subtrees.  Mark these as inferred visible nodes.
1137                 done = true;    // until proven otherwise
1138                 for(i=0;i<qnodes.size();i++){
1139                         if(qnodes[i]->subtree_roots.size()>1){
1140                                 bool is_new_root = true;
1141                                 set<int>::iterator sir;
1142                                 for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
1143                                         if(qnodes[(*sir)]->subtree_roots.size()>1)
1144                                                 is_new_root = false;
1145                                 }
1146                                 if(is_new_root){
1147                                         qnodes[i]->is_externally_visible = true;
1148                                         qnodes[i]->inferred_visible_node = true;
1149                                         visible_nodes.push_back(i);
1150                                         done = false;
1151                                 }
1152                         }
1153                 }
1154         }
1155
1156
1157
1158
1159
1160 //              get visible nodes in topo ordering.
1161 //  for(i=0;i<qnodes.size();i++){
1162 //              qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
1163 //  }
1164   vector<int> process_order;
1165   while(roots.size() >0){
1166         for(si=roots.begin();si!=roots.end();++si){
1167                 if(discarded_nodes.count((*si))==0){
1168                         process_order.push_back( (*si) );
1169                 }
1170                 set<int>::iterator sir;
1171                 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1172                         qnodes[(*sir)]->n_consumers--;
1173                         if(qnodes[(*sir)]->n_consumers == 0)
1174                                 candidates.insert( (*sir));
1175                 }
1176         }
1177         roots = candidates;
1178         candidates.clear();
1179   }
1180
1181
1182 //printf("process_order.size() =%d\n",process_order.size());
1183
1184 //              Search for cyclic dependencies
1185   string found_dep;
1186   for(i=0;i<qnodes.size();++i){
1187         if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
1188                 if(found_dep.size() != 0) found_dep += ", ";
1189                 found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
1190         }
1191   }
1192   if(found_dep.size()>0){
1193         fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
1194         exit(1);
1195   }
1196
1197 //              Get a list of query sets, in the order to be processed.
1198 //              Start at visible root and do bfs.
1199 //              The query set includes queries referenced indirectly,
1200 //              as sources for user-defined operators.  These are needed
1201 //              to ensure that they are added to the schema, but are not part
1202 //              of the query tree.
1203
1204 //              stream_node_sets contains queries reachable only through the
1205 //              FROM clause, so I can tell which queries to add to the stream
1206 //              query. (DISABLED, UDOPS are integrated, does this cause problems?)
1207
1208 //                      NOTE: this code works because in order for data to be
1209 //                      read by multiple hftas, the node must be externally visible.
1210 //                      But visible nodes define roots of process sets.
1211 //                      internally visible nodes can feed data only
1212 //                      to other nodes in the same query file.
1213 //                      Therefore, any access can be restricted to a file,
1214 //                      hfta output sharing is done only on roots
1215 //                      never on interior nodes.
1216
1217
1218
1219
1220 //              Conpute the base collection of hftas.
1221   vector<hfta_node *> hfta_sets;
1222   map<string, int> hfta_name_map;
1223 //  vector< vector<int> > process_sets;
1224 //  vector< set<int> > stream_node_sets;
1225   reverse(process_order.begin(), process_order.end());  // get listing in reverse order.
1226                                                                                                                 // i.e. process leaves 1st.
1227   for(i=0;i<process_order.size();++i){
1228         if(qnodes[process_order[i]]->is_externally_visible == true){
1229 //printf("Visible.\n");
1230                 int root = process_order[i];
1231                 hfta_node *hnode = new hfta_node();
1232                 hnode->name = qnodes[root]-> name;
1233                 hnode->source_name = qnodes[root]-> name;
1234                 hnode->is_udop = qnodes[root]->is_udop;
1235                 hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
1236
1237                 vector<int> proc_list;  proc_list.push_back(root);
1238 //                      Ensure that nodes are added only once.
1239                 set<int> proc_set;      proc_set.insert(root);
1240                 roots.clear();                  roots.insert(root);
1241                 candidates.clear();
1242                 while(roots.size()>0){
1243                         for(si=roots.begin();si!=roots.end();++si){
1244 //printf("Processing root %d\n",(*si));
1245                                 set<int>::iterator sir;
1246                                 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1247 //printf("reads fom %d\n",(*sir));
1248                                         if(qnodes[(*sir)]->is_externally_visible==false){
1249                                                 candidates.insert( (*sir) );
1250                                                 if(proc_set.count( (*sir) )==0){
1251                                                         proc_set.insert( (*sir) );
1252                                                         proc_list.push_back( (*sir) );
1253                                                 }
1254                                         }
1255                                 }
1256                         }
1257                         roots = candidates;
1258                         candidates.clear();
1259                 }
1260
1261                 reverse(proc_list.begin(), proc_list.end());
1262                 hnode->query_node_indices = proc_list;
1263                 hfta_name_map[hnode->name] = hfta_sets.size();
1264                 hfta_sets.push_back(hnode);
1265         }
1266   }
1267
1268 //              Compute the reads_from / sources_to graphs for the hftas.
1269
1270   for(i=0;i<hfta_sets.size();++i){
1271         hfta_node *hnode = hfta_sets[i];
1272         for(q=0;q<hnode->query_node_indices.size();q++){
1273                 query_node *qnode = qnodes[hnode->query_node_indices[q]];
1274                 for(s=0;s<qnode->refd_tbls.size();++s){
1275                         if(hfta_name_map.count(qnode->refd_tbls[s])){
1276                                 int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
1277                                 hnode->reads_from.insert(other_hfta);
1278                                 hfta_sets[other_hfta]->sources_to.insert(i);
1279                         }
1280                 }
1281         }
1282   }
1283
1284 //              Compute a topological sort of the hfta_sets.
1285
1286   vector<int> hfta_topsort;
1287   workq.clear();
1288   int hnode_srcs[hfta_sets.size()];
1289   for(i=0;i<hfta_sets.size();++i){
1290         hnode_srcs[i] = 0;
1291         if(hfta_sets[i]->sources_to.size() == 0)
1292                 workq.push_back(i);
1293   }
1294
1295   while(! workq.empty()){
1296         int     node = workq.front();
1297         workq.pop_front();
1298         hfta_topsort.push_back(node);
1299         set<int>::iterator stsi;
1300         for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
1301                 int parent = (*stsi);
1302                 hnode_srcs[parent]++;
1303                 if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
1304                         workq.push_back(parent);
1305                 }
1306         }
1307   }
1308
1309 //              Decorate hfta nodes with the level of parallelism given as input.
1310
1311   map<string, int>::iterator msii;
1312   for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1313         string hfta_name = (*msii).first;
1314         int par = (*msii).second;
1315         if(hfta_name_map.count(hfta_name) > 0){
1316                 hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
1317         }else{
1318                 fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
1319         }
1320   }
1321
1322 //              Propagate levels of parallelism: children should have a level of parallelism
1323 //              as large as any of its parents.  Adjust children upwards to compensate.
1324 //              Start at parents and adjust children, auto-propagation will occur.
1325
1326   for(i=hfta_sets.size()-1;i>=0;i--){
1327         set<int>::iterator stsi;
1328         for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
1329                 if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
1330                         hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
1331                 }
1332         }
1333   }
1334
1335 //              Before all the name mangling, check if therey are any output_spec.cfg
1336 //              or hfta_parallelism.cfg entries that do not have a matching query.
1337
1338         string dangling_ospecs = "";
1339         for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
1340                 string oq = (*msii).first;
1341                 if(hfta_name_map.count(oq) == 0){
1342                         dangling_ospecs += " "+(*msii).first;
1343                 }
1344         }
1345         if(dangling_ospecs!=""){
1346                 fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
1347         }
1348
1349         string dangling_par = "";
1350         for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1351                 string oq = (*msii).first;
1352                 if(hfta_name_map.count(oq) == 0){
1353                         dangling_par += " "+(*msii).first;
1354                 }
1355         }
1356         if(dangling_par!=""){
1357                 fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
1358         }
1359
1360
1361
1362 //              Replicate parallelized hftas.  Do __copyX name mangling.  Adjust
1363 //              FROM clauses: retarget any name which is an internal node, and
1364 //              any which is in hfta_sets (and which is parallelized). Add Merge nodes
1365 //              when the source hfta has more parallelism than the target node.
1366 //              Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
1367
1368
1369   int n_original_hfta_sets = hfta_sets.size();
1370   for(i=0;i<n_original_hfta_sets;++i){
1371         if(hfta_sets[i]->n_parallel > 1){
1372                 hfta_sets[i]->do_generation =false;     // set the deletion flag for this entry.
1373                 set<string> local_nodes;                // names of query nodes in the hfta.
1374                 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1375                         local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
1376                 }
1377
1378                 for(p=0;p<hfta_sets[i]->n_parallel;++p){
1379                         string mangler = "__copy"+int_to_string(p);
1380                         hfta_node *par_hfta  = new hfta_node();
1381                         par_hfta->name = hfta_sets[i]->name + mangler;
1382                         par_hfta->source_name = hfta_sets[i]->name;
1383                         par_hfta->is_udop = hfta_sets[i]->is_udop;
1384                         par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
1385                         par_hfta->n_parallel = hfta_sets[i]->n_parallel;
1386                         par_hfta->parallel_idx = p;
1387
1388                         map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
1389
1390 //      Is it a UDOP?
1391                         if(hfta_sets[i]->is_udop){
1392                                 int root = hfta_sets[i]->query_node_indices[0];
1393
1394                                 string unequal_par_sources;
1395                                 set<int>::iterator rfsii;
1396                                 for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
1397                                         if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
1398                                                 unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
1399                                         }
1400                                 }
1401                                 if(unequal_par_sources != ""){
1402                                         fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
1403                                         exit(1);
1404                                 }
1405
1406                                 int rti;
1407                                 vector<string> new_sources;
1408                                 for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
1409                                         new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
1410                                 }
1411
1412                                 query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
1413                                 new_qn->name += mangler;
1414                                 new_qn->mangler = mangler;
1415                                 new_qn->refd_tbls = new_sources;
1416                                 par_hfta->query_node_indices.push_back(qnodes.size());
1417                                 par_qnode_map[new_qn->name] = qnodes.size();
1418                                 name_node_map[ new_qn->name ] = qnodes.size();
1419                                 qnodes.push_back(new_qn);
1420                         }else{
1421 //              regular query node
1422                           for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1423                                 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1424                                 table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
1425 //                                      rehome the from clause on mangled names.
1426 //                                      create merge nodes as needed for external sources.
1427                                 for(f=0;f<dup_pt->fm->tlist.size();++f){
1428                                         if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
1429                                                 dup_pt->fm->tlist[f]->schema_name += mangler;
1430                                         }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
1431 //                      Ref's an external HFTA.  No parallelism => leave as is.  Else, if level of parallelism of the two hftas is the same, rename by the mangler.  Else, there mnust be more sources, so create a merge node.
1432                                                 int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
1433                                                 if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
1434                                                         dup_pt->fm->tlist[f]->schema_name += mangler;
1435                                                 }else{
1436                                                         vector<string> src_tbls;
1437                                                         int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
1438                                                         if(stride == 0){
1439                                                                 fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
1440                                                                 exit(1);
1441                                                         }
1442                                                         for(s=0;s<stride;++s){
1443                                                                 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
1444                                                                 src_tbls.push_back(ext_src_name);
1445                                                         }
1446                                                         table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1447                                                         string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
1448                                                         dup_pt->fm->tlist[f]->schema_name = merge_node_name;
1449 //                                      Make a qnode to represent the new merge node
1450                                                         query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1451                                                         qn_pt->refd_tbls = src_tbls;
1452                                                         qn_pt->is_udop  = false;
1453                                                         qn_pt->is_externally_visible = false;
1454                                                         qn_pt->inferred_visible_node  = false;
1455                                                         par_hfta->query_node_indices.push_back(qnodes.size());
1456                                                         par_qnode_map[merge_node_name] = qnodes.size();
1457                                                         name_node_map[ merge_node_name ] = qnodes.size();
1458                                                         qnodes.push_back(qn_pt);
1459                                                 }
1460                                         }
1461                                 }
1462                                 query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
1463                                 for(f=0;f<dup_pt->fm->tlist.size();++f){
1464                                         new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
1465                                 }
1466                                 new_qn->params = qnodes[hqn_idx]->params;
1467                                 new_qn->is_udop = false;
1468                                 new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
1469                                 new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
1470                                 par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
1471                                 par_qnode_map[new_qn->name] = qnodes.size();
1472                                 name_node_map[ new_qn->name ] = qnodes.size();
1473                                 qnodes.push_back(new_qn);
1474                           }
1475                         }
1476                         hfta_name_map[par_hfta->name] = hfta_sets.size();
1477                         hfta_sets.push_back(par_hfta);
1478                 }
1479         }else{
1480 //              This hfta isn't being parallelized, but add merge nodes for any parallelized
1481 //              hfta sources.
1482                 if(!hfta_sets[i]->is_udop){
1483                   for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1484                         int hqn_idx = hfta_sets[i]->query_node_indices[h];
1485                         for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
1486                                 if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
1487 //                      Ref's an external HFTA.  No parallelism => leave as is.  Else, if level of parallelism of the two hftas is the same, rename by the mangler.  Else, there mnust be more sources, so create a merge node.
1488                                         int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
1489                                         if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
1490                                                 vector<string> src_tbls;
1491                                                 for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
1492                                                         string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
1493                                                         src_tbls.push_back(ext_src_name);
1494                                                 }
1495                                                 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1496                                                 string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
1497                                                 qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
1498 //                                      Make a qnode to represent the new merge node
1499                                                 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1500                                                 qn_pt->refd_tbls = src_tbls;
1501                                                 qn_pt->is_udop  = false;
1502                                                 qn_pt->is_externally_visible = false;
1503                                                 qn_pt->inferred_visible_node  = false;
1504                                                 hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
1505                                                 name_node_map[ merge_node_name ] = qnodes.size();
1506                                                 qnodes.push_back(qn_pt);
1507                                         }
1508                                 }
1509                         }
1510                 }
1511           }
1512         }
1513   }
1514
1515 //                      Rebuild the reads_from / sources_to lists in the qnodes
1516   for(q=0;q<qnodes.size();++q){
1517         qnodes[q]->reads_from.clear();
1518         qnodes[q]->sources_to.clear();
1519   }
1520   for(q=0;q<qnodes.size();++q){
1521         for(s=0;s<qnodes[q]->refd_tbls.size();++s){
1522                 if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
1523                         int rf = name_node_map[qnodes[q]->refd_tbls[s]];
1524                         qnodes[q]->reads_from.insert(rf);
1525                         qnodes[rf]->sources_to.insert(q);
1526                 }
1527         }
1528   }
1529
1530 //                      Rebuild the reads_from / sources_to lists in hfta_sets
1531   for(q=0;q<hfta_sets.size();++q){
1532         hfta_sets[q]->reads_from.clear();
1533         hfta_sets[q]->sources_to.clear();
1534   }
1535   for(q=0;q<hfta_sets.size();++q){
1536         for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
1537                 int node = hfta_sets[q]->query_node_indices[s];
1538                 set<int>::iterator rfsii;
1539                 for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
1540                         if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
1541                                 hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
1542                                 hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
1543                         }
1544                 }
1545         }
1546   }
1547
1548 /*
1549 for(q=0;q<qnodes.size();++q){
1550  printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
1551  set<int>::iterator rsii;
1552  for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
1553   printf(" %d",(*rsii));
1554   printf(", and sources-to %d:",qnodes[q]->sources_to.size());
1555  for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
1556   printf(" %d",(*rsii));
1557  printf("\n");
1558 }
1559
1560 for(q=0;q<hfta_sets.size();++q){
1561  if(hfta_sets[q]->do_generation==false)
1562         continue;
1563  printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
1564  set<int>::iterator rsii;
1565  for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
1566   printf(" %d",(*rsii));
1567   printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
1568  for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
1569   printf(" %d",(*rsii));
1570  printf("\n");
1571 }
1572 */
1573
1574
1575
1576 //              Re-topo sort the hftas
1577   hfta_topsort.clear();
1578   workq.clear();
1579   int hnode_srcs_2[hfta_sets.size()];
1580   for(i=0;i<hfta_sets.size();++i){
1581         hnode_srcs_2[i] = 0;
1582         if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
1583                 workq.push_back(i);
1584         }
1585   }
1586
1587   while(workq.empty() == false){
1588         int     node = workq.front();
1589         workq.pop_front();
1590         hfta_topsort.push_back(node);
1591         set<int>::iterator stsii;
1592         for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
1593                 int child = (*stsii);
1594                 hnode_srcs_2[child]++;
1595                 if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
1596                         workq.push_back(child);
1597                 }
1598         }
1599   }
1600
1601 //              Ensure that all of the query_node_indices in hfta_sets are topologically
1602 //              sorted, don't rely on assumptions that all transforms maintain some kind of order.
1603   for(i=0;i<hfta_sets.size();++i){
1604         if(hfta_sets[i]->do_generation){
1605                 map<int,int> n_accounted;
1606                 vector<int> new_order;
1607                 workq.clear();
1608                 vector<int>::iterator vii;
1609                 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1610                         n_accounted[(*vii)]= 0;
1611                 }
1612                 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1613                         set<int>::iterator rfsii;
1614                         for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
1615                                 if(n_accounted.count((*rfsii)) == 0){
1616                                         n_accounted[(*vii)]++;
1617                                 }
1618                         }
1619                         if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
1620                                 workq.push_back((*vii));
1621                         }
1622                 }
1623
1624                 while(workq.empty() == false){
1625                         int node = workq.front();
1626                         workq.pop_front();
1627                         new_order.push_back(node);
1628                         set<int>::iterator stsii;
1629                         for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
1630                                 if(n_accounted.count((*stsii))){
1631                                         n_accounted[(*stsii)]++;
1632                                         if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
1633                                                 workq.push_back((*stsii));
1634                                         }
1635                                 }
1636                         }
1637                 }
1638                 hfta_sets[i]->query_node_indices = new_order;
1639         }
1640   }
1641
1642
1643
1644
1645
1646 ///                     Global checkng is done, start the analysis and translation
1647 ///                     of the query parse tree in the order specified by process_order
1648
1649
1650 //                      Get a list of the LFTAs for global lfta optimization
1651 //                              TODO: separate building operators from spliting lftas,
1652 //                                      that will make optimizations such as predicate pushing easier.
1653         vector<stream_query *> lfta_list;
1654         stream_query *rootq;
1655     int qi,qj;
1656
1657         map<string, map<int, string> > schema_of_schemaid; // ensure that schema IDs are unique to an <interface, schema>
1658
1659         for(qi=hfta_topsort.size()-1;qi>=0;--qi){
1660
1661         int hfta_id = hfta_topsort[qi];
1662     vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
1663
1664
1665
1666 //              Two possibilities, either its a UDOP, or its a collection of queries.
1667 //      if(qnodes[curr_list.back()]->is_udop)
1668         if(hfta_sets[hfta_id]->is_udop){
1669                 int node_id = curr_list.back();
1670                 int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
1671                 opview_entry *opv = new opview_entry();
1672
1673 //                      Many of the UDOP properties aren't currently used.
1674                 opv->parent_qname = "no_parent";
1675                 opv->root_name = qnodes[node_id]->name;
1676                 opv->view_name = qnodes[node_id]->file;
1677                 opv->pos = qi;
1678                 sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
1679                 opv->udop_alias = tmpstr;
1680                 opv->mangler = qnodes[node_id]->mangler;
1681
1682                 if(opv->mangler != ""){
1683                         int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
1684                         Schema->mangle_subq_names(new_udop_schref,opv->mangler);
1685                 }
1686
1687 //                      This piece of code makes each hfta which referes to the same udop
1688 //                      reference a distinct running udop.  Do this at query optimization time?
1689 //              fmtbl->set_udop_alias(opv->udop_alias);
1690
1691                 opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
1692                 opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
1693
1694                 vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
1695                 int s,f,q;
1696                 for(s=0;s<subq.size();++s){
1697 //                              Validate that the fields match.
1698                         subquery_spec *sqs = subq[s];
1699                         string subq_name = sqs->name + opv->mangler;
1700                         vector<field_entry *> flds = Schema->get_fields(subq_name);
1701                         if(flds.size() == 0){
1702                                 fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
1703                                 return(1);
1704                         }
1705                         if(flds.size() < sqs->types.size()){
1706                                 fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
1707                                 return(1);
1708                         }
1709                         bool failed = false;
1710                         for(f=0;f<sqs->types.size();++f){
1711                                 data_type dte(sqs->types[f],sqs->modifiers[f]);
1712                                 data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
1713                                 if(! dte.subsumes_type(&dtf) ){
1714                                         fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
1715                                         failed = true;
1716                                 }
1717 /*
1718                                 if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
1719                                         string pstr = dte.get_temporal_string();
1720                                         fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
1721                                         failed = true;
1722                                 }
1723 */
1724                         }
1725                         if(failed)
1726                                 return(1);
1727 ///                             Validation done, find the subquery, make a copy of the
1728 ///                             parse tree, and add it to the return list.
1729                         for(q=0;q<qnodes.size();++q)
1730                                 if(qnodes[q]->name == subq_name)
1731                                         break;
1732                         if(q==qnodes.size()){
1733                                 fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
1734                                 return(1);
1735                         }
1736
1737                 }
1738
1739 //                      Cross-link to from entry(s) in all sourced-to tables.
1740                 set<int>::iterator sii;
1741                 for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
1742 //printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
1743                         vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
1744                         int ii;
1745                         for(ii=0;ii<tblvars.size();++ii){
1746                                 if(tblvars[ii]->schema_name == opv->root_name){
1747                                         tblvars[ii]->set_opview_idx(opviews.size());
1748                                 }
1749
1750                         }
1751                 }
1752
1753                 opviews.append(opv);
1754         }else{
1755
1756 //                      Analyze the parse trees in this query,
1757 //                      put them in rootq
1758 //      vector<int> curr_list = process_sets[qi];
1759
1760
1761 ////////////////////////////////////////
1762
1763           rootq = NULL;
1764 //printf("Process set %d, has %d queries\n",qi,curr_list.size());
1765           for(qj=0;qj<curr_list.size();++qj){
1766                 i = curr_list[qj];
1767         fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
1768
1769 //                      Select the current query parse tree
1770                 table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
1771
1772 //                      if hfta only, try to fetch any missing schemas
1773 //                      from the registry (using the print_schema program).
1774 //                      Here I use a hack to avoid analyzing the query -- all referenced
1775 //                      tables must be in the from clause
1776 //                      If there is a problem loading any table, just issue a warning,
1777 //
1778                 tablevar_list_t *fm = fta_parse_tree->get_from();
1779                 vector<string> refd_tbls =  fm->get_src_tbls(Schema);
1780 //                      iterate over all referenced tables
1781                 int t;
1782                 for(t=0;t<refd_tbls.size();++t){
1783                   int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
1784
1785                   if(tbl_ref < 0){      // if this table is not in the Schema
1786
1787                         if(hfta_only){
1788                                 string cmd="print_schema "+refd_tbls[t];
1789                                 FILE *schema_in = popen(cmd.c_str(), "r");
1790                                 if(schema_in == NULL){
1791                                   fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
1792                                 }else{
1793                                   string schema_instr;
1794                                   while(fgets(tmpstr,TMPSTRLEN,schema_in)){
1795                                         schema_instr += tmpstr;
1796                                   }
1797                           fta_parse_result = new fta_parse_t();
1798                                   strcpy(tmp_schema_str,schema_instr.c_str());
1799                                   FtaParser_setstringinput(tmp_schema_str);
1800                           if(FtaParserparse()){
1801                                 fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
1802                           }else{
1803                                         if( fta_parse_result->tables != NULL){
1804                                                 int tl;
1805                                                 for(tl=0;tl<fta_parse_result->tables->size();++tl){
1806                                                         Schema->add_table(fta_parse_result->tables->get_table(tl));
1807                                                 }
1808                                         }else{
1809                                                 fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
1810                                         }
1811                                 }
1812                         }
1813                   }else{
1814                                 fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
1815                                 exit(1);
1816                   }
1817
1818                 }
1819           }
1820
1821
1822 //                              Analyze the query.
1823           query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
1824           if(qs == NULL){
1825                 fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
1826                 exit(1);
1827           }
1828
1829           stream_query new_sq(qs, Schema);
1830           if(new_sq.error_code){
1831                         fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
1832                         exit(1);
1833           }
1834
1835 //                      Add it to the Schema
1836           table_def *output_td = new_sq.get_output_tabledef();
1837           Schema->add_table(output_td);
1838
1839 //                      Create a query plan from the analyzed parse tree.
1840 //                      If its a query referneced via FROM, add it to the stream query.
1841           if(rootq){
1842                 rootq->add_query(new_sq);
1843           }else{
1844                 rootq = new stream_query(new_sq);
1845 //                      have the stream query object inherit properties form the analyzed
1846 //                      hfta_node object.
1847                 rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
1848                 rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
1849           }
1850
1851
1852     }
1853
1854 //              This stream query has all its parts
1855 //              Build and optimize it.
1856 //printf("translate_fta: generating plan.\n");
1857         if(rootq->generate_plan(Schema)){
1858                 fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
1859                 continue;
1860         }
1861
1862 //      If we've found the query plan head, so now add the output operators
1863         if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
1864                 pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
1865                 multimap<string, int>::iterator mmsi;
1866                 oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
1867                 for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
1868                         rootq->add_output_operator(output_specs[(*mmsi).second]);
1869                 }
1870         }
1871
1872
1873
1874 //                              Perform query splitting if necessary.
1875         bool hfta_returned;
1876     vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
1877
1878         int l;
1879 //for(l=0;l<split_queries.size();++l){
1880 //printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
1881 //}
1882
1883
1884
1885
1886     if(split_queries.size() > 0){       // should be at least one component.
1887
1888 //                              Compute the number of LFTAs.
1889           int n_lfta = split_queries.size();
1890           if(hfta_returned) n_lfta--;
1891 //                              Check if a schemaId constraint needs to be inserted.
1892
1893 //                              Process the LFTA components.
1894           for(l=0;l<n_lfta;++l){
1895            if(lfta_names.count(split_queries[l]->query_name) == 0){
1896 //                              Grab the lfta for global optimization.
1897                 vector<tablevar_t *> tvec =  split_queries[l]->query_plan[0]->get_input_tbls();
1898                 string liface = tvec[0]->get_interface();       // iface queries have been resolved
1899                 string lmach = tvec[0]->get_machine();
1900                 string schema_name = tvec[0]->get_schema_name();
1901                 int schema_ref = tvec[0]->get_schema_ref();
1902                 if (lmach == "")
1903                         lmach = hostname;
1904                 interface_names.push_back(liface);
1905                 machine_names.push_back(lmach);
1906 //printf("Machine is %s\n",lmach.c_str());
1907
1908 //                              Check if a schemaId constraint needs to be inserted.
1909                 if(schema_ref<0){ // can result from some kinds of splits
1910                         schema_ref = Schema->get_table_ref(schema_name);
1911                 }
1912                 int schema_id = Schema->get_schema_id(schema_ref);  // id associated with PROTOCOL
1913                 int errnum = 0;
1914                 string if_error;
1915                 iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
1916                 if(iface==NULL){
1917                         fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
1918                         exit(1);
1919                 }       
1920                 if(iface->has_multiple_schemas()){
1921                         if(schema_id<0){        // invalid schema_id
1922                                 fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
1923                                 exit(1);
1924                         }
1925                         vector<string> iface_schemas = iface->get_property("Schemas");
1926                         if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1927                                 fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1928                                 exit(1);
1929                         }
1930 // Ensure that in liface, schema_id is used for only one schema
1931                         if(schema_of_schemaid.count(liface)==0){
1932                                 map<int, string> empty_map;
1933                                 schema_of_schemaid[liface] = empty_map;
1934                         }
1935                         if(schema_of_schemaid[liface].count(schema_id)==0){
1936                                 schema_of_schemaid[liface][schema_id] = schema_name;
1937                         }else{
1938                                 if(schema_of_schemaid[liface][schema_id] != schema_name){
1939                                         fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
1940                                         exit(1);
1941                                 }
1942                         }
1943                 }else{  // single-schema interface
1944                         schema_id = -1; // don't generate schema_id predicate
1945                         vector<string> iface_schemas = iface->get_property("Schemas");
1946                         if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1947                                 fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1948                                 exit(1);
1949                         }
1950                         if(iface_schemas.size()>1){
1951                                 fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
1952                                 exit(1);
1953                         }
1954                 }                       
1955
1956 // If we need to check the schema_id, insert a predicate into the lfta.
1957 //       TODO not just schema_id, the full all_schema_ids set.
1958                 if(schema_id>=0){
1959                         colref_t *schid_cr = new colref_t("schemaId");
1960                         schid_cr->schema_ref = schema_ref;
1961                         schid_cr->tablevar_ref = 0;
1962                         scalarexp_t *schid_se = new scalarexp_t(schid_cr);
1963                         data_type *schid_dt = new data_type("uint");
1964                         schid_se->dt = schid_dt;
1965
1966                         string schid_str = int_to_string(schema_id);
1967                         literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
1968                         scalarexp_t *lit_se = new scalarexp_t(schid_lit);
1969                         lit_se->dt = schid_dt;
1970
1971                         predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
1972                         vector<cnf_elem *> clist;
1973                         make_cnf_from_pr(schid_pr, clist);
1974                         analyze_cnf(clist[0]);
1975                         clist[0]->cost = 1;     // cheap one comparison
1976 // cnf built, now insert it.
1977                         split_queries[l]->query_plan[0]->append_to_where(clist[0]);
1978                 }
1979                         
1980
1981
1982
1983
1984
1985
1986 //                      Set the ht size from the recommendation, if there is one in the rec file
1987                 if(lfta_htsize.count(split_queries[l]->query_name)>0){
1988                         split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
1989                 }
1990
1991
1992                 lfta_names[split_queries[l]->query_name] = lfta_list.size();
1993                 split_queries[l]->set_gid(lfta_list.size());  // set lfta global id
1994                 lfta_list.push_back(split_queries[l]);
1995                 lfta_mach_lists[lmach].push_back(split_queries[l]);
1996
1997 //                      THe following is a hack,
1998 //                      as I should be generating LFTA code through
1999 //                      the stream_query object.
2000                 split_queries[l]->query_plan[0]->bind_to_schema(Schema);
2001 //              split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
2002
2003 /*
2004 //                              Create query description to embed in lfta.c
2005                 string lfta_schema_str = split_queries[l]->make_schema();
2006                 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2007
2008 //                              get NIC capabilities.
2009                 int erri;
2010                 nic_property *nicprop = NULL;
2011                 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2012                 if(iface_codegen_type.size()){
2013                         nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2014                         if(!nicprop){
2015                                 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2016                                         exit(1);
2017                         }
2018                 }
2019
2020                 lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
2021 */
2022
2023                 snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
2024                 query_names.push_back(split_queries[l]->query_name);
2025                 mach_query_names[lmach].push_back(query_names.size()-1);
2026 //                      NOTE: I will assume a 1-1 correspondance between
2027 //                      mach_query_names[lmach] and lfta_mach_lists[lmach]
2028 //                      where mach_query_names[lmach][i] contains the index into
2029 //                      query_names, which names the lfta, and
2030 //                      mach_query_names[lmach][i] is the stream_query * of the
2031 //                      corresponding lfta.
2032 //                      Later, lfta_iface_qnames are the query names matching lfta_iface_lists
2033
2034
2035
2036                 // check if lfta is reusable
2037                 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
2038
2039                 bool lfta_reusable = false;
2040                 if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
2041                         split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
2042                         lfta_reusable = true;
2043                 }
2044                 lfta_reuse_options.push_back(lfta_reusable);
2045
2046                 // LFTA will inherit the liveness timeout specification from the containing query
2047                 // it is too conservative as lfta are expected to spend less time per tuple
2048                 // then full query
2049
2050                 // extract liveness timeout from query definition
2051                 int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
2052                 if (!liveness_timeout) {
2053 //                  fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
2054 //                    split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2055                     liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2056                 }
2057                 lfta_liveness_timeouts.push_back(liveness_timeout);
2058
2059 //                      Add it to the schema
2060                 table_def *td = split_queries[l]->get_output_tabledef();
2061                 Schema->append_table(td);
2062 //printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
2063
2064           }
2065          }
2066
2067 //                              If the output is lfta-only, dump out the query name.
2068       if(split_queries.size() == 1 && !hfta_returned){
2069         if(output_query_names ){
2070            fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
2071                 }
2072 /*
2073 else{
2074            fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
2075                 }
2076 */
2077
2078 /*
2079 //                              output schema summary
2080                 if(output_schema_summary){
2081                         dump_summary(split_queries[0]);
2082                 }
2083 */
2084       }
2085
2086
2087           if(hfta_returned){            // query also has an HFTA component
2088                 int hfta_nbr = split_queries.size()-1;
2089
2090                         hfta_list.push_back(split_queries[hfta_nbr]);
2091
2092 //                                      report on generated query names
2093         if(output_query_names){
2094                         string hfta_name =split_queries[hfta_nbr]->query_name;
2095                 fprintf(query_name_output,"%s H\n",hfta_name.c_str());
2096                         for(l=0;l<hfta_nbr;++l){
2097                                 string lfta_name =split_queries[l]->query_name;
2098                         fprintf(query_name_output,"%s L\n",lfta_name.c_str());
2099                         }
2100                 }
2101 //              else{
2102 //              fprintf(stderr,"query names are ");
2103 //                      for(l=0;l<hfta_nbr;++l){
2104 //                              if(l>0) fprintf(stderr,",");
2105 //                              string fta_name =split_queries[l]->query_name;
2106 //                      fprintf(stderr," %s",fta_name.c_str());
2107 //                      }
2108 //                      fprintf(stderr,"\n");
2109 //              }
2110           }
2111
2112   }else{
2113           fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
2114           fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
2115           exit(1);
2116   }
2117  }
2118 }
2119
2120
2121 //-----------------------------------------------------------------
2122 //              Compute and propagate the SE in PROTOCOL fields compute a field.
2123 //-----------------------------------------------------------------
2124
2125 for(i=0;i<lfta_list.size();i++){
2126         lfta_list[i]->generate_protocol_se(sq_map, Schema);
2127         sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
2128 }
2129 for(i=0;i<hfta_list.size();i++){
2130         hfta_list[i]->generate_protocol_se(sq_map, Schema);
2131         sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
2132 }
2133
2134
2135
2136 //------------------------------------------------------------------------
2137 //              Perform  individual FTA optimizations
2138 //-----------------------------------------------------------------------
2139
2140 if (partitioned_mode) {
2141
2142         // open partition definition file
2143         string part_fname = config_dir_path + "partition.txt";
2144
2145         FILE* partfd = fopen(part_fname.c_str(), "r");
2146         if (!partfd) {
2147                 fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
2148                 exit(1);
2149         }
2150         PartnParser_setfileinput(partfd);
2151         if (PartnParserparse()) {
2152                 fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
2153                 exit(1);
2154         }
2155         fclose(partfd);
2156 }
2157
2158
2159 print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
2160
2161 int num_hfta = hfta_list.size();
2162 for(i=0; i < hfta_list.size(); ++i){
2163         hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
2164 }
2165
2166 //                      Add all new hftas to schema
2167 for(i=num_hfta; i < hfta_list.size(); ++i){
2168                 table_def *td = hfta_list[i]->get_output_tabledef();
2169                 Schema->append_table(td);
2170 }
2171
2172 print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
2173
2174
2175
2176 //------------------------------------------------------------------------
2177 //              Do global (cross-fta) optimization
2178 //-----------------------------------------------------------------------
2179
2180
2181
2182
2183
2184
2185 set<string> extra_external_libs;
2186
2187 for(i=0;i<hfta_list.size();++i){                // query also has an HFTA component
2188
2189                 if(! debug_only){
2190 //                                      build hfta file name, create output
2191            if(numeric_hfta_flname){
2192             sprintf(tmpstr,"hfta_%d",hfta_count);
2193                         hfta_names.push_back(tmpstr);
2194             sprintf(tmpstr,"hfta_%d.cc",hfta_count);
2195          }else{
2196             sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
2197                         hfta_names.push_back(tmpstr);
2198             sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
2199           }
2200                   FILE *hfta_fl = fopen(tmpstr,"w");
2201                   if(hfta_fl == NULL){
2202                         fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
2203                         exit(1);
2204                   }
2205                   fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
2206
2207 //                      If there is a field verifier, warn about
2208 //                      lack of compatability
2209 //                              NOTE : this code assumes that visible non-lfta queries
2210 //                              are those at the root of a stream query.
2211                   string hfta_comment;
2212                   string hfta_title;
2213                   string hfta_namespace;
2214                   if(hfta_list[i]->defines.count("comment")>0)
2215                         hfta_comment = hfta_list[i]->defines["comment"];
2216                   if(hfta_list[i]->defines.count("Comment")>0)
2217                         hfta_comment = hfta_list[i]->defines["Comment"];
2218                   if(hfta_list[i]->defines.count("COMMENT")>0)
2219                         hfta_comment = hfta_list[i]->defines["COMMENT"];
2220                   if(hfta_list[i]->defines.count("title")>0)
2221                         hfta_title = hfta_list[i]->defines["title"];
2222                   if(hfta_list[i]->defines.count("Title")>0)
2223                         hfta_title = hfta_list[i]->defines["Title"];
2224                   if(hfta_list[i]->defines.count("TITLE")>0)
2225                         hfta_title = hfta_list[i]->defines["TITLE"];
2226                   if(hfta_list[i]->defines.count("namespace")>0)
2227                         hfta_namespace = hfta_list[i]->defines["namespace"];
2228                   if(hfta_list[i]->defines.count("Namespace")>0)
2229                         hfta_namespace = hfta_list[i]->defines["Namespace"];
2230                   if(hfta_list[i]->defines.count("NAMESPACE")>0)
2231                         hfta_namespace = hfta_list[i]->defines["NAMESPACE"];
2232
2233                   if(field_verifier != NULL){
2234                         string warning_str;
2235                         if(hfta_comment == "")
2236                                 warning_str += "\tcomment not found.\n";
2237
2238 // Obsolete stuff that Carsten wanted
2239 //                      if(hfta_title == "")
2240 //                              warning_str += "\ttitle not found.\n";
2241 //                      if(hfta_namespace == "")
2242 //                              warning_str += "\tnamespace not found.\n";
2243
2244 // STOPPED HERE
2245 //      There is a get_tbl_keys method implemented for qp_nodes,
2246 //      integrate it into steam_query, then call it to find keys,
2247 //      and annotate feidls with their key-ness.
2248 //      If there is a "keys" proprty in the defines block, override anything returned
2249 //      from the automated analysis
2250
2251                         vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2252                         int fi;
2253                         for(fi=0;fi<flds.size();fi++){
2254                                 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2255                         }
2256                         if(warning_str != "")
2257                                 fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
2258                                         hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
2259                   }
2260
2261 // Get the fields in this query
2262                   vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2263
2264 // do key processing
2265                   string hfta_keys_s = "";
2266                   if(hfta_list[i]->defines.count("keys")>0)
2267                         hfta_keys_s = hfta_list[i]->defines["keys"];
2268                   if(hfta_list[i]->defines.count("Keys")>0)
2269                         hfta_keys_s = hfta_list[i]->defines["Keys"];
2270                   if(hfta_list[i]->defines.count("KEYS")>0)
2271                         hfta_keys_s = hfta_list[i]->defines["KEYS"];
2272                   string xtra_keys_s = "";
2273                   if(hfta_list[i]->defines.count("extra_keys")>0)
2274                         xtra_keys_s = hfta_list[i]->defines["extra_keys"];
2275                   if(hfta_list[i]->defines.count("Extra_Keys")>0)
2276                         xtra_keys_s = hfta_list[i]->defines["Extra_Keys"];
2277                   if(hfta_list[i]->defines.count("EXTRA_KEYS")>0)
2278                         xtra_keys_s = hfta_list[i]->defines["EXTRA_KEYS"];
2279 // get the keys
2280                   vector<string> hfta_keys;
2281                   vector<string> partial_keys;
2282                   vector<string> xtra_keys;
2283                   if(hfta_keys_s==""){
2284                                 hfta_keys = hfta_list[i]->get_tbl_keys(partial_keys);
2285                                 if(xtra_keys_s.size()>0){
2286                                         xtra_keys = split_string(xtra_keys_s, ',');
2287                                 }
2288                                 for(int xi=0;xi<xtra_keys.size();++xi){
2289                                         if(find(hfta_keys.begin(), hfta_keys.end(), xtra_keys[xi]) == hfta_keys.end()){
2290                                                 hfta_keys.push_back(xtra_keys[xi]);
2291                                         }
2292                                 }
2293                   }else{
2294                                 hfta_keys = split_string(hfta_keys_s, ',');
2295                   }
2296 // validate that all of the keys exist in the output.
2297 //      (exit on error, as its a bad specificiation)
2298                   vector<string> missing_keys;
2299                   for(int ki=0;ki<hfta_keys.size(); ++ki){
2300                         int fi;
2301                         for(fi=0;fi<flds.size();++fi){
2302                                 if(hfta_keys[ki] == flds[fi]->get_name())
2303                                         break;
2304                         }
2305                         if(fi==flds.size())
2306                                 missing_keys.push_back(hfta_keys[ki]);
2307                   }
2308                   if(missing_keys.size()>0){
2309                         fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the outputg:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2310                         for(int hi=0; hi<missing_keys.size(); ++hi){
2311                                 fprintf(stderr," %s", missing_keys[hi].c_str());
2312                         }
2313                         fprintf(stderr,"\n");
2314                         exit(1);
2315                   }
2316
2317                   fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2318                   if(hfta_comment != "")
2319                         fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
2320                   if(hfta_title != "")
2321                         fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
2322                   if(hfta_namespace != "")
2323                         fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
2324                   fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
2325                   fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2326
2327 //                              write info about fields to qtree.xml
2328                   int fi;
2329                   for(fi=0;fi<flds.size();fi++){
2330                         fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2331                         if(flds[fi]->get_modifier_list()->size()){
2332                                 fprintf(qtree_output,"mods='%s'", flds[fi]->get_modifier_list()->to_string().c_str());
2333                         }
2334                         fprintf(qtree_output," />\n");
2335                   }
2336 // info about keys
2337                   for(int hi=0;hi<hfta_keys.size();++hi){
2338                         fprintf(qtree_output, "\t\t<Key value='%s'/>\n",hfta_keys[hi].c_str());
2339                   }
2340                   for(int hi=0;hi<partial_keys.size();++hi){
2341                         fprintf(qtree_output, "\t\t<PartialKey value='%s'/>\n",partial_keys[hi].c_str());
2342                   }
2343                   for(int hi=0;hi<xtra_keys.size();++hi){
2344                         fprintf(qtree_output, "\t\t<ExtraKey value='%s'/>\n",xtra_keys[hi].c_str());
2345                   }
2346
2347
2348                   // extract liveness timeout from query definition
2349                   int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
2350                   if (!liveness_timeout) {
2351 //                  fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
2352 //                    hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2353                     liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2354                   }
2355                   fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
2356
2357                   vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
2358                   int itv;
2359                   for(itv=0;itv<tmp_tv.size();++itv){
2360                         fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
2361                   }
2362                   string ifrs = hfta_list[i]->collect_refd_ifaces();
2363                   if(ifrs != ""){
2364                         fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
2365                   }
2366                   fprintf(qtree_output,"\t</HFTA>\n");
2367
2368                   fclose(hfta_fl);
2369                 }else{
2370 //                                      debug only -- do code generation to catch generation-time errors.
2371                   hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
2372                 }
2373
2374                 hfta_count++;   // for hfta file names with numeric suffixes
2375
2376                 hfta_list[i]->get_external_libs(extra_external_libs);
2377
2378           }
2379
2380 string ext_lib_string;
2381 set<string>::iterator ssi_el;
2382 for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
2383         ext_lib_string += (*ssi_el)+" ";
2384
2385
2386
2387 //                      Report on the set of operator views
2388   for(i=0;i<opviews.size();++i){
2389         opview_entry *opve = opviews.get_entry(i);
2390         fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
2391         fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
2392         fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
2393         fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
2394         fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2395
2396         if (!opve->liveness_timeout) {
2397 //              fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
2398 //                      opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2399                 opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2400         }
2401         fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
2402     int j;
2403         for(j=0;j<opve->subq_names.size();j++)
2404                 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
2405         fprintf(qtree_output,"\t</UDOP>\n");
2406   }
2407
2408
2409 //-----------------------------------------------------------------
2410
2411 //                      Create interface-specific meta code files.
2412 //                              first, open and parse the interface resources file.
2413         ifaces_db = new ifq_t();
2414     ierr = "";
2415         if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
2416                 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
2417                                 ifx_fname.c_str(), ierr.c_str());
2418                 exit(1);
2419         }
2420
2421         map<string, vector<stream_query *> >::iterator svsi;
2422         for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
2423                 string lmach = (*svsi).first;
2424
2425         //              For this machine, create a set of lftas per interface.
2426                 vector<stream_query *> mach_lftas = (*svsi).second;
2427                 map<string, vector<stream_query *> > lfta_iface_lists;
2428                 int li;
2429                 for(li=0;li<mach_lftas.size();++li){
2430                         vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2431                         string lfta_iface = tvec[0]->get_interface();
2432                         lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2433                 }
2434
2435                 map<string, vector<stream_query *> >::iterator lsvsi;
2436                 for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
2437                         int erri;
2438                         string liface = (*lsvsi).first;
2439                         vector<stream_query *> iface_lftas = (*lsvsi).second;
2440                         vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2441                         if(iface_codegen_type.size()){
2442                                 nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2443                                 if(!nicprop){
2444                                         fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2445                                         exit(1);
2446                                 }
2447                                 string mcs = generate_nic_code(iface_lftas, nicprop);
2448                                 string mcf_flnm;
2449                                 if(lmach != "")
2450                                   mcf_flnm = lmach + "_"+liface+".mcf";
2451                                 else
2452                                   mcf_flnm = hostname + "_"+liface+".mcf";
2453                                 FILE *mcf_fl ;
2454                                 if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
2455                                         fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
2456                                         exit(1);
2457                                 }
2458                                 fprintf(mcf_fl,"%s",mcs.c_str());
2459                                 fclose(mcf_fl);
2460 //printf("mcs of machine %s, iface %s of type %s is \n%s\n",
2461 //lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
2462                         }
2463                 }
2464
2465
2466         }
2467
2468
2469
2470 //-----------------------------------------------------------------
2471
2472
2473 //                      Find common filter predicates in the LFTAs.
2474 //                      in addition generate structs to store the temporal attributes unpacked by prefilter
2475         
2476         map<string, vector<stream_query *> >::iterator ssqi;
2477         for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
2478
2479                 string lmach = (*ssqi).first;
2480                 bool packed_return = false;
2481                 int li, erri;
2482
2483
2484 //      The LFTAs of this machine.
2485                 vector<stream_query *> mach_lftas = (*ssqi).second;
2486 //      break up on a per-interface basis.
2487                 map<string, vector<stream_query *> > lfta_iface_lists;
2488                 map<string, vector<int> > lfta_iface_qname_ix; // need the query name
2489                                                         // for fta_init
2490                 for(li=0;li<mach_lftas.size();++li){
2491                         vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2492                         string lfta_iface = tvec[0]->get_interface();
2493                         lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2494                         lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
2495                 }
2496
2497
2498 //      Are the return values "packed"?
2499 //      This should be done on a per-interface basis.
2500 //      But this is defunct code for gs-lite
2501                 for(li=0;li<mach_lftas.size();++li){
2502                   vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2503                   string liface = tvec[0]->get_interface();
2504                   vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2505                   if(iface_codegen_type.size()){
2506                         if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
2507                           packed_return = true;
2508                         }
2509                   }
2510                 }
2511
2512
2513 // Separate lftas by interface, collect results on a per-interface basis.
2514
2515                 vector<cnf_set *> no_preds;     // fallback if there is no prefilter
2516                 map<string, vector<cnf_set *> > prefilter_preds;
2517                 set<unsigned int> pred_ids;     // this can be global for all interfaces
2518                 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2519                         string liface = (*mvsi).first;
2520                         vector<cnf_set *> empty_list;
2521                         prefilter_preds[liface] = empty_list;
2522                         if(! packed_return){
2523                                 get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
2524                         }
2525
2526 //                              get NIC capabilities.  (Is this needed?)
2527                         nic_property *nicprop = NULL;
2528                         vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2529                         if(iface_codegen_type.size()){
2530                                 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2531                                 if(!nicprop){
2532                                         fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2533                                         exit(1);
2534                                 }
2535                         }
2536                 }
2537
2538
2539 //              Now that we know the prefilter preds, generate the lfta code.
2540 //      Do this for all lftas in this machine.
2541                 for(li=0;li<mach_lftas.size();++li){
2542                         set<unsigned int> subsumed_preds;
2543                         set<unsigned int>::iterator sii;
2544 #ifdef PREFILTER_OK
2545                         for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
2546                                 int pid = (*sii);
2547                                 if((pid>>16) == li){
2548                                         subsumed_preds.insert(pid & 0xffff);
2549                                 }
2550                         }
2551 #endif
2552                         string lfta_schema_str = mach_lftas[li]->make_schema();
2553                         string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2554                         nic_property *nicprop = NULL; // no NIC properties?
2555                         lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
2556                 }
2557
2558
2559 //                      generate structs to store the temporal attributes
2560 //                      unpacked by prefilter
2561                 col_id_set temp_cids;
2562                 get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
2563                 lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
2564
2565 //                      Compute the lfta bit signatures and the lfta colrefs
2566 //      do this on a per-interface basis
2567 #ifdef PREFILTER_OK
2568                         lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2569 #endif
2570                 map<string, vector<long long int> > lfta_sigs; // used again later
2571                 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2572                         string liface = (*mvsi).first;
2573                         vector<long long int> empty_list;
2574                         lfta_sigs[liface] = empty_list;
2575
2576                         vector<col_id_set> lfta_cols;
2577                         vector<int> lfta_snap_length;
2578                         for(li=0;li<lfta_iface_lists[liface].size();++li){
2579                                 unsigned long long int mask=0, bpos=1;
2580                                 int f_pos;
2581                                 for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
2582                                         if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
2583                                                 mask |= bpos;
2584                                         bpos = bpos << 1;
2585                                 }
2586                                 lfta_sigs[liface].push_back(mask);
2587                                 lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
2588                                 lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
2589                         }
2590
2591 //for(li=0;li<mach_lftas.size();++li){
2592 //printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
2593 //col_id_set::iterator tcisi;
2594 //for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
2595 //printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
2596 //}
2597 //}
2598
2599
2600 //                      generate the prefilter
2601 //      Do this on a per-interface basis, except for the #define
2602 #ifdef PREFILTER_OK
2603 //                      lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2604                         lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2605 #else
2606                         lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns,  lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2607
2608 #endif
2609                 }
2610
2611 //                      Generate interface parameter lookup function
2612           lfta_val[lmach] += "// lookup interface properties by name\n";
2613           lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
2614           lfta_val[lmach] += "// returns NULL if given property does not exist\n";
2615           lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
2616
2617 //        collect a lit of interface names used by queries running on this host
2618           set<std::string> iface_names;
2619           for(i=0;i<mach_query_names[lmach].size();i++){
2620                 int mi = mach_query_names[lmach][i];
2621                 stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2622
2623                 if(interface_names[mi]=="")
2624                         iface_names.insert("DEFAULTDEV");
2625                 else
2626                         iface_names.insert(interface_names[mi]);
2627           }
2628
2629 //        generate interface property lookup code for every interface
2630           set<std::string>::iterator sir;
2631           for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
2632                 if (sir == iface_names.begin())
2633                         lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2634                 else
2635                         lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2636
2637                 // iterate through interface properties
2638                 vector<string> iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
2639                 if (erri) {
2640                         fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
2641                         exit(1);
2642                 }
2643                 if (iface_properties.empty())
2644                         lfta_val[lmach] += "\t\treturn NULL;\n";
2645                 else {
2646                         for (int i = 0; i < iface_properties.size(); ++i) {
2647                                 if (i == 0)
2648                                         lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2649                                 else
2650                                         lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2651
2652                                 // combine all values for the interface property using comma separator
2653                                 vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
2654                                 lfta_val[lmach] += "\t\t\treturn \"";
2655                                 for (int j = 0; j < vals.size(); ++j) {
2656                                         lfta_val[lmach] +=  vals[j];
2657                                         if (j != vals.size()-1)
2658                                                 lfta_val[lmach] += ",";
2659                                 }
2660                                 lfta_val[lmach] += "\";\n";
2661                         }
2662                         lfta_val[lmach] += "\t\t} else\n";
2663                         lfta_val[lmach] += "\t\t\treturn NULL;\n";
2664                 }
2665           }
2666           lfta_val[lmach] += "\t} else\n";
2667           lfta_val[lmach] += "\t\treturn NULL;\n";
2668           lfta_val[lmach] += "}\n\n";
2669
2670
2671 //                      Generate a full list of FTAs for clearinghouse reference
2672           lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
2673           lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
2674
2675           for (i = 0; i < query_names.size(); ++i) {
2676                    if (i)
2677                           lfta_val[lmach] += ", ";
2678                    lfta_val[lmach] += "\"" + query_names[i] + "\"";
2679           }
2680           for (i = 0; i < hfta_list.size(); ++i) {
2681                    lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
2682           }
2683           lfta_val[lmach] += ", NULL};\n\n";
2684
2685
2686 //                      Add the initialization function to lfta.c
2687 //      Change to accept the interface name, and 
2688 //      set the prefilter function accordingly.
2689 //      see the example in demo/err2
2690           lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
2691
2692 //        for(i=0;i<mach_query_names[lmach].size();i++)
2693 //              int mi = mach_query_names[lmach][i];
2694 //              stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2695
2696           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2697                 string liface = (*mvsi).first;
2698                 vector<stream_query *> lfta_list = (*mvsi).second;
2699                 for(i=0;i<lfta_list.size();i++){
2700                         stream_query *lfta_sq = lfta_list[i];
2701                         int mi = lfta_iface_qname_ix[liface][i];
2702                 
2703                         fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
2704
2705                         string this_iface = "DEFAULTDEV";
2706                         if(interface_names[mi]!="")
2707                                 this_iface = '"'+interface_names[mi]+'"';
2708                         lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
2709                 lfta_val[lmach] += "\t\tfta_register(\""+query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
2710 //              if(interface_names[mi]=="")
2711 //                              lfta_val[lmach]+="DEFAULTDEV";
2712 //              else
2713 //                              lfta_val[lmach]+='"'+interface_names[mi]+'"';
2714                         lfta_val[lmach] += this_iface;
2715
2716
2717                 lfta_val[lmach] += ", "+generate_alloc_name(query_names[mi])
2718                         +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(query_names[mi])
2719                         +"\n#endif\n";
2720                                 sprintf(tmpstr,",%d",snap_lengths[mi]);
2721                         lfta_val[lmach] += tmpstr;
2722
2723 //                      unsigned long long int mask=0, bpos=1;
2724 //                      int f_pos;
2725 //                      for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
2726 //                              if(prefilter_preds[f_pos]->lfta_id.count(i))
2727 //                                      mask |= bpos;
2728 //                              bpos = bpos << 1;
2729 //                      }
2730
2731 #ifdef PREFILTER_OK
2732 //                      sprintf(tmpstr,",%lluull",mask);
2733                         sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
2734                         lfta_val[lmach]+=tmpstr;
2735 #else
2736                         lfta_val[lmach] += ",0ull";
2737 #endif
2738
2739                         lfta_val[lmach] += ");\n";
2740
2741
2742
2743 //    End of lfta prefilter stuff
2744 // --------------------------------------------------
2745
2746 //                      If there is a field verifier, warn about
2747 //                      lack of compatability
2748                   string lfta_comment;
2749                   string lfta_title;
2750                   string lfta_namespace;
2751                   map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
2752                   if(ldefs.count("comment")>0)
2753                         lfta_comment = lfta_sq->defines["comment"];
2754                   if(ldefs.count("Comment")>0)
2755                         lfta_comment = lfta_sq->defines["Comment"];
2756                   if(ldefs.count("COMMENT")>0)
2757                         lfta_comment = lfta_sq->defines["COMMENT"];
2758                   if(ldefs.count("title")>0)
2759                         lfta_title = lfta_sq->defines["title"];
2760                   if(ldefs.count("Title")>0)
2761                         lfta_title = lfta_sq->defines["Title"];
2762                   if(ldefs.count("TITLE")>0)
2763                         lfta_title = lfta_sq->defines["TITLE"];
2764                   if(ldefs.count("NAMESPACE")>0)
2765                         lfta_namespace = lfta_sq->defines["NAMESPACE"];
2766                   if(ldefs.count("Namespace")>0)
2767                         lfta_namespace = lfta_sq->defines["Namespace"];
2768                   if(ldefs.count("namespace")>0)
2769                         lfta_namespace = lfta_sq->defines["namespace"];
2770
2771                   string lfta_ht_size;
2772                   if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
2773                         lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
2774                   if(ldefs.count("aggregate_slots")>0){
2775                         lfta_ht_size = ldefs["aggregate_slots"];
2776                   }
2777
2778 //                      NOTE : I'm assuming that visible lftas do not start with _fta.
2779 //                              -- will fail for non-visible simple selection queries.
2780                 if(field_verifier != NULL && query_names[mi].substr(0,1) != "_"){
2781                         string warning_str;
2782                         if(lfta_comment == "")
2783                                 warning_str += "\tcomment not found.\n";
2784 // Obsolete stuff that carsten wanted
2785 //                      if(lfta_title == "")
2786 //                              warning_str += "\ttitle not found.\n";
2787 //                      if(lfta_namespace == "")
2788 //                              warning_str += "\tnamespace not found.\n";
2789
2790                         vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2791                         int fi;
2792                         for(fi=0;fi<flds.size();fi++){
2793                                 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2794                         }
2795                         if(warning_str != "")
2796                                 fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
2797                                         query_names[mi].c_str(),warning_str.c_str());
2798                 }
2799
2800
2801 //                      Create qtree output
2802                 fprintf(qtree_output,"\t<LFTA name='%s' >\n",query_names[mi].c_str());
2803                   if(lfta_comment != "")
2804                         fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
2805                   if(lfta_title != "")
2806                         fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
2807                   if(lfta_namespace != "")
2808                         fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
2809                   if(lfta_ht_size != "")
2810                         fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
2811                 if(lmach != "")
2812                   fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
2813                 else
2814                   fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
2815                 fprintf(qtree_output,"\t\t<Interface  value='%s' />\n",interface_names[mi].c_str());
2816                 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
2817                 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2818                 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
2819 //                              write info about fields to qtree.xml
2820                   vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2821                   int fi;
2822                   for(fi=0;fi<flds.size();fi++){
2823                         fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2824                         if(flds[fi]->get_modifier_list()->size()){
2825                                 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2826                         }
2827                         fprintf(qtree_output," />\n");
2828                   }
2829                 fprintf(qtree_output,"\t</LFTA>\n");
2830
2831
2832             }
2833           }
2834
2835           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2836                         string liface = (*mvsi).first;
2837                         lfta_val[lmach] += 
2838 "       if (!strcmp(device, \""+liface+"\")) \n"
2839 "               lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
2840 ;
2841                 }
2842                 lfta_val[lmach] += 
2843 "       if(lfta_prefilter == NULL){\n"
2844 "               fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
2845 "               exit(1);\n"
2846 "       }\n"
2847 ;
2848
2849
2850
2851           lfta_val[lmach] += "}\n\n";
2852
2853       if(!(debug_only || hfta_only) ){
2854                 string lfta_flnm;
2855                 if(lmach != "")
2856                   lfta_flnm = lmach + "_lfta.c";
2857                 else
2858                   lfta_flnm = hostname + "_lfta.c";
2859                 if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
2860                         fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
2861                         exit(1);
2862                 }
2863               fprintf(lfta_out,"%s",lfta_header.c_str());
2864               fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
2865               fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
2866                 fclose(lfta_out);
2867           }
2868         }
2869
2870 //              Say what are the operators which must execute
2871         if(opviews.size()>0)
2872                 fprintf(stderr,"The queries use the following external operators:\n");
2873         for(i=0;i<opviews.size();++i){
2874                 opview_entry *opv = opviews.get_entry(i);
2875                 fprintf(stderr,"\t%s\n",opv->view_name.c_str());
2876         }
2877
2878         if(create_makefile)
2879                 generate_makefile(input_file_names, nfiles, hfta_names, opviews,
2880                 machine_names, schema_file_name,
2881                 interface_names,
2882                 ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
2883
2884
2885         fprintf(qtree_output,"</QueryNodes>\n");
2886
2887         return(0);
2888 }
2889
2890 ////////////////////////////////////////////////////////////
2891
2892 void generate_makefile(vector<string> &input_file_names, int nfiles,
2893                                            vector<string> &hfta_names, opview_set &opviews,
2894                                                 vector<string> &machine_names,
2895                                                 string schema_file_name,
2896                                                 vector<string> &interface_names,
2897                                                 ifq_t *ifdb, string &config_dir_path,
2898                                                 bool use_pads,
2899                                                 string extra_libs,
2900                                                 map<string, vector<int> > &rts_hload
2901                                          ){
2902         int i,j;
2903
2904         if(config_dir_path != ""){
2905                 config_dir_path = "-C "+config_dir_path;
2906         }
2907
2908         struct stat sb;
2909         bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
2910         bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
2911
2912 //      if(libz_exists && !libast_exists){
2913 //              fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
2914 //              exit(1);
2915 //      }
2916
2917 //                      Get set of operator executable files to run
2918         set<string> op_fls;
2919         set<string>::iterator ssi;
2920         for(i=0;i<opviews.size();++i){
2921                 opview_entry *opv = opviews.get_entry(i);
2922                 if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
2923         }
2924
2925         FILE *outfl = fopen("Makefile", "w");
2926         if(outfl==NULL){
2927                 fprintf(stderr,"Can't open Makefile for write, exiting.\n");
2928                 exit(0);
2929         }
2930
2931         fputs(
2932 ("CPP= g++ -O3 -g -I "+root_path+"/include  -I "+root_path+"/include/hfta\n"
2933 "CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
2934 ).c_str(), outfl
2935 );
2936         if(generate_stats)
2937                 fprintf(outfl,"  -DLFTA_STATS");
2938
2939 //              Gather the set of interfaces
2940 //              Also, gather "base interface names" for use in computing
2941 //              the hash splitting to virtual interfaces.
2942 //              TODO : must update to hanndle machines
2943         set<string> ifaces;
2944         set<string> base_vifaces;       // base interfaces of virtual interfaces
2945         map<string, string> ifmachines;
2946         map<string, string> ifattrs;
2947         for(i=0;i<interface_names.size();++i){
2948                 ifaces.insert(interface_names[i]);
2949                 ifmachines[interface_names[i]] = machine_names[i];
2950
2951                 size_t Xpos = interface_names[i].find_last_of("X");
2952                 if(Xpos!=string::npos){
2953                         string iface = interface_names[i].substr(0,Xpos);
2954                         base_vifaces.insert(iface);
2955                 }
2956                 // get interface attributes and add them to the list
2957         }
2958
2959 //              Do we need to include protobuf libraries?
2960 //              TODO Move to the interface library: get the libraries to include 
2961 //              for an interface type
2962
2963         bool use_proto = false;
2964         int erri;
2965         string err_str;
2966         for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
2967                 string ifnm = (*ssi);
2968                 vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
2969                 for(int ift_i=0;ift_i<ift.size();ift_i++){
2970                         if(ift[ift_i]=="PROTO"){
2971                                 use_proto = true;
2972                         }
2973                 }
2974         }
2975
2976         fprintf(outfl,
2977 "\n"
2978 "\n"
2979 "all: rts");
2980         for(i=0;i<hfta_names.size();++i)
2981                 fprintf(outfl," %s",hfta_names[i].c_str());
2982         fputs(
2983 ("\n"
2984 "\n"
2985 "rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a  "+root_path+"/lib/libclearinghouse.a\n"
2986 "\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
2987         if(use_pads)
2988                 fprintf(outfl,"-L. ");
2989         fputs(
2990 ("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
2991         if(use_pads)
2992                 fprintf(outfl,"-lgscppads -lpads ");
2993         fprintf(outfl,
2994 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux  -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
2995         if(use_pads)
2996                 fprintf(outfl, " -lpz -lz -lbz ");
2997         if(libz_exists && libast_exists)
2998                 fprintf(outfl," -last ");
2999         if(use_pads)
3000                 fprintf(outfl, " -ldll -ldl ");
3001         if(use_proto)
3002                 fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
3003         fprintf(outfl," -lgscpaux");
3004 #ifdef GCOV
3005         fprintf(outfl," -fprofile-arcs");
3006 #endif
3007         fprintf(outfl,
3008 "\n"
3009 "\n"
3010 "lfta.o: %s_lfta.c\n"
3011 "\t$(CC) -o lfta.o -c %s_lfta.c\n"
3012 "\n"
3013 "%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
3014         for(i=0;i<nfiles;++i)
3015                 fprintf(outfl," %s",input_file_names[i].c_str());
3016         if(hostname == ""){
3017                 fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3018         }else{
3019                 fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3020         }
3021         for(i=0;i<nfiles;++i)
3022                 fprintf(outfl," %s",input_file_names[i].c_str());
3023         fprintf(outfl,"\n");
3024
3025         for(i=0;i<hfta_names.size();++i)
3026                 fprintf(outfl,
3027 ("%s: %s.o\n"
3028 "\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
3029 "\n"
3030 "%s.o: %s.cc\n"
3031 "\t$(CPP) -o %s.o -c %s.cc\n"
3032 "\n"
3033 "\n").c_str(),
3034     hfta_names[i].c_str(), hfta_names[i].c_str(),
3035         hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
3036         hfta_names[i].c_str(), hfta_names[i].c_str(),
3037         hfta_names[i].c_str(), hfta_names[i].c_str()
3038                 );
3039
3040         fprintf(outfl,
3041 ("\n"
3042 "packet_schema.txt:\n"
3043 "\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
3044 "\n"
3045 "external_fcns.def:\n"
3046 "\tln -s "+root_path+"/cfg/external_fcns.def .\n"
3047 "\n"
3048 "clean:\n"
3049 "\trm -rf core rts *.o  %s_lfta.c  external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
3050         for(i=0;i<hfta_names.size();++i)
3051                 fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
3052         fprintf(outfl,"\n");
3053
3054         fclose(outfl);
3055
3056
3057
3058 //              Gather the set of interfaces
3059 //              TODO : must update to hanndle machines
3060 //              TODO : lookup interface attributes and add them as a parameter to rts process
3061         outfl = fopen("runit", "w");
3062         if(outfl==NULL){
3063                 fprintf(stderr,"Can't open runit for write, exiting.\n");
3064                 exit(0);
3065         }
3066
3067
3068         fputs(
3069 ("#!/bin/sh\n"
3070 "./stopit\n"
3071 +root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
3072 "sleep 5\n"
3073 "if [ ! -f gshub.log ]\n"
3074 "then\n"
3075 "\techo \"Failed to start bin/gshub.py\"\n"
3076 "\texit -1\n"
3077 "fi\n"
3078 "ADDR=`cat gshub.log`\n"
3079 "ps opgid= $! >> gs.pids\n"
3080 "./rts $ADDR default ").c_str(), outfl);
3081 //      int erri;
3082 //      string err_str;
3083         for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3084                 string ifnm = (*ssi);
3085                 fprintf(outfl, "%s ",ifnm.c_str());
3086                 vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
3087                 for(j=0;j<ifv.size();++j)
3088                         fprintf(outfl, "%s ",ifv[j].c_str());
3089         }
3090         fprintf(outfl, " &\n");
3091         fprintf(outfl, "echo $! >> gs.pids\n");
3092         for(i=0;i<hfta_names.size();++i)
3093                 fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
3094
3095         for(j=0;j<opviews.opview_list.size();++j){
3096                 fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
3097         }
3098
3099         fclose(outfl);
3100         system("chmod +x runit");
3101
3102         outfl = fopen("stopit", "w");
3103         if(outfl==NULL){
3104                 fprintf(stderr,"Can't open stopit for write, exiting.\n");
3105                 exit(0);
3106         }
3107
3108         fprintf(outfl,"#!/bin/sh\n"
3109 "rm -f gshub.log\n"
3110 "if [ ! -f gs.pids ]\n"
3111 "then\n"
3112 "exit\n"
3113 "fi\n"
3114 "for pgid in `cat gs.pids`\n"
3115 "do\n"
3116 "kill -TERM -$pgid\n"
3117 "done\n"
3118 "sleep 1\n"
3119 "for pgid in `cat gs.pids`\n"
3120 "do\n"
3121 "kill -9 -$pgid\n"
3122 "done\n"
3123 "rm gs.pids\n");
3124
3125         fclose(outfl);
3126         system("chmod +x stopit");
3127
3128 //-----------------------------------------------
3129
3130 /* For now disable support for virtual interfaces
3131         outfl = fopen("set_vinterface_hash.bat", "w");
3132         if(outfl==NULL){
3133                 fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
3134                 exit(0);
3135         }
3136
3137 //              The format should be determined by an entry in the ifres.xml file,
3138 //              but for now hardcode the only example I have.
3139         for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
3140                 if(rts_hload.count((*ssi))){
3141                         string iface_name = (*ssi);
3142                         string iface_number = "";
3143                         for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
3144                                 if(isdigit(iface_name[j])){
3145                                         iface_number = iface_name[j];
3146                                         if(j>0 && isdigit(iface_name[j-1]))
3147                                                 iface_number = iface_name[j-1] + iface_number;
3148                                 }
3149                         }
3150
3151                         fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
3152                         vector<int> halloc = rts_hload[iface_name];
3153                         int prev_limit = 0;
3154                         for(j=0;j<halloc.size();++j){
3155                                 if(j>0)
3156                                         fprintf(outfl,":");
3157                                 fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
3158                                 prev_limit = halloc[j];
3159                         }
3160                         fprintf(outfl,"\n");
3161                 }
3162         }
3163         fclose(outfl);
3164         system("chmod +x set_vinterface_hash.bat");
3165 */
3166 }
3167
3168 //              Code for implementing a local schema
3169 /*
3170                 table_list qpSchema;
3171
3172 //                              Load the schemas of any LFTAs.
3173                 int l;
3174                 for(l=0;l<hfta_nbr;++l){
3175                         stream_query *sq0 = split_queries[l];
3176                         table_def *td = sq0->get_output_tabledef();
3177                         qpSchema.append_table(td);
3178                 }
3179 //                              load the schemas of any other ref'd tables.
3180 //                              (e.g., hftas)
3181                 vector<tablevar_t *>  input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
3182                 int ti;
3183                 for(ti=0;ti<input_tbl_names.size();++ti){
3184                         int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
3185                         if(tbl_ref < 0){
3186                                 tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
3187                                 if(tbl_ref < 0){
3188                                         fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
3189                                         exit(1);
3190                                 }
3191                                 qpSchema.append_table(Schema->get_table(tbl_ref));
3192                         }
3193                 }
3194 */
3195
3196 //              Functions related to parsing.
3197
3198 /*
3199 static int split_string(char *instr,char sep, char **words,int max_words){
3200    char *loc;
3201    char *str;
3202    int nwords = 0;
3203
3204    str = instr;
3205    words[nwords++] = str;
3206    while( (loc = strchr(str,sep)) != NULL){
3207         *loc = '\0';
3208         str = loc+1;
3209         if(nwords >= max_words){
3210                 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
3211                 nwords = max_words-1;
3212         }
3213         words[nwords++] = str;
3214    }
3215
3216    return(nwords);
3217 }
3218
3219 */
3220