b6855ff8aea4bc965bf699f1003ccf88e820662d
[com/gs-lite.git] / src / ftacmp / translate_fta.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #include<unistd.h>              // for gethostname
17
18 #include <string>
19 #include "parse_fta.h"
20 #include "parse_schema.h"
21 #include "parse_ext_fcns.h"
22 #include"analyze_fta.h"
23 #include"query_plan.h"
24 #include"generate_lfta_code.h"
25 #include"stream_query.h"
26 #include"generate_utils.h"
27 #include"nic_def.h"
28 #include"generate_nic_code.h"
29
30 #include <stdlib.h>
31 #include <stdio.h>
32 #include<ctype.h>
33 #include<glob.h>
34 #include<string.h>
35
36 #include<list>
37
38 //              for the scandir
39      #include <sys/types.h>
40      #include <dirent.h>
41
42
43 #include<errno.h>
44
45 //              to verify that some files exist.
46      #include <sys/types.h>
47      #include <sys/stat.h>
48
49 #include "parse_partn.h"
50
51 #include "print_plan.h"
52
53 //              Interface to the xml parser
54
55 #include"xml_t.h"
56 #include"field_list.h"
57
58 extern int xmlParserparse(void);
59 extern FILE *xmlParserin;
60 extern int xmlParserdebug;
61
62 std::vector<std::string> xml_attr_vec;
63 std::vector<std::string> xml_val_vec;
64 std::string xml_a, xml_v;
65 xml_t *xml_leaves = NULL;
66
67 //      Interface to the field list verifier
68 field_list *field_verifier = NULL;
69
70 #define TMPSTRLEN 1000
71
72 #ifndef PATH_DELIM
73   #define PATH_DELIM '/'
74 #endif
75
76 char tmp_schema_str[10000];
77
78 // maximum delay between two hearbeats produced
79 // by UDOP. Used when its not explicity
80 // provided in udop definition
81 #define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
82
83 //              Default lfta hash table size, must be power of 2.
84 int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
85
86 //              Interface to FTA definition lexer and parser ...
87
88 extern int FtaParserparse(void);
89 extern FILE *FtaParserin;
90 extern int FtaParserdebug;
91
92 fta_parse_t *fta_parse_result;
93 var_defs_t *fta_parse_defines;
94
95
96
97 //              Interface to external function lexer and parser ...
98
99 extern int Ext_fcnsParserparse(void);
100 extern FILE *Ext_fcnsParserin;
101 extern int Ext_fcnsParserdebug;
102
103 ext_fcn_list *Ext_fcns;
104
105
106 //              Interface to partition definition parser
107 extern int PartnParserparse();
108 partn_def_list_t *partn_parse_result = NULL;
109
110
111
112 using namespace std;
113 //extern int errno;
114
115
116 //              forward delcaration of local utility function
117 void generate_makefile(vector<string> &input_file_names, int nfiles,
118                                            vector<string> &hfta_names, opview_set &opviews,
119                                                 vector<string> &machine_names,
120                                                 string schema_file_name,
121                                                 vector<string> &interface_names,
122                                                 ifq_t *ifdb, string &config_dir_path,
123                                                 bool use_pads,
124                                                 string extra_libs,
125                                                 map<string, vector<int> > &rts_hload
126                                         );
127
128 //static int split_string(char *instr,char sep, char **words,int max_words);
129 #define MAXFLDS 100
130
131   FILE *schema_summary_output = NULL;           // query names
132
133 //                      Dump schema summary
134 void dump_summary(stream_query *str){
135         fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
136
137         table_def *sch = str->get_output_tabledef();
138
139         vector<field_entry *> flds = sch->get_fields();
140         int f;
141         for(f=0;f<flds.size();++f){
142                 if(f>0) fprintf(schema_summary_output,"|");
143                 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
144         }
145         fprintf(schema_summary_output,"\n");
146         for(f=0;f<flds.size();++f){
147                 if(f>0) fprintf(schema_summary_output,"|");
148                 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
149         }
150         fprintf(schema_summary_output,"\n");
151 }
152
153 //              Globals
154 string hostname;                // name of current host.
155 int hostname_len;
156 bool generate_stats = false;
157 string root_path = "../..";
158
159
160 int main(int argc, char **argv){
161   char tmpstr[TMPSTRLEN];
162   string err_str;
163   int q,s,h,f;
164
165   set<int>::iterator si;
166
167   vector<string> registration_query_names;                      // for lfta.c registration
168   map<string, vector<int> > mach_query_names;   // list queries of machine
169   vector<int> snap_lengths;                             // for lfta.c registration
170   vector<string> interface_names;                       // for lfta.c registration
171   vector<string> machine_names;                 // machine of interface
172   vector<bool> lfta_reuse_options;                      // for lfta.c registration
173   vector<int> lfta_liveness_timeouts;           // fot qtree.xml generation
174   vector<string> hfta_names;                    // hfta cource code names, for
175                                                                                 // creating make file.
176   vector<string> qnames;                                // ensure unique names
177   map<string, int> lfta_names;                  // keep track of unique lftas.
178
179
180 //                              set these to 1 to debug the parser
181   FtaParserdebug = 0;
182   Ext_fcnsParserdebug = 0;
183
184   FILE *lfta_out;                               // lfta.c output.
185   FILE *fta_in;                                 // input file
186   FILE *table_schemas_in;               // source tables definition file
187   FILE *query_name_output;              // query names
188   FILE *qtree_output;                   // interconnections of query nodes
189
190   // -------------------------------
191   // Handling of Input Arguments
192   // -------------------------------
193     char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
194         const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
195                 "\t[-B] : debug only (don't create output files)\n"
196                 "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
197                 "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
198                 "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
199                 "\t[-C] : use <config directory> for definition files\n"
200                 "\t[-l] : use <library directory> for library queries\n"
201                 "\t[-N] : output query names in query_names.txt\n"
202                 "\t[-H] : create HFTA only (no schema_file)\n"
203                 "\t[-Q] : use query name for hfta suffix\n"
204                 "\t[-M] : generate make file and runit, stopit scripts\n"
205                 "\t[-S] : enable LFTA statistics (alters Makefile).\n"
206                 "\t[-f] : Output schema summary to schema_summary.txt\n"
207                 "\t[-P] : link with PADS\n"
208                 "\t[-h] : override host name.\n"
209                 "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
210                 "\t[-R] : path to root of GS-lite\n"
211 ;
212
213 //              parameters gathered from command line processing
214         string external_fcns_path;
215 //      string internal_fcn_path;
216         string config_dir_path;
217         string library_path = "./";
218         vector<string> input_file_names;
219         string schema_file_name;
220         bool debug_only = false;
221         bool hfta_only = false;
222         bool output_query_names = false;
223         bool output_schema_summary=false;
224         bool numeric_hfta_flname = true;
225         bool create_makefile = false;
226         bool distributed_mode = false;
227         bool partitioned_mode = false;
228         bool use_live_hosts_file = false;
229         bool use_pads = false;
230         bool clean_make = false;
231         int n_virtual_interfaces = 1;
232
233    char chopt;
234    while((chopt = getopt(argc,argv,optstr)) != -1){
235                 switch(chopt){
236                 case 'B':
237                         debug_only = true;
238                         break;
239                 case 'D':
240                         distributed_mode = true;
241                         break;
242                 case 'p':
243                         partitioned_mode = true;
244                         break;
245                 case 'L':
246                         use_live_hosts_file = true;
247                         break;
248                 case 'C':
249                                 if(optarg != NULL)
250                                  config_dir_path = string(optarg) + string("/");
251                         break;
252                 case 'l':
253                                 if(optarg != NULL)
254                                  library_path = string(optarg) + string("/");
255                         break;
256                 case 'N':
257                         output_query_names = true;
258                         break;
259                 case 'Q':
260                         numeric_hfta_flname = false;
261                         break;
262                 case 'H':
263                         if(schema_file_name == ""){
264                                 hfta_only = true;
265                         }
266                         break;
267                 case 'f':
268                         output_schema_summary=true;
269                         break;
270                 case 'M':
271                         create_makefile=true;
272                         break;
273                 case 'S':
274                         generate_stats=true;
275                         break;
276                 case 'P':
277                         use_pads = true;
278                         break;
279                 case 'c':
280                         clean_make = true;
281                         break;
282                 case 'h':
283                         if(optarg != NULL)
284                                 hostname = optarg;
285                         break;
286                 case 'R':
287                         if(optarg != NULL)
288                                 root_path = optarg;
289                         break;
290                 case 'n':
291                         if(optarg != NULL){
292                                 n_virtual_interfaces = atoi(optarg);
293                                 if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
294                                         fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
295                                         n_virtual_interfaces = 1;
296                                 }
297                         }
298                         break;
299                 case '?':
300                         fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
301                         fprintf(stderr,"%s\n", usage_str);
302                         exit(1);
303                 default:
304                         fprintf(stderr, "Argument was %c\n", optopt);
305                         fprintf(stderr,"Invalid arguments\n");
306                         fprintf(stderr,"%s\n", usage_str);
307                         exit(1);
308                 }
309         }
310         argc -= optind;
311         argv += optind;
312         for (int i = 0; i < argc; ++i) {
313                 if((schema_file_name == "") && !hfta_only){
314                         schema_file_name = argv[i];
315                 }else{
316                         input_file_names.push_back(argv[i]);
317                 }
318         }
319
320         if(input_file_names.size() == 0){
321                 fprintf(stderr,"%s\n", usage_str);
322                 exit(1);
323         }
324
325         if(clean_make){
326                 string clean_cmd = "rm Makefile hfta_*.cc";
327                 int clean_ret = system(clean_cmd.c_str());
328                 if(clean_ret){
329                         fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
330                 }
331         }
332
333
334         nic_prop_db *npdb = new nic_prop_db(config_dir_path);
335
336 //                      Open globally used file names.
337
338         // prepend config directory to schema file
339         schema_file_name = config_dir_path + schema_file_name;
340         external_fcns_path = config_dir_path + string("external_fcns.def");
341     string ifx_fname = config_dir_path + string("ifres.xml");
342
343 //              Find interface query file(s).
344         if(hostname == ""){
345                 gethostname(tmpstr,TMPSTRLEN);
346                 hostname = tmpstr;
347         }
348         hostname_len = strlen(tmpstr);
349     string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
350         vector<string> ifq_fls;
351
352                 ifq_fls.push_back(ifq_fname);
353
354
355 //                      Get the field list, if it exists
356         string flist_fl = config_dir_path + "field_list.xml";
357         FILE *flf_in = NULL;
358         if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
359                 fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
360                 xml_leaves = new xml_t();
361                 xmlParser_setfileinput(flf_in);
362                 if(xmlParserparse()){
363                         fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
364                 }else{
365                         field_verifier = new field_list(xml_leaves);
366                 }
367         }
368
369         if(!hfta_only){
370           if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
371                 fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
372                 exit(1);
373           }
374         }
375
376 /*
377         if(!(debug_only || hfta_only)){
378           if((lfta_out = fopen("lfta.c","w")) == NULL){
379                 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
380                 exit(1);
381           }
382         }
383 */
384
385 //              Get the output specification file.
386 //              format is
387 //                      query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
388         string ospec_fl = "output_spec.cfg";
389         FILE *osp_in = NULL;
390         vector<ospec_str *> output_specs;
391         multimap<string, int> qname_to_ospec;
392         if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
393                 char *flds[MAXFLDS];
394                 int o_lineno = 0;
395                 while(fgets(tmpstr,TMPSTRLEN,osp_in)){
396                         o_lineno++;
397                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
398                         if(nflds == 7){
399 //              make operator type lowercase
400                                 char *tmpc;
401                                 for(tmpc=flds[1];*tmpc!='\0';++tmpc)
402                                         *tmpc = tolower(*tmpc);
403
404                                 ospec_str *tmp_ospec = new ospec_str();
405                                 tmp_ospec->query = flds[0];
406                                 tmp_ospec->operator_type = flds[1];
407                                 tmp_ospec->operator_param = flds[2];
408                                 tmp_ospec->output_directory = flds[3];
409                                 tmp_ospec->bucketwidth = atoi(flds[4]);
410                                 tmp_ospec->partitioning_flds = flds[5];
411                                 tmp_ospec->n_partitions = atoi(flds[6]);
412                                 qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
413                                 output_specs.push_back(tmp_ospec);
414                         }else{
415                                 fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
416                         }
417                 }
418                 fclose(osp_in);
419         }else{
420                 fprintf(stderr,"output_spec.cfg not found.  The query set has no output.  exiting.\n");
421                 exit(1);
422         }
423
424 //              hfta parallelism
425         string pspec_fl = "hfta_parallelism.cfg";
426         FILE *psp_in = NULL;
427         map<string, int> hfta_parallelism;
428         if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
429                 char *flds[MAXFLDS];
430                 int o_lineno = 0;
431                 while(fgets(tmpstr,TMPSTRLEN,psp_in)){
432                         bool good_entry = true;
433                         o_lineno++;
434                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
435                         if(nflds == 2){
436                                 string hname = flds[0];
437                                 int par = atoi(flds[1]);
438                                 if(par <= 0 || par > n_virtual_interfaces){
439                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
440                                         good_entry = false;
441                                 }
442                                 if(good_entry && n_virtual_interfaces % par != 0){
443                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
444                                         good_entry = false;
445                                 }
446                                 if(good_entry)
447                                         hfta_parallelism[hname] = par;
448                         }
449                 }
450         }else{
451                 fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
452         }
453
454
455 //              LFTA hash table sizes
456         string htspec_fl = "lfta_htsize.cfg";
457         FILE *htsp_in = NULL;
458         map<string, int> lfta_htsize;
459         if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
460                 char *flds[MAXFLDS];
461                 int o_lineno = 0;
462                 while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
463                         bool good_entry = true;
464                         o_lineno++;
465                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
466                         if(nflds == 2){
467                                 string lfta_name = flds[0];
468                                 int htsz = atoi(flds[1]);
469                                 if(htsz>0){
470                                         lfta_htsize[lfta_name] = htsz;
471                                 }else{
472                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
473                                 }
474                         }
475                 }
476         }else{
477                 fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
478         }
479
480 //              LFTA vitual interface hash split
481         string rtlspec_fl = "rts_load.cfg";
482         FILE *rtl_in = NULL;
483         map<string, vector<int> > rts_hload;
484         if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
485                 char *flds[MAXFLDS];
486                 int r_lineno = 0;
487                 string iface_name;
488                 vector<int> hload;
489                 while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
490                         bool good_entry = true;
491                         r_lineno++;
492                         iface_name = "";
493                         hload.clear();
494                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
495                         if(nflds >1){
496                                 iface_name = flds[0];
497                                 int cumm_h = 0;
498                                 int j;
499                                 for(j=1;j<nflds;++j){
500                                         int h = atoi(flds[j]);
501                                         if(h<=0)
502                                                 good_entry = false;
503                                         cumm_h += h;
504                                         hload.push_back(cumm_h);
505                                 }
506                         }else{
507                                 good_entry = false;
508                         }
509                         if(good_entry){
510                                 rts_hload[iface_name] = hload;
511                         }else{
512                                 fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
513                         }
514                 }
515         }
516
517
518
519         if(output_query_names){
520           if((query_name_output = fopen("query_names.txt","w")) == NULL){
521                 fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
522                 exit(1);
523           }
524         }
525
526         if(output_schema_summary){
527           if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
528                 fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
529                 exit(1);
530           }
531         }
532
533         if((qtree_output = fopen("qtree.xml","w")) == NULL){
534                 fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
535                 exit(1);
536         }
537         fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
538         fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
539         fprintf(qtree_output,"<QueryNodes>\n");
540
541
542 //                      Get an initial Schema
543         table_list *Schema;
544         if(!hfta_only){
545 //                      Parse the table schema definitions.
546           fta_parse_result = new fta_parse_t();
547           FtaParser_setfileinput(table_schemas_in);
548           if(FtaParserparse()){
549                 fprintf(stderr,"Table schema parse failed.\n");
550                 exit(1);
551           }
552           if(fta_parse_result->parse_type != TABLE_PARSE){
553                 fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
554                 exit(1);
555           }
556           Schema = fta_parse_result->tables;
557
558 //                      Ensure that all schema_ids, if set, are distinct.
559 //  Obsolete? There is code elsewhere to ensure that schema IDs are
560 //  distinct on a per-interface basis.
561 /*
562           set<int> found_ids;
563           set<int> dup_ids;
564           for(int t=0;t<Schema->size();++t){
565                 int sch_id = Schema->get_table(t)->get_schema_id();
566                 if(sch_id> 0){
567                         if(found_ids.find(sch_id) != found_ids.end()){
568                                 dup_ids.insert(sch_id);
569                         }else{
570                                 found_ids.insert(sch_id);
571                         }
572                 }
573                 if(dup_ids.size()>0){
574                         fprintf(stderr, "Error, the schema has duplicate schema_ids:");
575                         for(auto dit=dup_ids.begin();dit!=dup_ids.end(); ++dit)
576                                 fprintf(stderr," %d",(*dit));
577                         fprintf(stderr,"\n");
578                         exit(1);
579                 }
580           }
581 */
582
583
584 //                      Process schema field inheritance
585           int retval;
586           retval = Schema->unroll_tables(err_str);
587           if(retval){
588                 fprintf(stderr,"Error processing schema field inheritance:\n %s\n", err_str.c_str() );
589                 exit(1);
590           }
591         }else{
592 //                      hfta only => we will try to fetch schemas from the registry.
593 //                      therefore, start off with an empty schema.
594           Schema = new table_list();
595         }
596
597
598 //                      Open and parse the external functions file.
599         Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
600         if(Ext_fcnsParserin == NULL){
601                 fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
602                 Ext_fcns = new ext_fcn_list();
603         }else{
604                 if(Ext_fcnsParserparse()){
605                         fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
606                         Ext_fcns = new ext_fcn_list();
607                 }
608         }
609         if(Ext_fcns->validate_fcns(err_str)){
610                 fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
611                 exit(1);
612         }
613
614 //              Open and parse the interface resources file.
615 //      ifq_t *ifaces_db = new ifq_t();
616 //   string ierr;
617 //      if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
618 //              fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
619 //                              ifx_fname.c_str(), ierr.c_str());
620 //              exit(1);
621 //      }
622 //      if(ifaces_db->load_ifqs(ifq_fname, ierr)){
623 //              fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
624 //                              ifq_fname.c_str(), ierr.c_str());
625 //              exit(1);
626 //      }
627
628
629 //                      The LFTA code string.
630 //                      Put the standard preamble here.
631 //                      NOTE: the hash macros, fcns should go into the run time
632   map<string, string> lfta_val;
633   map<string, string> lfta_prefilter_val;
634
635   string lfta_header =
636 "#include <limits.h>\n"
637 "#include \"rts.h\"\n"
638 "#include \"fta.h\"\n"
639 "#include \"lapp.h\"\n"
640 "#include \"rts_udaf.h\"\n"
641 "#include<stdio.h>\n"
642 "#include <float.h>\n"
643 "#include \"rdtsc.h\"\n"
644 "#include \"watchlist.h\"\n\n"
645
646 ;
647 // Get any locally defined parsing headers
648     glob_t glob_result;
649     memset(&glob_result, 0, sizeof(glob_result));
650
651     // do the glob operation TODO should be from GSROOT
652     int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
653         if(return_value == 0){
654                 lfta_header += "\n";
655         for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
656                         char *flds[1000];
657                         int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
658                         lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n";
659         }
660                 lfta_header += "\n";
661         }else{
662                 fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
663         }
664
665 /*
666 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
667 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
668 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
669 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
670 */
671
672         lfta_header += 
673 "\n"
674 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
675 "\n"
676 "#define SLOT_FILLED 0x04\n"
677 "#define SLOT_GEN_BITS 0x03\n"
678 "#define SLOT_HASH_BITS 0xfffffff8\n"
679 "#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
680 "#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
681 "#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
682 "\n\n"
683
684 "#define lfta_BOOL_to_hash(x) (x)\n"
685 "#define lfta_USHORT_to_hash(x) (x)\n"
686 "#define lfta_UINT_to_hash(x) (x)\n"
687 "#define lfta_IP_to_hash(x) (x)\n"
688 "#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
689 "#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
690 "#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
691 "#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
692 "#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
693 "static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
694 "       gs_uint32_t i,ret=0,tmp_sum = 0;\n"
695 "       for(i=0;i<x.length;++i){\n"
696 "               tmp_sum |= (x.data[i]) << (8*(i%4));\n"
697 "               if((i%4) == 3){\n"
698 "                       ret ^= tmp_sum;\n"
699 "                       tmp_sum = 0;\n"
700 "               }\n"
701 "       }\n"
702 "       if((i%4)!=0) ret ^=tmp_sum;\n"
703 "       return(ret);\n"
704 "}\n\n\n";
705
706
707
708 //////////////////////////////////////////////////////////////////
709 /////                   Get all of the query parse trees
710
711
712   int i,p;
713   int hfta_count = 0;           // for numeric suffixes to hfta .cc files
714
715 //---------------------------
716 //              Global info needed for post processing.
717
718 //                      Set of operator views ref'd in the query set.
719         opview_set opviews;
720 //                      lftas on a per-machine basis.
721         map<string, vector<stream_query *> > lfta_mach_lists;
722         int nfiles = input_file_names.size();
723         vector<stream_query *> hfta_list;               // list of hftas.
724         map<string, stream_query *> sq_map;             // map from query name to stream query.
725
726
727 //////////////////////////////////////////
728
729 //              Open and parse the interface resources file.
730         ifq_t *ifaces_db = new ifq_t();
731     string ierr;
732         if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
733                 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
734                                 ifx_fname.c_str(), ierr.c_str());
735                 exit(1);
736         }
737         if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
738                 fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
739                                 ifq_fls[0].c_str(), ierr.c_str());
740                 exit(1);
741         }
742
743   map<string, string> qname_to_flname;  // for detecting duplicate query names
744
745
746
747 //                      Parse the files to create a vector of parse trees.
748 //                      Load qnodes with information to perform a topo sort
749 //                      based on query dependencies.
750   vector<query_node *> qnodes;                          // for topo sort.
751   map<string,int> name_node_map;                        // map query name to qnodes entry
752   for(i=0;i<input_file_names.size();i++){
753
754           if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
755                   fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
756                   continue;
757           }
758 fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
759
760 //                      Parse the FTA query
761           fta_parse_result = new fta_parse_t();
762           FtaParser_setfileinput(fta_in);
763           if(FtaParserparse()){
764                 fprintf(stderr,"FTA parse failed.\n");
765                 exit(1);
766           }
767           if(fta_parse_result->parse_type != QUERY_PARSE){
768                 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
769                 exit(1);
770           }
771
772 //                      returns a list of parse trees
773           vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
774           for(p=0;p<qlist.size();++p){
775             table_exp_t *fta_parse_tree = qlist[p];
776 //              query_parse_trees.push_back(fta_parse_tree);
777
778 //                      compute the default name -- extract from query name
779                 strcpy(tmpstr,input_file_names[i].c_str());
780                 char *qname = strrchr(tmpstr,PATH_DELIM);
781                 if(qname == NULL)
782                         qname = tmpstr;
783                 else
784                         qname++;
785                 char *qname_end = strchr(qname,'.');
786                 if(qname_end != NULL) *qname_end = '\0';
787                 string qname_str = qname;
788                 string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
789
790 //                      Deternmine visibility.  Should I be attaching all of the output methods?
791                 if(qname_to_ospec.count(imputed_qname)>0)
792                         fta_parse_tree->set_visible(true);
793                 else
794                         fta_parse_tree->set_visible(false);
795
796
797 //                              Create a manipulable repesentation of the parse tree.
798 //                              the qnode inherits the visibility assigned to the parse tree.
799             int pos = qnodes.size();
800                 qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
801                 name_node_map[ qnodes[pos]->name ] = pos;
802 //printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
803 //              qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
804 //              qfiles.push_back(i);
805
806 //                      Check for duplicate query names
807 //                                      NOTE : in hfta-only generation, I should
808 //                                      also check with the names of the registered queries.
809                 if(qname_to_flname.count(qnodes[pos]->name) > 0){
810                         fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
811                                 qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
812                         exit(1);
813                 }
814                 if(Schema->find_tbl(qnodes[pos]->name) >= 0){
815                         fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
816                                 qnodes[pos]->name.c_str(), input_file_names[i].c_str());
817                         exit(1);
818                 }
819                 qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
820
821
822         }
823   }
824
825 //              Add the library queries
826
827   int pos;
828   for(pos=0;pos<qnodes.size();++pos){
829         int fi;
830         for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
831                 string src_tbl = qnodes[pos]->refd_tbls[fi];
832                 if(qname_to_flname.count(src_tbl) == 0){
833                         int last_sep = src_tbl.find_last_of('/');
834                         if(last_sep != string::npos){
835 fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
836                                 string target_qname = src_tbl.substr(last_sep+1);
837                                 string qpathname = library_path + src_tbl + ".gsql";
838                                 if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
839                                         fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
840                                         exit(1);
841                                         fprintf(stderr,"After exit\n");
842                                 }
843 fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
844 //                      Parse the FTA query
845                                 fta_parse_result = new fta_parse_t();
846                                 FtaParser_setfileinput(fta_in);
847                                 if(FtaParserparse()){
848                                         fprintf(stderr,"FTA parse failed.\n");
849                                         exit(1);
850                                 }
851                                 if(fta_parse_result->parse_type != QUERY_PARSE){
852                                         fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
853                                         exit(1);
854                                 }
855
856                                 map<string, int> local_query_map;
857                                 vector<string> local_query_names;
858                                 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
859                                 for(p=0;p<qlist.size();++p){
860                                 table_exp_t *fta_parse_tree = qlist[p];
861                                         fta_parse_tree->set_visible(false);             // assumed to not produce output
862                                         string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
863                                         if(imputed_qname == target_qname)
864                                                 imputed_qname = src_tbl;
865                                         if(local_query_map.count(imputed_qname)>0){
866                                                 fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
867                                                 exit(1);
868                                         }
869                                         local_query_map[ imputed_qname ] = p;
870                                         local_query_names.push_back(imputed_qname);
871                                 }
872
873                                 if(local_query_map.count(src_tbl)==0){
874                                         fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
875                                         exit(1);
876                                 }
877
878                                 vector<int> worklist;
879                                 set<int> added_queries;
880                                 vector<query_node *> new_qnodes;
881                                 worklist.push_back(local_query_map[target_qname]);
882                                 added_queries.insert(local_query_map[target_qname]);
883                                 int qq;
884                                 int qpos = qnodes.size();
885                                 for(qq=0;qq<worklist.size();++qq){
886                                         int q_id = worklist[qq];
887                                         query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
888                                         new_qnodes.push_back( new_qnode);
889                                         vector<string> refd_tbls =  new_qnode->refd_tbls;
890                                         int ff;
891                                         for(ff = 0;ff<refd_tbls.size();++ff){
892                                                 if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
893
894                                                         if(name_node_map.count(refd_tbls[ff])>0){
895                                                                 fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s,  and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
896                                                                 exit(1);
897                                                         }else{
898                                                                 worklist.push_back(local_query_map[refd_tbls[ff]]);
899                                                         }
900                                                 }
901                                         }
902                                 }
903
904                                 for(qq=0;qq<new_qnodes.size();++qq){
905                                         int qpos = qnodes.size();
906                                         qnodes.push_back(new_qnodes[qq]);
907                                         name_node_map[qnodes[qpos]->name ] = qpos;
908                                         qname_to_flname[qnodes[qpos]->name ] = qpathname;
909                                 }
910                         }
911                 }
912         }
913   }
914
915
916
917
918
919
920
921
922 //---------------------------------------
923
924
925 //              Add the UDOPS.
926
927   string udop_missing_sources;
928   for(i=0;i<qnodes.size();++i){
929         int fi;
930         for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
931                 int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
932                 if(sid >= 0){
933                         if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
934                                 if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
935                                 int pos = qnodes.size();
936                                         qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
937                                         name_node_map[ qnodes[pos]->name ] = pos;
938                                         qnodes[pos]->is_externally_visible = false;   // its visible
939         //                                      Need to mark the source queries as visible.
940                                         int si;
941                                         string missing_sources = "";
942                                         for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
943                                                 string src_tbl = qnodes[pos]->refd_tbls[si];
944                                                 if(name_node_map.count(src_tbl)==0){
945                                                         missing_sources += src_tbl + " ";
946                                                 }
947                                         }
948                                         if(missing_sources != ""){
949                                                 udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
950                                         }
951                                 }
952                         }
953                 }
954         }
955   }
956   if(udop_missing_sources != ""){
957         fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
958         exit(1);
959   }
960
961
962
963 ////////////////////////////////////////////////////////////////////
964 ///                             Check parse trees to verify that some
965 ///                             global properties are met :
966 ///                             if q1 reads from q2, then
967 ///                               q2 is processed before q1
968 ///                               q1 can supply q2's parameters
969 ///                             Verify there is no cycle in the reads-from graph.
970
971 //                      Compute an order in which to process the
972 //                      queries.
973
974 //                      Start by building the reads-from lists.
975 //
976
977   for(i=0;i<qnodes.size();++i){
978         int qi, fi;
979         vector<string> refd_tbls =  qnodes[i]->refd_tbls;
980         for(fi = 0;fi<refd_tbls.size();++fi){
981                 if(name_node_map.count(refd_tbls[fi])>0){
982 //printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
983                         (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
984                 }
985         }
986   }
987
988
989 //              If one query reads the result of another,
990 //              check for parameter compatibility.  Currently it must
991 //              be an exact match.  I will move to requiring
992 //              containment after re-ordering, but will require
993 //              some analysis for code generation which is not
994 //              yet in place.
995 //printf("There are %d query nodes.\n",qnodes.size());
996
997
998   for(i=0;i<qnodes.size();++i){
999         vector<var_pair_t *> target_params  = qnodes[i]->params;
1000         for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1001                 vector<var_pair_t *> source_params  = qnodes[(*si)]->params;
1002                 if(target_params.size() != source_params.size()){
1003                         fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1004                         exit(1);
1005                 }
1006                 int p;
1007                 for(p=0;p<target_params.size();++p){
1008                         if(! (target_params[p]->name == source_params[p]->name &&
1009                               target_params[p]->val == source_params[p]->val ) ){
1010                         fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1011                                 exit(1);
1012                         }
1013                 }
1014         }
1015   }
1016
1017
1018 //              Find the roots.
1019 //              Start by counting inedges.
1020   for(i=0;i<qnodes.size();++i){
1021         for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1022                 qnodes[(*si)]->n_consumers++;
1023         }
1024   }
1025
1026 //              The roots are the nodes with indegree zero.
1027   set<int> roots;
1028   for(i=0;i<qnodes.size();++i){
1029         if(qnodes[i]->n_consumers == 0){
1030                 if(qnodes[i]->is_externally_visible == false){
1031                         fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible.  Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
1032                 }
1033                 roots.insert(i);
1034         }
1035   }
1036
1037 //              Remove the parts of the subtree that produce no output.
1038   set<int> valid_roots;
1039   set<int> discarded_nodes;
1040   set<int> candidates;
1041   while(roots.size() >0){
1042         for(si=roots.begin();si!=roots.end();++si){
1043                 if(qnodes[(*si)]->is_externally_visible){
1044                         valid_roots.insert((*si));
1045                 }else{
1046                         discarded_nodes.insert((*si));
1047                         set<int>::iterator sir;
1048                         for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1049                                 qnodes[(*sir)]->n_consumers--;
1050                                 if(qnodes[(*sir)]->n_consumers == 0)
1051                                         candidates.insert( (*sir));
1052                         }
1053                 }
1054         }
1055         roots = candidates;
1056         candidates.clear();
1057   }
1058   roots = valid_roots;
1059   if(discarded_nodes.size()>0){
1060         fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
1061         int di = 0;
1062         for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
1063                 if(di>0 && (di%8)==0) fprintf(stderr,"\n");
1064                 di++;
1065                 fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
1066         }
1067         fprintf(stderr,"\n");
1068   }
1069
1070 //              Compute the sources_to set, ignoring discarded nodes.
1071   for(i=0;i<qnodes.size();++i){
1072         if(discarded_nodes.count(i)==0)
1073                 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1074                         qnodes[(*si)]->sources_to.insert(i);
1075         }
1076   }
1077
1078
1079 //              Find the nodes that are shared by multiple visible subtrees.
1080 //              THe roots become inferred visible nodes.
1081
1082 //              Find the visible nodes.
1083         vector<int> visible_nodes;
1084         for(i=0;i<qnodes.size();i++){
1085                 if(qnodes[i]->is_externally_visible){
1086                         visible_nodes.push_back(i);
1087                 }
1088         }
1089
1090 //              Find UDOPs referenced by visible nodes.
1091   list<int> workq;
1092   for(i=0;i<visible_nodes.size();++i){
1093         workq.push_back(visible_nodes[i]);
1094   }
1095   while(!workq.empty()){
1096         int node = workq.front();
1097         workq.pop_front();
1098         set<int>::iterator children;
1099         if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
1100                 qnodes[node]->is_externally_visible = true;
1101                 visible_nodes.push_back(node);
1102                 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1103                         if(qnodes[(*children)]->is_externally_visible == false){
1104                                 qnodes[(*children)]->is_externally_visible = true;
1105                                 visible_nodes.push_back((*children));
1106                         }
1107                 }
1108         }
1109         for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1110                 workq.push_back((*children));
1111         }
1112   }
1113
1114         bool done = false;
1115         while(!done){
1116 //      reset the nodes
1117                 for(i=0;i<qnodes.size();i++){
1118                         qnodes[i]->subtree_roots.clear();
1119                 }
1120
1121 //              Walk the tree defined by a visible node, not descending into
1122 //              subtrees rooted by a visible node.  Mark the node visited with
1123 //              the visible node ID.
1124                 for(i=0;i<visible_nodes.size();++i){
1125                         set<int> vroots;
1126                         vroots.insert(visible_nodes[i]);
1127                         while(vroots.size()>0){
1128                                 for(si=vroots.begin();si!=vroots.end();++si){
1129                                         qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
1130
1131                                         set<int>::iterator sir;
1132                                         for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1133                                                 if(! qnodes[(*sir)]->is_externally_visible){
1134                                                         candidates.insert( (*sir));
1135                                                 }
1136                                         }
1137                                 }
1138                                 vroots = candidates;
1139                                 candidates.clear();
1140                         }
1141                 }
1142 //              Find the nodes in multiple visible node subtrees, but with no parent
1143 //              that has is in multile visible node subtrees.  Mark these as inferred visible nodes.
1144                 done = true;    // until proven otherwise
1145                 for(i=0;i<qnodes.size();i++){
1146                         if(qnodes[i]->subtree_roots.size()>1){
1147                                 bool is_new_root = true;
1148                                 set<int>::iterator sir;
1149                                 for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
1150                                         if(qnodes[(*sir)]->subtree_roots.size()>1)
1151                                                 is_new_root = false;
1152                                 }
1153                                 if(is_new_root){
1154                                         qnodes[i]->is_externally_visible = true;
1155                                         qnodes[i]->inferred_visible_node = true;
1156                                         visible_nodes.push_back(i);
1157                                         done = false;
1158                                 }
1159                         }
1160                 }
1161         }
1162
1163
1164
1165
1166
1167 //              get visible nodes in topo ordering.
1168 //  for(i=0;i<qnodes.size();i++){
1169 //              qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
1170 //  }
1171   vector<int> process_order;
1172   while(roots.size() >0){
1173         for(si=roots.begin();si!=roots.end();++si){
1174                 if(discarded_nodes.count((*si))==0){
1175                         process_order.push_back( (*si) );
1176                 }
1177                 set<int>::iterator sir;
1178                 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1179                         qnodes[(*sir)]->n_consumers--;
1180                         if(qnodes[(*sir)]->n_consumers == 0)
1181                                 candidates.insert( (*sir));
1182                 }
1183         }
1184         roots = candidates;
1185         candidates.clear();
1186   }
1187
1188
1189 //printf("process_order.size() =%d\n",process_order.size());
1190
1191 //              Search for cyclic dependencies
1192   string found_dep;
1193   for(i=0;i<qnodes.size();++i){
1194         if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
1195                 if(found_dep.size() != 0) found_dep += ", ";
1196                 found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
1197         }
1198   }
1199   if(found_dep.size()>0){
1200         fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
1201         exit(1);
1202   }
1203
1204 //              Get a list of query sets, in the order to be processed.
1205 //              Start at visible root and do bfs.
1206 //              The query set includes queries referenced indirectly,
1207 //              as sources for user-defined operators.  These are needed
1208 //              to ensure that they are added to the schema, but are not part
1209 //              of the query tree.
1210
1211 //              stream_node_sets contains queries reachable only through the
1212 //              FROM clause, so I can tell which queries to add to the stream
1213 //              query. (DISABLED, UDOPS are integrated, does this cause problems?)
1214
1215 //                      NOTE: this code works because in order for data to be
1216 //                      read by multiple hftas, the node must be externally visible.
1217 //                      But visible nodes define roots of process sets.
1218 //                      internally visible nodes can feed data only
1219 //                      to other nodes in the same query file.
1220 //                      Therefore, any access can be restricted to a file,
1221 //                      hfta output sharing is done only on roots
1222 //                      never on interior nodes.
1223
1224
1225
1226
1227 //              Conpute the base collection of hftas.
1228   vector<hfta_node *> hfta_sets;
1229   map<string, int> hfta_name_map;
1230 //  vector< vector<int> > process_sets;
1231 //  vector< set<int> > stream_node_sets;
1232   reverse(process_order.begin(), process_order.end());  // get listing in reverse order.
1233                                                                                                                 // i.e. process leaves 1st.
1234   for(i=0;i<process_order.size();++i){
1235         if(qnodes[process_order[i]]->is_externally_visible == true){
1236 //printf("Visible.\n");
1237                 int root = process_order[i];
1238                 hfta_node *hnode = new hfta_node();
1239                 hnode->name = qnodes[root]-> name;
1240                 hnode->source_name = qnodes[root]-> name;
1241                 hnode->is_udop = qnodes[root]->is_udop;
1242                 hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
1243
1244                 vector<int> proc_list;  proc_list.push_back(root);
1245 //                      Ensure that nodes are added only once.
1246                 set<int> proc_set;      proc_set.insert(root);
1247                 roots.clear();                  roots.insert(root);
1248                 candidates.clear();
1249                 while(roots.size()>0){
1250                         for(si=roots.begin();si!=roots.end();++si){
1251 //printf("Processing root %d\n",(*si));
1252                                 set<int>::iterator sir;
1253                                 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1254 //printf("reads fom %d\n",(*sir));
1255                                         if(qnodes[(*sir)]->is_externally_visible==false){
1256                                                 candidates.insert( (*sir) );
1257                                                 if(proc_set.count( (*sir) )==0){
1258                                                         proc_set.insert( (*sir) );
1259                                                         proc_list.push_back( (*sir) );
1260                                                 }
1261                                         }
1262                                 }
1263                         }
1264                         roots = candidates;
1265                         candidates.clear();
1266                 }
1267
1268                 reverse(proc_list.begin(), proc_list.end());
1269                 hnode->query_node_indices = proc_list;
1270                 hfta_name_map[hnode->name] = hfta_sets.size();
1271                 hfta_sets.push_back(hnode);
1272         }
1273   }
1274
1275 //              Compute the reads_from / sources_to graphs for the hftas.
1276
1277   for(i=0;i<hfta_sets.size();++i){
1278         hfta_node *hnode = hfta_sets[i];
1279         for(q=0;q<hnode->query_node_indices.size();q++){
1280                 query_node *qnode = qnodes[hnode->query_node_indices[q]];
1281                 for(s=0;s<qnode->refd_tbls.size();++s){
1282                         if(hfta_name_map.count(qnode->refd_tbls[s])){
1283                                 int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
1284                                 hnode->reads_from.insert(other_hfta);
1285                                 hfta_sets[other_hfta]->sources_to.insert(i);
1286                         }
1287                 }
1288         }
1289   }
1290
1291 //              Compute a topological sort of the hfta_sets.
1292
1293   vector<int> hfta_topsort;
1294   workq.clear();
1295   int hnode_srcs[hfta_sets.size()];
1296   for(i=0;i<hfta_sets.size();++i){
1297         hnode_srcs[i] = 0;
1298         if(hfta_sets[i]->sources_to.size() == 0)
1299                 workq.push_back(i);
1300   }
1301
1302   while(! workq.empty()){
1303         int     node = workq.front();
1304         workq.pop_front();
1305         hfta_topsort.push_back(node);
1306         set<int>::iterator stsi;
1307         for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
1308                 int parent = (*stsi);
1309                 hnode_srcs[parent]++;
1310                 if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
1311                         workq.push_back(parent);
1312                 }
1313         }
1314   }
1315
1316 //              Decorate hfta nodes with the level of parallelism given as input.
1317
1318   map<string, int>::iterator msii;
1319   for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1320         string hfta_name = (*msii).first;
1321         int par = (*msii).second;
1322         if(hfta_name_map.count(hfta_name) > 0){
1323                 hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
1324         }else{
1325                 fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
1326         }
1327   }
1328
1329 //              Propagate levels of parallelism: children should have a level of parallelism
1330 //              as large as any of its parents.  Adjust children upwards to compensate.
1331 //              Start at parents and adjust children, auto-propagation will occur.
1332
1333   for(i=hfta_sets.size()-1;i>=0;i--){
1334         set<int>::iterator stsi;
1335         for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
1336                 if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
1337                         hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
1338                 }
1339         }
1340   }
1341
1342 //              Before all the name mangling, check if therey are any output_spec.cfg
1343 //              or hfta_parallelism.cfg entries that do not have a matching query.
1344
1345         string dangling_ospecs = "";
1346         for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
1347                 string oq = (*msii).first;
1348                 if(hfta_name_map.count(oq) == 0){
1349                         dangling_ospecs += " "+(*msii).first;
1350                 }
1351         }
1352         if(dangling_ospecs!=""){
1353                 fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
1354         }
1355
1356         string dangling_par = "";
1357         for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1358                 string oq = (*msii).first;
1359                 if(hfta_name_map.count(oq) == 0){
1360                         dangling_par += " "+(*msii).first;
1361                 }
1362         }
1363         if(dangling_par!=""){
1364                 fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
1365         }
1366
1367
1368
1369 //              Replicate parallelized hftas.  Do __copyX name mangling.  Adjust
1370 //              FROM clauses: retarget any name which is an internal node, and
1371 //              any which is in hfta_sets (and which is parallelized). Add Merge nodes
1372 //              when the source hfta has more parallelism than the target node.
1373 //              Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
1374
1375
1376   int n_original_hfta_sets = hfta_sets.size();
1377   for(i=0;i<n_original_hfta_sets;++i){
1378         if(hfta_sets[i]->n_parallel > 1){
1379                 hfta_sets[i]->do_generation =false;     // set the deletion flag for this entry.
1380                 set<string> local_nodes;                // names of query nodes in the hfta.
1381                 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1382                         local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
1383                 }
1384
1385                 for(p=0;p<hfta_sets[i]->n_parallel;++p){
1386                         string mangler = "__copy"+int_to_string(p);
1387                         hfta_node *par_hfta  = new hfta_node();
1388                         par_hfta->name = hfta_sets[i]->name + mangler;
1389                         par_hfta->source_name = hfta_sets[i]->name;
1390                         par_hfta->is_udop = hfta_sets[i]->is_udop;
1391                         par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
1392                         par_hfta->n_parallel = hfta_sets[i]->n_parallel;
1393                         par_hfta->parallel_idx = p;
1394
1395                         map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
1396
1397 //      Is it a UDOP?
1398                         if(hfta_sets[i]->is_udop){
1399                                 int root = hfta_sets[i]->query_node_indices[0];
1400
1401                                 string unequal_par_sources;
1402                                 set<int>::iterator rfsii;
1403                                 for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
1404                                         if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
1405                                                 unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
1406                                         }
1407                                 }
1408                                 if(unequal_par_sources != ""){
1409                                         fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
1410                                         exit(1);
1411                                 }
1412
1413                                 int rti;
1414                                 vector<string> new_sources;
1415                                 for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
1416                                         new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
1417                                 }
1418
1419                                 query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
1420                                 new_qn->name += mangler;
1421                                 new_qn->mangler = mangler;
1422                                 new_qn->refd_tbls = new_sources;
1423                                 par_hfta->query_node_indices.push_back(qnodes.size());
1424                                 par_qnode_map[new_qn->name] = qnodes.size();
1425                                 name_node_map[ new_qn->name ] = qnodes.size();
1426                                 qnodes.push_back(new_qn);
1427                         }else{
1428 //              regular query node
1429                           for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1430                                 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1431                                 table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
1432 //                                      rehome the from clause on mangled names.
1433 //                                      create merge nodes as needed for external sources.
1434                                 for(f=0;f<dup_pt->fm->tlist.size();++f){
1435                                         if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
1436                                                 dup_pt->fm->tlist[f]->schema_name += mangler;
1437                                         }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
1438 //                      Ref's an external HFTA.  No parallelism => leave as is.  Else, if level of parallelism of the two hftas is the same, rename by the mangler.  Else, there mnust be more sources, so create a merge node.
1439                                                 int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
1440                                                 if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
1441                                                         dup_pt->fm->tlist[f]->schema_name += mangler;
1442                                                 }else{
1443                                                         vector<string> src_tbls;
1444                                                         int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
1445                                                         if(stride == 0){
1446                                                                 fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
1447                                                                 exit(1);
1448                                                         }
1449                                                         for(s=0;s<stride;++s){
1450                                                                 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
1451                                                                 src_tbls.push_back(ext_src_name);
1452                                                         }
1453                                                         table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1454                                                         string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
1455                                                         dup_pt->fm->tlist[f]->schema_name = merge_node_name;
1456 //                                      Make a qnode to represent the new merge node
1457                                                         query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1458                                                         qn_pt->refd_tbls = src_tbls;
1459                                                         qn_pt->is_udop  = false;
1460                                                         qn_pt->is_externally_visible = false;
1461                                                         qn_pt->inferred_visible_node  = false;
1462                                                         par_hfta->query_node_indices.push_back(qnodes.size());
1463                                                         par_qnode_map[merge_node_name] = qnodes.size();
1464                                                         name_node_map[ merge_node_name ] = qnodes.size();
1465                                                         qnodes.push_back(qn_pt);
1466                                                 }
1467                                         }
1468                                 }
1469                                 query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
1470                                 for(f=0;f<dup_pt->fm->tlist.size();++f){
1471                                         new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
1472                                 }
1473                                 new_qn->params = qnodes[hqn_idx]->params;
1474                                 new_qn->is_udop = false;
1475                                 new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
1476                                 new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
1477                                 par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
1478                                 par_qnode_map[new_qn->name] = qnodes.size();
1479                                 name_node_map[ new_qn->name ] = qnodes.size();
1480                                 qnodes.push_back(new_qn);
1481                           }
1482                         }
1483                         hfta_name_map[par_hfta->name] = hfta_sets.size();
1484                         hfta_sets.push_back(par_hfta);
1485                 }
1486         }else{
1487 //              This hfta isn't being parallelized, but add merge nodes for any parallelized
1488 //              hfta sources.
1489                 if(!hfta_sets[i]->is_udop){
1490                   for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1491                         int hqn_idx = hfta_sets[i]->query_node_indices[h];
1492                         for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
1493                                 if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
1494 //                      Ref's an external HFTA.  No parallelism => leave as is.  Else, if level of parallelism of the two hftas is the same, rename by the mangler.  Else, there mnust be more sources, so create a merge node.
1495                                         int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
1496                                         if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
1497                                                 vector<string> src_tbls;
1498                                                 for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
1499                                                         string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
1500                                                         src_tbls.push_back(ext_src_name);
1501                                                 }
1502                                                 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1503                                                 string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
1504                                                 qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
1505 //                                      Make a qnode to represent the new merge node
1506                                                 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1507                                                 qn_pt->refd_tbls = src_tbls;
1508                                                 qn_pt->is_udop  = false;
1509                                                 qn_pt->is_externally_visible = false;
1510                                                 qn_pt->inferred_visible_node  = false;
1511                                                 hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
1512                                                 name_node_map[ merge_node_name ] = qnodes.size();
1513                                                 qnodes.push_back(qn_pt);
1514                                         }
1515                                 }
1516                         }
1517                 }
1518           }
1519         }
1520   }
1521
1522 //                      Rebuild the reads_from / sources_to lists in the qnodes
1523   for(q=0;q<qnodes.size();++q){
1524         qnodes[q]->reads_from.clear();
1525         qnodes[q]->sources_to.clear();
1526   }
1527   for(q=0;q<qnodes.size();++q){
1528         for(s=0;s<qnodes[q]->refd_tbls.size();++s){
1529                 if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
1530                         int rf = name_node_map[qnodes[q]->refd_tbls[s]];
1531                         qnodes[q]->reads_from.insert(rf);
1532                         qnodes[rf]->sources_to.insert(q);
1533                 }
1534         }
1535   }
1536
1537 //                      Rebuild the reads_from / sources_to lists in hfta_sets
1538   for(q=0;q<hfta_sets.size();++q){
1539         hfta_sets[q]->reads_from.clear();
1540         hfta_sets[q]->sources_to.clear();
1541   }
1542   for(q=0;q<hfta_sets.size();++q){
1543         for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
1544                 int node = hfta_sets[q]->query_node_indices[s];
1545                 set<int>::iterator rfsii;
1546                 for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
1547                         if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
1548                                 hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
1549                                 hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
1550                         }
1551                 }
1552         }
1553   }
1554
1555 /*
1556 for(q=0;q<qnodes.size();++q){
1557  printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
1558  set<int>::iterator rsii;
1559  for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
1560   printf(" %d",(*rsii));
1561   printf(", and sources-to %d:",qnodes[q]->sources_to.size());
1562  for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
1563   printf(" %d",(*rsii));
1564  printf("\n");
1565 }
1566
1567 for(q=0;q<hfta_sets.size();++q){
1568  if(hfta_sets[q]->do_generation==false)
1569         continue;
1570  printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
1571  set<int>::iterator rsii;
1572  for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
1573   printf(" %d",(*rsii));
1574   printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
1575  for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
1576   printf(" %d",(*rsii));
1577  printf("\n");
1578 }
1579 */
1580
1581
1582
1583 //              Re-topo sort the hftas
1584   hfta_topsort.clear();
1585   workq.clear();
1586   int hnode_srcs_2[hfta_sets.size()];
1587   for(i=0;i<hfta_sets.size();++i){
1588         hnode_srcs_2[i] = 0;
1589         if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
1590                 workq.push_back(i);
1591         }
1592   }
1593
1594   while(workq.empty() == false){
1595         int     node = workq.front();
1596         workq.pop_front();
1597         hfta_topsort.push_back(node);
1598         set<int>::iterator stsii;
1599         for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
1600                 int child = (*stsii);
1601                 hnode_srcs_2[child]++;
1602                 if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
1603                         workq.push_back(child);
1604                 }
1605         }
1606   }
1607
1608 //              Ensure that all of the query_node_indices in hfta_sets are topologically
1609 //              sorted, don't rely on assumptions that all transforms maintain some kind of order.
1610   for(i=0;i<hfta_sets.size();++i){
1611         if(hfta_sets[i]->do_generation){
1612                 map<int,int> n_accounted;
1613                 vector<int> new_order;
1614                 workq.clear();
1615                 vector<int>::iterator vii;
1616                 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1617                         n_accounted[(*vii)]= 0;
1618                 }
1619                 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1620                         set<int>::iterator rfsii;
1621                         for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
1622                                 if(n_accounted.count((*rfsii)) == 0){
1623                                         n_accounted[(*vii)]++;
1624                                 }
1625                         }
1626                         if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
1627                                 workq.push_back((*vii));
1628                         }
1629                 }
1630
1631                 while(workq.empty() == false){
1632                         int node = workq.front();
1633                         workq.pop_front();
1634                         new_order.push_back(node);
1635                         set<int>::iterator stsii;
1636                         for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
1637                                 if(n_accounted.count((*stsii))){
1638                                         n_accounted[(*stsii)]++;
1639                                         if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
1640                                                 workq.push_back((*stsii));
1641                                         }
1642                                 }
1643                         }
1644                 }
1645                 hfta_sets[i]->query_node_indices = new_order;
1646         }
1647   }
1648
1649
1650
1651
1652
1653 ///                     Global checkng is done, start the analysis and translation
1654 ///                     of the query parse tree in the order specified by process_order
1655
1656
1657 //                      Get a list of the LFTAs for global lfta optimization
1658 //                              TODO: separate building operators from spliting lftas,
1659 //                                      that will make optimizations such as predicate pushing easier.
1660         vector<stream_query *> lfta_list;
1661         stream_query *rootq;
1662     int qi,qj;
1663
1664         map<string, map<int, string> > schema_of_schemaid; // ensure that schema IDs are unique to an <interface, schema>
1665
1666         for(qi=hfta_topsort.size()-1;qi>=0;--qi){
1667
1668         int hfta_id = hfta_topsort[qi];
1669     vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
1670
1671
1672
1673 //              Two possibilities, either its a UDOP, or its a collection of queries.
1674 //      if(qnodes[curr_list.back()]->is_udop)
1675         if(hfta_sets[hfta_id]->is_udop){
1676                 int node_id = curr_list.back();
1677                 int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
1678                 opview_entry *opv = new opview_entry();
1679
1680 //                      Many of the UDOP properties aren't currently used.
1681                 opv->parent_qname = "no_parent";
1682                 opv->root_name = qnodes[node_id]->name;
1683                 opv->view_name = qnodes[node_id]->file;
1684                 opv->pos = qi;
1685                 sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
1686                 opv->udop_alias = tmpstr;
1687                 opv->mangler = qnodes[node_id]->mangler;
1688
1689                 if(opv->mangler != ""){
1690                         int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
1691                         Schema->mangle_subq_names(new_udop_schref,opv->mangler);
1692                 }
1693
1694 //                      This piece of code makes each hfta which referes to the same udop
1695 //                      reference a distinct running udop.  Do this at query optimization time?
1696 //              fmtbl->set_udop_alias(opv->udop_alias);
1697
1698                 opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
1699                 opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
1700
1701                 vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
1702                 int s,f,q;
1703                 for(s=0;s<subq.size();++s){
1704 //                              Validate that the fields match.
1705                         subquery_spec *sqs = subq[s];
1706                         string subq_name = sqs->name + opv->mangler;
1707                         vector<field_entry *> flds = Schema->get_fields(subq_name);
1708                         if(flds.size() == 0){
1709                                 fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
1710                                 return(1);
1711                         }
1712                         if(flds.size() < sqs->types.size()){
1713                                 fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
1714                                 return(1);
1715                         }
1716                         bool failed = false;
1717                         for(f=0;f<sqs->types.size();++f){
1718                                 data_type dte(sqs->types[f],sqs->modifiers[f]);
1719                                 data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
1720                                 if(! dte.subsumes_type(&dtf) ){
1721                                         fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
1722                                         failed = true;
1723                                 }
1724 /*
1725                                 if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
1726                                         string pstr = dte.get_temporal_string();
1727                                         fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
1728                                         failed = true;
1729                                 }
1730 */
1731                         }
1732                         if(failed)
1733                                 return(1);
1734 ///                             Validation done, find the subquery, make a copy of the
1735 ///                             parse tree, and add it to the return list.
1736                         for(q=0;q<qnodes.size();++q)
1737                                 if(qnodes[q]->name == subq_name)
1738                                         break;
1739                         if(q==qnodes.size()){
1740                                 fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
1741                                 return(1);
1742                         }
1743
1744                 }
1745
1746 //                      Cross-link to from entry(s) in all sourced-to tables.
1747                 set<int>::iterator sii;
1748                 for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
1749 //printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
1750                         vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
1751                         int ii;
1752                         for(ii=0;ii<tblvars.size();++ii){
1753                                 if(tblvars[ii]->schema_name == opv->root_name){
1754                                         tblvars[ii]->set_opview_idx(opviews.size());
1755                                 }
1756
1757                         }
1758                 }
1759
1760                 opviews.append(opv);
1761         }else{
1762
1763 //                      Analyze the parse trees in this query,
1764 //                      put them in rootq
1765 //      vector<int> curr_list = process_sets[qi];
1766
1767
1768 ////////////////////////////////////////
1769
1770           rootq = NULL;
1771 //printf("Process set %d, has %d queries\n",qi,curr_list.size());
1772           for(qj=0;qj<curr_list.size();++qj){
1773                 i = curr_list[qj];
1774         fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
1775
1776 //                      Select the current query parse tree
1777                 table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
1778
1779 //                      if hfta only, try to fetch any missing schemas
1780 //                      from the registry (using the print_schema program).
1781 //                      Here I use a hack to avoid analyzing the query -- all referenced
1782 //                      tables must be in the from clause
1783 //                      If there is a problem loading any table, just issue a warning,
1784 //
1785                 tablevar_list_t *fm = fta_parse_tree->get_from();
1786                 vector<string> refd_tbls =  fm->get_src_tbls(Schema);
1787 //                      iterate over all referenced tables
1788                 int t;
1789                 for(t=0;t<refd_tbls.size();++t){
1790                   int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
1791
1792                   if(tbl_ref < 0){      // if this table is not in the Schema
1793
1794                         if(hfta_only){
1795                                 string cmd="print_schema "+refd_tbls[t];
1796                                 FILE *schema_in = popen(cmd.c_str(), "r");
1797                                 if(schema_in == NULL){
1798                                   fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
1799                                 }else{
1800                                   string schema_instr;
1801                                   while(fgets(tmpstr,TMPSTRLEN,schema_in)){
1802                                         schema_instr += tmpstr;
1803                                   }
1804                           fta_parse_result = new fta_parse_t();
1805                                   strcpy(tmp_schema_str,schema_instr.c_str());
1806                                   FtaParser_setstringinput(tmp_schema_str);
1807                           if(FtaParserparse()){
1808                                 fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
1809                           }else{
1810                                         if( fta_parse_result->tables != NULL){
1811                                                 int tl;
1812                                                 for(tl=0;tl<fta_parse_result->tables->size();++tl){
1813                                                         Schema->add_table(fta_parse_result->tables->get_table(tl));
1814                                                 }
1815                                         }else{
1816                                                 fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
1817                                         }
1818                                 }
1819                         }
1820                   }else{
1821                                 fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
1822                                 exit(1);
1823                   }
1824
1825                 }
1826           }
1827
1828
1829 //                              Analyze the query.
1830           query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
1831           if(qs == NULL){
1832                 fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
1833                 exit(1);
1834           }
1835
1836           stream_query new_sq(qs, Schema);
1837           if(new_sq.error_code){
1838                         fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
1839                         exit(1);
1840           }
1841
1842 //                      Add it to the Schema
1843           table_def *output_td = new_sq.get_output_tabledef();
1844           Schema->add_table(output_td);
1845
1846 //                      Create a query plan from the analyzed parse tree.
1847 //                      If its a query referneced via FROM, add it to the stream query.
1848           if(rootq){
1849                 rootq->add_query(new_sq);
1850           }else{
1851                 rootq = new stream_query(new_sq);
1852 //                      have the stream query object inherit properties form the analyzed
1853 //                      hfta_node object.
1854                 rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
1855                 rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
1856           }
1857
1858
1859     }
1860
1861 //              This stream query has all its parts
1862 //              Build and optimize it.
1863 //printf("translate_fta: generating plan.\n");
1864         if(rootq->generate_plan(Schema)){
1865                 fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
1866                 continue;
1867         }
1868
1869 //      If we've found the query plan head, so now add the output operators
1870         if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
1871                 pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
1872                 multimap<string, int>::iterator mmsi;
1873                 oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
1874                 for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
1875                         rootq->add_output_operator(output_specs[(*mmsi).second]);
1876                 }
1877         }
1878
1879
1880
1881 //                              Perform query splitting if necessary.
1882         bool hfta_returned;
1883     vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
1884
1885         int l;
1886 //for(l=0;l<split_queries.size();++l){
1887 //printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
1888 //}
1889
1890
1891
1892
1893     if(split_queries.size() > 0){       // should be at least one component.
1894
1895 //                              Compute the number of LFTAs.
1896           int n_lfta = split_queries.size();
1897           if(hfta_returned) n_lfta--;
1898 //                              Check if a schemaId constraint needs to be inserted.
1899
1900 //                              Process the LFTA components.
1901           for(l=0;l<n_lfta;++l){
1902            if(lfta_names.count(split_queries[l]->query_name) == 0){
1903 //                              Grab the lfta for global optimization.
1904                 vector<tablevar_t *> tvec =  split_queries[l]->query_plan[0]->get_input_tbls();
1905                 string liface = "_local_";
1906 //              string lmach = "";
1907                 string lmach = hostname;
1908                 if(tvec.size()>0){
1909                         liface = tvec[0]->get_interface();      // iface queries have been resolved
1910                         lmach = tvec[0]->get_machine();
1911                 } // else{
1912                         interface_names.push_back(liface);
1913                         machine_names.push_back(lmach);
1914 //              }
1915
1916                 vector<predicate_t *> schemaid_preds;
1917                 for(int irv=0;irv<tvec.size();++irv){
1918
1919                         string schema_name = tvec[irv]->get_schema_name();
1920                         string rvar_name = tvec[irv]->get_var_name();
1921                         int schema_ref = tvec[irv]->get_schema_ref();
1922                         if (lmach == "")
1923                                 lmach = hostname;
1924 //                      interface_names.push_back(liface);
1925 //                      machine_names.push_back(lmach);
1926
1927 //printf("Machine is %s\n",lmach.c_str());
1928
1929 //                              Check if a schemaId constraint needs to be inserted.
1930                         if(schema_ref<0){ // can result from some kinds of splits
1931                                 schema_ref = Schema->get_table_ref(schema_name);
1932                         }
1933                         int schema_id = Schema->get_schema_id(schema_ref);  // id associated with PROTOCOL
1934                         int errnum = 0;
1935                         string if_error;
1936                         iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
1937                         if(iface==NULL){
1938                                 fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
1939                                 exit(1);
1940                         }       
1941
1942
1943                         if(tvec[irv]->get_interface() != "_local_"){
1944                          if(iface->has_multiple_schemas()){
1945                                 if(schema_id<0){        // invalid schema_id
1946                                         fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
1947                                         exit(1);
1948                                 }
1949                                 vector<string> iface_schemas = iface->get_property("Schemas");
1950                                 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1951                                         fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1952                                         exit(1);
1953                                 }
1954 // Ensure that in liface, schema_id is used for only one schema
1955                                 if(schema_of_schemaid.count(liface)==0){
1956                                         map<int, string> empty_map;
1957                                         schema_of_schemaid[liface] = empty_map;
1958                                 }
1959                                 if(schema_of_schemaid[liface].count(schema_id)==0){
1960                                         schema_of_schemaid[liface][schema_id] = schema_name;
1961                                 }else{
1962                                         if(schema_of_schemaid[liface][schema_id] != schema_name){
1963                                                 fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
1964                                                 exit(1);
1965                                         }
1966                                 }
1967                          }else{ // single-schema interface
1968                                 schema_id = -1; // don't generate schema_id predicate
1969                                 vector<string> iface_schemas = iface->get_property("Schemas");
1970                                 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1971                                         fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1972                                         exit(1);
1973                                 }
1974                                 if(iface_schemas.size()>1){
1975                                         fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
1976                                         exit(1);
1977                                 }
1978                          }                      
1979                         }else{
1980                                 schema_id = -1;
1981                         }
1982
1983 // If we need to check the schema_id, insert a predicate into the lfta.
1984 //       TODO not just schema_id, the full all_schema_ids set.
1985                         if(schema_id>=0){
1986                                 colref_t *schid_cr = new colref_t("schemaId");
1987                                 schid_cr->schema_ref = schema_ref;
1988                                 schid_cr->table_name = rvar_name;
1989                                 schid_cr->tablevar_ref = 0;
1990                                 schid_cr->default_table = false;
1991                                 scalarexp_t *schid_se = new scalarexp_t(schid_cr);
1992                                 data_type *schid_dt = new data_type("uint");
1993                                 schid_se->dt = schid_dt;
1994
1995                                 string schid_str = int_to_string(schema_id);
1996                                 literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
1997                                 scalarexp_t *lit_se = new scalarexp_t(schid_lit);
1998                                 lit_se->dt = schid_dt;
1999
2000                                 predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
2001                                 vector<cnf_elem *> clist;
2002                                 make_cnf_from_pr(schid_pr, clist);
2003                                 analyze_cnf(clist[0]);
2004                                 clist[0]->cost = 1;     // cheap one comparison
2005 // cnf built, now insert it.
2006                                 split_queries[l]->query_plan[0]->append_to_where(clist[0]);
2007
2008 // Specialized processing 
2009 //              filter join, get two schemaid preds
2010                                 string node_type = split_queries[l]->query_plan[0]->node_type();
2011                                 if(node_type == "filter_join"){
2012                                         filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2013                                         if(irv==0){
2014                                                 fj->pred_t0.push_back(clist[0]);
2015                                         }else{
2016                                                 fj->pred_t1.push_back(clist[0]);
2017                                         }
2018                                         schemaid_preds.push_back(schid_pr);
2019                                 }
2020 //              watchlist join, get the first schemaid pred
2021                                 if(node_type == "watch_join"){
2022                                         watch_join_qpn *fj = (watch_join_qpn *)split_queries[l]->query_plan[0];
2023                                         if(irv==0){
2024                                                 fj->pred_t0.push_back(clist[0]);
2025                                                 schemaid_preds.push_back(schid_pr);
2026                                         }
2027                                 }
2028                         }
2029                 }
2030 // Specialized processing, currently filter join.
2031                 if(schemaid_preds.size()>1){
2032                         string node_type = split_queries[l]->query_plan[0]->node_type();
2033                         if(node_type == "filter_join"){
2034                                 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2035                                 predicate_t *filter_pr = new predicate_t("OR", schemaid_preds[0], schemaid_preds[1]);
2036                                 vector<cnf_elem *> clist;
2037                                 make_cnf_from_pr(filter_pr, clist);
2038                                 analyze_cnf(clist[0]);
2039                                 clist[0]->cost = 1;     // cheap one comparison
2040                                 fj->shared_pred.push_back(clist[0]);
2041                         }
2042                 }
2043                         
2044
2045
2046
2047
2048
2049
2050 //                      Set the ht size from the recommendation, if there is one in the rec file
2051                 if(lfta_htsize.count(split_queries[l]->query_name)>0){
2052                         split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
2053                 }
2054
2055
2056                 lfta_names[split_queries[l]->query_name] = lfta_list.size();
2057                 split_queries[l]->set_gid(lfta_list.size());  // set lfta global id
2058                 lfta_list.push_back(split_queries[l]);
2059                 lfta_mach_lists[lmach].push_back(split_queries[l]);
2060
2061 //                      THe following is a hack,
2062 //                      as I should be generating LFTA code through
2063 //                      the stream_query object.
2064
2065                 split_queries[l]->query_plan[0]->bind_to_schema(Schema);
2066
2067 //              split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
2068
2069 /*
2070 //                              Create query description to embed in lfta.c
2071                 string lfta_schema_str = split_queries[l]->make_schema();
2072                 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2073
2074 //                              get NIC capabilities.
2075                 int erri;
2076                 nic_property *nicprop = NULL;
2077                 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2078                 if(iface_codegen_type.size()){
2079                         nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2080                         if(!nicprop){
2081                                 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2082                                         exit(1);
2083                         }
2084                 }
2085
2086                 lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
2087 */
2088
2089                 snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
2090
2091 // TODO NOTE : I'd like it to be the case that registration_query_names
2092 //      are the queries to be registered for subsciption.
2093 //      but there is some complex bookkeeping here.
2094                 registration_query_names.push_back(split_queries[l]->query_name);
2095                 mach_query_names[lmach].push_back(registration_query_names.size()-1);
2096 //                      NOTE: I will assume a 1-1 correspondance between
2097 //                      mach_query_names[lmach] and lfta_mach_lists[lmach]
2098 //                      where mach_query_names[lmach][i] contains the index into
2099 //                      query_names, which names the lfta, and
2100 //                      mach_query_names[lmach][i] is the stream_query * of the
2101 //                      corresponding lfta.
2102 //                      Later, lfta_iface_qnames are the query names matching lfta_iface_lists
2103
2104
2105
2106                 // check if lfta is reusable
2107                 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
2108
2109                 bool lfta_reusable = false;
2110                 if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
2111                         split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
2112                         lfta_reusable = true;
2113                 }
2114                 lfta_reuse_options.push_back(lfta_reusable);
2115
2116                 // LFTA will inherit the liveness timeout specification from the containing query
2117                 // it is too conservative as lfta are expected to spend less time per tuple
2118                 // then full query
2119
2120                 // extract liveness timeout from query definition
2121                 int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
2122                 if (!liveness_timeout) {
2123 //                  fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
2124 //                    split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2125                     liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2126                 }
2127                 lfta_liveness_timeouts.push_back(liveness_timeout);
2128
2129 //                      Add it to the schema
2130                 table_def *td = split_queries[l]->get_output_tabledef();
2131                 Schema->append_table(td);
2132 //printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
2133
2134           }
2135          }
2136
2137 //                              If the output is lfta-only, dump out the query name.
2138       if(split_queries.size() == 1 && !hfta_returned){
2139         if(output_query_names ){
2140            fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
2141                 }
2142 /*
2143 else{
2144            fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
2145                 }
2146 */
2147
2148 /*
2149 //                              output schema summary
2150                 if(output_schema_summary){
2151                         dump_summary(split_queries[0]);
2152                 }
2153 */
2154       }
2155
2156
2157           if(hfta_returned){            // query also has an HFTA component
2158                 int hfta_nbr = split_queries.size()-1;
2159
2160                         hfta_list.push_back(split_queries[hfta_nbr]);
2161
2162 //                                      report on generated query names
2163         if(output_query_names){
2164                         string hfta_name =split_queries[hfta_nbr]->query_name;
2165                 fprintf(query_name_output,"%s H\n",hfta_name.c_str());
2166                         for(l=0;l<hfta_nbr;++l){
2167                                 string lfta_name =split_queries[l]->query_name;
2168                         fprintf(query_name_output,"%s L\n",lfta_name.c_str());
2169                         }
2170                 }
2171 //              else{
2172 //              fprintf(stderr,"query names are ");
2173 //                      for(l=0;l<hfta_nbr;++l){
2174 //                              if(l>0) fprintf(stderr,",");
2175 //                              string fta_name =split_queries[l]->query_name;
2176 //                      fprintf(stderr," %s",fta_name.c_str());
2177 //                      }
2178 //                      fprintf(stderr,"\n");
2179 //              }
2180           }
2181
2182   }else{
2183           fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
2184           fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
2185           exit(1);
2186   }
2187  }
2188 }
2189
2190
2191 //-----------------------------------------------------------------
2192 //              Compute and propagate the SE in PROTOCOL fields compute a field.
2193 //-----------------------------------------------------------------
2194
2195 for(i=0;i<lfta_list.size();i++){
2196         lfta_list[i]->generate_protocol_se(sq_map, Schema);
2197         sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
2198 }
2199 for(i=0;i<hfta_list.size();i++){
2200         hfta_list[i]->generate_protocol_se(sq_map, Schema);
2201         sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
2202 }
2203
2204
2205
2206 //------------------------------------------------------------------------
2207 //              Perform  individual FTA optimizations
2208 //-----------------------------------------------------------------------
2209
2210 if (partitioned_mode) {
2211
2212         // open partition definition file
2213         string part_fname = config_dir_path + "partition.txt";
2214
2215         FILE* partfd = fopen(part_fname.c_str(), "r");
2216         if (!partfd) {
2217                 fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
2218                 exit(1);
2219         }
2220         PartnParser_setfileinput(partfd);
2221         if (PartnParserparse()) {
2222                 fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
2223                 exit(1);
2224         }
2225         fclose(partfd);
2226 }
2227
2228
2229 print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2230
2231 int num_hfta = hfta_list.size();
2232 for(i=0; i < hfta_list.size(); ++i){
2233         hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
2234 }
2235
2236 //                      Add all new hftas to schema
2237 for(i=num_hfta; i < hfta_list.size(); ++i){
2238                 table_def *td = hfta_list[i]->get_output_tabledef();
2239                 Schema->append_table(td);
2240 }
2241
2242 print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2243
2244
2245
2246 //------------------------------------------------------------------------
2247 //              Do global (cross-fta) optimization
2248 //-----------------------------------------------------------------------
2249
2250
2251
2252
2253
2254
2255 set<string> extra_external_libs;
2256
2257 for(i=0;i<hfta_list.size();++i){                // query also has an HFTA component
2258
2259                 if(! debug_only){
2260 //                                      build hfta file name, create output
2261            if(numeric_hfta_flname){
2262             sprintf(tmpstr,"hfta_%d",hfta_count);
2263                         hfta_names.push_back(tmpstr);
2264             sprintf(tmpstr,"hfta_%d.cc",hfta_count);
2265          }else{
2266             sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
2267                         hfta_names.push_back(tmpstr);
2268             sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
2269           }
2270                   FILE *hfta_fl = fopen(tmpstr,"w");
2271                   if(hfta_fl == NULL){
2272                         fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
2273                         exit(1);
2274                   }
2275                   fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
2276
2277 //                      If there is a field verifier, warn about
2278 //                      lack of compatability
2279 //                              NOTE : this code assumes that visible non-lfta queries
2280 //                              are those at the root of a stream query.
2281                   string hfta_comment;
2282                   string hfta_title;
2283                   string hfta_namespace;
2284                   if(hfta_list[i]->defines.count("comment")>0)
2285                         hfta_comment = hfta_list[i]->defines["comment"];
2286                   if(hfta_list[i]->defines.count("Comment")>0)
2287                         hfta_comment = hfta_list[i]->defines["Comment"];
2288                   if(hfta_list[i]->defines.count("COMMENT")>0)
2289                         hfta_comment = hfta_list[i]->defines["COMMENT"];
2290                   if(hfta_list[i]->defines.count("title")>0)
2291                         hfta_title = hfta_list[i]->defines["title"];
2292                   if(hfta_list[i]->defines.count("Title")>0)
2293                         hfta_title = hfta_list[i]->defines["Title"];
2294                   if(hfta_list[i]->defines.count("TITLE")>0)
2295                         hfta_title = hfta_list[i]->defines["TITLE"];
2296                   if(hfta_list[i]->defines.count("namespace")>0)
2297                         hfta_namespace = hfta_list[i]->defines["namespace"];
2298                   if(hfta_list[i]->defines.count("Namespace")>0)
2299                         hfta_namespace = hfta_list[i]->defines["Namespace"];
2300                   if(hfta_list[i]->defines.count("NAMESPACE")>0)
2301                         hfta_namespace = hfta_list[i]->defines["NAMESPACE"];
2302
2303                   if(field_verifier != NULL){
2304                         string warning_str;
2305                         if(hfta_comment == "")
2306                                 warning_str += "\tcomment not found.\n";
2307
2308 // Obsolete stuff that Carsten wanted
2309 //                      if(hfta_title == "")
2310 //                              warning_str += "\ttitle not found.\n";
2311 //                      if(hfta_namespace == "")
2312 //                              warning_str += "\tnamespace not found.\n";
2313
2314 // STOPPED HERE
2315 //      There is a get_tbl_keys method implemented for qp_nodes,
2316 //      integrate it into steam_query, then call it to find keys,
2317 //      and annotate feidls with their key-ness.
2318 //      If there is a "keys" proprty in the defines block, override anything returned
2319 //      from the automated analysis
2320
2321                         vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2322                         int fi;
2323                         for(fi=0;fi<flds.size();fi++){
2324                                 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2325                         }
2326                         if(warning_str != "")
2327                                 fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
2328                                         hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
2329                   }
2330
2331 // Get the fields in this query
2332                   vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2333
2334 // do key processing
2335                   string hfta_keys_s = "";
2336                   if(hfta_list[i]->defines.count("keys")>0)
2337                         hfta_keys_s = hfta_list[i]->defines["keys"];
2338                   if(hfta_list[i]->defines.count("Keys")>0)
2339                         hfta_keys_s = hfta_list[i]->defines["Keys"];
2340                   if(hfta_list[i]->defines.count("KEYS")>0)
2341                         hfta_keys_s = hfta_list[i]->defines["KEYS"];
2342                   string xtra_keys_s = "";
2343                   if(hfta_list[i]->defines.count("extra_keys")>0)
2344                         xtra_keys_s = hfta_list[i]->defines["extra_keys"];
2345                   if(hfta_list[i]->defines.count("Extra_Keys")>0)
2346                         xtra_keys_s = hfta_list[i]->defines["Extra_Keys"];
2347                   if(hfta_list[i]->defines.count("EXTRA_KEYS")>0)
2348                         xtra_keys_s = hfta_list[i]->defines["EXTRA_KEYS"];
2349 // get the keys
2350                   vector<string> hfta_keys;
2351                   vector<string> partial_keys;
2352                   vector<string> xtra_keys;
2353                   if(hfta_keys_s==""){
2354                                 hfta_keys = hfta_list[i]->get_tbl_keys(partial_keys);
2355                                 if(xtra_keys_s.size()>0){
2356                                         xtra_keys = split_string(xtra_keys_s, ',');
2357                                 }
2358                                 for(int xi=0;xi<xtra_keys.size();++xi){
2359                                         if(find(hfta_keys.begin(), hfta_keys.end(), xtra_keys[xi]) == hfta_keys.end()){
2360                                                 hfta_keys.push_back(xtra_keys[xi]);
2361                                         }
2362                                 }
2363                   }else{
2364                                 hfta_keys = split_string(hfta_keys_s, ',');
2365                   }
2366 // validate that all of the keys exist in the output.
2367 //      (exit on error, as its a bad specificiation)
2368                   vector<string> missing_keys;
2369                   for(int ki=0;ki<hfta_keys.size(); ++ki){
2370                         int fi;
2371                         for(fi=0;fi<flds.size();++fi){
2372                                 if(hfta_keys[ki] == flds[fi]->get_name())
2373                                         break;
2374                         }
2375                         if(fi==flds.size())
2376                                 missing_keys.push_back(hfta_keys[ki]);
2377                   }
2378                   if(missing_keys.size()>0){
2379                         fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the output:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2380                         for(int hi=0; hi<missing_keys.size(); ++hi){
2381                                 fprintf(stderr," %s", missing_keys[hi].c_str());
2382                         }
2383                         fprintf(stderr,"\n");
2384                         exit(1);
2385                   }
2386
2387                   fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2388                   if(hfta_comment != "")
2389                         fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
2390                   if(hfta_title != "")
2391                         fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
2392                   if(hfta_namespace != "")
2393                         fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
2394                   fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
2395                   fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2396
2397 //                              write info about fields to qtree.xml
2398                   int fi;
2399                   for(fi=0;fi<flds.size();fi++){
2400                         fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2401                         if(flds[fi]->get_modifier_list()->size()){
2402                                 fprintf(qtree_output,"mods='%s'", flds[fi]->get_modifier_list()->to_string().c_str());
2403                         }
2404                         fprintf(qtree_output," />\n");
2405                   }
2406 // info about keys
2407                   for(int hi=0;hi<hfta_keys.size();++hi){
2408                         fprintf(qtree_output, "\t\t<Key value='%s'/>\n",hfta_keys[hi].c_str());
2409                   }
2410                   for(int hi=0;hi<partial_keys.size();++hi){
2411                         fprintf(qtree_output, "\t\t<PartialKey value='%s'/>\n",partial_keys[hi].c_str());
2412                   }
2413                   for(int hi=0;hi<xtra_keys.size();++hi){
2414                         fprintf(qtree_output, "\t\t<ExtraKey value='%s'/>\n",xtra_keys[hi].c_str());
2415                   }
2416
2417
2418                   // extract liveness timeout from query definition
2419                   int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
2420                   if (!liveness_timeout) {
2421 //                  fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
2422 //                    hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2423                     liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2424                   }
2425                   fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
2426
2427                   vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
2428                   int itv;
2429                   for(itv=0;itv<tmp_tv.size();++itv){
2430                         fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
2431                   }
2432                   string ifrs = hfta_list[i]->collect_refd_ifaces();
2433                   if(ifrs != ""){
2434                         fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
2435                   }
2436                   fprintf(qtree_output,"\t</HFTA>\n");
2437
2438                   fclose(hfta_fl);
2439                 }else{
2440 //                                      debug only -- do code generation to catch generation-time errors.
2441                   hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
2442                 }
2443
2444                 hfta_count++;   // for hfta file names with numeric suffixes
2445
2446                 hfta_list[i]->get_external_libs(extra_external_libs);
2447
2448           }
2449
2450 string ext_lib_string;
2451 set<string>::iterator ssi_el;
2452 for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
2453         ext_lib_string += (*ssi_el)+" ";
2454
2455
2456
2457 //                      Report on the set of operator views
2458   for(i=0;i<opviews.size();++i){
2459         opview_entry *opve = opviews.get_entry(i);
2460         fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
2461         fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
2462         fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
2463         fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
2464         fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2465
2466         if (!opve->liveness_timeout) {
2467 //              fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
2468 //                      opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2469                 opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2470         }
2471         fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
2472     int j;
2473         for(j=0;j<opve->subq_names.size();j++)
2474                 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
2475         fprintf(qtree_output,"\t</UDOP>\n");
2476   }
2477
2478
2479 //-----------------------------------------------------------------
2480
2481 //                      Create interface-specific meta code files.
2482 //                              first, open and parse the interface resources file.
2483         ifaces_db = new ifq_t();
2484     ierr = "";
2485         if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
2486                 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
2487                                 ifx_fname.c_str(), ierr.c_str());
2488                 exit(1);
2489         }
2490
2491         map<string, vector<stream_query *> >::iterator svsi;
2492         for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
2493                 string lmach = (*svsi).first;
2494
2495         //              For this machine, create a set of lftas per interface.
2496                 vector<stream_query *> mach_lftas = (*svsi).second;
2497                 map<string, vector<stream_query *> > lfta_iface_lists;
2498                 int li;
2499                 for(li=0;li<mach_lftas.size();++li){
2500                         vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2501                         string lfta_iface = "_local_";
2502                         if(tvec.size()>0){
2503                                 string lfta_iface = tvec[0]->get_interface();
2504                         }
2505                         lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2506                 }
2507
2508                 map<string, vector<stream_query *> >::iterator lsvsi;
2509                 for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
2510                         int erri;
2511                         string liface = (*lsvsi).first;
2512                         vector<stream_query *> iface_lftas = (*lsvsi).second;
2513                         vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2514                         if(iface_codegen_type.size()){
2515                                 nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2516                                 if(!nicprop){
2517                                         fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2518                                         exit(1);
2519                                 }
2520                                 string mcs = generate_nic_code(iface_lftas, nicprop);
2521                                 string mcf_flnm;
2522                                 if(lmach != "")
2523                                   mcf_flnm = lmach + "_"+liface+".mcf";
2524                                 else
2525                                   mcf_flnm = hostname + "_"+liface+".mcf";
2526                                 FILE *mcf_fl ;
2527                                 if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
2528                                         fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
2529                                         exit(1);
2530                                 }
2531                                 fprintf(mcf_fl,"%s",mcs.c_str());
2532                                 fclose(mcf_fl);
2533 //printf("mcs of machine %s, iface %s of type %s is \n%s\n",
2534 //lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
2535                         }
2536                 }
2537
2538
2539         }
2540
2541
2542
2543 //-----------------------------------------------------------------
2544
2545
2546 //                      Find common filter predicates in the LFTAs.
2547 //                      in addition generate structs to store the temporal attributes unpacked by prefilter
2548         
2549         map<string, vector<stream_query *> >::iterator ssqi;
2550         for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
2551
2552                 string lmach = (*ssqi).first;
2553                 bool packed_return = false;
2554                 int li, erri;
2555
2556
2557 //      The LFTAs of this machine.
2558                 vector<stream_query *> mach_lftas = (*ssqi).second;
2559 //      break up on a per-interface basis.
2560                 map<string, vector<stream_query *> > lfta_iface_lists;
2561                 map<string, vector<int> > lfta_iface_qname_ix; // need the query name
2562                                                         // for fta_init
2563                 for(li=0;li<mach_lftas.size();++li){
2564                         vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2565                         string lfta_iface = "_local_";
2566                         if(tvec.size()>0){
2567                                 lfta_iface = tvec[0]->get_interface();
2568                         }
2569                         lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2570                         lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
2571                 }
2572
2573
2574 //      Are the return values "packed"?
2575 //      This should be done on a per-interface basis.
2576 //      But this is defunct code for gs-lite
2577                 for(li=0;li<mach_lftas.size();++li){
2578                   vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2579                   string liface = "_local_";
2580                   if(tvec.size()>0){
2581                          liface = tvec[0]->get_interface();
2582                   }
2583                   vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2584                   if(iface_codegen_type.size()){
2585                         if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
2586                         packed_return = true;
2587                         }
2588                   }
2589                 }
2590
2591
2592 // Separate lftas by interface, collect results on a per-interface basis.
2593
2594                 vector<cnf_set *> no_preds;     // fallback if there is no prefilter
2595                 map<string, vector<cnf_set *> > prefilter_preds;
2596                 set<unsigned int> pred_ids;     // this can be global for all interfaces
2597                 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2598                         string liface = (*mvsi).first;
2599                         vector<cnf_set *> empty_list;
2600                         prefilter_preds[liface] = empty_list;
2601                         if(! packed_return){
2602                                 get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
2603                         }
2604
2605 //                              get NIC capabilities.  (Is this needed?)
2606                         nic_property *nicprop = NULL;
2607                         vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2608                         if(iface_codegen_type.size()){
2609                                 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2610                                 if(!nicprop){
2611                                         fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2612                                         exit(1);
2613                                 }
2614                         }
2615                 }
2616
2617
2618 //              Now that we know the prefilter preds, generate the lfta code.
2619 //      Do this for all lftas in this machine.
2620                 for(li=0;li<mach_lftas.size();++li){
2621                         set<unsigned int> subsumed_preds;
2622                         set<unsigned int>::iterator sii;
2623 #ifdef PREFILTER_OK
2624                         for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
2625                                 int pid = (*sii);
2626                                 if((pid>>16) == li){
2627                                         subsumed_preds.insert(pid & 0xffff);
2628                                 }
2629                         }
2630 #endif
2631                         string lfta_schema_str = mach_lftas[li]->make_schema();
2632                         string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2633                         nic_property *nicprop = NULL; // no NIC properties?
2634                         lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
2635                 }
2636
2637
2638 //                      generate structs to store the temporal attributes
2639 //                      unpacked by prefilter
2640                 col_id_set temp_cids;
2641                 get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
2642                 lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
2643
2644 //                      Compute the lfta bit signatures and the lfta colrefs
2645 //      do this on a per-interface basis
2646 #ifdef PREFILTER_OK
2647                         lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2648 #endif
2649                 map<string, vector<long long int> > lfta_sigs; // used again later
2650                 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2651                         string liface = (*mvsi).first;
2652                         vector<long long int> empty_list;
2653                         lfta_sigs[liface] = empty_list;
2654
2655                         vector<col_id_set> lfta_cols;
2656                         vector<int> lfta_snap_length;
2657                         for(li=0;li<lfta_iface_lists[liface].size();++li){
2658                                 unsigned long long int mask=0, bpos=1;
2659                                 int f_pos;
2660                                 for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
2661                                         if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
2662                                                 mask |= bpos;
2663                                         bpos = bpos << 1;
2664                                 }
2665                                 lfta_sigs[liface].push_back(mask);
2666                                 lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
2667                                 lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
2668                         }
2669
2670 //for(li=0;li<mach_lftas.size();++li){
2671 //printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
2672 //col_id_set::iterator tcisi;
2673 //for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
2674 //printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
2675 //}
2676 //}
2677
2678
2679 //                      generate the prefilter
2680 //      Do this on a per-interface basis, except for the #define
2681 #ifdef PREFILTER_OK
2682 //                      lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2683                         lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2684 #else
2685                         lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns,  lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2686
2687 #endif
2688                 }
2689
2690 //                      Generate interface parameter lookup function
2691           lfta_val[lmach] += "// lookup interface properties by name\n";
2692           lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
2693           lfta_val[lmach] += "// returns NULL if given property does not exist\n";
2694           lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
2695
2696 //        collect a lit of interface names used by queries running on this host
2697           set<std::string> iface_names;
2698           for(i=0;i<mach_query_names[lmach].size();i++){
2699                 int mi = mach_query_names[lmach][i];
2700                 stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2701
2702                 if(interface_names[mi]=="")
2703                         iface_names.insert("DEFAULTDEV");
2704                 else
2705                         iface_names.insert(interface_names[mi]);
2706           }
2707
2708 //        generate interface property lookup code for every interface
2709           set<std::string>::iterator sir;
2710           for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
2711                 if (sir == iface_names.begin())
2712                         lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2713                 else
2714                         lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2715
2716                 // iterate through interface properties
2717                 vector<string> iface_properties;
2718                 if(*sir!="_local_"){    // dummy watchlist interface, don't process.
2719                         iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
2720                 }
2721                 if (erri) {
2722                         fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
2723                         exit(1);
2724                 }
2725                 if (iface_properties.empty())
2726                         lfta_val[lmach] += "\t\treturn NULL;\n";
2727                 else {
2728                         for (int i = 0; i < iface_properties.size(); ++i) {
2729                                 if (i == 0)
2730                                         lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2731                                 else
2732                                         lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2733
2734                                 // combine all values for the interface property using comma separator
2735                                 vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
2736                                 lfta_val[lmach] += "\t\t\treturn \"";
2737                                 for (int j = 0; j < vals.size(); ++j) {
2738                                         lfta_val[lmach] +=  vals[j];
2739                                         if (j != vals.size()-1)
2740                                                 lfta_val[lmach] += ",";
2741                                 }
2742                                 lfta_val[lmach] += "\";\n";
2743                         }
2744                         lfta_val[lmach] += "\t\t} else\n";
2745                         lfta_val[lmach] += "\t\t\treturn NULL;\n";
2746                 }
2747           }
2748           lfta_val[lmach] += "\t} else\n";
2749           lfta_val[lmach] += "\t\treturn NULL;\n";
2750           lfta_val[lmach] += "}\n\n";
2751
2752
2753 //                      Generate a full list of FTAs for clearinghouse reference
2754           lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
2755           lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
2756
2757           bool first = true;
2758           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2759                 string liface = (*mvsi).first;
2760                 if(liface != "_local_"){        // these don't register themselves
2761                         vector<stream_query *> lfta_list = (*mvsi).second;
2762                         for(i=0;i<lfta_list.size();i++){
2763                                 int mi = lfta_iface_qname_ix[liface][i];
2764                                 if(first) first = false;
2765                                 else      lfta_val[lmach] += ", ";
2766                                 lfta_val[lmach] += "\"" + registration_query_names[mi] + "\"";
2767                         }
2768                 }
2769           }
2770 //        for (i = 0; i < registration_query_names.size(); ++i) {
2771 //                 if (i)
2772 //                        lfta_val[lmach] += ", ";
2773 //                 lfta_val[lmach] += "\"" + registration_query_names[i] + "\"";
2774 //        }
2775
2776           for (i = 0; i < hfta_list.size(); ++i) {
2777                    lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
2778           }
2779           lfta_val[lmach] += ", NULL};\n\n";
2780
2781
2782 //                      Add the initialization function to lfta.c
2783 //      Change to accept the interface name, and 
2784 //      set the prefilter function accordingly.
2785 //      see the example in demo/err2
2786           lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
2787           lfta_val[lmach] += "//        note: the last parameter in fta_register is the prefilter signature\n";
2788
2789 //        for(i=0;i<mach_query_names[lmach].size();i++)
2790 //              int mi = mach_query_names[lmach][i];
2791 //              stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2792
2793           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2794                 string liface = (*mvsi).first;
2795                 vector<stream_query *> lfta_list = (*mvsi).second;
2796                 for(i=0;i<lfta_list.size();i++){
2797                         stream_query *lfta_sq = lfta_list[i];
2798                         int mi = lfta_iface_qname_ix[liface][i];
2799
2800                         if(liface == "_local_"){
2801 //  Don't register an init function, do the init code inline
2802                                 lfta_val[lmach] += "// initialization code for "+registration_query_names[mi]+";\n";
2803                                 lfta_val[lmach] += "\t"+generate_watchlist_name(registration_query_names[mi])+".filename = "+generate_watchlist_name(registration_query_names[mi])+"__fstr;\n";
2804                                 continue;
2805                         }
2806                 
2807                         fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
2808
2809                         string this_iface = "DEFAULTDEV";
2810                         if(interface_names[mi]!="")
2811                                 this_iface = '"'+interface_names[mi]+'"';
2812                         lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
2813                 lfta_val[lmach] += "\t\tfta_register(\""+registration_query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
2814 //              if(interface_names[mi]=="")
2815 //                              lfta_val[lmach]+="DEFAULTDEV";
2816 //              else
2817 //                              lfta_val[lmach]+='"'+interface_names[mi]+'"';
2818                         lfta_val[lmach] += this_iface;
2819
2820
2821                 lfta_val[lmach] += ", "+generate_alloc_name(registration_query_names[mi])
2822                         +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(registration_query_names[mi])
2823                         +"\n#endif\n";
2824                                 sprintf(tmpstr,",%d",snap_lengths[mi]);
2825                         lfta_val[lmach] += tmpstr;
2826
2827 //                      unsigned long long int mask=0, bpos=1;
2828 //                      int f_pos;
2829 //                      for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
2830 //                              if(prefilter_preds[f_pos]->lfta_id.count(i))
2831 //                                      mask |= bpos;
2832 //                              bpos = bpos << 1;
2833 //                      }
2834
2835 #ifdef PREFILTER_OK
2836 //                      sprintf(tmpstr,",%lluull",mask);
2837                         sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
2838                         lfta_val[lmach]+=tmpstr;
2839 #else
2840                         lfta_val[lmach] += ",0ull";
2841 #endif
2842
2843                         lfta_val[lmach] += ");\n";
2844
2845
2846
2847 //    End of lfta prefilter stuff
2848 // --------------------------------------------------
2849
2850 //                      If there is a field verifier, warn about
2851 //                      lack of compatability
2852                   string lfta_comment;
2853                   string lfta_title;
2854                   string lfta_namespace;
2855                   map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
2856                   if(ldefs.count("comment")>0)
2857                         lfta_comment = lfta_sq->defines["comment"];
2858                   if(ldefs.count("Comment")>0)
2859                         lfta_comment = lfta_sq->defines["Comment"];
2860                   if(ldefs.count("COMMENT")>0)
2861                         lfta_comment = lfta_sq->defines["COMMENT"];
2862                   if(ldefs.count("title")>0)
2863                         lfta_title = lfta_sq->defines["title"];
2864                   if(ldefs.count("Title")>0)
2865                         lfta_title = lfta_sq->defines["Title"];
2866                   if(ldefs.count("TITLE")>0)
2867                         lfta_title = lfta_sq->defines["TITLE"];
2868                   if(ldefs.count("NAMESPACE")>0)
2869                         lfta_namespace = lfta_sq->defines["NAMESPACE"];
2870                   if(ldefs.count("Namespace")>0)
2871                         lfta_namespace = lfta_sq->defines["Namespace"];
2872                   if(ldefs.count("namespace")>0)
2873                         lfta_namespace = lfta_sq->defines["namespace"];
2874
2875                   string lfta_ht_size;
2876                   if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
2877                         lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
2878                   if(ldefs.count("aggregate_slots")>0){
2879                         lfta_ht_size = ldefs["aggregate_slots"];
2880                   }
2881
2882 //                      NOTE : I'm assuming that visible lftas do not start with _fta.
2883 //                              -- will fail for non-visible simple selection queries.
2884                 if(field_verifier != NULL && registration_query_names[mi].substr(0,1) != "_"){
2885                         string warning_str;
2886                         if(lfta_comment == "")
2887                                 warning_str += "\tcomment not found.\n";
2888 // Obsolete stuff that carsten wanted
2889 //                      if(lfta_title == "")
2890 //                              warning_str += "\ttitle not found.\n";
2891 //                      if(lfta_namespace == "")
2892 //                              warning_str += "\tnamespace not found.\n";
2893
2894                         vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2895                         int fi;
2896                         for(fi=0;fi<flds.size();fi++){
2897                                 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2898                         }
2899                         if(warning_str != "")
2900                                 fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
2901                                         registration_query_names[mi].c_str(),warning_str.c_str());
2902                 }
2903
2904
2905 //                      Create qtree output
2906                 fprintf(qtree_output,"\t<LFTA name='%s' >\n",registration_query_names[mi].c_str());
2907         if(lfta_comment != "")
2908               fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
2909         if(lfta_title != "")
2910               fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
2911         if(lfta_namespace != "")
2912               fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
2913         if(lfta_ht_size != "")
2914               fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
2915                 if(lmach != "")
2916                   fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
2917                 else
2918                   fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
2919                 fprintf(qtree_output,"\t\t<Interface  value='%s' />\n",interface_names[mi].c_str());
2920                 std::vector<tablevar_t *> itbls = lfta_sq->get_input_tables();
2921                 for(int t=0;t<itbls.size();++t){
2922                         fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",itbls[t]->get_schema_name().c_str());
2923                 }
2924 //              fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
2925                 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2926                 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
2927 //                              write info about fields to qtree.xml
2928                   vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2929                   int fi;
2930                   for(fi=0;fi<flds.size();fi++){
2931                         fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2932                         if(flds[fi]->get_modifier_list()->size()){
2933                                 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2934                         }
2935                         fprintf(qtree_output," />\n");
2936                   }
2937                 fprintf(qtree_output,"\t</LFTA>\n");
2938
2939
2940             }
2941           }
2942
2943           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2944                         string liface = (*mvsi).first;
2945                         lfta_val[lmach] += 
2946 "       if (!strcmp(device, \""+liface+"\")) \n"
2947 "               lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
2948 ;
2949                 }
2950                 lfta_val[lmach] += 
2951 "       if(lfta_prefilter == NULL){\n"
2952 "               fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
2953 "               exit(1);\n"
2954 "       }\n"
2955 ;
2956
2957
2958
2959           lfta_val[lmach] += "}\n\n";
2960
2961       if(!(debug_only || hfta_only) ){
2962                 string lfta_flnm;
2963                 if(lmach != "")
2964                   lfta_flnm = lmach + "_lfta.c";
2965                 else
2966                   lfta_flnm = hostname + "_lfta.c";
2967                 if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
2968                         fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
2969                         exit(1);
2970                 }
2971               fprintf(lfta_out,"%s",lfta_header.c_str());
2972               fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
2973               fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
2974                 fclose(lfta_out);
2975           }
2976         }
2977
2978 //              Say what are the operators which must execute
2979         if(opviews.size()>0)
2980                 fprintf(stderr,"The queries use the following external operators:\n");
2981         for(i=0;i<opviews.size();++i){
2982                 opview_entry *opv = opviews.get_entry(i);
2983                 fprintf(stderr,"\t%s\n",opv->view_name.c_str());
2984         }
2985
2986         if(create_makefile)
2987                 generate_makefile(input_file_names, nfiles, hfta_names, opviews,
2988                 machine_names, schema_file_name,
2989                 interface_names,
2990                 ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
2991
2992
2993         fprintf(qtree_output,"</QueryNodes>\n");
2994
2995         return(0);
2996 }
2997
2998 ////////////////////////////////////////////////////////////
2999
3000 void generate_makefile(vector<string> &input_file_names, int nfiles,
3001                                            vector<string> &hfta_names, opview_set &opviews,
3002                                                 vector<string> &machine_names,
3003                                                 string schema_file_name,
3004                                                 vector<string> &interface_names,
3005                                                 ifq_t *ifdb, string &config_dir_path,
3006                                                 bool use_pads,
3007                                                 string extra_libs,
3008                                                 map<string, vector<int> > &rts_hload
3009                                          ){
3010         int i,j;
3011
3012         if(config_dir_path != ""){
3013                 config_dir_path = "-C "+config_dir_path;
3014         }
3015
3016         struct stat sb;
3017         bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
3018         bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
3019
3020 //      if(libz_exists && !libast_exists){
3021 //              fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
3022 //              exit(1);
3023 //      }
3024
3025 //                      Get set of operator executable files to run
3026         set<string> op_fls;
3027         set<string>::iterator ssi;
3028         for(i=0;i<opviews.size();++i){
3029                 opview_entry *opv = opviews.get_entry(i);
3030                 if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
3031         }
3032
3033         FILE *outfl = fopen("Makefile", "w");
3034         if(outfl==NULL){
3035                 fprintf(stderr,"Can't open Makefile for write, exiting.\n");
3036                 exit(0);
3037         }
3038
3039         fputs(
3040 ("CPP= g++ -O3 -g -I "+root_path+"/include  -I "+root_path+"/include/hfta\n"
3041 "CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
3042 ).c_str(), outfl
3043 );
3044         if(generate_stats)
3045                 fprintf(outfl,"  -DLFTA_STATS");
3046
3047 //              Gather the set of interfaces
3048 //              Also, gather "base interface names" for use in computing
3049 //              the hash splitting to virtual interfaces.
3050 //              TODO : must update to hanndle machines
3051         set<string> ifaces;
3052         set<string> base_vifaces;       // base interfaces of virtual interfaces
3053         map<string, string> ifmachines;
3054         map<string, string> ifattrs;
3055         for(i=0;i<interface_names.size();++i){
3056                 ifaces.insert(interface_names[i]);
3057                 ifmachines[interface_names[i]] = machine_names[i];
3058
3059                 size_t Xpos = interface_names[i].find_last_of("X");
3060                 if(Xpos!=string::npos){
3061                         string iface = interface_names[i].substr(0,Xpos);
3062                         base_vifaces.insert(iface);
3063                 }
3064                 // get interface attributes and add them to the list
3065         }
3066
3067 //              Do we need to include protobuf libraries?
3068 //              TODO Move to the interface library: get the libraries to include 
3069 //              for an interface type
3070
3071         bool use_proto = false;
3072         bool use_bsa = false;
3073         bool use_kafka = false;
3074         int erri;
3075         string err_str;
3076         for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3077                 string ifnm = (*ssi);
3078                 vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
3079                 for(int ift_i=0;ift_i<ift.size();ift_i++){
3080                         if(ift[ift_i]=="PROTO"){
3081                                 use_proto = true;
3082                         }
3083                 }
3084                 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "BSA", erri, err_str);
3085                 for(int ift_i=0;ift_i<ift.size();ift_i++){
3086                         if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){       
3087                                 use_bsa = true;
3088                         }
3089                 }
3090                 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "KAFKA", erri, err_str);
3091                 for(int ift_i=0;ift_i<ift.size();ift_i++){
3092                         if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){       
3093                                 use_kafka = true;
3094                         }
3095                 }
3096         }
3097
3098         fprintf(outfl,
3099 "\n"
3100 "\n"
3101 "all: rts");
3102         for(i=0;i<hfta_names.size();++i)
3103                 fprintf(outfl," %s",hfta_names[i].c_str());
3104         fputs(
3105 ("\n"
3106 "\n"
3107 "rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a  "+root_path+"/lib/libclearinghouse.a\n"
3108 "\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
3109         if(use_pads)
3110                 fprintf(outfl,"-L. ");
3111         fputs(
3112 ("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
3113         if(use_pads)
3114                 fprintf(outfl,"-lgscppads -lpads ");
3115         fprintf(outfl,
3116 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux  -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
3117         if(use_pads)
3118                 fprintf(outfl, " -lpz -lz -lbz ");
3119         if(libz_exists && libast_exists)
3120                 fprintf(outfl," -last ");
3121         if(use_pads)
3122                 fprintf(outfl, " -ldll -ldl ");
3123         if(use_proto)
3124                 fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
3125         if(use_bsa)
3126                 fprintf(outfl, " -lbsa_stream ");
3127         if(use_kafka)
3128                 fprintf(outfl, " -lrdkafka ");
3129         fprintf(outfl," -lgscpaux");
3130 #ifdef GCOV
3131         fprintf(outfl," -fprofile-arcs");
3132 #endif
3133         fprintf(outfl,
3134 "\n"
3135 "\n"
3136 "lfta.o: %s_lfta.c\n"
3137 "\t$(CC) -o lfta.o -c %s_lfta.c\n"
3138 "\n"
3139 "%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
3140         for(i=0;i<nfiles;++i)
3141                 fprintf(outfl," %s",input_file_names[i].c_str());
3142         if(hostname == ""){
3143                 fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3144         }else{
3145                 fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3146         }
3147         for(i=0;i<nfiles;++i)
3148                 fprintf(outfl," %s",input_file_names[i].c_str());
3149         fprintf(outfl,"\n");
3150
3151         for(i=0;i<hfta_names.size();++i)
3152                 fprintf(outfl,
3153 ("%s: %s.o\n"
3154 "\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
3155 "\n"
3156 "%s.o: %s.cc\n"
3157 "\t$(CPP) -o %s.o -c %s.cc\n"
3158 "\n"
3159 "\n").c_str(),
3160     hfta_names[i].c_str(), hfta_names[i].c_str(),
3161         hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
3162         hfta_names[i].c_str(), hfta_names[i].c_str(),
3163         hfta_names[i].c_str(), hfta_names[i].c_str()
3164                 );
3165
3166         fprintf(outfl,
3167 ("\n"
3168 "packet_schema.txt:\n"
3169 "\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
3170 "\n"
3171 "external_fcns.def:\n"
3172 "\tln -s "+root_path+"/cfg/external_fcns.def .\n"
3173 "\n"
3174 "clean:\n"
3175 "\trm -rf core rts *.o  %s_lfta.c  external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
3176         for(i=0;i<hfta_names.size();++i)
3177                 fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
3178         fprintf(outfl,"\n");
3179
3180         fclose(outfl);
3181
3182
3183
3184 //              Gather the set of interfaces
3185 //              TODO : must update to hanndle machines
3186 //              TODO : lookup interface attributes and add them as a parameter to rts process
3187         outfl = fopen("runit", "w");
3188         if(outfl==NULL){
3189                 fprintf(stderr,"Can't open runit for write, exiting.\n");
3190                 exit(0);
3191         }
3192
3193
3194         fputs(
3195 ("#!/bin/sh\n"
3196 "./stopit\n"
3197 +root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
3198 "sleep 5\n"
3199 "if [ ! -f gshub.log ]\n"
3200 "then\n"
3201 "\techo \"Failed to start bin/gshub.py\"\n"
3202 "\texit -1\n"
3203 "fi\n"
3204 "ADDR=`cat gshub.log`\n"
3205 "ps opgid= $! >> gs.pids\n"
3206 "./rts $ADDR default ").c_str(), outfl);
3207 //      int erri;
3208 //      string err_str;
3209         for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3210                 string ifnm = (*ssi);
3211                 // suppress internal _local_ interface
3212                 if (ifnm == "_local_")
3213                         continue;
3214                 fprintf(outfl, "%s ",ifnm.c_str());
3215                 vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
3216                 for(j=0;j<ifv.size();++j)
3217                         fprintf(outfl, "%s ",ifv[j].c_str());
3218         }
3219         fprintf(outfl, " &\n");
3220         fprintf(outfl, "echo $! >> gs.pids\n");
3221         for(i=0;i<hfta_names.size();++i)
3222                 fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
3223
3224         for(j=0;j<opviews.opview_list.size();++j){
3225                 fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
3226         }
3227
3228         fclose(outfl);
3229         system("chmod +x runit");
3230
3231         outfl = fopen("stopit", "w");
3232         if(outfl==NULL){
3233                 fprintf(stderr,"Can't open stopit for write, exiting.\n");
3234                 exit(0);
3235         }
3236
3237         fprintf(outfl,"#!/bin/sh\n"
3238 "rm -f gshub.log\n"
3239 "if [ ! -f gs.pids ]\n"
3240 "then\n"
3241 "exit\n"
3242 "fi\n"
3243 "for pgid in `cat gs.pids`\n"
3244 "do\n"
3245 "kill -TERM -$pgid\n"
3246 "done\n"
3247 "sleep 1\n"
3248 "for pgid in `cat gs.pids`\n"
3249 "do\n"
3250 "kill -9 -$pgid\n"
3251 "done\n"
3252 "rm gs.pids\n");
3253
3254         fclose(outfl);
3255         system("chmod +x stopit");
3256
3257 //-----------------------------------------------
3258
3259 /* For now disable support for virtual interfaces
3260         outfl = fopen("set_vinterface_hash.bat", "w");
3261         if(outfl==NULL){
3262                 fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
3263                 exit(0);
3264         }
3265
3266 //              The format should be determined by an entry in the ifres.xml file,
3267 //              but for now hardcode the only example I have.
3268         for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
3269                 if(rts_hload.count((*ssi))){
3270                         string iface_name = (*ssi);
3271                         string iface_number = "";
3272                         for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
3273                                 if(isdigit(iface_name[j])){
3274                                         iface_number = iface_name[j];
3275                                         if(j>0 && isdigit(iface_name[j-1]))
3276                                                 iface_number = iface_name[j-1] + iface_number;
3277                                 }
3278                         }
3279
3280                         fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
3281                         vector<int> halloc = rts_hload[iface_name];
3282                         int prev_limit = 0;
3283                         for(j=0;j<halloc.size();++j){
3284                                 if(j>0)
3285                                         fprintf(outfl,":");
3286                                 fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
3287                                 prev_limit = halloc[j];
3288                         }
3289                         fprintf(outfl,"\n");
3290                 }
3291         }
3292         fclose(outfl);
3293         system("chmod +x set_vinterface_hash.bat");
3294 */
3295 }
3296
3297 //              Code for implementing a local schema
3298 /*
3299                 table_list qpSchema;
3300
3301 //                              Load the schemas of any LFTAs.
3302                 int l;
3303                 for(l=0;l<hfta_nbr;++l){
3304                         stream_query *sq0 = split_queries[l];
3305                         table_def *td = sq0->get_output_tabledef();
3306                         qpSchema.append_table(td);
3307                 }
3308 //                              load the schemas of any other ref'd tables.
3309 //                              (e.g., hftas)
3310                 vector<tablevar_t *>  input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
3311                 int ti;
3312                 for(ti=0;ti<input_tbl_names.size();++ti){
3313                         int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
3314                         if(tbl_ref < 0){
3315                                 tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
3316                                 if(tbl_ref < 0){
3317                                         fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
3318                                         exit(1);
3319                                 }
3320                                 qpSchema.append_table(Schema->get_table(tbl_ref));
3321                         }
3322                 }
3323 */
3324
3325 //              Functions related to parsing.
3326
3327 /*
3328 static int split_string(char *instr,char sep, char **words,int max_words){
3329    char *loc;
3330    char *str;
3331    int nwords = 0;
3332
3333    str = instr;
3334    words[nwords++] = str;
3335    while( (loc = strchr(str,sep)) != NULL){
3336         *loc = '\0';
3337         str = loc+1;
3338         if(nwords >= max_words){
3339                 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
3340                 nwords = max_words-1;
3341         }
3342         words[nwords++] = str;
3343    }
3344
3345    return(nwords);
3346 }
3347
3348 */
3349