41df1972ce5188ccac55fd74bd6c9389f1244ac2
[com/gs-lite.git] / src / ftacmp / translate_fta.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #include<unistd.h>              // for gethostname
17
18 #include <string>
19 #include "parse_fta.h"
20 #include "parse_schema.h"
21 #include "parse_ext_fcns.h"
22 #include"analyze_fta.h"
23 #include"query_plan.h"
24 #include"generate_lfta_code.h"
25 #include"stream_query.h"
26 #include"generate_utils.h"
27 #include"nic_def.h"
28 #include"generate_nic_code.h"
29
30 #include <stdlib.h>
31 #include <stdio.h>
32 #include<ctype.h>
33 #include<glob.h>
34 #include<string.h>
35
36 #include<list>
37
38 //              for the scandir
39      #include <sys/types.h>
40      #include <dirent.h>
41
42
43 #include<errno.h>
44
45 //              to verify that some files exist.
46      #include <sys/types.h>
47      #include <sys/stat.h>
48
49 #include "parse_partn.h"
50
51 #include "print_plan.h"
52
53 //              Interface to the xml parser
54
55 #include"xml_t.h"
56 #include"field_list.h"
57
58 extern int xmlParserparse(void);
59 extern FILE *xmlParserin;
60 extern int xmlParserdebug;
61
62 std::vector<std::string> xml_attr_vec;
63 std::vector<std::string> xml_val_vec;
64 std::string xml_a, xml_v;
65 xml_t *xml_leaves = NULL;
66
67 //      Interface to the field list verifier
68 field_list *field_verifier = NULL;
69
70 #define TMPSTRLEN 1000
71
72 #ifndef PATH_DELIM
73   #define PATH_DELIM '/'
74 #endif
75
76 char tmp_schema_str[10000];
77
78 // maximum delay between two hearbeats produced
79 // by UDOP. Used when its not explicity
80 // provided in udop definition
81 #define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
82
83 //              Default lfta hash table size, must be power of 2.
84 int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
85
86 //              Interface to FTA definition lexer and parser ...
87
88 extern int FtaParserparse(void);
89 extern FILE *FtaParserin;
90 extern int FtaParserdebug;
91
92 fta_parse_t *fta_parse_result;
93 var_defs_t *fta_parse_defines;
94
95
96
97 //              Interface to external function lexer and parser ...
98
99 extern int Ext_fcnsParserparse(void);
100 extern FILE *Ext_fcnsParserin;
101 extern int Ext_fcnsParserdebug;
102
103 ext_fcn_list *Ext_fcns;
104
105
106 //              Interface to partition definition parser
107 extern int PartnParserparse();
108 partn_def_list_t *partn_parse_result = NULL;
109
110
111
112 using namespace std;
113 //extern int errno;
114
115
116 //              forward delcaration of local utility function
117 void generate_makefile(vector<string> &input_file_names, int nfiles,
118                                            vector<string> &hfta_names, opview_set &opviews,
119                                                 vector<string> &machine_names,
120                                                 string schema_file_name,
121                                                 vector<string> &interface_names,
122                                                 ifq_t *ifdb, string &config_dir_path,
123                                                 bool use_pads,
124                                                 string extra_libs,
125                                                 map<string, vector<int> > &rts_hload
126                                         );
127
128 //static int split_string(char *instr,char sep, char **words,int max_words);
129 #define MAXFLDS 100
130
131   FILE *schema_summary_output = NULL;           // query names
132
133 //                      Dump schema summary
134 void dump_summary(stream_query *str){
135         fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
136
137         table_def *sch = str->get_output_tabledef();
138
139         vector<field_entry *> flds = sch->get_fields();
140         int f;
141         for(f=0;f<flds.size();++f){
142                 if(f>0) fprintf(schema_summary_output,"|");
143                 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
144         }
145         fprintf(schema_summary_output,"\n");
146         for(f=0;f<flds.size();++f){
147                 if(f>0) fprintf(schema_summary_output,"|");
148                 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
149         }
150         fprintf(schema_summary_output,"\n");
151 }
152
153 //              Globals
154 string hostname;                // name of current host.
155 int hostname_len;
156 bool generate_stats = false;
157 string root_path = "../..";
158
159
160 int main(int argc, char **argv){
161   char tmpstr[TMPSTRLEN];
162   string err_str;
163   int q,s,h,f;
164
165   set<int>::iterator si;
166
167   vector<string> query_names;                   // for lfta.c registration
168   map<string, vector<int> > mach_query_names;   // list queries of machine
169   vector<int> snap_lengths;                             // for lfta.c registration
170   vector<string> interface_names;                       // for lfta.c registration
171   vector<string> machine_names;                 // machine of interface
172   vector<bool> lfta_reuse_options;                      // for lfta.c registration
173   vector<int> lfta_liveness_timeouts;           // fot qtree.xml generation
174   vector<string> hfta_names;                    // hfta cource code names, for
175                                                                                 // creating make file.
176   vector<string> qnames;                                // ensure unique names
177   map<string, int> lfta_names;                  // keep track of unique lftas.
178
179
180 //                              set these to 1 to debug the parser
181   FtaParserdebug = 0;
182   Ext_fcnsParserdebug = 0;
183
184   FILE *lfta_out;                               // lfta.c output.
185   FILE *fta_in;                                 // input file
186   FILE *table_schemas_in;               // source tables definition file
187   FILE *query_name_output;              // query names
188   FILE *qtree_output;                   // interconnections of query nodes
189
190   // -------------------------------
191   // Handling of Input Arguments
192   // -------------------------------
193     char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
194         const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
195                 "\t[-B] : debug only (don't create output files)\n"
196                 "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
197                 "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
198                 "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
199                 "\t[-C] : use <config directory> for definition files\n"
200                 "\t[-l] : use <library directory> for library queries\n"
201                 "\t[-N] : output query names in query_names.txt\n"
202                 "\t[-H] : create HFTA only (no schema_file)\n"
203                 "\t[-Q] : use query name for hfta suffix\n"
204                 "\t[-M] : generate make file and runit, stopit scripts\n"
205                 "\t[-S] : enable LFTA statistics (alters Makefile).\n"
206                 "\t[-f] : Output schema summary to schema_summary.txt\n"
207                 "\t[-P] : link with PADS\n"
208                 "\t[-h] : override host name.\n"
209                 "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
210                 "\t[-R] : path to root of GS-lite\n"
211 ;
212
213 //              parameters gathered from command line processing
214         string external_fcns_path;
215 //      string internal_fcn_path;
216         string config_dir_path;
217         string library_path = "./";
218         vector<string> input_file_names;
219         string schema_file_name;
220         bool debug_only = false;
221         bool hfta_only = false;
222         bool output_query_names = false;
223         bool output_schema_summary=false;
224         bool numeric_hfta_flname = true;
225         bool create_makefile = false;
226         bool distributed_mode = false;
227         bool partitioned_mode = false;
228         bool use_live_hosts_file = false;
229         bool use_pads = false;
230         bool clean_make = false;
231         int n_virtual_interfaces = 1;
232
233    char chopt;
234    while((chopt = getopt(argc,argv,optstr)) != -1){
235                 switch(chopt){
236                 case 'B':
237                         debug_only = true;
238                         break;
239                 case 'D':
240                         distributed_mode = true;
241                         break;
242                 case 'p':
243                         partitioned_mode = true;
244                         break;
245                 case 'L':
246                         use_live_hosts_file = true;
247                         break;
248                 case 'C':
249                                 if(optarg != NULL)
250                                  config_dir_path = string(optarg) + string("/");
251                         break;
252                 case 'l':
253                                 if(optarg != NULL)
254                                  library_path = string(optarg) + string("/");
255                         break;
256                 case 'N':
257                         output_query_names = true;
258                         break;
259                 case 'Q':
260                         numeric_hfta_flname = false;
261                         break;
262                 case 'H':
263                         if(schema_file_name == ""){
264                                 hfta_only = true;
265                         }
266                         break;
267                 case 'f':
268                         output_schema_summary=true;
269                         break;
270                 case 'M':
271                         create_makefile=true;
272                         break;
273                 case 'S':
274                         generate_stats=true;
275                         break;
276                 case 'P':
277                         use_pads = true;
278                         break;
279                 case 'c':
280                         clean_make = true;
281                         break;
282                 case 'h':
283                         if(optarg != NULL)
284                                 hostname = optarg;
285                         break;
286                 case 'R':
287                         if(optarg != NULL)
288                                 root_path = optarg;
289                         break;
290                 case 'n':
291                         if(optarg != NULL){
292                                 n_virtual_interfaces = atoi(optarg);
293                                 if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
294                                         fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
295                                         n_virtual_interfaces = 1;
296                                 }
297                         }
298                         break;
299                 case '?':
300                         fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
301                         fprintf(stderr,"%s\n", usage_str);
302                         exit(1);
303                 default:
304                         fprintf(stderr, "Argument was %c\n", optopt);
305                         fprintf(stderr,"Invalid arguments\n");
306                         fprintf(stderr,"%s\n", usage_str);
307                         exit(1);
308                 }
309         }
310         argc -= optind;
311         argv += optind;
312         for (int i = 0; i < argc; ++i) {
313                 if((schema_file_name == "") && !hfta_only){
314                         schema_file_name = argv[i];
315                 }else{
316                         input_file_names.push_back(argv[i]);
317                 }
318         }
319
320         if(input_file_names.size() == 0){
321                 fprintf(stderr,"%s\n", usage_str);
322                 exit(1);
323         }
324
325         if(clean_make){
326                 string clean_cmd = "rm Makefile hfta_*.cc";
327                 int clean_ret = system(clean_cmd.c_str());
328                 if(clean_ret){
329                         fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
330                 }
331         }
332
333
334         nic_prop_db *npdb = new nic_prop_db(config_dir_path);
335
336 //                      Open globally used file names.
337
338         // prepend config directory to schema file
339         schema_file_name = config_dir_path + schema_file_name;
340         external_fcns_path = config_dir_path + string("external_fcns.def");
341     string ifx_fname = config_dir_path + string("ifres.xml");
342
343 //              Find interface query file(s).
344         if(hostname == ""){
345                 gethostname(tmpstr,TMPSTRLEN);
346                 hostname = tmpstr;
347         }
348         hostname_len = strlen(tmpstr);
349     string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
350         vector<string> ifq_fls;
351
352                 ifq_fls.push_back(ifq_fname);
353
354
355 //                      Get the field list, if it exists
356         string flist_fl = config_dir_path + "field_list.xml";
357         FILE *flf_in = NULL;
358         if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
359                 fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
360                 xml_leaves = new xml_t();
361                 xmlParser_setfileinput(flf_in);
362                 if(xmlParserparse()){
363                         fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
364                 }else{
365                         field_verifier = new field_list(xml_leaves);
366                 }
367         }
368
369         if(!hfta_only){
370           if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
371                 fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
372                 exit(1);
373           }
374         }
375
376 /*
377         if(!(debug_only || hfta_only)){
378           if((lfta_out = fopen("lfta.c","w")) == NULL){
379                 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
380                 exit(1);
381           }
382         }
383 */
384
385 //              Get the output specification file.
386 //              format is
387 //                      query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
388         string ospec_fl = "output_spec.cfg";
389         FILE *osp_in = NULL;
390         vector<ospec_str *> output_specs;
391         multimap<string, int> qname_to_ospec;
392         if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
393                 char *flds[MAXFLDS];
394                 int o_lineno = 0;
395                 while(fgets(tmpstr,TMPSTRLEN,osp_in)){
396                         o_lineno++;
397                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
398                         if(nflds == 7){
399 //              make operator type lowercase
400                                 char *tmpc;
401                                 for(tmpc=flds[1];*tmpc!='\0';++tmpc)
402                                         *tmpc = tolower(*tmpc);
403
404                                 ospec_str *tmp_ospec = new ospec_str();
405                                 tmp_ospec->query = flds[0];
406                                 tmp_ospec->operator_type = flds[1];
407                                 tmp_ospec->operator_param = flds[2];
408                                 tmp_ospec->output_directory = flds[3];
409                                 tmp_ospec->bucketwidth = atoi(flds[4]);
410                                 tmp_ospec->partitioning_flds = flds[5];
411                                 tmp_ospec->n_partitions = atoi(flds[6]);
412                                 qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
413                                 output_specs.push_back(tmp_ospec);
414                         }else{
415                                 fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
416                         }
417                 }
418                 fclose(osp_in);
419         }else{
420                 fprintf(stderr,"output_spec.cfg not found.  The query set has no output.  exiting.\n");
421                 exit(1);
422         }
423
424 //              hfta parallelism
425         string pspec_fl = "hfta_parallelism.cfg";
426         FILE *psp_in = NULL;
427         map<string, int> hfta_parallelism;
428         if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
429                 char *flds[MAXFLDS];
430                 int o_lineno = 0;
431                 while(fgets(tmpstr,TMPSTRLEN,psp_in)){
432                         bool good_entry = true;
433                         o_lineno++;
434                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
435                         if(nflds == 2){
436                                 string hname = flds[0];
437                                 int par = atoi(flds[1]);
438                                 if(par <= 0 || par > n_virtual_interfaces){
439                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
440                                         good_entry = false;
441                                 }
442                                 if(good_entry && n_virtual_interfaces % par != 0){
443                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
444                                         good_entry = false;
445                                 }
446                                 if(good_entry)
447                                         hfta_parallelism[hname] = par;
448                         }
449                 }
450         }else{
451                 fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
452         }
453
454
455 //              LFTA hash table sizes
456         string htspec_fl = "lfta_htsize.cfg";
457         FILE *htsp_in = NULL;
458         map<string, int> lfta_htsize;
459         if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
460                 char *flds[MAXFLDS];
461                 int o_lineno = 0;
462                 while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
463                         bool good_entry = true;
464                         o_lineno++;
465                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
466                         if(nflds == 2){
467                                 string lfta_name = flds[0];
468                                 int htsz = atoi(flds[1]);
469                                 if(htsz>0){
470                                         lfta_htsize[lfta_name] = htsz;
471                                 }else{
472                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
473                                 }
474                         }
475                 }
476         }else{
477                 fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
478         }
479
480 //              LFTA vitual interface hash split
481         string rtlspec_fl = "rts_load.cfg";
482         FILE *rtl_in = NULL;
483         map<string, vector<int> > rts_hload;
484         if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
485                 char *flds[MAXFLDS];
486                 int r_lineno = 0;
487                 string iface_name;
488                 vector<int> hload;
489                 while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
490                         bool good_entry = true;
491                         r_lineno++;
492                         iface_name = "";
493                         hload.clear();
494                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
495                         if(nflds >1){
496                                 iface_name = flds[0];
497                                 int cumm_h = 0;
498                                 int j;
499                                 for(j=1;j<nflds;++j){
500                                         int h = atoi(flds[j]);
501                                         if(h<=0)
502                                                 good_entry = false;
503                                         cumm_h += h;
504                                         hload.push_back(cumm_h);
505                                 }
506                         }else{
507                                 good_entry = false;
508                         }
509                         if(good_entry){
510                                 rts_hload[iface_name] = hload;
511                         }else{
512                                 fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
513                         }
514                 }
515         }
516
517
518
519         if(output_query_names){
520           if((query_name_output = fopen("query_names.txt","w")) == NULL){
521                 fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
522                 exit(1);
523           }
524         }
525
526         if(output_schema_summary){
527           if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
528                 fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
529                 exit(1);
530           }
531         }
532
533         if((qtree_output = fopen("qtree.xml","w")) == NULL){
534                 fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
535                 exit(1);
536         }
537         fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
538         fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
539         fprintf(qtree_output,"<QueryNodes>\n");
540
541
542 //                      Get an initial Schema
543         table_list *Schema;
544         if(!hfta_only){
545 //                      Parse the table schema definitions.
546           fta_parse_result = new fta_parse_t();
547           FtaParser_setfileinput(table_schemas_in);
548           if(FtaParserparse()){
549                 fprintf(stderr,"Table schema parse failed.\n");
550                 exit(1);
551           }
552           if(fta_parse_result->parse_type != TABLE_PARSE){
553                 fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
554                 exit(1);
555           }
556           Schema = fta_parse_result->tables;
557
558 //                      Ensure that all schema_ids, if set, are distinct.
559 //  Obsolete? There is code elsewhere to ensure that schema IDs are
560 //  distinct on a per-interface basis.
561 /*
562           set<int> found_ids;
563           set<int> dup_ids;
564           for(int t=0;t<Schema->size();++t){
565                 int sch_id = Schema->get_table(t)->get_schema_id();
566                 if(sch_id> 0){
567                         if(found_ids.find(sch_id) != found_ids.end()){
568                                 dup_ids.insert(sch_id);
569                         }else{
570                                 found_ids.insert(sch_id);
571                         }
572                 }
573                 if(dup_ids.size()>0){
574                         fprintf(stderr, "Error, the schema has duplicate schema_ids:");
575                         for(auto dit=dup_ids.begin();dit!=dup_ids.end(); ++dit)
576                                 fprintf(stderr," %d",(*dit));
577                         fprintf(stderr,"\n");
578                         exit(1);
579                 }
580           }
581 */
582
583
584 //                      Process schema field inheritance
585           int retval;
586           retval = Schema->unroll_tables(err_str);
587           if(retval){
588                 fprintf(stderr,"Error processing schema field inheritance:\n %s\n", err_str.c_str() );
589                 exit(1);
590           }
591         }else{
592 //                      hfta only => we will try to fetch schemas from the registry.
593 //                      therefore, start off with an empty schema.
594           Schema = new table_list();
595         }
596
597
598 //                      Open and parse the external functions file.
599         Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
600         if(Ext_fcnsParserin == NULL){
601                 fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
602                 Ext_fcns = new ext_fcn_list();
603         }else{
604                 if(Ext_fcnsParserparse()){
605                         fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
606                         Ext_fcns = new ext_fcn_list();
607                 }
608         }
609         if(Ext_fcns->validate_fcns(err_str)){
610                 fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
611                 exit(1);
612         }
613
614 //              Open and parse the interface resources file.
615 //      ifq_t *ifaces_db = new ifq_t();
616 //   string ierr;
617 //      if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
618 //              fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
619 //                              ifx_fname.c_str(), ierr.c_str());
620 //              exit(1);
621 //      }
622 //      if(ifaces_db->load_ifqs(ifq_fname, ierr)){
623 //              fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
624 //                              ifq_fname.c_str(), ierr.c_str());
625 //              exit(1);
626 //      }
627
628
629 //                      The LFTA code string.
630 //                      Put the standard preamble here.
631 //                      NOTE: the hash macros, fcns should go into the run time
632   map<string, string> lfta_val;
633   map<string, string> lfta_prefilter_val;
634
635   string lfta_header =
636 "#include <limits.h>\n\n"
637 "#include \"rts.h\"\n"
638 "#include \"fta.h\"\n"
639 "#include \"lapp.h\"\n"
640 "#include \"rts_udaf.h\"\n\n"
641 ;
642 // Get any locally defined parsing headers
643     glob_t glob_result;
644     memset(&glob_result, 0, sizeof(glob_result));
645
646     // do the glob operation
647     int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
648         if(return_value == 0){
649         for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
650                         char *flds[1000];
651                         int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
652                         lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n\n";
653         }
654         }else{
655                 fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
656         }
657
658 /*
659 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
660 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
661 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
662 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
663 */
664
665         lfta_header += 
666 "\n"
667 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
668 "\n"
669 "#define SLOT_FILLED 0x04\n"
670 "#define SLOT_GEN_BITS 0x03\n"
671 "#define SLOT_HASH_BITS 0xfffffff8\n"
672 "#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
673 "#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
674 "#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
675 "\n\n"
676
677 "#define lfta_BOOL_to_hash(x) (x)\n"
678 "#define lfta_USHORT_to_hash(x) (x)\n"
679 "#define lfta_UINT_to_hash(x) (x)\n"
680 "#define lfta_IP_to_hash(x) (x)\n"
681 "#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
682 "#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
683 "#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
684 "#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
685 "#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
686 "static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
687 "       gs_uint32_t i,ret=0,tmp_sum = 0;\n"
688 "       for(i=0;i<x.length;++i){\n"
689 "               tmp_sum |= (x.data[i]) << (8*(i%4));\n"
690 "               if((i%4) == 3){\n"
691 "                       ret ^= tmp_sum;\n"
692 "                       tmp_sum = 0;\n"
693 "               }\n"
694 "       }\n"
695 "       if((i%4)!=0) ret ^=tmp_sum;\n"
696 "       return(ret);\n"
697 "}\n\n\n";
698
699
700
701 //////////////////////////////////////////////////////////////////
702 /////                   Get all of the query parse trees
703
704
705   int i,p;
706   int hfta_count = 0;           // for numeric suffixes to hfta .cc files
707
708 //---------------------------
709 //              Global info needed for post processing.
710
711 //                      Set of operator views ref'd in the query set.
712         opview_set opviews;
713 //                      lftas on a per-machine basis.
714         map<string, vector<stream_query *> > lfta_mach_lists;
715         int nfiles = input_file_names.size();
716         vector<stream_query *> hfta_list;               // list of hftas.
717         map<string, stream_query *> sq_map;             // map from query name to stream query.
718
719
720 //////////////////////////////////////////
721
722 //              Open and parse the interface resources file.
723         ifq_t *ifaces_db = new ifq_t();
724     string ierr;
725         if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
726                 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
727                                 ifx_fname.c_str(), ierr.c_str());
728                 exit(1);
729         }
730         if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
731                 fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
732                                 ifq_fls[0].c_str(), ierr.c_str());
733                 exit(1);
734         }
735
736   map<string, string> qname_to_flname;  // for detecting duplicate query names
737
738
739
740 //                      Parse the files to create a vector of parse trees.
741 //                      Load qnodes with information to perform a topo sort
742 //                      based on query dependencies.
743   vector<query_node *> qnodes;                          // for topo sort.
744   map<string,int> name_node_map;                        // map query name to qnodes entry
745   for(i=0;i<input_file_names.size();i++){
746
747           if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
748                   fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
749                   continue;
750           }
751 fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
752
753 //                      Parse the FTA query
754           fta_parse_result = new fta_parse_t();
755           FtaParser_setfileinput(fta_in);
756           if(FtaParserparse()){
757                 fprintf(stderr,"FTA parse failed.\n");
758                 exit(1);
759           }
760           if(fta_parse_result->parse_type != QUERY_PARSE){
761                 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
762                 exit(1);
763           }
764
765 //                      returns a list of parse trees
766           vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
767           for(p=0;p<qlist.size();++p){
768             table_exp_t *fta_parse_tree = qlist[p];
769 //              query_parse_trees.push_back(fta_parse_tree);
770
771 //                      compute the default name -- extract from query name
772                 strcpy(tmpstr,input_file_names[i].c_str());
773                 char *qname = strrchr(tmpstr,PATH_DELIM);
774                 if(qname == NULL)
775                         qname = tmpstr;
776                 else
777                         qname++;
778                 char *qname_end = strchr(qname,'.');
779                 if(qname_end != NULL) *qname_end = '\0';
780                 string qname_str = qname;
781                 string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
782
783 //                      Deternmine visibility.  Should I be attaching all of the output methods?
784                 if(qname_to_ospec.count(imputed_qname)>0)
785                         fta_parse_tree->set_visible(true);
786                 else
787                         fta_parse_tree->set_visible(false);
788
789
790 //                              Create a manipulable repesentation of the parse tree.
791 //                              the qnode inherits the visibility assigned to the parse tree.
792             int pos = qnodes.size();
793                 qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
794                 name_node_map[ qnodes[pos]->name ] = pos;
795 //printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
796 //              qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
797 //              qfiles.push_back(i);
798
799 //                      Check for duplicate query names
800 //                                      NOTE : in hfta-only generation, I should
801 //                                      also check with the names of the registered queries.
802                 if(qname_to_flname.count(qnodes[pos]->name) > 0){
803                         fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
804                                 qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
805                         exit(1);
806                 }
807                 if(Schema->find_tbl(qnodes[pos]->name) >= 0){
808                         fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
809                                 qnodes[pos]->name.c_str(), input_file_names[i].c_str());
810                         exit(1);
811                 }
812                 qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
813
814
815         }
816   }
817
818 //              Add the library queries
819
820   int pos;
821   for(pos=0;pos<qnodes.size();++pos){
822         int fi;
823         for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
824                 string src_tbl = qnodes[pos]->refd_tbls[fi];
825                 if(qname_to_flname.count(src_tbl) == 0){
826                         int last_sep = src_tbl.find_last_of('/');
827                         if(last_sep != string::npos){
828 fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
829                                 string target_qname = src_tbl.substr(last_sep+1);
830                                 string qpathname = library_path + src_tbl + ".gsql";
831                                 if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
832                                         fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
833                                         exit(1);
834                                         fprintf(stderr,"After exit\n");
835                                 }
836 fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
837 //                      Parse the FTA query
838                                 fta_parse_result = new fta_parse_t();
839                                 FtaParser_setfileinput(fta_in);
840                                 if(FtaParserparse()){
841                                         fprintf(stderr,"FTA parse failed.\n");
842                                         exit(1);
843                                 }
844                                 if(fta_parse_result->parse_type != QUERY_PARSE){
845                                         fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
846                                         exit(1);
847                                 }
848
849                                 map<string, int> local_query_map;
850                                 vector<string> local_query_names;
851                                 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
852                                 for(p=0;p<qlist.size();++p){
853                                 table_exp_t *fta_parse_tree = qlist[p];
854                                         fta_parse_tree->set_visible(false);             // assumed to not produce output
855                                         string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
856                                         if(imputed_qname == target_qname)
857                                                 imputed_qname = src_tbl;
858                                         if(local_query_map.count(imputed_qname)>0){
859                                                 fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
860                                                 exit(1);
861                                         }
862                                         local_query_map[ imputed_qname ] = p;
863                                         local_query_names.push_back(imputed_qname);
864                                 }
865
866                                 if(local_query_map.count(src_tbl)==0){
867                                         fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
868                                         exit(1);
869                                 }
870
871                                 vector<int> worklist;
872                                 set<int> added_queries;
873                                 vector<query_node *> new_qnodes;
874                                 worklist.push_back(local_query_map[target_qname]);
875                                 added_queries.insert(local_query_map[target_qname]);
876                                 int qq;
877                                 int qpos = qnodes.size();
878                                 for(qq=0;qq<worklist.size();++qq){
879                                         int q_id = worklist[qq];
880                                         query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
881                                         new_qnodes.push_back( new_qnode);
882                                         vector<string> refd_tbls =  new_qnode->refd_tbls;
883                                         int ff;
884                                         for(ff = 0;ff<refd_tbls.size();++ff){
885                                                 if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
886
887                                                         if(name_node_map.count(refd_tbls[ff])>0){
888                                                                 fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s,  and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
889                                                                 exit(1);
890                                                         }else{
891                                                                 worklist.push_back(local_query_map[refd_tbls[ff]]);
892                                                         }
893                                                 }
894                                         }
895                                 }
896
897                                 for(qq=0;qq<new_qnodes.size();++qq){
898                                         int qpos = qnodes.size();
899                                         qnodes.push_back(new_qnodes[qq]);
900                                         name_node_map[qnodes[qpos]->name ] = qpos;
901                                         qname_to_flname[qnodes[qpos]->name ] = qpathname;
902                                 }
903                         }
904                 }
905         }
906   }
907
908
909
910
911
912
913
914
915 //---------------------------------------
916
917
918 //              Add the UDOPS.
919
920   string udop_missing_sources;
921   for(i=0;i<qnodes.size();++i){
922         int fi;
923         for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
924                 int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
925                 if(sid >= 0){
926                         if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
927                                 if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
928                                 int pos = qnodes.size();
929                                         qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
930                                         name_node_map[ qnodes[pos]->name ] = pos;
931                                         qnodes[pos]->is_externally_visible = false;   // its visible
932         //                                      Need to mark the source queries as visible.
933                                         int si;
934                                         string missing_sources = "";
935                                         for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
936                                                 string src_tbl = qnodes[pos]->refd_tbls[si];
937                                                 if(name_node_map.count(src_tbl)==0){
938                                                         missing_sources += src_tbl + " ";
939                                                 }
940                                         }
941                                         if(missing_sources != ""){
942                                                 udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
943                                         }
944                                 }
945                         }
946                 }
947         }
948   }
949   if(udop_missing_sources != ""){
950         fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
951         exit(1);
952   }
953
954
955
956 ////////////////////////////////////////////////////////////////////
957 ///                             Check parse trees to verify that some
958 ///                             global properties are met :
959 ///                             if q1 reads from q2, then
960 ///                               q2 is processed before q1
961 ///                               q1 can supply q2's parameters
962 ///                             Verify there is no cycle in the reads-from graph.
963
964 //                      Compute an order in which to process the
965 //                      queries.
966
967 //                      Start by building the reads-from lists.
968 //
969
970   for(i=0;i<qnodes.size();++i){
971         int qi, fi;
972         vector<string> refd_tbls =  qnodes[i]->refd_tbls;
973         for(fi = 0;fi<refd_tbls.size();++fi){
974                 if(name_node_map.count(refd_tbls[fi])>0){
975 //printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
976                         (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
977                 }
978         }
979   }
980
981
982 //              If one query reads the result of another,
983 //              check for parameter compatibility.  Currently it must
984 //              be an exact match.  I will move to requiring
985 //              containment after re-ordering, but will require
986 //              some analysis for code generation which is not
987 //              yet in place.
988 //printf("There are %d query nodes.\n",qnodes.size());
989
990
991   for(i=0;i<qnodes.size();++i){
992         vector<var_pair_t *> target_params  = qnodes[i]->params;
993         for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
994                 vector<var_pair_t *> source_params  = qnodes[(*si)]->params;
995                 if(target_params.size() != source_params.size()){
996                         fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
997                         exit(1);
998                 }
999                 int p;
1000                 for(p=0;p<target_params.size();++p){
1001                         if(! (target_params[p]->name == source_params[p]->name &&
1002                               target_params[p]->val == source_params[p]->val ) ){
1003                         fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1004                                 exit(1);
1005                         }
1006                 }
1007         }
1008   }
1009
1010
1011 //              Find the roots.
1012 //              Start by counting inedges.
1013   for(i=0;i<qnodes.size();++i){
1014         for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1015                 qnodes[(*si)]->n_consumers++;
1016         }
1017   }
1018
1019 //              The roots are the nodes with indegree zero.
1020   set<int> roots;
1021   for(i=0;i<qnodes.size();++i){
1022         if(qnodes[i]->n_consumers == 0){
1023                 if(qnodes[i]->is_externally_visible == false){
1024                         fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible.  Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
1025                 }
1026                 roots.insert(i);
1027         }
1028   }
1029
1030 //              Remove the parts of the subtree that produce no output.
1031   set<int> valid_roots;
1032   set<int> discarded_nodes;
1033   set<int> candidates;
1034   while(roots.size() >0){
1035         for(si=roots.begin();si!=roots.end();++si){
1036                 if(qnodes[(*si)]->is_externally_visible){
1037                         valid_roots.insert((*si));
1038                 }else{
1039                         discarded_nodes.insert((*si));
1040                         set<int>::iterator sir;
1041                         for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1042                                 qnodes[(*sir)]->n_consumers--;
1043                                 if(qnodes[(*sir)]->n_consumers == 0)
1044                                         candidates.insert( (*sir));
1045                         }
1046                 }
1047         }
1048         roots = candidates;
1049         candidates.clear();
1050   }
1051   roots = valid_roots;
1052   if(discarded_nodes.size()>0){
1053         fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
1054         int di = 0;
1055         for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
1056                 if(di>0 && (di%8)==0) fprintf(stderr,"\n");
1057                 di++;
1058                 fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
1059         }
1060         fprintf(stderr,"\n");
1061   }
1062
1063 //              Compute the sources_to set, ignoring discarded nodes.
1064   for(i=0;i<qnodes.size();++i){
1065         if(discarded_nodes.count(i)==0)
1066                 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1067                         qnodes[(*si)]->sources_to.insert(i);
1068         }
1069   }
1070
1071
1072 //              Find the nodes that are shared by multiple visible subtrees.
1073 //              THe roots become inferred visible nodes.
1074
1075 //              Find the visible nodes.
1076         vector<int> visible_nodes;
1077         for(i=0;i<qnodes.size();i++){
1078                 if(qnodes[i]->is_externally_visible){
1079                         visible_nodes.push_back(i);
1080                 }
1081         }
1082
1083 //              Find UDOPs referenced by visible nodes.
1084   list<int> workq;
1085   for(i=0;i<visible_nodes.size();++i){
1086         workq.push_back(visible_nodes[i]);
1087   }
1088   while(!workq.empty()){
1089         int node = workq.front();
1090         workq.pop_front();
1091         set<int>::iterator children;
1092         if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
1093                 qnodes[node]->is_externally_visible = true;
1094                 visible_nodes.push_back(node);
1095                 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1096                         if(qnodes[(*children)]->is_externally_visible == false){
1097                                 qnodes[(*children)]->is_externally_visible = true;
1098                                 visible_nodes.push_back((*children));
1099                         }
1100                 }
1101         }
1102         for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1103                 workq.push_back((*children));
1104         }
1105   }
1106
1107         bool done = false;
1108         while(!done){
1109 //      reset the nodes
1110                 for(i=0;i<qnodes.size();i++){
1111                         qnodes[i]->subtree_roots.clear();
1112                 }
1113
1114 //              Walk the tree defined by a visible node, not descending into
1115 //              subtrees rooted by a visible node.  Mark the node visited with
1116 //              the visible node ID.
1117                 for(i=0;i<visible_nodes.size();++i){
1118                         set<int> vroots;
1119                         vroots.insert(visible_nodes[i]);
1120                         while(vroots.size()>0){
1121                                 for(si=vroots.begin();si!=vroots.end();++si){
1122                                         qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
1123
1124                                         set<int>::iterator sir;
1125                                         for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1126                                                 if(! qnodes[(*sir)]->is_externally_visible){
1127                                                         candidates.insert( (*sir));
1128                                                 }
1129                                         }
1130                                 }
1131                                 vroots = candidates;
1132                                 candidates.clear();
1133                         }
1134                 }
1135 //              Find the nodes in multiple visible node subtrees, but with no parent
1136 //              that has is in multile visible node subtrees.  Mark these as inferred visible nodes.
1137                 done = true;    // until proven otherwise
1138                 for(i=0;i<qnodes.size();i++){
1139                         if(qnodes[i]->subtree_roots.size()>1){
1140                                 bool is_new_root = true;
1141                                 set<int>::iterator sir;
1142                                 for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
1143                                         if(qnodes[(*sir)]->subtree_roots.size()>1)
1144                                                 is_new_root = false;
1145                                 }
1146                                 if(is_new_root){
1147                                         qnodes[i]->is_externally_visible = true;
1148                                         qnodes[i]->inferred_visible_node = true;
1149                                         visible_nodes.push_back(i);
1150                                         done = false;
1151                                 }
1152                         }
1153                 }
1154         }
1155
1156
1157
1158
1159
1160 //              get visible nodes in topo ordering.
1161 //  for(i=0;i<qnodes.size();i++){
1162 //              qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
1163 //  }
1164   vector<int> process_order;
1165   while(roots.size() >0){
1166         for(si=roots.begin();si!=roots.end();++si){
1167                 if(discarded_nodes.count((*si))==0){
1168                         process_order.push_back( (*si) );
1169                 }
1170                 set<int>::iterator sir;
1171                 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1172                         qnodes[(*sir)]->n_consumers--;
1173                         if(qnodes[(*sir)]->n_consumers == 0)
1174                                 candidates.insert( (*sir));
1175                 }
1176         }
1177         roots = candidates;
1178         candidates.clear();
1179   }
1180
1181
1182 //printf("process_order.size() =%d\n",process_order.size());
1183
1184 //              Search for cyclic dependencies
1185   string found_dep;
1186   for(i=0;i<qnodes.size();++i){
1187         if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
1188                 if(found_dep.size() != 0) found_dep += ", ";
1189                 found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
1190         }
1191   }
1192   if(found_dep.size()>0){
1193         fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
1194         exit(1);
1195   }
1196
1197 //              Get a list of query sets, in the order to be processed.
1198 //              Start at visible root and do bfs.
1199 //              The query set includes queries referenced indirectly,
1200 //              as sources for user-defined operators.  These are needed
1201 //              to ensure that they are added to the schema, but are not part
1202 //              of the query tree.
1203
1204 //              stream_node_sets contains queries reachable only through the
1205 //              FROM clause, so I can tell which queries to add to the stream
1206 //              query. (DISABLED, UDOPS are integrated, does this cause problems?)
1207
1208 //                      NOTE: this code works because in order for data to be
1209 //                      read by multiple hftas, the node must be externally visible.
1210 //                      But visible nodes define roots of process sets.
1211 //                      internally visible nodes can feed data only
1212 //                      to other nodes in the same query file.
1213 //                      Therefore, any access can be restricted to a file,
1214 //                      hfta output sharing is done only on roots
1215 //                      never on interior nodes.
1216
1217
1218
1219
1220 //              Conpute the base collection of hftas.
1221   vector<hfta_node *> hfta_sets;
1222   map<string, int> hfta_name_map;
1223 //  vector< vector<int> > process_sets;
1224 //  vector< set<int> > stream_node_sets;
1225   reverse(process_order.begin(), process_order.end());  // get listing in reverse order.
1226                                                                                                                 // i.e. process leaves 1st.
1227   for(i=0;i<process_order.size();++i){
1228         if(qnodes[process_order[i]]->is_externally_visible == true){
1229 //printf("Visible.\n");
1230                 int root = process_order[i];
1231                 hfta_node *hnode = new hfta_node();
1232                 hnode->name = qnodes[root]-> name;
1233                 hnode->source_name = qnodes[root]-> name;
1234                 hnode->is_udop = qnodes[root]->is_udop;
1235                 hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
1236
1237                 vector<int> proc_list;  proc_list.push_back(root);
1238 //                      Ensure that nodes are added only once.
1239                 set<int> proc_set;      proc_set.insert(root);
1240                 roots.clear();                  roots.insert(root);
1241                 candidates.clear();
1242                 while(roots.size()>0){
1243                         for(si=roots.begin();si!=roots.end();++si){
1244 //printf("Processing root %d\n",(*si));
1245                                 set<int>::iterator sir;
1246                                 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1247 //printf("reads fom %d\n",(*sir));
1248                                         if(qnodes[(*sir)]->is_externally_visible==false){
1249                                                 candidates.insert( (*sir) );
1250                                                 if(proc_set.count( (*sir) )==0){
1251                                                         proc_set.insert( (*sir) );
1252                                                         proc_list.push_back( (*sir) );
1253                                                 }
1254                                         }
1255                                 }
1256                         }
1257                         roots = candidates;
1258                         candidates.clear();
1259                 }
1260
1261                 reverse(proc_list.begin(), proc_list.end());
1262                 hnode->query_node_indices = proc_list;
1263                 hfta_name_map[hnode->name] = hfta_sets.size();
1264                 hfta_sets.push_back(hnode);
1265         }
1266   }
1267
1268 //              Compute the reads_from / sources_to graphs for the hftas.
1269
1270   for(i=0;i<hfta_sets.size();++i){
1271         hfta_node *hnode = hfta_sets[i];
1272         for(q=0;q<hnode->query_node_indices.size();q++){
1273                 query_node *qnode = qnodes[hnode->query_node_indices[q]];
1274                 for(s=0;s<qnode->refd_tbls.size();++s){
1275                         if(hfta_name_map.count(qnode->refd_tbls[s])){
1276                                 int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
1277                                 hnode->reads_from.insert(other_hfta);
1278                                 hfta_sets[other_hfta]->sources_to.insert(i);
1279                         }
1280                 }
1281         }
1282   }
1283
1284 //              Compute a topological sort of the hfta_sets.
1285
1286   vector<int> hfta_topsort;
1287   workq.clear();
1288   int hnode_srcs[hfta_sets.size()];
1289   for(i=0;i<hfta_sets.size();++i){
1290         hnode_srcs[i] = 0;
1291         if(hfta_sets[i]->sources_to.size() == 0)
1292                 workq.push_back(i);
1293   }
1294
1295   while(! workq.empty()){
1296         int     node = workq.front();
1297         workq.pop_front();
1298         hfta_topsort.push_back(node);
1299         set<int>::iterator stsi;
1300         for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
1301                 int parent = (*stsi);
1302                 hnode_srcs[parent]++;
1303                 if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
1304                         workq.push_back(parent);
1305                 }
1306         }
1307   }
1308
1309 //              Decorate hfta nodes with the level of parallelism given as input.
1310
1311   map<string, int>::iterator msii;
1312   for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1313         string hfta_name = (*msii).first;
1314         int par = (*msii).second;
1315         if(hfta_name_map.count(hfta_name) > 0){
1316                 hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
1317         }else{
1318                 fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
1319         }
1320   }
1321
1322 //              Propagate levels of parallelism: children should have a level of parallelism
1323 //              as large as any of its parents.  Adjust children upwards to compensate.
1324 //              Start at parents and adjust children, auto-propagation will occur.
1325
1326   for(i=hfta_sets.size()-1;i>=0;i--){
1327         set<int>::iterator stsi;
1328         for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
1329                 if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
1330                         hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
1331                 }
1332         }
1333   }
1334
1335 //              Before all the name mangling, check if therey are any output_spec.cfg
1336 //              or hfta_parallelism.cfg entries that do not have a matching query.
1337
1338         string dangling_ospecs = "";
1339         for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
1340                 string oq = (*msii).first;
1341                 if(hfta_name_map.count(oq) == 0){
1342                         dangling_ospecs += " "+(*msii).first;
1343                 }
1344         }
1345         if(dangling_ospecs!=""){
1346                 fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
1347         }
1348
1349         string dangling_par = "";
1350         for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1351                 string oq = (*msii).first;
1352                 if(hfta_name_map.count(oq) == 0){
1353                         dangling_par += " "+(*msii).first;
1354                 }
1355         }
1356         if(dangling_par!=""){
1357                 fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
1358         }
1359
1360
1361
1362 //              Replicate parallelized hftas.  Do __copyX name mangling.  Adjust
1363 //              FROM clauses: retarget any name which is an internal node, and
1364 //              any which is in hfta_sets (and which is parallelized). Add Merge nodes
1365 //              when the source hfta has more parallelism than the target node.
1366 //              Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
1367
1368
1369   int n_original_hfta_sets = hfta_sets.size();
1370   for(i=0;i<n_original_hfta_sets;++i){
1371         if(hfta_sets[i]->n_parallel > 1){
1372                 hfta_sets[i]->do_generation =false;     // set the deletion flag for this entry.
1373                 set<string> local_nodes;                // names of query nodes in the hfta.
1374                 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1375                         local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
1376                 }
1377
1378                 for(p=0;p<hfta_sets[i]->n_parallel;++p){
1379                         string mangler = "__copy"+int_to_string(p);
1380                         hfta_node *par_hfta  = new hfta_node();
1381                         par_hfta->name = hfta_sets[i]->name + mangler;
1382                         par_hfta->source_name = hfta_sets[i]->name;
1383                         par_hfta->is_udop = hfta_sets[i]->is_udop;
1384                         par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
1385                         par_hfta->n_parallel = hfta_sets[i]->n_parallel;
1386                         par_hfta->parallel_idx = p;
1387
1388                         map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
1389
1390 //      Is it a UDOP?
1391                         if(hfta_sets[i]->is_udop){
1392                                 int root = hfta_sets[i]->query_node_indices[0];
1393
1394                                 string unequal_par_sources;
1395                                 set<int>::iterator rfsii;
1396                                 for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
1397                                         if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
1398                                                 unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
1399                                         }
1400                                 }
1401                                 if(unequal_par_sources != ""){
1402                                         fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
1403                                         exit(1);
1404                                 }
1405
1406                                 int rti;
1407                                 vector<string> new_sources;
1408                                 for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
1409                                         new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
1410                                 }
1411
1412                                 query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
1413                                 new_qn->name += mangler;
1414                                 new_qn->mangler = mangler;
1415                                 new_qn->refd_tbls = new_sources;
1416                                 par_hfta->query_node_indices.push_back(qnodes.size());
1417                                 par_qnode_map[new_qn->name] = qnodes.size();
1418                                 name_node_map[ new_qn->name ] = qnodes.size();
1419                                 qnodes.push_back(new_qn);
1420                         }else{
1421 //              regular query node
1422                           for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1423                                 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1424                                 table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
1425 //                                      rehome the from clause on mangled names.
1426 //                                      create merge nodes as needed for external sources.
1427                                 for(f=0;f<dup_pt->fm->tlist.size();++f){
1428                                         if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
1429                                                 dup_pt->fm->tlist[f]->schema_name += mangler;
1430                                         }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
1431 //                      Ref's an external HFTA.  No parallelism => leave as is.  Else, if level of parallelism of the two hftas is the same, rename by the mangler.  Else, there mnust be more sources, so create a merge node.
1432                                                 int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
1433                                                 if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
1434                                                         dup_pt->fm->tlist[f]->schema_name += mangler;
1435                                                 }else{
1436                                                         vector<string> src_tbls;
1437                                                         int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
1438                                                         if(stride == 0){
1439                                                                 fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
1440                                                                 exit(1);
1441                                                         }
1442                                                         for(s=0;s<stride;++s){
1443                                                                 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
1444                                                                 src_tbls.push_back(ext_src_name);
1445                                                         }
1446                                                         table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1447                                                         string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
1448                                                         dup_pt->fm->tlist[f]->schema_name = merge_node_name;
1449 //                                      Make a qnode to represent the new merge node
1450                                                         query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1451                                                         qn_pt->refd_tbls = src_tbls;
1452                                                         qn_pt->is_udop  = false;
1453                                                         qn_pt->is_externally_visible = false;
1454                                                         qn_pt->inferred_visible_node  = false;
1455                                                         par_hfta->query_node_indices.push_back(qnodes.size());
1456                                                         par_qnode_map[merge_node_name] = qnodes.size();
1457                                                         name_node_map[ merge_node_name ] = qnodes.size();
1458                                                         qnodes.push_back(qn_pt);
1459                                                 }
1460                                         }
1461                                 }
1462                                 query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
1463                                 for(f=0;f<dup_pt->fm->tlist.size();++f){
1464                                         new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
1465                                 }
1466                                 new_qn->params = qnodes[hqn_idx]->params;
1467                                 new_qn->is_udop = false;
1468                                 new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
1469                                 new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
1470                                 par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
1471                                 par_qnode_map[new_qn->name] = qnodes.size();
1472                                 name_node_map[ new_qn->name ] = qnodes.size();
1473                                 qnodes.push_back(new_qn);
1474                           }
1475                         }
1476                         hfta_name_map[par_hfta->name] = hfta_sets.size();
1477                         hfta_sets.push_back(par_hfta);
1478                 }
1479         }else{
1480 //              This hfta isn't being parallelized, but add merge nodes for any parallelized
1481 //              hfta sources.
1482                 if(!hfta_sets[i]->is_udop){
1483                   for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1484                         int hqn_idx = hfta_sets[i]->query_node_indices[h];
1485                         for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
1486                                 if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
1487 //                      Ref's an external HFTA.  No parallelism => leave as is.  Else, if level of parallelism of the two hftas is the same, rename by the mangler.  Else, there mnust be more sources, so create a merge node.
1488                                         int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
1489                                         if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
1490                                                 vector<string> src_tbls;
1491                                                 for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
1492                                                         string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
1493                                                         src_tbls.push_back(ext_src_name);
1494                                                 }
1495                                                 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1496                                                 string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
1497                                                 qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
1498 //                                      Make a qnode to represent the new merge node
1499                                                 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1500                                                 qn_pt->refd_tbls = src_tbls;
1501                                                 qn_pt->is_udop  = false;
1502                                                 qn_pt->is_externally_visible = false;
1503                                                 qn_pt->inferred_visible_node  = false;
1504                                                 hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
1505                                                 name_node_map[ merge_node_name ] = qnodes.size();
1506                                                 qnodes.push_back(qn_pt);
1507                                         }
1508                                 }
1509                         }
1510                 }
1511           }
1512         }
1513   }
1514
1515 //                      Rebuild the reads_from / sources_to lists in the qnodes
1516   for(q=0;q<qnodes.size();++q){
1517         qnodes[q]->reads_from.clear();
1518         qnodes[q]->sources_to.clear();
1519   }
1520   for(q=0;q<qnodes.size();++q){
1521         for(s=0;s<qnodes[q]->refd_tbls.size();++s){
1522                 if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
1523                         int rf = name_node_map[qnodes[q]->refd_tbls[s]];
1524                         qnodes[q]->reads_from.insert(rf);
1525                         qnodes[rf]->sources_to.insert(q);
1526                 }
1527         }
1528   }
1529
1530 //                      Rebuild the reads_from / sources_to lists in hfta_sets
1531   for(q=0;q<hfta_sets.size();++q){
1532         hfta_sets[q]->reads_from.clear();
1533         hfta_sets[q]->sources_to.clear();
1534   }
1535   for(q=0;q<hfta_sets.size();++q){
1536         for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
1537                 int node = hfta_sets[q]->query_node_indices[s];
1538                 set<int>::iterator rfsii;
1539                 for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
1540                         if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
1541                                 hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
1542                                 hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
1543                         }
1544                 }
1545         }
1546   }
1547
1548 /*
1549 for(q=0;q<qnodes.size();++q){
1550  printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
1551  set<int>::iterator rsii;
1552  for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
1553   printf(" %d",(*rsii));
1554   printf(", and sources-to %d:",qnodes[q]->sources_to.size());
1555  for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
1556   printf(" %d",(*rsii));
1557  printf("\n");
1558 }
1559
1560 for(q=0;q<hfta_sets.size();++q){
1561  if(hfta_sets[q]->do_generation==false)
1562         continue;
1563  printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
1564  set<int>::iterator rsii;
1565  for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
1566   printf(" %d",(*rsii));
1567   printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
1568  for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
1569   printf(" %d",(*rsii));
1570  printf("\n");
1571 }
1572 */
1573
1574
1575
1576 //              Re-topo sort the hftas
1577   hfta_topsort.clear();
1578   workq.clear();
1579   int hnode_srcs_2[hfta_sets.size()];
1580   for(i=0;i<hfta_sets.size();++i){
1581         hnode_srcs_2[i] = 0;
1582         if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
1583                 workq.push_back(i);
1584         }
1585   }
1586
1587   while(workq.empty() == false){
1588         int     node = workq.front();
1589         workq.pop_front();
1590         hfta_topsort.push_back(node);
1591         set<int>::iterator stsii;
1592         for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
1593                 int child = (*stsii);
1594                 hnode_srcs_2[child]++;
1595                 if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
1596                         workq.push_back(child);
1597                 }
1598         }
1599   }
1600
1601 //              Ensure that all of the query_node_indices in hfta_sets are topologically
1602 //              sorted, don't rely on assumptions that all transforms maintain some kind of order.
1603   for(i=0;i<hfta_sets.size();++i){
1604         if(hfta_sets[i]->do_generation){
1605                 map<int,int> n_accounted;
1606                 vector<int> new_order;
1607                 workq.clear();
1608                 vector<int>::iterator vii;
1609                 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1610                         n_accounted[(*vii)]= 0;
1611                 }
1612                 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1613                         set<int>::iterator rfsii;
1614                         for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
1615                                 if(n_accounted.count((*rfsii)) == 0){
1616                                         n_accounted[(*vii)]++;
1617                                 }
1618                         }
1619                         if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
1620                                 workq.push_back((*vii));
1621                         }
1622                 }
1623
1624                 while(workq.empty() == false){
1625                         int node = workq.front();
1626                         workq.pop_front();
1627                         new_order.push_back(node);
1628                         set<int>::iterator stsii;
1629                         for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
1630                                 if(n_accounted.count((*stsii))){
1631                                         n_accounted[(*stsii)]++;
1632                                         if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
1633                                                 workq.push_back((*stsii));
1634                                         }
1635                                 }
1636                         }
1637                 }
1638                 hfta_sets[i]->query_node_indices = new_order;
1639         }
1640   }
1641
1642
1643
1644
1645
1646 ///                     Global checkng is done, start the analysis and translation
1647 ///                     of the query parse tree in the order specified by process_order
1648
1649
1650 //                      Get a list of the LFTAs for global lfta optimization
1651 //                              TODO: separate building operators from spliting lftas,
1652 //                                      that will make optimizations such as predicate pushing easier.
1653         vector<stream_query *> lfta_list;
1654         stream_query *rootq;
1655     int qi,qj;
1656
1657         map<string, map<int, string> > schema_of_schemaid; // ensure that schema IDs are unique to an <interface, schema>
1658
1659         for(qi=hfta_topsort.size()-1;qi>=0;--qi){
1660
1661         int hfta_id = hfta_topsort[qi];
1662     vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
1663
1664
1665
1666 //              Two possibilities, either its a UDOP, or its a collection of queries.
1667 //      if(qnodes[curr_list.back()]->is_udop)
1668         if(hfta_sets[hfta_id]->is_udop){
1669                 int node_id = curr_list.back();
1670                 int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
1671                 opview_entry *opv = new opview_entry();
1672
1673 //                      Many of the UDOP properties aren't currently used.
1674                 opv->parent_qname = "no_parent";
1675                 opv->root_name = qnodes[node_id]->name;
1676                 opv->view_name = qnodes[node_id]->file;
1677                 opv->pos = qi;
1678                 sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
1679                 opv->udop_alias = tmpstr;
1680                 opv->mangler = qnodes[node_id]->mangler;
1681
1682                 if(opv->mangler != ""){
1683                         int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
1684                         Schema->mangle_subq_names(new_udop_schref,opv->mangler);
1685                 }
1686
1687 //                      This piece of code makes each hfta which referes to the same udop
1688 //                      reference a distinct running udop.  Do this at query optimization time?
1689 //              fmtbl->set_udop_alias(opv->udop_alias);
1690
1691                 opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
1692                 opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
1693
1694                 vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
1695                 int s,f,q;
1696                 for(s=0;s<subq.size();++s){
1697 //                              Validate that the fields match.
1698                         subquery_spec *sqs = subq[s];
1699                         string subq_name = sqs->name + opv->mangler;
1700                         vector<field_entry *> flds = Schema->get_fields(subq_name);
1701                         if(flds.size() == 0){
1702                                 fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
1703                                 return(1);
1704                         }
1705                         if(flds.size() < sqs->types.size()){
1706                                 fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
1707                                 return(1);
1708                         }
1709                         bool failed = false;
1710                         for(f=0;f<sqs->types.size();++f){
1711                                 data_type dte(sqs->types[f],sqs->modifiers[f]);
1712                                 data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
1713                                 if(! dte.subsumes_type(&dtf) ){
1714                                         fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
1715                                         failed = true;
1716                                 }
1717 /*
1718                                 if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
1719                                         string pstr = dte.get_temporal_string();
1720                                         fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
1721                                         failed = true;
1722                                 }
1723 */
1724                         }
1725                         if(failed)
1726                                 return(1);
1727 ///                             Validation done, find the subquery, make a copy of the
1728 ///                             parse tree, and add it to the return list.
1729                         for(q=0;q<qnodes.size();++q)
1730                                 if(qnodes[q]->name == subq_name)
1731                                         break;
1732                         if(q==qnodes.size()){
1733                                 fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
1734                                 return(1);
1735                         }
1736
1737                 }
1738
1739 //                      Cross-link to from entry(s) in all sourced-to tables.
1740                 set<int>::iterator sii;
1741                 for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
1742 //printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
1743                         vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
1744                         int ii;
1745                         for(ii=0;ii<tblvars.size();++ii){
1746                                 if(tblvars[ii]->schema_name == opv->root_name){
1747                                         tblvars[ii]->set_opview_idx(opviews.size());
1748                                 }
1749
1750                         }
1751                 }
1752
1753                 opviews.append(opv);
1754         }else{
1755
1756 //                      Analyze the parse trees in this query,
1757 //                      put them in rootq
1758 //      vector<int> curr_list = process_sets[qi];
1759
1760
1761 ////////////////////////////////////////
1762
1763           rootq = NULL;
1764 //printf("Process set %d, has %d queries\n",qi,curr_list.size());
1765           for(qj=0;qj<curr_list.size();++qj){
1766                 i = curr_list[qj];
1767         fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
1768
1769 //                      Select the current query parse tree
1770                 table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
1771
1772 //                      if hfta only, try to fetch any missing schemas
1773 //                      from the registry (using the print_schema program).
1774 //                      Here I use a hack to avoid analyzing the query -- all referenced
1775 //                      tables must be in the from clause
1776 //                      If there is a problem loading any table, just issue a warning,
1777 //
1778                 tablevar_list_t *fm = fta_parse_tree->get_from();
1779                 vector<string> refd_tbls =  fm->get_src_tbls(Schema);
1780 //                      iterate over all referenced tables
1781                 int t;
1782                 for(t=0;t<refd_tbls.size();++t){
1783                   int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
1784
1785                   if(tbl_ref < 0){      // if this table is not in the Schema
1786
1787                         if(hfta_only){
1788                                 string cmd="print_schema "+refd_tbls[t];
1789                                 FILE *schema_in = popen(cmd.c_str(), "r");
1790                                 if(schema_in == NULL){
1791                                   fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
1792                                 }else{
1793                                   string schema_instr;
1794                                   while(fgets(tmpstr,TMPSTRLEN,schema_in)){
1795                                         schema_instr += tmpstr;
1796                                   }
1797                           fta_parse_result = new fta_parse_t();
1798                                   strcpy(tmp_schema_str,schema_instr.c_str());
1799                                   FtaParser_setstringinput(tmp_schema_str);
1800                           if(FtaParserparse()){
1801                                 fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
1802                           }else{
1803                                         if( fta_parse_result->tables != NULL){
1804                                                 int tl;
1805                                                 for(tl=0;tl<fta_parse_result->tables->size();++tl){
1806                                                         Schema->add_table(fta_parse_result->tables->get_table(tl));
1807                                                 }
1808                                         }else{
1809                                                 fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
1810                                         }
1811                                 }
1812                         }
1813                   }else{
1814                                 fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
1815                                 exit(1);
1816                   }
1817
1818                 }
1819           }
1820
1821
1822 //                              Analyze the query.
1823           query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
1824           if(qs == NULL){
1825                 fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
1826                 exit(1);
1827           }
1828
1829           stream_query new_sq(qs, Schema);
1830           if(new_sq.error_code){
1831                         fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
1832                         exit(1);
1833           }
1834
1835 //                      Add it to the Schema
1836           table_def *output_td = new_sq.get_output_tabledef();
1837           Schema->add_table(output_td);
1838
1839 //                      Create a query plan from the analyzed parse tree.
1840 //                      If its a query referneced via FROM, add it to the stream query.
1841           if(rootq){
1842                 rootq->add_query(new_sq);
1843           }else{
1844                 rootq = new stream_query(new_sq);
1845 //                      have the stream query object inherit properties form the analyzed
1846 //                      hfta_node object.
1847                 rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
1848                 rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
1849           }
1850
1851
1852     }
1853
1854 //              This stream query has all its parts
1855 //              Build and optimize it.
1856 //printf("translate_fta: generating plan.\n");
1857         if(rootq->generate_plan(Schema)){
1858                 fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
1859                 continue;
1860         }
1861
1862 //      If we've found the query plan head, so now add the output operators
1863         if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
1864                 pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
1865                 multimap<string, int>::iterator mmsi;
1866                 oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
1867                 for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
1868                         rootq->add_output_operator(output_specs[(*mmsi).second]);
1869                 }
1870         }
1871
1872
1873
1874 //                              Perform query splitting if necessary.
1875         bool hfta_returned;
1876     vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
1877
1878         int l;
1879 //for(l=0;l<split_queries.size();++l){
1880 //printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
1881 //}
1882
1883
1884
1885
1886     if(split_queries.size() > 0){       // should be at least one component.
1887
1888 //                              Compute the number of LFTAs.
1889           int n_lfta = split_queries.size();
1890           if(hfta_returned) n_lfta--;
1891 //                              Check if a schemaId constraint needs to be inserted.
1892
1893 //                              Process the LFTA components.
1894           for(l=0;l<n_lfta;++l){
1895            if(lfta_names.count(split_queries[l]->query_name) == 0){
1896 //                              Grab the lfta for global optimization.
1897                 vector<tablevar_t *> tvec =  split_queries[l]->query_plan[0]->get_input_tbls();
1898                 string liface = tvec[0]->get_interface();       // iface queries have been resolved
1899                 string lmach = tvec[0]->get_machine();
1900
1901                 vector<predicate_t *> schemaid_preds;
1902                 for(int irv=0;irv<tvec.size();++irv){
1903
1904                         string schema_name = tvec[irv]->get_schema_name();
1905                         string rvar_name = tvec[irv]->get_var_name();
1906                         int schema_ref = tvec[irv]->get_schema_ref();
1907                         if (lmach == "")
1908                                 lmach = hostname;
1909                         interface_names.push_back(liface);
1910                         machine_names.push_back(lmach);
1911 //printf("Machine is %s\n",lmach.c_str());
1912
1913 //                              Check if a schemaId constraint needs to be inserted.
1914                         if(schema_ref<0){ // can result from some kinds of splits
1915                                 schema_ref = Schema->get_table_ref(schema_name);
1916                         }
1917                         int schema_id = Schema->get_schema_id(schema_ref);  // id associated with PROTOCOL
1918                         int errnum = 0;
1919                         string if_error;
1920                         iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
1921                         if(iface==NULL){
1922                                 fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
1923                                 exit(1);
1924                         }       
1925                         if(iface->has_multiple_schemas()){
1926                                 if(schema_id<0){        // invalid schema_id
1927                                         fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
1928                                         exit(1);
1929                                 }
1930                                 vector<string> iface_schemas = iface->get_property("Schemas");
1931                                 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1932                                         fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1933                                         exit(1);
1934                                 }
1935 // Ensure that in liface, schema_id is used for only one schema
1936                                 if(schema_of_schemaid.count(liface)==0){
1937                                         map<int, string> empty_map;
1938                                         schema_of_schemaid[liface] = empty_map;
1939                                 }
1940                                 if(schema_of_schemaid[liface].count(schema_id)==0){
1941                                         schema_of_schemaid[liface][schema_id] = schema_name;
1942                                 }else{
1943                                         if(schema_of_schemaid[liface][schema_id] != schema_name){
1944                                                 fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
1945                                                 exit(1);
1946                                         }
1947                                 }
1948                         }else{  // single-schema interface
1949                                 schema_id = -1; // don't generate schema_id predicate
1950                                 vector<string> iface_schemas = iface->get_property("Schemas");
1951                                 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1952                                         fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1953                                         exit(1);
1954                                 }
1955                                 if(iface_schemas.size()>1){
1956                                         fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
1957                                         exit(1);
1958                                 }
1959                         }                       
1960
1961 // If we need to check the schema_id, insert a predicate into the lfta.
1962 //       TODO not just schema_id, the full all_schema_ids set.
1963                         if(schema_id>=0){
1964                                 colref_t *schid_cr = new colref_t("schemaId");
1965                                 schid_cr->schema_ref = schema_ref;
1966                                 schid_cr->table_name = rvar_name;
1967                                 schid_cr->tablevar_ref = 0;
1968                                 schid_cr->default_table = false;
1969                                 scalarexp_t *schid_se = new scalarexp_t(schid_cr);
1970                                 data_type *schid_dt = new data_type("uint");
1971                                 schid_se->dt = schid_dt;
1972
1973                                 string schid_str = int_to_string(schema_id);
1974                                 literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
1975                                 scalarexp_t *lit_se = new scalarexp_t(schid_lit);
1976                                 lit_se->dt = schid_dt;
1977
1978                                 predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
1979                                 vector<cnf_elem *> clist;
1980                                 make_cnf_from_pr(schid_pr, clist);
1981                                 analyze_cnf(clist[0]);
1982                                 clist[0]->cost = 1;     // cheap one comparison
1983 // cnf built, now insert it.
1984                                 split_queries[l]->query_plan[0]->append_to_where(clist[0]);
1985
1986 // Specialized processing ... currently filter join
1987                                 string node_type = split_queries[l]->query_plan[0]->node_type();
1988                                 if(node_type == "filter_join"){
1989                                         filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
1990                                         if(irv==0){
1991                                                 fj->pred_t0.push_back(clist[0]);
1992                                         }else{
1993                                                 fj->pred_t1.push_back(clist[0]);
1994                                         }
1995                                         schemaid_preds.push_back(schid_pr);
1996                                 }
1997                         }
1998                 }
1999 // Specialized processing, currently filter join.
2000                 if(schemaid_preds.size()>0){
2001                         string node_type = split_queries[l]->query_plan[0]->node_type();
2002                         if(node_type == "filter_join"){
2003                                 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2004                                 predicate_t *filter_pr = new predicate_t("OR", schemaid_preds[0], schemaid_preds[1]);
2005                                 vector<cnf_elem *> clist;
2006                                 make_cnf_from_pr(filter_pr, clist);
2007                                 analyze_cnf(clist[0]);
2008                                 clist[0]->cost = 1;     // cheap one comparison
2009                                 fj->shared_pred.push_back(clist[0]);
2010                         }
2011                 }
2012                         
2013
2014
2015
2016
2017
2018
2019 //                      Set the ht size from the recommendation, if there is one in the rec file
2020                 if(lfta_htsize.count(split_queries[l]->query_name)>0){
2021                         split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
2022                 }
2023
2024
2025                 lfta_names[split_queries[l]->query_name] = lfta_list.size();
2026                 split_queries[l]->set_gid(lfta_list.size());  // set lfta global id
2027                 lfta_list.push_back(split_queries[l]);
2028                 lfta_mach_lists[lmach].push_back(split_queries[l]);
2029
2030 //                      THe following is a hack,
2031 //                      as I should be generating LFTA code through
2032 //                      the stream_query object.
2033
2034                 split_queries[l]->query_plan[0]->bind_to_schema(Schema);
2035
2036 //              split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
2037
2038 /*
2039 //                              Create query description to embed in lfta.c
2040                 string lfta_schema_str = split_queries[l]->make_schema();
2041                 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2042
2043 //                              get NIC capabilities.
2044                 int erri;
2045                 nic_property *nicprop = NULL;
2046                 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2047                 if(iface_codegen_type.size()){
2048                         nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2049                         if(!nicprop){
2050                                 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2051                                         exit(1);
2052                         }
2053                 }
2054
2055                 lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
2056 */
2057
2058                 snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
2059                 query_names.push_back(split_queries[l]->query_name);
2060                 mach_query_names[lmach].push_back(query_names.size()-1);
2061 //                      NOTE: I will assume a 1-1 correspondance between
2062 //                      mach_query_names[lmach] and lfta_mach_lists[lmach]
2063 //                      where mach_query_names[lmach][i] contains the index into
2064 //                      query_names, which names the lfta, and
2065 //                      mach_query_names[lmach][i] is the stream_query * of the
2066 //                      corresponding lfta.
2067 //                      Later, lfta_iface_qnames are the query names matching lfta_iface_lists
2068
2069
2070
2071                 // check if lfta is reusable
2072                 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
2073
2074                 bool lfta_reusable = false;
2075                 if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
2076                         split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
2077                         lfta_reusable = true;
2078                 }
2079                 lfta_reuse_options.push_back(lfta_reusable);
2080
2081                 // LFTA will inherit the liveness timeout specification from the containing query
2082                 // it is too conservative as lfta are expected to spend less time per tuple
2083                 // then full query
2084
2085                 // extract liveness timeout from query definition
2086                 int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
2087                 if (!liveness_timeout) {
2088 //                  fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
2089 //                    split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2090                     liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2091                 }
2092                 lfta_liveness_timeouts.push_back(liveness_timeout);
2093
2094 //                      Add it to the schema
2095                 table_def *td = split_queries[l]->get_output_tabledef();
2096                 Schema->append_table(td);
2097 //printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
2098
2099           }
2100          }
2101
2102 //                              If the output is lfta-only, dump out the query name.
2103       if(split_queries.size() == 1 && !hfta_returned){
2104         if(output_query_names ){
2105            fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
2106                 }
2107 /*
2108 else{
2109            fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
2110                 }
2111 */
2112
2113 /*
2114 //                              output schema summary
2115                 if(output_schema_summary){
2116                         dump_summary(split_queries[0]);
2117                 }
2118 */
2119       }
2120
2121
2122           if(hfta_returned){            // query also has an HFTA component
2123                 int hfta_nbr = split_queries.size()-1;
2124
2125                         hfta_list.push_back(split_queries[hfta_nbr]);
2126
2127 //                                      report on generated query names
2128         if(output_query_names){
2129                         string hfta_name =split_queries[hfta_nbr]->query_name;
2130                 fprintf(query_name_output,"%s H\n",hfta_name.c_str());
2131                         for(l=0;l<hfta_nbr;++l){
2132                                 string lfta_name =split_queries[l]->query_name;
2133                         fprintf(query_name_output,"%s L\n",lfta_name.c_str());
2134                         }
2135                 }
2136 //              else{
2137 //              fprintf(stderr,"query names are ");
2138 //                      for(l=0;l<hfta_nbr;++l){
2139 //                              if(l>0) fprintf(stderr,",");
2140 //                              string fta_name =split_queries[l]->query_name;
2141 //                      fprintf(stderr," %s",fta_name.c_str());
2142 //                      }
2143 //                      fprintf(stderr,"\n");
2144 //              }
2145           }
2146
2147   }else{
2148           fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
2149           fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
2150           exit(1);
2151   }
2152  }
2153 }
2154
2155
2156 //-----------------------------------------------------------------
2157 //              Compute and propagate the SE in PROTOCOL fields compute a field.
2158 //-----------------------------------------------------------------
2159
2160 for(i=0;i<lfta_list.size();i++){
2161         lfta_list[i]->generate_protocol_se(sq_map, Schema);
2162         sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
2163 }
2164 for(i=0;i<hfta_list.size();i++){
2165         hfta_list[i]->generate_protocol_se(sq_map, Schema);
2166         sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
2167 }
2168
2169
2170
2171 //------------------------------------------------------------------------
2172 //              Perform  individual FTA optimizations
2173 //-----------------------------------------------------------------------
2174
2175 if (partitioned_mode) {
2176
2177         // open partition definition file
2178         string part_fname = config_dir_path + "partition.txt";
2179
2180         FILE* partfd = fopen(part_fname.c_str(), "r");
2181         if (!partfd) {
2182                 fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
2183                 exit(1);
2184         }
2185         PartnParser_setfileinput(partfd);
2186         if (PartnParserparse()) {
2187                 fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
2188                 exit(1);
2189         }
2190         fclose(partfd);
2191 }
2192
2193
2194 print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
2195
2196 int num_hfta = hfta_list.size();
2197 for(i=0; i < hfta_list.size(); ++i){
2198         hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
2199 }
2200
2201 //                      Add all new hftas to schema
2202 for(i=num_hfta; i < hfta_list.size(); ++i){
2203                 table_def *td = hfta_list[i]->get_output_tabledef();
2204                 Schema->append_table(td);
2205 }
2206
2207 print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
2208
2209
2210
2211 //------------------------------------------------------------------------
2212 //              Do global (cross-fta) optimization
2213 //-----------------------------------------------------------------------
2214
2215
2216
2217
2218
2219
2220 set<string> extra_external_libs;
2221
2222 for(i=0;i<hfta_list.size();++i){                // query also has an HFTA component
2223
2224                 if(! debug_only){
2225 //                                      build hfta file name, create output
2226            if(numeric_hfta_flname){
2227             sprintf(tmpstr,"hfta_%d",hfta_count);
2228                         hfta_names.push_back(tmpstr);
2229             sprintf(tmpstr,"hfta_%d.cc",hfta_count);
2230          }else{
2231             sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
2232                         hfta_names.push_back(tmpstr);
2233             sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
2234           }
2235                   FILE *hfta_fl = fopen(tmpstr,"w");
2236                   if(hfta_fl == NULL){
2237                         fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
2238                         exit(1);
2239                   }
2240                   fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
2241
2242 //                      If there is a field verifier, warn about
2243 //                      lack of compatability
2244 //                              NOTE : this code assumes that visible non-lfta queries
2245 //                              are those at the root of a stream query.
2246                   string hfta_comment;
2247                   string hfta_title;
2248                   string hfta_namespace;
2249                   if(hfta_list[i]->defines.count("comment")>0)
2250                         hfta_comment = hfta_list[i]->defines["comment"];
2251                   if(hfta_list[i]->defines.count("Comment")>0)
2252                         hfta_comment = hfta_list[i]->defines["Comment"];
2253                   if(hfta_list[i]->defines.count("COMMENT")>0)
2254                         hfta_comment = hfta_list[i]->defines["COMMENT"];
2255                   if(hfta_list[i]->defines.count("title")>0)
2256                         hfta_title = hfta_list[i]->defines["title"];
2257                   if(hfta_list[i]->defines.count("Title")>0)
2258                         hfta_title = hfta_list[i]->defines["Title"];
2259                   if(hfta_list[i]->defines.count("TITLE")>0)
2260                         hfta_title = hfta_list[i]->defines["TITLE"];
2261                   if(hfta_list[i]->defines.count("namespace")>0)
2262                         hfta_namespace = hfta_list[i]->defines["namespace"];
2263                   if(hfta_list[i]->defines.count("Namespace")>0)
2264                         hfta_namespace = hfta_list[i]->defines["Namespace"];
2265                   if(hfta_list[i]->defines.count("NAMESPACE")>0)
2266                         hfta_namespace = hfta_list[i]->defines["NAMESPACE"];
2267
2268                   if(field_verifier != NULL){
2269                         string warning_str;
2270                         if(hfta_comment == "")
2271                                 warning_str += "\tcomment not found.\n";
2272
2273 // Obsolete stuff that Carsten wanted
2274 //                      if(hfta_title == "")
2275 //                              warning_str += "\ttitle not found.\n";
2276 //                      if(hfta_namespace == "")
2277 //                              warning_str += "\tnamespace not found.\n";
2278
2279 // STOPPED HERE
2280 //      There is a get_tbl_keys method implemented for qp_nodes,
2281 //      integrate it into steam_query, then call it to find keys,
2282 //      and annotate feidls with their key-ness.
2283 //      If there is a "keys" proprty in the defines block, override anything returned
2284 //      from the automated analysis
2285
2286                         vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2287                         int fi;
2288                         for(fi=0;fi<flds.size();fi++){
2289                                 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2290                         }
2291                         if(warning_str != "")
2292                                 fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
2293                                         hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
2294                   }
2295
2296 // Get the fields in this query
2297                   vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2298
2299 // do key processing
2300                   string hfta_keys_s = "";
2301                   if(hfta_list[i]->defines.count("keys")>0)
2302                         hfta_keys_s = hfta_list[i]->defines["keys"];
2303                   if(hfta_list[i]->defines.count("Keys")>0)
2304                         hfta_keys_s = hfta_list[i]->defines["Keys"];
2305                   if(hfta_list[i]->defines.count("KEYS")>0)
2306                         hfta_keys_s = hfta_list[i]->defines["KEYS"];
2307                   string xtra_keys_s = "";
2308                   if(hfta_list[i]->defines.count("extra_keys")>0)
2309                         xtra_keys_s = hfta_list[i]->defines["extra_keys"];
2310                   if(hfta_list[i]->defines.count("Extra_Keys")>0)
2311                         xtra_keys_s = hfta_list[i]->defines["Extra_Keys"];
2312                   if(hfta_list[i]->defines.count("EXTRA_KEYS")>0)
2313                         xtra_keys_s = hfta_list[i]->defines["EXTRA_KEYS"];
2314 // get the keys
2315                   vector<string> hfta_keys;
2316                   vector<string> partial_keys;
2317                   vector<string> xtra_keys;
2318                   if(hfta_keys_s==""){
2319                                 hfta_keys = hfta_list[i]->get_tbl_keys(partial_keys);
2320                                 if(xtra_keys_s.size()>0){
2321                                         xtra_keys = split_string(xtra_keys_s, ',');
2322                                 }
2323                                 for(int xi=0;xi<xtra_keys.size();++xi){
2324                                         if(find(hfta_keys.begin(), hfta_keys.end(), xtra_keys[xi]) == hfta_keys.end()){
2325                                                 hfta_keys.push_back(xtra_keys[xi]);
2326                                         }
2327                                 }
2328                   }else{
2329                                 hfta_keys = split_string(hfta_keys_s, ',');
2330                   }
2331 // validate that all of the keys exist in the output.
2332 //      (exit on error, as its a bad specificiation)
2333                   vector<string> missing_keys;
2334                   for(int ki=0;ki<hfta_keys.size(); ++ki){
2335                         int fi;
2336                         for(fi=0;fi<flds.size();++fi){
2337                                 if(hfta_keys[ki] == flds[fi]->get_name())
2338                                         break;
2339                         }
2340                         if(fi==flds.size())
2341                                 missing_keys.push_back(hfta_keys[ki]);
2342                   }
2343                   if(missing_keys.size()>0){
2344                         fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the outputg:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2345                         for(int hi=0; hi<missing_keys.size(); ++hi){
2346                                 fprintf(stderr," %s", missing_keys[hi].c_str());
2347                         }
2348                         fprintf(stderr,"\n");
2349                         exit(1);
2350                   }
2351
2352                   fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2353                   if(hfta_comment != "")
2354                         fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
2355                   if(hfta_title != "")
2356                         fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
2357                   if(hfta_namespace != "")
2358                         fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
2359                   fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
2360                   fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2361
2362 //                              write info about fields to qtree.xml
2363                   int fi;
2364                   for(fi=0;fi<flds.size();fi++){
2365                         fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2366                         if(flds[fi]->get_modifier_list()->size()){
2367                                 fprintf(qtree_output,"mods='%s'", flds[fi]->get_modifier_list()->to_string().c_str());
2368                         }
2369                         fprintf(qtree_output," />\n");
2370                   }
2371 // info about keys
2372                   for(int hi=0;hi<hfta_keys.size();++hi){
2373                         fprintf(qtree_output, "\t\t<Key value='%s'/>\n",hfta_keys[hi].c_str());
2374                   }
2375                   for(int hi=0;hi<partial_keys.size();++hi){
2376                         fprintf(qtree_output, "\t\t<PartialKey value='%s'/>\n",partial_keys[hi].c_str());
2377                   }
2378                   for(int hi=0;hi<xtra_keys.size();++hi){
2379                         fprintf(qtree_output, "\t\t<ExtraKey value='%s'/>\n",xtra_keys[hi].c_str());
2380                   }
2381
2382
2383                   // extract liveness timeout from query definition
2384                   int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
2385                   if (!liveness_timeout) {
2386 //                  fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
2387 //                    hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2388                     liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2389                   }
2390                   fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
2391
2392                   vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
2393                   int itv;
2394                   for(itv=0;itv<tmp_tv.size();++itv){
2395                         fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
2396                   }
2397                   string ifrs = hfta_list[i]->collect_refd_ifaces();
2398                   if(ifrs != ""){
2399                         fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
2400                   }
2401                   fprintf(qtree_output,"\t</HFTA>\n");
2402
2403                   fclose(hfta_fl);
2404                 }else{
2405 //                                      debug only -- do code generation to catch generation-time errors.
2406                   hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
2407                 }
2408
2409                 hfta_count++;   // for hfta file names with numeric suffixes
2410
2411                 hfta_list[i]->get_external_libs(extra_external_libs);
2412
2413           }
2414
2415 string ext_lib_string;
2416 set<string>::iterator ssi_el;
2417 for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
2418         ext_lib_string += (*ssi_el)+" ";
2419
2420
2421
2422 //                      Report on the set of operator views
2423   for(i=0;i<opviews.size();++i){
2424         opview_entry *opve = opviews.get_entry(i);
2425         fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
2426         fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
2427         fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
2428         fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
2429         fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2430
2431         if (!opve->liveness_timeout) {
2432 //              fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
2433 //                      opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2434                 opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2435         }
2436         fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
2437     int j;
2438         for(j=0;j<opve->subq_names.size();j++)
2439                 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
2440         fprintf(qtree_output,"\t</UDOP>\n");
2441   }
2442
2443
2444 //-----------------------------------------------------------------
2445
2446 //                      Create interface-specific meta code files.
2447 //                              first, open and parse the interface resources file.
2448         ifaces_db = new ifq_t();
2449     ierr = "";
2450         if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
2451                 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
2452                                 ifx_fname.c_str(), ierr.c_str());
2453                 exit(1);
2454         }
2455
2456         map<string, vector<stream_query *> >::iterator svsi;
2457         for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
2458                 string lmach = (*svsi).first;
2459
2460         //              For this machine, create a set of lftas per interface.
2461                 vector<stream_query *> mach_lftas = (*svsi).second;
2462                 map<string, vector<stream_query *> > lfta_iface_lists;
2463                 int li;
2464                 for(li=0;li<mach_lftas.size();++li){
2465                         vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2466                         string lfta_iface = tvec[0]->get_interface();
2467                         lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2468                 }
2469
2470                 map<string, vector<stream_query *> >::iterator lsvsi;
2471                 for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
2472                         int erri;
2473                         string liface = (*lsvsi).first;
2474                         vector<stream_query *> iface_lftas = (*lsvsi).second;
2475                         vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2476                         if(iface_codegen_type.size()){
2477                                 nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2478                                 if(!nicprop){
2479                                         fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2480                                         exit(1);
2481                                 }
2482                                 string mcs = generate_nic_code(iface_lftas, nicprop);
2483                                 string mcf_flnm;
2484                                 if(lmach != "")
2485                                   mcf_flnm = lmach + "_"+liface+".mcf";
2486                                 else
2487                                   mcf_flnm = hostname + "_"+liface+".mcf";
2488                                 FILE *mcf_fl ;
2489                                 if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
2490                                         fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
2491                                         exit(1);
2492                                 }
2493                                 fprintf(mcf_fl,"%s",mcs.c_str());
2494                                 fclose(mcf_fl);
2495 //printf("mcs of machine %s, iface %s of type %s is \n%s\n",
2496 //lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
2497                         }
2498                 }
2499
2500
2501         }
2502
2503
2504
2505 //-----------------------------------------------------------------
2506
2507
2508 //                      Find common filter predicates in the LFTAs.
2509 //                      in addition generate structs to store the temporal attributes unpacked by prefilter
2510         
2511         map<string, vector<stream_query *> >::iterator ssqi;
2512         for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
2513
2514                 string lmach = (*ssqi).first;
2515                 bool packed_return = false;
2516                 int li, erri;
2517
2518
2519 //      The LFTAs of this machine.
2520                 vector<stream_query *> mach_lftas = (*ssqi).second;
2521 //      break up on a per-interface basis.
2522                 map<string, vector<stream_query *> > lfta_iface_lists;
2523                 map<string, vector<int> > lfta_iface_qname_ix; // need the query name
2524                                                         // for fta_init
2525                 for(li=0;li<mach_lftas.size();++li){
2526                         vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2527                         string lfta_iface = tvec[0]->get_interface();
2528                         lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2529                         lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
2530                 }
2531
2532
2533 //      Are the return values "packed"?
2534 //      This should be done on a per-interface basis.
2535 //      But this is defunct code for gs-lite
2536                 for(li=0;li<mach_lftas.size();++li){
2537                   vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2538                   string liface = tvec[0]->get_interface();
2539                   vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2540                   if(iface_codegen_type.size()){
2541                         if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
2542                           packed_return = true;
2543                         }
2544                   }
2545                 }
2546
2547
2548 // Separate lftas by interface, collect results on a per-interface basis.
2549
2550                 vector<cnf_set *> no_preds;     // fallback if there is no prefilter
2551                 map<string, vector<cnf_set *> > prefilter_preds;
2552                 set<unsigned int> pred_ids;     // this can be global for all interfaces
2553                 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2554                         string liface = (*mvsi).first;
2555                         vector<cnf_set *> empty_list;
2556                         prefilter_preds[liface] = empty_list;
2557                         if(! packed_return){
2558                                 get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
2559                         }
2560
2561 //                              get NIC capabilities.  (Is this needed?)
2562                         nic_property *nicprop = NULL;
2563                         vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2564                         if(iface_codegen_type.size()){
2565                                 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2566                                 if(!nicprop){
2567                                         fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2568                                         exit(1);
2569                                 }
2570                         }
2571                 }
2572
2573
2574 //              Now that we know the prefilter preds, generate the lfta code.
2575 //      Do this for all lftas in this machine.
2576                 for(li=0;li<mach_lftas.size();++li){
2577                         set<unsigned int> subsumed_preds;
2578                         set<unsigned int>::iterator sii;
2579 #ifdef PREFILTER_OK
2580                         for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
2581                                 int pid = (*sii);
2582                                 if((pid>>16) == li){
2583                                         subsumed_preds.insert(pid & 0xffff);
2584                                 }
2585                         }
2586 #endif
2587                         string lfta_schema_str = mach_lftas[li]->make_schema();
2588                         string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2589                         nic_property *nicprop = NULL; // no NIC properties?
2590                         lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
2591                 }
2592
2593
2594 //                      generate structs to store the temporal attributes
2595 //                      unpacked by prefilter
2596                 col_id_set temp_cids;
2597                 get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
2598                 lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
2599
2600 //                      Compute the lfta bit signatures and the lfta colrefs
2601 //      do this on a per-interface basis
2602 #ifdef PREFILTER_OK
2603                         lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2604 #endif
2605                 map<string, vector<long long int> > lfta_sigs; // used again later
2606                 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2607                         string liface = (*mvsi).first;
2608                         vector<long long int> empty_list;
2609                         lfta_sigs[liface] = empty_list;
2610
2611                         vector<col_id_set> lfta_cols;
2612                         vector<int> lfta_snap_length;
2613                         for(li=0;li<lfta_iface_lists[liface].size();++li){
2614                                 unsigned long long int mask=0, bpos=1;
2615                                 int f_pos;
2616                                 for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
2617                                         if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
2618                                                 mask |= bpos;
2619                                         bpos = bpos << 1;
2620                                 }
2621                                 lfta_sigs[liface].push_back(mask);
2622                                 lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
2623                                 lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
2624                         }
2625
2626 //for(li=0;li<mach_lftas.size();++li){
2627 //printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
2628 //col_id_set::iterator tcisi;
2629 //for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
2630 //printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
2631 //}
2632 //}
2633
2634
2635 //                      generate the prefilter
2636 //      Do this on a per-interface basis, except for the #define
2637 #ifdef PREFILTER_OK
2638 //                      lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2639                         lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2640 #else
2641                         lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns,  lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2642
2643 #endif
2644                 }
2645
2646 //                      Generate interface parameter lookup function
2647           lfta_val[lmach] += "// lookup interface properties by name\n";
2648           lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
2649           lfta_val[lmach] += "// returns NULL if given property does not exist\n";
2650           lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
2651
2652 //        collect a lit of interface names used by queries running on this host
2653           set<std::string> iface_names;
2654           for(i=0;i<mach_query_names[lmach].size();i++){
2655                 int mi = mach_query_names[lmach][i];
2656                 stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2657
2658                 if(interface_names[mi]=="")
2659                         iface_names.insert("DEFAULTDEV");
2660                 else
2661                         iface_names.insert(interface_names[mi]);
2662           }
2663
2664 //        generate interface property lookup code for every interface
2665           set<std::string>::iterator sir;
2666           for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
2667                 if (sir == iface_names.begin())
2668                         lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2669                 else
2670                         lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2671
2672                 // iterate through interface properties
2673                 vector<string> iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
2674                 if (erri) {
2675                         fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
2676                         exit(1);
2677                 }
2678                 if (iface_properties.empty())
2679                         lfta_val[lmach] += "\t\treturn NULL;\n";
2680                 else {
2681                         for (int i = 0; i < iface_properties.size(); ++i) {
2682                                 if (i == 0)
2683                                         lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2684                                 else
2685                                         lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2686
2687                                 // combine all values for the interface property using comma separator
2688                                 vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
2689                                 lfta_val[lmach] += "\t\t\treturn \"";
2690                                 for (int j = 0; j < vals.size(); ++j) {
2691                                         lfta_val[lmach] +=  vals[j];
2692                                         if (j != vals.size()-1)
2693                                                 lfta_val[lmach] += ",";
2694                                 }
2695                                 lfta_val[lmach] += "\";\n";
2696                         }
2697                         lfta_val[lmach] += "\t\t} else\n";
2698                         lfta_val[lmach] += "\t\t\treturn NULL;\n";
2699                 }
2700           }
2701           lfta_val[lmach] += "\t} else\n";
2702           lfta_val[lmach] += "\t\treturn NULL;\n";
2703           lfta_val[lmach] += "}\n\n";
2704
2705
2706 //                      Generate a full list of FTAs for clearinghouse reference
2707           lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
2708           lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
2709
2710           for (i = 0; i < query_names.size(); ++i) {
2711                    if (i)
2712                           lfta_val[lmach] += ", ";
2713                    lfta_val[lmach] += "\"" + query_names[i] + "\"";
2714           }
2715           for (i = 0; i < hfta_list.size(); ++i) {
2716                    lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
2717           }
2718           lfta_val[lmach] += ", NULL};\n\n";
2719
2720
2721 //                      Add the initialization function to lfta.c
2722 //      Change to accept the interface name, and 
2723 //      set the prefilter function accordingly.
2724 //      see the example in demo/err2
2725           lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
2726
2727 //        for(i=0;i<mach_query_names[lmach].size();i++)
2728 //              int mi = mach_query_names[lmach][i];
2729 //              stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2730
2731           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2732                 string liface = (*mvsi).first;
2733                 vector<stream_query *> lfta_list = (*mvsi).second;
2734                 for(i=0;i<lfta_list.size();i++){
2735                         stream_query *lfta_sq = lfta_list[i];
2736                         int mi = lfta_iface_qname_ix[liface][i];
2737                 
2738                         fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
2739
2740                         string this_iface = "DEFAULTDEV";
2741                         if(interface_names[mi]!="")
2742                                 this_iface = '"'+interface_names[mi]+'"';
2743                         lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
2744                 lfta_val[lmach] += "\t\tfta_register(\""+query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
2745 //              if(interface_names[mi]=="")
2746 //                              lfta_val[lmach]+="DEFAULTDEV";
2747 //              else
2748 //                              lfta_val[lmach]+='"'+interface_names[mi]+'"';
2749                         lfta_val[lmach] += this_iface;
2750
2751
2752                 lfta_val[lmach] += ", "+generate_alloc_name(query_names[mi])
2753                         +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(query_names[mi])
2754                         +"\n#endif\n";
2755                                 sprintf(tmpstr,",%d",snap_lengths[mi]);
2756                         lfta_val[lmach] += tmpstr;
2757
2758 //                      unsigned long long int mask=0, bpos=1;
2759 //                      int f_pos;
2760 //                      for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
2761 //                              if(prefilter_preds[f_pos]->lfta_id.count(i))
2762 //                                      mask |= bpos;
2763 //                              bpos = bpos << 1;
2764 //                      }
2765
2766 #ifdef PREFILTER_OK
2767 //                      sprintf(tmpstr,",%lluull",mask);
2768                         sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
2769                         lfta_val[lmach]+=tmpstr;
2770 #else
2771                         lfta_val[lmach] += ",0ull";
2772 #endif
2773
2774                         lfta_val[lmach] += ");\n";
2775
2776
2777
2778 //    End of lfta prefilter stuff
2779 // --------------------------------------------------
2780
2781 //                      If there is a field verifier, warn about
2782 //                      lack of compatability
2783                   string lfta_comment;
2784                   string lfta_title;
2785                   string lfta_namespace;
2786                   map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
2787                   if(ldefs.count("comment")>0)
2788                         lfta_comment = lfta_sq->defines["comment"];
2789                   if(ldefs.count("Comment")>0)
2790                         lfta_comment = lfta_sq->defines["Comment"];
2791                   if(ldefs.count("COMMENT")>0)
2792                         lfta_comment = lfta_sq->defines["COMMENT"];
2793                   if(ldefs.count("title")>0)
2794                         lfta_title = lfta_sq->defines["title"];
2795                   if(ldefs.count("Title")>0)
2796                         lfta_title = lfta_sq->defines["Title"];
2797                   if(ldefs.count("TITLE")>0)
2798                         lfta_title = lfta_sq->defines["TITLE"];
2799                   if(ldefs.count("NAMESPACE")>0)
2800                         lfta_namespace = lfta_sq->defines["NAMESPACE"];
2801                   if(ldefs.count("Namespace")>0)
2802                         lfta_namespace = lfta_sq->defines["Namespace"];
2803                   if(ldefs.count("namespace")>0)
2804                         lfta_namespace = lfta_sq->defines["namespace"];
2805
2806                   string lfta_ht_size;
2807                   if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
2808                         lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
2809                   if(ldefs.count("aggregate_slots")>0){
2810                         lfta_ht_size = ldefs["aggregate_slots"];
2811                   }
2812
2813 //                      NOTE : I'm assuming that visible lftas do not start with _fta.
2814 //                              -- will fail for non-visible simple selection queries.
2815                 if(field_verifier != NULL && query_names[mi].substr(0,1) != "_"){
2816                         string warning_str;
2817                         if(lfta_comment == "")
2818                                 warning_str += "\tcomment not found.\n";
2819 // Obsolete stuff that carsten wanted
2820 //                      if(lfta_title == "")
2821 //                              warning_str += "\ttitle not found.\n";
2822 //                      if(lfta_namespace == "")
2823 //                              warning_str += "\tnamespace not found.\n";
2824
2825                         vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2826                         int fi;
2827                         for(fi=0;fi<flds.size();fi++){
2828                                 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2829                         }
2830                         if(warning_str != "")
2831                                 fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
2832                                         query_names[mi].c_str(),warning_str.c_str());
2833                 }
2834
2835
2836 //                      Create qtree output
2837                 fprintf(qtree_output,"\t<LFTA name='%s' >\n",query_names[mi].c_str());
2838                   if(lfta_comment != "")
2839                         fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
2840                   if(lfta_title != "")
2841                         fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
2842                   if(lfta_namespace != "")
2843                         fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
2844                   if(lfta_ht_size != "")
2845                         fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
2846                 if(lmach != "")
2847                   fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
2848                 else
2849                   fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
2850                 fprintf(qtree_output,"\t\t<Interface  value='%s' />\n",interface_names[mi].c_str());
2851                 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
2852                 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2853                 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
2854 //                              write info about fields to qtree.xml
2855                   vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2856                   int fi;
2857                   for(fi=0;fi<flds.size();fi++){
2858                         fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2859                         if(flds[fi]->get_modifier_list()->size()){
2860                                 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2861                         }
2862                         fprintf(qtree_output," />\n");
2863                   }
2864                 fprintf(qtree_output,"\t</LFTA>\n");
2865
2866
2867             }
2868           }
2869
2870           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2871                         string liface = (*mvsi).first;
2872                         lfta_val[lmach] += 
2873 "       if (!strcmp(device, \""+liface+"\")) \n"
2874 "               lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
2875 ;
2876                 }
2877                 lfta_val[lmach] += 
2878 "       if(lfta_prefilter == NULL){\n"
2879 "               fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
2880 "               exit(1);\n"
2881 "       }\n"
2882 ;
2883
2884
2885
2886           lfta_val[lmach] += "}\n\n";
2887
2888       if(!(debug_only || hfta_only) ){
2889                 string lfta_flnm;
2890                 if(lmach != "")
2891                   lfta_flnm = lmach + "_lfta.c";
2892                 else
2893                   lfta_flnm = hostname + "_lfta.c";
2894                 if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
2895                         fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
2896                         exit(1);
2897                 }
2898               fprintf(lfta_out,"%s",lfta_header.c_str());
2899               fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
2900               fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
2901                 fclose(lfta_out);
2902           }
2903         }
2904
2905 //              Say what are the operators which must execute
2906         if(opviews.size()>0)
2907                 fprintf(stderr,"The queries use the following external operators:\n");
2908         for(i=0;i<opviews.size();++i){
2909                 opview_entry *opv = opviews.get_entry(i);
2910                 fprintf(stderr,"\t%s\n",opv->view_name.c_str());
2911         }
2912
2913         if(create_makefile)
2914                 generate_makefile(input_file_names, nfiles, hfta_names, opviews,
2915                 machine_names, schema_file_name,
2916                 interface_names,
2917                 ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
2918
2919
2920         fprintf(qtree_output,"</QueryNodes>\n");
2921
2922         return(0);
2923 }
2924
2925 ////////////////////////////////////////////////////////////
2926
2927 void generate_makefile(vector<string> &input_file_names, int nfiles,
2928                                            vector<string> &hfta_names, opview_set &opviews,
2929                                                 vector<string> &machine_names,
2930                                                 string schema_file_name,
2931                                                 vector<string> &interface_names,
2932                                                 ifq_t *ifdb, string &config_dir_path,
2933                                                 bool use_pads,
2934                                                 string extra_libs,
2935                                                 map<string, vector<int> > &rts_hload
2936                                          ){
2937         int i,j;
2938
2939         if(config_dir_path != ""){
2940                 config_dir_path = "-C "+config_dir_path;
2941         }
2942
2943         struct stat sb;
2944         bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
2945         bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
2946
2947 //      if(libz_exists && !libast_exists){
2948 //              fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
2949 //              exit(1);
2950 //      }
2951
2952 //                      Get set of operator executable files to run
2953         set<string> op_fls;
2954         set<string>::iterator ssi;
2955         for(i=0;i<opviews.size();++i){
2956                 opview_entry *opv = opviews.get_entry(i);
2957                 if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
2958         }
2959
2960         FILE *outfl = fopen("Makefile", "w");
2961         if(outfl==NULL){
2962                 fprintf(stderr,"Can't open Makefile for write, exiting.\n");
2963                 exit(0);
2964         }
2965
2966         fputs(
2967 ("CPP= g++ -O3 -g -I "+root_path+"/include  -I "+root_path+"/include/hfta\n"
2968 "CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
2969 ).c_str(), outfl
2970 );
2971         if(generate_stats)
2972                 fprintf(outfl,"  -DLFTA_STATS");
2973
2974 //              Gather the set of interfaces
2975 //              Also, gather "base interface names" for use in computing
2976 //              the hash splitting to virtual interfaces.
2977 //              TODO : must update to hanndle machines
2978         set<string> ifaces;
2979         set<string> base_vifaces;       // base interfaces of virtual interfaces
2980         map<string, string> ifmachines;
2981         map<string, string> ifattrs;
2982         for(i=0;i<interface_names.size();++i){
2983                 ifaces.insert(interface_names[i]);
2984                 ifmachines[interface_names[i]] = machine_names[i];
2985
2986                 size_t Xpos = interface_names[i].find_last_of("X");
2987                 if(Xpos!=string::npos){
2988                         string iface = interface_names[i].substr(0,Xpos);
2989                         base_vifaces.insert(iface);
2990                 }
2991                 // get interface attributes and add them to the list
2992         }
2993
2994 //              Do we need to include protobuf libraries?
2995 //              TODO Move to the interface library: get the libraries to include 
2996 //              for an interface type
2997
2998         bool use_proto = false;
2999         int erri;
3000         string err_str;
3001         for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3002                 string ifnm = (*ssi);
3003                 vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
3004                 for(int ift_i=0;ift_i<ift.size();ift_i++){
3005                         if(ift[ift_i]=="PROTO"){
3006                                 use_proto = true;
3007                         }
3008                 }
3009         }
3010
3011         fprintf(outfl,
3012 "\n"
3013 "\n"
3014 "all: rts");
3015         for(i=0;i<hfta_names.size();++i)
3016                 fprintf(outfl," %s",hfta_names[i].c_str());
3017         fputs(
3018 ("\n"
3019 "\n"
3020 "rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a  "+root_path+"/lib/libclearinghouse.a\n"
3021 "\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
3022         if(use_pads)
3023                 fprintf(outfl,"-L. ");
3024         fputs(
3025 ("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
3026         if(use_pads)
3027                 fprintf(outfl,"-lgscppads -lpads ");
3028         fprintf(outfl,
3029 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux  -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
3030         if(use_pads)
3031                 fprintf(outfl, " -lpz -lz -lbz ");
3032         if(libz_exists && libast_exists)
3033                 fprintf(outfl," -last ");
3034         if(use_pads)
3035                 fprintf(outfl, " -ldll -ldl ");
3036         if(use_proto)
3037                 fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
3038         fprintf(outfl," -lgscpaux");
3039 #ifdef GCOV
3040         fprintf(outfl," -fprofile-arcs");
3041 #endif
3042         fprintf(outfl,
3043 "\n"
3044 "\n"
3045 "lfta.o: %s_lfta.c\n"
3046 "\t$(CC) -o lfta.o -c %s_lfta.c\n"
3047 "\n"
3048 "%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
3049         for(i=0;i<nfiles;++i)
3050                 fprintf(outfl," %s",input_file_names[i].c_str());
3051         if(hostname == ""){
3052                 fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3053         }else{
3054                 fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3055         }
3056         for(i=0;i<nfiles;++i)
3057                 fprintf(outfl," %s",input_file_names[i].c_str());
3058         fprintf(outfl,"\n");
3059
3060         for(i=0;i<hfta_names.size();++i)
3061                 fprintf(outfl,
3062 ("%s: %s.o\n"
3063 "\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
3064 "\n"
3065 "%s.o: %s.cc\n"
3066 "\t$(CPP) -o %s.o -c %s.cc\n"
3067 "\n"
3068 "\n").c_str(),
3069     hfta_names[i].c_str(), hfta_names[i].c_str(),
3070         hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
3071         hfta_names[i].c_str(), hfta_names[i].c_str(),
3072         hfta_names[i].c_str(), hfta_names[i].c_str()
3073                 );
3074
3075         fprintf(outfl,
3076 ("\n"
3077 "packet_schema.txt:\n"
3078 "\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
3079 "\n"
3080 "external_fcns.def:\n"
3081 "\tln -s "+root_path+"/cfg/external_fcns.def .\n"
3082 "\n"
3083 "clean:\n"
3084 "\trm -rf core rts *.o  %s_lfta.c  external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
3085         for(i=0;i<hfta_names.size();++i)
3086                 fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
3087         fprintf(outfl,"\n");
3088
3089         fclose(outfl);
3090
3091
3092
3093 //              Gather the set of interfaces
3094 //              TODO : must update to hanndle machines
3095 //              TODO : lookup interface attributes and add them as a parameter to rts process
3096         outfl = fopen("runit", "w");
3097         if(outfl==NULL){
3098                 fprintf(stderr,"Can't open runit for write, exiting.\n");
3099                 exit(0);
3100         }
3101
3102
3103         fputs(
3104 ("#!/bin/sh\n"
3105 "./stopit\n"
3106 +root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
3107 "sleep 5\n"
3108 "if [ ! -f gshub.log ]\n"
3109 "then\n"
3110 "\techo \"Failed to start bin/gshub.py\"\n"
3111 "\texit -1\n"
3112 "fi\n"
3113 "ADDR=`cat gshub.log`\n"
3114 "ps opgid= $! >> gs.pids\n"
3115 "./rts $ADDR default ").c_str(), outfl);
3116 //      int erri;
3117 //      string err_str;
3118         for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3119                 string ifnm = (*ssi);
3120                 fprintf(outfl, "%s ",ifnm.c_str());
3121                 vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
3122                 for(j=0;j<ifv.size();++j)
3123                         fprintf(outfl, "%s ",ifv[j].c_str());
3124         }
3125         fprintf(outfl, " &\n");
3126         fprintf(outfl, "echo $! >> gs.pids\n");
3127         for(i=0;i<hfta_names.size();++i)
3128                 fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
3129
3130         for(j=0;j<opviews.opview_list.size();++j){
3131                 fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
3132         }
3133
3134         fclose(outfl);
3135         system("chmod +x runit");
3136
3137         outfl = fopen("stopit", "w");
3138         if(outfl==NULL){
3139                 fprintf(stderr,"Can't open stopit for write, exiting.\n");
3140                 exit(0);
3141         }
3142
3143         fprintf(outfl,"#!/bin/sh\n"
3144 "rm -f gshub.log\n"
3145 "if [ ! -f gs.pids ]\n"
3146 "then\n"
3147 "exit\n"
3148 "fi\n"
3149 "for pgid in `cat gs.pids`\n"
3150 "do\n"
3151 "kill -TERM -$pgid\n"
3152 "done\n"
3153 "sleep 1\n"
3154 "for pgid in `cat gs.pids`\n"
3155 "do\n"
3156 "kill -9 -$pgid\n"
3157 "done\n"
3158 "rm gs.pids\n");
3159
3160         fclose(outfl);
3161         system("chmod +x stopit");
3162
3163 //-----------------------------------------------
3164
3165 /* For now disable support for virtual interfaces
3166         outfl = fopen("set_vinterface_hash.bat", "w");
3167         if(outfl==NULL){
3168                 fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
3169                 exit(0);
3170         }
3171
3172 //              The format should be determined by an entry in the ifres.xml file,
3173 //              but for now hardcode the only example I have.
3174         for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
3175                 if(rts_hload.count((*ssi))){
3176                         string iface_name = (*ssi);
3177                         string iface_number = "";
3178                         for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
3179                                 if(isdigit(iface_name[j])){
3180                                         iface_number = iface_name[j];
3181                                         if(j>0 && isdigit(iface_name[j-1]))
3182                                                 iface_number = iface_name[j-1] + iface_number;
3183                                 }
3184                         }
3185
3186                         fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
3187                         vector<int> halloc = rts_hload[iface_name];
3188                         int prev_limit = 0;
3189                         for(j=0;j<halloc.size();++j){
3190                                 if(j>0)
3191                                         fprintf(outfl,":");
3192                                 fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
3193                                 prev_limit = halloc[j];
3194                         }
3195                         fprintf(outfl,"\n");
3196                 }
3197         }
3198         fclose(outfl);
3199         system("chmod +x set_vinterface_hash.bat");
3200 */
3201 }
3202
3203 //              Code for implementing a local schema
3204 /*
3205                 table_list qpSchema;
3206
3207 //                              Load the schemas of any LFTAs.
3208                 int l;
3209                 for(l=0;l<hfta_nbr;++l){
3210                         stream_query *sq0 = split_queries[l];
3211                         table_def *td = sq0->get_output_tabledef();
3212                         qpSchema.append_table(td);
3213                 }
3214 //                              load the schemas of any other ref'd tables.
3215 //                              (e.g., hftas)
3216                 vector<tablevar_t *>  input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
3217                 int ti;
3218                 for(ti=0;ti<input_tbl_names.size();++ti){
3219                         int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
3220                         if(tbl_ref < 0){
3221                                 tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
3222                                 if(tbl_ref < 0){
3223                                         fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
3224                                         exit(1);
3225                                 }
3226                                 qpSchema.append_table(Schema->get_table(tbl_ref));
3227                         }
3228                 }
3229 */
3230
3231 //              Functions related to parsing.
3232
3233 /*
3234 static int split_string(char *instr,char sep, char **words,int max_words){
3235    char *loc;
3236    char *str;
3237    int nwords = 0;
3238
3239    str = instr;
3240    words[nwords++] = str;
3241    while( (loc = strchr(str,sep)) != NULL){
3242         *loc = '\0';
3243         str = loc+1;
3244         if(nwords >= max_words){
3245                 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
3246                 nwords = max_words-1;
3247         }
3248         words[nwords++] = str;
3249    }
3250
3251    return(nwords);
3252 }
3253
3254 */
3255