2176d2fc0ae0f25a2e97eef2ddcb5016b569d61c
[com/gs-lite.git] / src / ftacmp / translate_fta.cc
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #include<unistd.h>              // for gethostname
17
18 #include <string>
19 #include "parse_fta.h"
20 #include "parse_schema.h"
21 #include "parse_ext_fcns.h"
22 #include"analyze_fta.h"
23 #include"query_plan.h"
24 #include"generate_lfta_code.h"
25 #include"stream_query.h"
26 #include"generate_utils.h"
27 #include"nic_def.h"
28 #include"generate_nic_code.h"
29
30 #include <stdlib.h>
31 #include <stdio.h>
32 #include<ctype.h>
33 #include<glob.h>
34 #include<string.h>
35
36 #include<list>
37
38 //              for the scandir
39      #include <sys/types.h>
40      #include <dirent.h>
41
42
43 #include<errno.h>
44
45 //              to verify that some files exist.
46      #include <sys/types.h>
47      #include <sys/stat.h>
48
49 #include "parse_partn.h"
50
51 #include "print_plan.h"
52
53 //              Interface to the xml parser
54
55 #include"xml_t.h"
56 #include"field_list.h"
57
58 #include "gsconfig.h"
59
60 extern int xmlParserparse(void);
61 extern FILE *xmlParserin;
62 extern int xmlParserdebug;
63
64 std::vector<std::string> xml_attr_vec;
65 std::vector<std::string> xml_val_vec;
66 std::string xml_a, xml_v;
67 xml_t *xml_leaves = NULL;
68
69 //      Interface to the field list verifier
70 field_list *field_verifier = NULL;
71
72 #define TMPSTRLEN 1000
73
74 #ifndef PATH_DELIM
75   #define PATH_DELIM '/'
76 #endif
77
78 char tmp_schema_str[10000];
79
80 // maximum delay between two hearbeats produced
81 // by UDOP. Used when its not explicity
82 // provided in udop definition
83 #define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
84
85 //              Default lfta hash table size, must be power of 2.
86 int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
87
88 //              Interface to FTA definition lexer and parser ...
89
90 extern int FtaParserparse(void);
91 extern FILE *FtaParserin;
92 extern int FtaParserdebug;
93
94 fta_parse_t *fta_parse_result;
95 var_defs_t *fta_parse_defines;
96
97
98
99 //              Interface to external function lexer and parser ...
100
101 extern int Ext_fcnsParserparse(void);
102 extern FILE *Ext_fcnsParserin;
103 extern int Ext_fcnsParserdebug;
104
105 ext_fcn_list *Ext_fcns;
106
107
108 //              Interface to partition definition parser
109 extern int PartnParserparse();
110 partn_def_list_t *partn_parse_result = NULL;
111
112
113
114 using namespace std;
115 //extern int errno;
116
117
118 //              forward delcaration of local utility function
119 void generate_makefile(vector<string> &input_file_names, int nfiles,
120                                            vector<string> &hfta_names, opview_set &opviews,
121                                                 vector<string> &machine_names,
122                                                 string schema_file_name,
123                                                 vector<string> &interface_names,
124                                                 ifq_t *ifdb, string &config_dir_path,
125                                                 bool use_pads,
126                                                 string extra_libs,
127                                                 map<string, vector<int> > &rts_hload
128                                         );
129
130 //static int split_string(char *instr,char sep, char **words,int max_words);
131 #define MAXFLDS 100
132
133   FILE *schema_summary_output = NULL;           // query names
134
135 //                      Dump schema summary
136 void dump_summary(stream_query *str){
137         fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
138
139         table_def *sch = str->get_output_tabledef();
140
141         vector<field_entry *> flds = sch->get_fields();
142         int f;
143         for(f=0;f<flds.size();++f){
144                 if(f>0) fprintf(schema_summary_output,"|");
145                 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
146         }
147         fprintf(schema_summary_output,"\n");
148         for(f=0;f<flds.size();++f){
149                 if(f>0) fprintf(schema_summary_output,"|");
150                 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
151         }
152         fprintf(schema_summary_output,"\n");
153 }
154
155 //              Globals
156 string hostname;                // name of current host.
157 int hostname_len;
158 bool generate_stats = false;
159 string root_path = "../..";
160
161
162 int main(int argc, char **argv){
163   char tmpstr[TMPSTRLEN];
164   string err_str;
165   int q,s,h,f;
166
167   set<int>::iterator si;
168
169   vector<string> registration_query_names;                      // for lfta.c registration
170   map<string, vector<int> > mach_query_names;   // list queries of machine
171   vector<int> snap_lengths;                             // for lfta.c registration
172   vector<string> interface_names;                       // for lfta.c registration
173   vector<string> machine_names;                 // machine of interface
174   vector<bool> lfta_reuse_options;                      // for lfta.c registration
175   vector<int> lfta_liveness_timeouts;           // fot qtree.xml generation
176   vector<string> hfta_names;                    // hfta cource code names, for
177                                                                                 // creating make file.
178   vector<string> qnames;                                // ensure unique names
179   map<string, int> lfta_names;                  // keep track of unique lftas.
180
181
182 //                              set these to 1 to debug the parser
183   FtaParserdebug = 0;
184   Ext_fcnsParserdebug = 0;
185
186   FILE *lfta_out;                               // lfta.c output.
187   FILE *fta_in;                                 // input file
188   FILE *table_schemas_in;               // source tables definition file
189   FILE *query_name_output;              // query names
190   FILE *qtree_output;                   // interconnections of query nodes
191
192   // -------------------------------
193   // Handling of Input Arguments
194   // -------------------------------
195     char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
196         const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
197                 "\t[-B] : debug only (don't create output files)\n"
198                 "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
199                 "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
200                 "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
201                 "\t[-C] : use <config directory> for definition files\n"
202                 "\t[-l] : use <library directory> for library queries\n"
203                 "\t[-N] : output query names in query_names.txt\n"
204                 "\t[-H] : create HFTA only (no schema_file)\n"
205                 "\t[-Q] : use query name for hfta suffix\n"
206                 "\t[-M] : generate make file and runit, stopit scripts\n"
207                 "\t[-S] : enable LFTA statistics (alters Makefile).\n"
208                 "\t[-f] : Output schema summary to schema_summary.txt\n"
209                 "\t[-P] : link with PADS\n"
210                 "\t[-h] : override host name.\n"
211                 "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
212                 "\t[-R] : path to root of GS-lite\n"
213 ;
214
215 //              parameters gathered from command line processing
216         string external_fcns_path;
217 //      string internal_fcn_path;
218         string config_dir_path;
219         string library_path = "./";
220         vector<string> input_file_names;
221         string schema_file_name;
222         bool debug_only = false;
223         bool hfta_only = false;
224         bool output_query_names = false;
225         bool output_schema_summary=false;
226         bool numeric_hfta_flname = true;
227         bool create_makefile = false;
228         bool distributed_mode = false;
229         bool partitioned_mode = false;
230         bool use_live_hosts_file = false;
231         bool use_pads = false;
232         bool clean_make = false;
233         int n_virtual_interfaces = 1;
234
235    char chopt;
236    while((chopt = getopt(argc,argv,optstr)) != -1){
237                 switch(chopt){
238                 case 'B':
239                         debug_only = true;
240                         break;
241                 case 'D':
242                         distributed_mode = true;
243                         break;
244                 case 'p':
245                         partitioned_mode = true;
246                         break;
247                 case 'L':
248                         use_live_hosts_file = true;
249                         break;
250                 case 'C':
251                                 if(optarg != NULL)
252                                  config_dir_path = string(optarg) + string("/");
253                         break;
254                 case 'l':
255                                 if(optarg != NULL)
256                                  library_path = string(optarg) + string("/");
257                         break;
258                 case 'N':
259                         output_query_names = true;
260                         break;
261                 case 'Q':
262                         numeric_hfta_flname = false;
263                         break;
264                 case 'H':
265                         if(schema_file_name == ""){
266                                 hfta_only = true;
267                         }
268                         break;
269                 case 'f':
270                         output_schema_summary=true;
271                         break;
272                 case 'M':
273                         create_makefile=true;
274                         break;
275                 case 'S':
276                         generate_stats=true;
277                         break;
278                 case 'P':
279                         use_pads = true;
280                         break;
281                 case 'c':
282                         clean_make = true;
283                         break;
284                 case 'h':
285                         if(optarg != NULL)
286                                 hostname = optarg;
287                         break;
288                 case 'R':
289                         if(optarg != NULL)
290                                 root_path = optarg;
291                         break;
292                 case 'n':
293                         if(optarg != NULL){
294                                 n_virtual_interfaces = atoi(optarg);
295                                 if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
296                                         fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
297                                         n_virtual_interfaces = 1;
298                                 }
299                         }
300                         break;
301                 case '?':
302                         fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
303                         fprintf(stderr,"%s\n", usage_str);
304                         exit(1);
305                 default:
306                         fprintf(stderr, "Argument was %c\n", optopt);
307                         fprintf(stderr,"Invalid arguments\n");
308                         fprintf(stderr,"%s\n", usage_str);
309                         exit(1);
310                 }
311         }
312         argc -= optind;
313         argv += optind;
314         for (int i = 0; i < argc; ++i) {
315                 if((schema_file_name == "") && !hfta_only){
316                         schema_file_name = argv[i];
317                 }else{
318                         input_file_names.push_back(argv[i]);
319                 }
320         }
321
322         if(input_file_names.size() == 0){
323                 fprintf(stderr,"%s\n", usage_str);
324                 exit(1);
325         }
326
327         if(clean_make){
328                 string clean_cmd = "rm Makefile hfta_*.cc";
329                 int clean_ret = system(clean_cmd.c_str());
330                 if(clean_ret){
331                         fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
332                 }
333         }
334
335
336         nic_prop_db *npdb = new nic_prop_db(config_dir_path);
337
338 //                      Open globally used file names.
339
340         // prepend config directory to schema file
341         schema_file_name = config_dir_path + schema_file_name;
342         external_fcns_path = config_dir_path + string("external_fcns.def");
343     string ifx_fname = config_dir_path + string("ifres.xml");
344
345 //              Find interface query file(s).
346         if(hostname == ""){
347                 gethostname(tmpstr,TMPSTRLEN);
348                 hostname = tmpstr;
349         }
350         hostname_len = strlen(tmpstr);
351     string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
352         vector<string> ifq_fls;
353
354                 ifq_fls.push_back(ifq_fname);
355
356
357 //                      Get the field list, if it exists
358         string flist_fl = config_dir_path + "field_list.xml";
359         FILE *flf_in = NULL;
360         if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
361                 fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
362                 xml_leaves = new xml_t();
363                 xmlParser_setfileinput(flf_in);
364                 if(xmlParserparse()){
365                         fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
366                 }else{
367                         field_verifier = new field_list(xml_leaves);
368                 }
369         }
370
371         if(!hfta_only){
372           if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
373                 fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
374                 exit(1);
375           }
376         }
377
378 /*
379         if(!(debug_only || hfta_only)){
380           if((lfta_out = fopen("lfta.c","w")) == NULL){
381                 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
382                 exit(1);
383           }
384         }
385 */
386
387 //              Get the output specification file.
388 //              format is
389 //                      query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
390         string ospec_fl = "output_spec.cfg";
391         FILE *osp_in = NULL;
392         vector<ospec_str *> output_specs;
393         multimap<string, int> qname_to_ospec;
394         if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
395                 char *flds[MAXFLDS];
396                 int o_lineno = 0;
397                 while(fgets(tmpstr,TMPSTRLEN,osp_in)){
398                         o_lineno++;
399                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
400                         if(nflds == 7){
401 //              make operator type lowercase
402                                 char *tmpc;
403                                 for(tmpc=flds[1];*tmpc!='\0';++tmpc)
404                                         *tmpc = tolower(*tmpc);
405
406                                 ospec_str *tmp_ospec = new ospec_str();
407                                 tmp_ospec->query = flds[0];
408                                 tmp_ospec->operator_type = flds[1];
409                                 tmp_ospec->operator_param = flds[2];
410                                 tmp_ospec->output_directory = flds[3];
411                                 tmp_ospec->bucketwidth = atoi(flds[4]);
412                                 tmp_ospec->partitioning_flds = flds[5];
413                                 tmp_ospec->n_partitions = atoi(flds[6]);
414                                 qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
415                                 output_specs.push_back(tmp_ospec);
416                         }else{
417                                 fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
418                         }
419                 }
420                 fclose(osp_in);
421         }else{
422                 fprintf(stderr,"output_spec.cfg not found.  The query set has no output.  exiting.\n");
423                 exit(1);
424         }
425
426 //              hfta parallelism
427         string pspec_fl = "hfta_parallelism.cfg";
428         FILE *psp_in = NULL;
429         map<string, int> hfta_parallelism;
430         if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
431                 char *flds[MAXFLDS];
432                 int o_lineno = 0;
433                 while(fgets(tmpstr,TMPSTRLEN,psp_in)){
434                         bool good_entry = true;
435                         o_lineno++;
436                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
437                         if(nflds == 2){
438                                 string hname = flds[0];
439                                 int par = atoi(flds[1]);
440                                 if(par <= 0 || par > n_virtual_interfaces){
441                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
442                                         good_entry = false;
443                                 }
444                                 if(good_entry && n_virtual_interfaces % par != 0){
445                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
446                                         good_entry = false;
447                                 }
448                                 if(good_entry)
449                                         hfta_parallelism[hname] = par;
450                         }
451                 }
452         }else{
453                 fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
454         }
455
456
457 //              LFTA hash table sizes
458         string htspec_fl = "lfta_htsize.cfg";
459         FILE *htsp_in = NULL;
460         map<string, int> lfta_htsize;
461         if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
462                 char *flds[MAXFLDS];
463                 int o_lineno = 0;
464                 while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
465                         bool good_entry = true;
466                         o_lineno++;
467                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
468                         if(nflds == 2){
469                                 string lfta_name = flds[0];
470                                 int htsz = atoi(flds[1]);
471                                 if(htsz>0){
472                                         lfta_htsize[lfta_name] = htsz;
473                                 }else{
474                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
475                                 }
476                         }
477                 }
478         }else{
479                 fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
480         }
481
482 //              LFTA vitual interface hash split
483         string rtlspec_fl = "rts_load.cfg";
484         FILE *rtl_in = NULL;
485         map<string, vector<int> > rts_hload;
486         if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
487                 char *flds[MAXFLDS];
488                 int r_lineno = 0;
489                 string iface_name;
490                 vector<int> hload;
491                 while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
492                         bool good_entry = true;
493                         r_lineno++;
494                         iface_name = "";
495                         hload.clear();
496                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);
497                         if(nflds >1){
498                                 iface_name = flds[0];
499                                 int cumm_h = 0;
500                                 int j;
501                                 for(j=1;j<nflds;++j){
502                                         int h = atoi(flds[j]);
503                                         if(h<=0)
504                                                 good_entry = false;
505                                         cumm_h += h;
506                                         hload.push_back(cumm_h);
507                                 }
508                         }else{
509                                 good_entry = false;
510                         }
511                         if(good_entry){
512                                 rts_hload[iface_name] = hload;
513                         }else{
514                                 fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
515                         }
516                 }
517         }
518
519
520
521         if(output_query_names){
522           if((query_name_output = fopen("query_names.txt","w")) == NULL){
523                 fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
524                 exit(1);
525           }
526         }
527
528         if(output_schema_summary){
529           if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
530                 fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
531                 exit(1);
532           }
533         }
534
535         if((qtree_output = fopen("qtree.xml","w")) == NULL){
536                 fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
537                 exit(1);
538         }
539         fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
540         fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
541         fprintf(qtree_output,"<QueryNodes>\n");
542
543
544 //                      Get an initial Schema
545         table_list *Schema;
546         if(!hfta_only){
547 //                      Parse the table schema definitions.
548           fta_parse_result = new fta_parse_t();
549           FtaParser_setfileinput(table_schemas_in);
550           if(FtaParserparse()){
551                 fprintf(stderr,"Table schema parse failed.\n");
552                 exit(1);
553           }
554           if(fta_parse_result->parse_type != TABLE_PARSE){
555                 fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
556                 exit(1);
557           }
558           Schema = fta_parse_result->tables;
559
560 //                      Ensure that all schema_ids, if set, are distinct.
561 //  Obsolete? There is code elsewhere to ensure that schema IDs are
562 //  distinct on a per-interface basis.
563 /*
564           set<int> found_ids;
565           set<int> dup_ids;
566           for(int t=0;t<Schema->size();++t){
567                 int sch_id = Schema->get_table(t)->get_schema_id();
568                 if(sch_id> 0){
569                         if(found_ids.find(sch_id) != found_ids.end()){
570                                 dup_ids.insert(sch_id);
571                         }else{
572                                 found_ids.insert(sch_id);
573                         }
574                 }
575                 if(dup_ids.size()>0){
576                         fprintf(stderr, "Error, the schema has duplicate schema_ids:");
577                         for(auto dit=dup_ids.begin();dit!=dup_ids.end(); ++dit)
578                                 fprintf(stderr," %d",(*dit));
579                         fprintf(stderr,"\n");
580                         exit(1);
581                 }
582           }
583 */
584
585
586 //                      Process schema field inheritance
587           int retval;
588           retval = Schema->unroll_tables(err_str);
589           if(retval){
590                 fprintf(stderr,"Error processing schema field inheritance:\n %s\n", err_str.c_str() );
591                 exit(1);
592           }
593         }else{
594 //                      hfta only => we will try to fetch schemas from the registry.
595 //                      therefore, start off with an empty schema.
596           Schema = new table_list();
597         }
598
599
600 //                      Open and parse the external functions file.
601         Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
602         if(Ext_fcnsParserin == NULL){
603                 fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
604                 Ext_fcns = new ext_fcn_list();
605         }else{
606                 if(Ext_fcnsParserparse()){
607                         fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
608                         Ext_fcns = new ext_fcn_list();
609                 }
610         }
611         if(Ext_fcns->validate_fcns(err_str)){
612                 fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
613                 exit(1);
614         }
615
616 //              Open and parse the interface resources file.
617 //      ifq_t *ifaces_db = new ifq_t();
618 //   string ierr;
619 //      if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
620 //              fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
621 //                              ifx_fname.c_str(), ierr.c_str());
622 //              exit(1);
623 //      }
624 //      if(ifaces_db->load_ifqs(ifq_fname, ierr)){
625 //              fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
626 //                              ifq_fname.c_str(), ierr.c_str());
627 //              exit(1);
628 //      }
629
630
631 //                      The LFTA code string.
632 //                      Put the standard preamble here.
633 //                      NOTE: the hash macros, fcns should go into the run time
634   map<string, string> lfta_val;
635   map<string, string> lfta_prefilter_val;
636
637   string lfta_header =
638 "#include <limits.h>\n"
639 "#include \"rts.h\"\n"
640 "#include \"fta.h\"\n"
641 "#include \"lapp.h\"\n"
642 "#include \"rts_udaf.h\"\n"
643 "#include<stdio.h>\n"
644 "#include <float.h>\n"
645 "#include \"rdtsc.h\"\n"
646 "#include \"watchlist.h\"\n\n"
647
648 ;
649 // Get any locally defined parsing headers
650     glob_t glob_result;
651     memset(&glob_result, 0, sizeof(glob_result));
652
653     // do the glob operation TODO should be from GSROOT
654     int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
655         if(return_value == 0){
656                 lfta_header += "\n";
657         for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
658                         char *flds[1000];
659                         int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
660                         lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n";
661         }
662                 lfta_header += "\n";
663         }else{
664                 fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
665         }
666
667 /*
668 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
669 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
670 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
671 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
672 */
673
674         lfta_header += 
675 "\n"
676 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
677 "\n"
678 "#define SLOT_FILLED 0x04\n"
679 "#define SLOT_GEN_BITS 0x03\n"
680 "#define SLOT_HASH_BITS 0xfffffff8\n"
681 "#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
682 "#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
683 "#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
684 "\n\n"
685
686 "#define lfta_BOOL_to_hash(x) (x)\n"
687 "#define lfta_USHORT_to_hash(x) (x)\n"
688 "#define lfta_UINT_to_hash(x) (x)\n"
689 "#define lfta_IP_to_hash(x) (x)\n"
690 "#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
691 "#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
692 "#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
693 "#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
694 "#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
695 "static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
696 "       gs_uint32_t i,ret=0,tmp_sum = 0;\n"
697 "       for(i=0;i<x.length;++i){\n"
698 "               tmp_sum |= (x.data[i]) << (8*(i%4));\n"
699 "               if((i%4) == 3){\n"
700 "                       ret ^= tmp_sum;\n"
701 "                       tmp_sum = 0;\n"
702 "               }\n"
703 "       }\n"
704 "       if((i%4)!=0) ret ^=tmp_sum;\n"
705 "       return(ret);\n"
706 "}\n\n\n";
707
708
709
710 //////////////////////////////////////////////////////////////////
711 /////                   Get all of the query parse trees
712
713
714   int i,p;
715   int hfta_count = 0;           // for numeric suffixes to hfta .cc files
716
717 //---------------------------
718 //              Global info needed for post processing.
719
720 //                      Set of operator views ref'd in the query set.
721         opview_set opviews;
722 //                      lftas on a per-machine basis.
723         map<string, vector<stream_query *> > lfta_mach_lists;
724         int nfiles = input_file_names.size();
725         vector<stream_query *> hfta_list;               // list of hftas.
726         map<string, stream_query *> sq_map;             // map from query name to stream query.
727
728
729 //////////////////////////////////////////
730
731 //              Open and parse the interface resources file.
732         ifq_t *ifaces_db = new ifq_t();
733     string ierr;
734         if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
735                 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
736                                 ifx_fname.c_str(), ierr.c_str());
737                 exit(1);
738         }
739         if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
740                 fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
741                                 ifq_fls[0].c_str(), ierr.c_str());
742                 exit(1);
743         }
744
745   map<string, string> qname_to_flname;  // for detecting duplicate query names
746
747
748
749 //                      Parse the files to create a vector of parse trees.
750 //                      Load qnodes with information to perform a topo sort
751 //                      based on query dependencies.
752   vector<query_node *> qnodes;                          // for topo sort.
753   map<string,int> name_node_map;                        // map query name to qnodes entry
754   for(i=0;i<input_file_names.size();i++){
755
756           if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
757                   fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
758                   continue;
759           }
760 fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
761
762 //                      Parse the FTA query
763           fta_parse_result = new fta_parse_t();
764           FtaParser_setfileinput(fta_in);
765           if(FtaParserparse()){
766                 fprintf(stderr,"FTA parse failed.\n");
767                 exit(1);
768           }
769           if(fta_parse_result->parse_type != QUERY_PARSE){
770                 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
771                 exit(1);
772           }
773
774 //                      returns a list of parse trees
775           vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
776           for(p=0;p<qlist.size();++p){
777             table_exp_t *fta_parse_tree = qlist[p];
778 //              query_parse_trees.push_back(fta_parse_tree);
779
780 //                      compute the default name -- extract from query name
781                 strcpy(tmpstr,input_file_names[i].c_str());
782                 char *qname = strrchr(tmpstr,PATH_DELIM);
783                 if(qname == NULL)
784                         qname = tmpstr;
785                 else
786                         qname++;
787                 char *qname_end = strchr(qname,'.');
788                 if(qname_end != NULL) *qname_end = '\0';
789                 string qname_str = qname;
790                 string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
791
792 //                      Deternmine visibility.  Should I be attaching all of the output methods?
793                 if(qname_to_ospec.count(imputed_qname)>0)
794                         fta_parse_tree->set_visible(true);
795                 else
796                         fta_parse_tree->set_visible(false);
797
798
799 //                              Create a manipulable repesentation of the parse tree.
800 //                              the qnode inherits the visibility assigned to the parse tree.
801             int pos = qnodes.size();
802                 qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
803                 name_node_map[ qnodes[pos]->name ] = pos;
804 //printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
805 //              qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
806 //              qfiles.push_back(i);
807
808 //                      Check for duplicate query names
809 //                                      NOTE : in hfta-only generation, I should
810 //                                      also check with the names of the registered queries.
811                 if(qname_to_flname.count(qnodes[pos]->name) > 0){
812                         fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
813                                 qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
814                         exit(1);
815                 }
816                 if(Schema->find_tbl(qnodes[pos]->name) >= 0){
817                         fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
818                                 qnodes[pos]->name.c_str(), input_file_names[i].c_str());
819                         exit(1);
820                 }
821                 qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
822
823
824         }
825   }
826
827 //              Add the library queries
828
829   int pos;
830   for(pos=0;pos<qnodes.size();++pos){
831         int fi;
832         for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
833                 string src_tbl = qnodes[pos]->refd_tbls[fi];
834                 if(qname_to_flname.count(src_tbl) == 0){
835                         int last_sep = src_tbl.find_last_of('/');
836                         if(last_sep != string::npos){
837 fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
838                                 string target_qname = src_tbl.substr(last_sep+1);
839                                 string qpathname = library_path + src_tbl + ".gsql";
840                                 if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
841                                         fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
842                                         exit(1);
843                                         fprintf(stderr,"After exit\n");
844                                 }
845 fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
846 //                      Parse the FTA query
847                                 fta_parse_result = new fta_parse_t();
848                                 FtaParser_setfileinput(fta_in);
849                                 if(FtaParserparse()){
850                                         fprintf(stderr,"FTA parse failed.\n");
851                                         exit(1);
852                                 }
853                                 if(fta_parse_result->parse_type != QUERY_PARSE){
854                                         fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
855                                         exit(1);
856                                 }
857
858                                 map<string, int> local_query_map;
859                                 vector<string> local_query_names;
860                                 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
861                                 for(p=0;p<qlist.size();++p){
862                                 table_exp_t *fta_parse_tree = qlist[p];
863                                         fta_parse_tree->set_visible(false);             // assumed to not produce output
864                                         string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
865                                         if(imputed_qname == target_qname)
866                                                 imputed_qname = src_tbl;
867                                         if(local_query_map.count(imputed_qname)>0){
868                                                 fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
869                                                 exit(1);
870                                         }
871                                         local_query_map[ imputed_qname ] = p;
872                                         local_query_names.push_back(imputed_qname);
873                                 }
874
875                                 if(local_query_map.count(src_tbl)==0){
876                                         fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
877                                         exit(1);
878                                 }
879
880                                 vector<int> worklist;
881                                 set<int> added_queries;
882                                 vector<query_node *> new_qnodes;
883                                 worklist.push_back(local_query_map[target_qname]);
884                                 added_queries.insert(local_query_map[target_qname]);
885                                 int qq;
886                                 int qpos = qnodes.size();
887                                 for(qq=0;qq<worklist.size();++qq){
888                                         int q_id = worklist[qq];
889                                         query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
890                                         new_qnodes.push_back( new_qnode);
891                                         vector<string> refd_tbls =  new_qnode->refd_tbls;
892                                         int ff;
893                                         for(ff = 0;ff<refd_tbls.size();++ff){
894                                                 if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
895
896                                                         if(name_node_map.count(refd_tbls[ff])>0){
897                                                                 fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s,  and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
898                                                                 exit(1);
899                                                         }else{
900                                                                 worklist.push_back(local_query_map[refd_tbls[ff]]);
901                                                         }
902                                                 }
903                                         }
904                                 }
905
906                                 for(qq=0;qq<new_qnodes.size();++qq){
907                                         int qpos = qnodes.size();
908                                         qnodes.push_back(new_qnodes[qq]);
909                                         name_node_map[qnodes[qpos]->name ] = qpos;
910                                         qname_to_flname[qnodes[qpos]->name ] = qpathname;
911                                 }
912                         }
913                 }
914         }
915   }
916
917
918
919
920
921
922
923
924 //---------------------------------------
925
926
927 //              Add the UDOPS.
928
929   string udop_missing_sources;
930   for(i=0;i<qnodes.size();++i){
931         int fi;
932         for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
933                 int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
934                 if(sid >= 0){
935                         if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
936                                 if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
937                                 int pos = qnodes.size();
938                                         qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
939                                         name_node_map[ qnodes[pos]->name ] = pos;
940                                         qnodes[pos]->is_externally_visible = false;   // its visible
941         //                                      Need to mark the source queries as visible.
942                                         int si;
943                                         string missing_sources = "";
944                                         for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
945                                                 string src_tbl = qnodes[pos]->refd_tbls[si];
946                                                 if(name_node_map.count(src_tbl)==0){
947                                                         missing_sources += src_tbl + " ";
948                                                 }
949                                         }
950                                         if(missing_sources != ""){
951                                                 udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
952                                         }
953                                 }
954                         }
955                 }
956         }
957   }
958   if(udop_missing_sources != ""){
959         fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
960         exit(1);
961   }
962
963
964
965 ////////////////////////////////////////////////////////////////////
966 ///                             Check parse trees to verify that some
967 ///                             global properties are met :
968 ///                             if q1 reads from q2, then
969 ///                               q2 is processed before q1
970 ///                               q1 can supply q2's parameters
971 ///                             Verify there is no cycle in the reads-from graph.
972
973 //                      Compute an order in which to process the
974 //                      queries.
975
976 //                      Start by building the reads-from lists.
977 //
978
979   for(i=0;i<qnodes.size();++i){
980         int qi, fi;
981         vector<string> refd_tbls =  qnodes[i]->refd_tbls;
982         for(fi = 0;fi<refd_tbls.size();++fi){
983                 if(name_node_map.count(refd_tbls[fi])>0){
984 //printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
985                         (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
986                 }
987         }
988   }
989
990
991 //              If one query reads the result of another,
992 //              check for parameter compatibility.  Currently it must
993 //              be an exact match.  I will move to requiring
994 //              containment after re-ordering, but will require
995 //              some analysis for code generation which is not
996 //              yet in place.
997 //printf("There are %d query nodes.\n",qnodes.size());
998
999
1000   for(i=0;i<qnodes.size();++i){
1001         vector<var_pair_t *> target_params  = qnodes[i]->params;
1002         for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1003                 vector<var_pair_t *> source_params  = qnodes[(*si)]->params;
1004                 if(target_params.size() != source_params.size()){
1005                         fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1006                         exit(1);
1007                 }
1008                 int p;
1009                 for(p=0;p<target_params.size();++p){
1010                         if(! (target_params[p]->name == source_params[p]->name &&
1011                               target_params[p]->val == source_params[p]->val ) ){
1012                         fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1013                                 exit(1);
1014                         }
1015                 }
1016         }
1017   }
1018
1019
1020 //              Find the roots.
1021 //              Start by counting inedges.
1022   for(i=0;i<qnodes.size();++i){
1023         for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1024                 qnodes[(*si)]->n_consumers++;
1025         }
1026   }
1027
1028 //              The roots are the nodes with indegree zero.
1029   set<int> roots;
1030   for(i=0;i<qnodes.size();++i){
1031         if(qnodes[i]->n_consumers == 0){
1032                 if(qnodes[i]->is_externally_visible == false){
1033                         fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible.  Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
1034                 }
1035                 roots.insert(i);
1036         }
1037   }
1038
1039 //              Remove the parts of the subtree that produce no output.
1040   set<int> valid_roots;
1041   set<int> discarded_nodes;
1042   set<int> candidates;
1043   while(roots.size() >0){
1044         for(si=roots.begin();si!=roots.end();++si){
1045                 if(qnodes[(*si)]->is_externally_visible){
1046                         valid_roots.insert((*si));
1047                 }else{
1048                         discarded_nodes.insert((*si));
1049                         set<int>::iterator sir;
1050                         for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1051                                 qnodes[(*sir)]->n_consumers--;
1052                                 if(qnodes[(*sir)]->n_consumers == 0)
1053                                         candidates.insert( (*sir));
1054                         }
1055                 }
1056         }
1057         roots = candidates;
1058         candidates.clear();
1059   }
1060   roots = valid_roots;
1061   if(discarded_nodes.size()>0){
1062         fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
1063         int di = 0;
1064         for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
1065                 if(di>0 && (di%8)==0) fprintf(stderr,"\n");
1066                 di++;
1067                 fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
1068         }
1069         fprintf(stderr,"\n");
1070   }
1071
1072 //              Compute the sources_to set, ignoring discarded nodes.
1073   for(i=0;i<qnodes.size();++i){
1074         if(discarded_nodes.count(i)==0)
1075                 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1076                         qnodes[(*si)]->sources_to.insert(i);
1077         }
1078   }
1079
1080
1081 //              Find the nodes that are shared by multiple visible subtrees.
1082 //              THe roots become inferred visible nodes.
1083
1084 //              Find the visible nodes.
1085         vector<int> visible_nodes;
1086         for(i=0;i<qnodes.size();i++){
1087                 if(qnodes[i]->is_externally_visible){
1088                         visible_nodes.push_back(i);
1089                 }
1090         }
1091
1092 //              Find UDOPs referenced by visible nodes.
1093   list<int> workq;
1094   for(i=0;i<visible_nodes.size();++i){
1095         workq.push_back(visible_nodes[i]);
1096   }
1097   while(!workq.empty()){
1098         int node = workq.front();
1099         workq.pop_front();
1100         set<int>::iterator children;
1101         if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
1102                 qnodes[node]->is_externally_visible = true;
1103                 visible_nodes.push_back(node);
1104                 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1105                         if(qnodes[(*children)]->is_externally_visible == false){
1106                                 qnodes[(*children)]->is_externally_visible = true;
1107                                 visible_nodes.push_back((*children));
1108                         }
1109                 }
1110         }
1111         for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1112                 workq.push_back((*children));
1113         }
1114   }
1115
1116         bool done = false;
1117         while(!done){
1118 //      reset the nodes
1119                 for(i=0;i<qnodes.size();i++){
1120                         qnodes[i]->subtree_roots.clear();
1121                 }
1122
1123 //              Walk the tree defined by a visible node, not descending into
1124 //              subtrees rooted by a visible node.  Mark the node visited with
1125 //              the visible node ID.
1126                 for(i=0;i<visible_nodes.size();++i){
1127                         set<int> vroots;
1128                         vroots.insert(visible_nodes[i]);
1129                         while(vroots.size()>0){
1130                                 for(si=vroots.begin();si!=vroots.end();++si){
1131                                         qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
1132
1133                                         set<int>::iterator sir;
1134                                         for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1135                                                 if(! qnodes[(*sir)]->is_externally_visible){
1136                                                         candidates.insert( (*sir));
1137                                                 }
1138                                         }
1139                                 }
1140                                 vroots = candidates;
1141                                 candidates.clear();
1142                         }
1143                 }
1144 //              Find the nodes in multiple visible node subtrees, but with no parent
1145 //              that has is in multile visible node subtrees.  Mark these as inferred visible nodes.
1146                 done = true;    // until proven otherwise
1147                 for(i=0;i<qnodes.size();i++){
1148                         if(qnodes[i]->subtree_roots.size()>1){
1149                                 bool is_new_root = true;
1150                                 set<int>::iterator sir;
1151                                 for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
1152                                         if(qnodes[(*sir)]->subtree_roots.size()>1)
1153                                                 is_new_root = false;
1154                                 }
1155                                 if(is_new_root){
1156                                         qnodes[i]->is_externally_visible = true;
1157                                         qnodes[i]->inferred_visible_node = true;
1158                                         visible_nodes.push_back(i);
1159                                         done = false;
1160                                 }
1161                         }
1162                 }
1163         }
1164
1165
1166
1167
1168
1169 //              get visible nodes in topo ordering.
1170 //  for(i=0;i<qnodes.size();i++){
1171 //              qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
1172 //  }
1173   vector<int> process_order;
1174   while(roots.size() >0){
1175         for(si=roots.begin();si!=roots.end();++si){
1176                 if(discarded_nodes.count((*si))==0){
1177                         process_order.push_back( (*si) );
1178                 }
1179                 set<int>::iterator sir;
1180                 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1181                         qnodes[(*sir)]->n_consumers--;
1182                         if(qnodes[(*sir)]->n_consumers == 0)
1183                                 candidates.insert( (*sir));
1184                 }
1185         }
1186         roots = candidates;
1187         candidates.clear();
1188   }
1189
1190
1191 //printf("process_order.size() =%d\n",process_order.size());
1192
1193 //              Search for cyclic dependencies
1194   string found_dep;
1195   for(i=0;i<qnodes.size();++i){
1196         if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
1197                 if(found_dep.size() != 0) found_dep += ", ";
1198                 found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
1199         }
1200   }
1201   if(found_dep.size()>0){
1202         fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
1203         exit(1);
1204   }
1205
1206 //              Get a list of query sets, in the order to be processed.
1207 //              Start at visible root and do bfs.
1208 //              The query set includes queries referenced indirectly,
1209 //              as sources for user-defined operators.  These are needed
1210 //              to ensure that they are added to the schema, but are not part
1211 //              of the query tree.
1212
1213 //              stream_node_sets contains queries reachable only through the
1214 //              FROM clause, so I can tell which queries to add to the stream
1215 //              query. (DISABLED, UDOPS are integrated, does this cause problems?)
1216
1217 //                      NOTE: this code works because in order for data to be
1218 //                      read by multiple hftas, the node must be externally visible.
1219 //                      But visible nodes define roots of process sets.
1220 //                      internally visible nodes can feed data only
1221 //                      to other nodes in the same query file.
1222 //                      Therefore, any access can be restricted to a file,
1223 //                      hfta output sharing is done only on roots
1224 //                      never on interior nodes.
1225
1226
1227
1228
1229 //              Conpute the base collection of hftas.
1230   vector<hfta_node *> hfta_sets;
1231   map<string, int> hfta_name_map;
1232 //  vector< vector<int> > process_sets;
1233 //  vector< set<int> > stream_node_sets;
1234   reverse(process_order.begin(), process_order.end());  // get listing in reverse order.
1235                                                                                                                 // i.e. process leaves 1st.
1236   for(i=0;i<process_order.size();++i){
1237         if(qnodes[process_order[i]]->is_externally_visible == true){
1238 //printf("Visible.\n");
1239                 int root = process_order[i];
1240                 hfta_node *hnode = new hfta_node();
1241                 hnode->name = qnodes[root]-> name;
1242                 hnode->source_name = qnodes[root]-> name;
1243                 hnode->is_udop = qnodes[root]->is_udop;
1244                 hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
1245
1246                 vector<int> proc_list;  proc_list.push_back(root);
1247 //                      Ensure that nodes are added only once.
1248                 set<int> proc_set;      proc_set.insert(root);
1249                 roots.clear();                  roots.insert(root);
1250                 candidates.clear();
1251                 while(roots.size()>0){
1252                         for(si=roots.begin();si!=roots.end();++si){
1253 //printf("Processing root %d\n",(*si));
1254                                 set<int>::iterator sir;
1255                                 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1256 //printf("reads fom %d\n",(*sir));
1257                                         if(qnodes[(*sir)]->is_externally_visible==false){
1258                                                 candidates.insert( (*sir) );
1259                                                 if(proc_set.count( (*sir) )==0){
1260                                                         proc_set.insert( (*sir) );
1261                                                         proc_list.push_back( (*sir) );
1262                                                 }
1263                                         }
1264                                 }
1265                         }
1266                         roots = candidates;
1267                         candidates.clear();
1268                 }
1269
1270                 reverse(proc_list.begin(), proc_list.end());
1271                 hnode->query_node_indices = proc_list;
1272                 hfta_name_map[hnode->name] = hfta_sets.size();
1273                 hfta_sets.push_back(hnode);
1274         }
1275   }
1276
1277 //              Compute the reads_from / sources_to graphs for the hftas.
1278
1279   for(i=0;i<hfta_sets.size();++i){
1280         hfta_node *hnode = hfta_sets[i];
1281         for(q=0;q<hnode->query_node_indices.size();q++){
1282                 query_node *qnode = qnodes[hnode->query_node_indices[q]];
1283                 for(s=0;s<qnode->refd_tbls.size();++s){
1284                         if(hfta_name_map.count(qnode->refd_tbls[s])){
1285                                 int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
1286                                 hnode->reads_from.insert(other_hfta);
1287                                 hfta_sets[other_hfta]->sources_to.insert(i);
1288                         }
1289                 }
1290         }
1291   }
1292
1293 //              Compute a topological sort of the hfta_sets.
1294
1295   vector<int> hfta_topsort;
1296   workq.clear();
1297   int hnode_srcs[hfta_sets.size()];
1298   for(i=0;i<hfta_sets.size();++i){
1299         hnode_srcs[i] = 0;
1300         if(hfta_sets[i]->sources_to.size() == 0)
1301                 workq.push_back(i);
1302   }
1303
1304   while(! workq.empty()){
1305         int     node = workq.front();
1306         workq.pop_front();
1307         hfta_topsort.push_back(node);
1308         set<int>::iterator stsi;
1309         for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
1310                 int parent = (*stsi);
1311                 hnode_srcs[parent]++;
1312                 if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
1313                         workq.push_back(parent);
1314                 }
1315         }
1316   }
1317
1318 //              Decorate hfta nodes with the level of parallelism given as input.
1319
1320   map<string, int>::iterator msii;
1321   for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1322         string hfta_name = (*msii).first;
1323         int par = (*msii).second;
1324         if(hfta_name_map.count(hfta_name) > 0){
1325                 hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
1326         }else{
1327                 fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
1328         }
1329   }
1330
1331 //              Propagate levels of parallelism: children should have a level of parallelism
1332 //              as large as any of its parents.  Adjust children upwards to compensate.
1333 //              Start at parents and adjust children, auto-propagation will occur.
1334
1335   for(i=hfta_sets.size()-1;i>=0;i--){
1336         set<int>::iterator stsi;
1337         for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
1338                 if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
1339                         hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
1340                 }
1341         }
1342   }
1343
1344 //              Before all the name mangling, check if therey are any output_spec.cfg
1345 //              or hfta_parallelism.cfg entries that do not have a matching query.
1346
1347         string dangling_ospecs = "";
1348         for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
1349                 string oq = (*msii).first;
1350                 if(hfta_name_map.count(oq) == 0){
1351                         dangling_ospecs += " "+(*msii).first;
1352                 }
1353         }
1354         if(dangling_ospecs!=""){
1355                 fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
1356                 exit(1);
1357         }
1358
1359         string dangling_par = "";
1360         for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1361                 string oq = (*msii).first;
1362                 if(hfta_name_map.count(oq) == 0){
1363                         dangling_par += " "+(*msii).first;
1364                 }
1365         }
1366         if(dangling_par!=""){
1367                 fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
1368         }
1369
1370
1371
1372 //              Replicate parallelized hftas.  Do __copyX name mangling.  Adjust
1373 //              FROM clauses: retarget any name which is an internal node, and
1374 //              any which is in hfta_sets (and which is parallelized). Add Merge nodes
1375 //              when the source hfta has more parallelism than the target node.
1376 //              Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
1377
1378
1379   int n_original_hfta_sets = hfta_sets.size();
1380   for(i=0;i<n_original_hfta_sets;++i){
1381         if(hfta_sets[i]->n_parallel > 1){
1382                 hfta_sets[i]->do_generation =false;     // set the deletion flag for this entry.
1383                 set<string> local_nodes;                // names of query nodes in the hfta.
1384                 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1385                         local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
1386                 }
1387
1388                 for(p=0;p<hfta_sets[i]->n_parallel;++p){
1389                         string mangler = "__copy"+int_to_string(p);
1390                         hfta_node *par_hfta  = new hfta_node();
1391                         par_hfta->name = hfta_sets[i]->name + mangler;
1392                         par_hfta->source_name = hfta_sets[i]->name;
1393                         par_hfta->is_udop = hfta_sets[i]->is_udop;
1394                         par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
1395                         par_hfta->n_parallel = hfta_sets[i]->n_parallel;
1396                         par_hfta->parallel_idx = p;
1397
1398                         map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
1399
1400 //      Is it a UDOP?
1401                         if(hfta_sets[i]->is_udop){
1402                                 int root = hfta_sets[i]->query_node_indices[0];
1403
1404                                 string unequal_par_sources;
1405                                 set<int>::iterator rfsii;
1406                                 for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
1407                                         if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
1408                                                 unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
1409                                         }
1410                                 }
1411                                 if(unequal_par_sources != ""){
1412                                         fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
1413                                         exit(1);
1414                                 }
1415
1416                                 int rti;
1417                                 vector<string> new_sources;
1418                                 for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
1419                                         new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
1420                                 }
1421
1422                                 query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
1423                                 new_qn->name += mangler;
1424                                 new_qn->mangler = mangler;
1425                                 new_qn->refd_tbls = new_sources;
1426                                 par_hfta->query_node_indices.push_back(qnodes.size());
1427                                 par_qnode_map[new_qn->name] = qnodes.size();
1428                                 name_node_map[ new_qn->name ] = qnodes.size();
1429                                 qnodes.push_back(new_qn);
1430                         }else{
1431 //              regular query node
1432                           for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1433                                 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1434                                 table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
1435 //                                      rehome the from clause on mangled names.
1436 //                                      create merge nodes as needed for external sources.
1437                                 for(f=0;f<dup_pt->fm->tlist.size();++f){
1438                                         if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
1439                                                 dup_pt->fm->tlist[f]->schema_name += mangler;
1440                                         }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
1441 //                      Ref's an external HFTA.  No parallelism => leave as is.  Else, if level of parallelism of the two hftas is the same, rename by the mangler.  Else, there mnust be more sources, so create a merge node.
1442                                                 int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
1443                                                 if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
1444                                                         dup_pt->fm->tlist[f]->schema_name += mangler;
1445                                                 }else{
1446                                                         vector<string> src_tbls;
1447                                                         int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
1448                                                         if(stride == 0){
1449                                                                 fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
1450                                                                 exit(1);
1451                                                         }
1452                                                         for(s=0;s<stride;++s){
1453                                                                 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
1454                                                                 src_tbls.push_back(ext_src_name);
1455                                                         }
1456                                                         table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1457                                                         string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
1458                                                         dup_pt->fm->tlist[f]->schema_name = merge_node_name;
1459 //                                      Make a qnode to represent the new merge node
1460                                                         query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1461                                                         qn_pt->refd_tbls = src_tbls;
1462                                                         qn_pt->is_udop  = false;
1463                                                         qn_pt->is_externally_visible = false;
1464                                                         qn_pt->inferred_visible_node  = false;
1465                                                         par_hfta->query_node_indices.push_back(qnodes.size());
1466                                                         par_qnode_map[merge_node_name] = qnodes.size();
1467                                                         name_node_map[ merge_node_name ] = qnodes.size();
1468                                                         qnodes.push_back(qn_pt);
1469                                                 }
1470                                         }
1471                                 }
1472                                 query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
1473                                 for(f=0;f<dup_pt->fm->tlist.size();++f){
1474                                         new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
1475                                 }
1476                                 new_qn->params = qnodes[hqn_idx]->params;
1477                                 new_qn->is_udop = false;
1478                                 new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
1479                                 new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
1480                                 par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
1481                                 par_qnode_map[new_qn->name] = qnodes.size();
1482                                 name_node_map[ new_qn->name ] = qnodes.size();
1483                                 qnodes.push_back(new_qn);
1484                           }
1485                         }
1486                         hfta_name_map[par_hfta->name] = hfta_sets.size();
1487                         hfta_sets.push_back(par_hfta);
1488                 }
1489         }else{
1490 //              This hfta isn't being parallelized, but add merge nodes for any parallelized
1491 //              hfta sources.
1492                 if(!hfta_sets[i]->is_udop){
1493                   for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1494                         int hqn_idx = hfta_sets[i]->query_node_indices[h];
1495                         for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
1496                                 if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
1497 //                      Ref's an external HFTA.  No parallelism => leave as is.  Else, if level of parallelism of the two hftas is the same, rename by the mangler.  Else, there mnust be more sources, so create a merge node.
1498                                         int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
1499                                         if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
1500                                                 vector<string> src_tbls;
1501                                                 for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
1502                                                         string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
1503                                                         src_tbls.push_back(ext_src_name);
1504                                                 }
1505                                                 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1506                                                 string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
1507                                                 qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
1508 //                                      Make a qnode to represent the new merge node
1509                                                 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1510                                                 qn_pt->refd_tbls = src_tbls;
1511                                                 qn_pt->is_udop  = false;
1512                                                 qn_pt->is_externally_visible = false;
1513                                                 qn_pt->inferred_visible_node  = false;
1514                                                 hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
1515                                                 name_node_map[ merge_node_name ] = qnodes.size();
1516                                                 qnodes.push_back(qn_pt);
1517                                         }
1518                                 }
1519                         }
1520                 }
1521           }
1522         }
1523   }
1524
1525 //                      Rebuild the reads_from / sources_to lists in the qnodes
1526   for(q=0;q<qnodes.size();++q){
1527         qnodes[q]->reads_from.clear();
1528         qnodes[q]->sources_to.clear();
1529   }
1530   for(q=0;q<qnodes.size();++q){
1531         for(s=0;s<qnodes[q]->refd_tbls.size();++s){
1532                 if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
1533                         int rf = name_node_map[qnodes[q]->refd_tbls[s]];
1534                         qnodes[q]->reads_from.insert(rf);
1535                         qnodes[rf]->sources_to.insert(q);
1536                 }
1537         }
1538   }
1539
1540 //                      Rebuild the reads_from / sources_to lists in hfta_sets
1541   for(q=0;q<hfta_sets.size();++q){
1542         hfta_sets[q]->reads_from.clear();
1543         hfta_sets[q]->sources_to.clear();
1544   }
1545   for(q=0;q<hfta_sets.size();++q){
1546         for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
1547                 int node = hfta_sets[q]->query_node_indices[s];
1548                 set<int>::iterator rfsii;
1549                 for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
1550                         if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
1551                                 hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
1552                                 hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
1553                         }
1554                 }
1555         }
1556   }
1557
1558 /*
1559 for(q=0;q<qnodes.size();++q){
1560  printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
1561  set<int>::iterator rsii;
1562  for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
1563   printf(" %d",(*rsii));
1564   printf(", and sources-to %d:",qnodes[q]->sources_to.size());
1565  for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
1566   printf(" %d",(*rsii));
1567  printf("\n");
1568 }
1569
1570 for(q=0;q<hfta_sets.size();++q){
1571  if(hfta_sets[q]->do_generation==false)
1572         continue;
1573  printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
1574  set<int>::iterator rsii;
1575  for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
1576   printf(" %d",(*rsii));
1577   printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
1578  for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
1579   printf(" %d",(*rsii));
1580  printf("\n");
1581 }
1582 */
1583
1584
1585
1586 //              Re-topo sort the hftas
1587   hfta_topsort.clear();
1588   workq.clear();
1589   int hnode_srcs_2[hfta_sets.size()];
1590   for(i=0;i<hfta_sets.size();++i){
1591         hnode_srcs_2[i] = 0;
1592         if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
1593                 workq.push_back(i);
1594         }
1595   }
1596
1597   while(workq.empty() == false){
1598         int     node = workq.front();
1599         workq.pop_front();
1600         hfta_topsort.push_back(node);
1601         set<int>::iterator stsii;
1602         for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
1603                 int child = (*stsii);
1604                 hnode_srcs_2[child]++;
1605                 if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
1606                         workq.push_back(child);
1607                 }
1608         }
1609   }
1610
1611 //              Ensure that all of the query_node_indices in hfta_sets are topologically
1612 //              sorted, don't rely on assumptions that all transforms maintain some kind of order.
1613   for(i=0;i<hfta_sets.size();++i){
1614         if(hfta_sets[i]->do_generation){
1615                 map<int,int> n_accounted;
1616                 vector<int> new_order;
1617                 workq.clear();
1618                 vector<int>::iterator vii;
1619                 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1620                         n_accounted[(*vii)]= 0;
1621                 }
1622                 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1623                         set<int>::iterator rfsii;
1624                         for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
1625                                 if(n_accounted.count((*rfsii)) == 0){
1626                                         n_accounted[(*vii)]++;
1627                                 }
1628                         }
1629                         if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
1630                                 workq.push_back((*vii));
1631                         }
1632                 }
1633
1634                 while(workq.empty() == false){
1635                         int node = workq.front();
1636                         workq.pop_front();
1637                         new_order.push_back(node);
1638                         set<int>::iterator stsii;
1639                         for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
1640                                 if(n_accounted.count((*stsii))){
1641                                         n_accounted[(*stsii)]++;
1642                                         if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
1643                                                 workq.push_back((*stsii));
1644                                         }
1645                                 }
1646                         }
1647                 }
1648                 hfta_sets[i]->query_node_indices = new_order;
1649         }
1650   }
1651
1652
1653
1654
1655
1656 ///                     Global checkng is done, start the analysis and translation
1657 ///                     of the query parse tree in the order specified by process_order
1658
1659
1660 //                      Get a list of the LFTAs for global lfta optimization
1661 //                              TODO: separate building operators from spliting lftas,
1662 //                                      that will make optimizations such as predicate pushing easier.
1663         vector<stream_query *> lfta_list;
1664         stream_query *rootq;
1665     int qi,qj;
1666
1667         map<string, map<int, string> > schema_of_schemaid; // ensure that schema IDs are unique to an <interface, schema>
1668
1669         for(qi=hfta_topsort.size()-1;qi>=0;--qi){
1670
1671         int hfta_id = hfta_topsort[qi];
1672     vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
1673
1674
1675
1676 //              Two possibilities, either its a UDOP, or its a collection of queries.
1677 //      if(qnodes[curr_list.back()]->is_udop)
1678         if(hfta_sets[hfta_id]->is_udop){
1679                 int node_id = curr_list.back();
1680                 int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
1681                 opview_entry *opv = new opview_entry();
1682
1683 //                      Many of the UDOP properties aren't currently used.
1684                 opv->parent_qname = "no_parent";
1685                 opv->root_name = qnodes[node_id]->name;
1686                 opv->view_name = qnodes[node_id]->file;
1687                 opv->pos = qi;
1688                 sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
1689                 opv->udop_alias = tmpstr;
1690                 opv->mangler = qnodes[node_id]->mangler;
1691
1692                 if(opv->mangler != ""){
1693                         int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
1694                         Schema->mangle_subq_names(new_udop_schref,opv->mangler);
1695                 }
1696
1697 //                      This piece of code makes each hfta which referes to the same udop
1698 //                      reference a distinct running udop.  Do this at query optimization time?
1699 //              fmtbl->set_udop_alias(opv->udop_alias);
1700
1701                 opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
1702                 opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
1703
1704                 vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
1705                 int s,f,q;
1706                 for(s=0;s<subq.size();++s){
1707 //                              Validate that the fields match.
1708                         subquery_spec *sqs = subq[s];
1709                         string subq_name = sqs->name + opv->mangler;
1710                         vector<field_entry *> flds = Schema->get_fields(subq_name);
1711                         if(flds.size() == 0){
1712                                 fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
1713                                 return(1);
1714                         }
1715                         if(flds.size() < sqs->types.size()){
1716                                 fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
1717                                 return(1);
1718                         }
1719                         bool failed = false;
1720                         for(f=0;f<sqs->types.size();++f){
1721                                 data_type dte(sqs->types[f],sqs->modifiers[f]);
1722                                 data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
1723                                 if(! dte.subsumes_type(&dtf) ){
1724                                         fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
1725                                         failed = true;
1726                                 }
1727 /*
1728                                 if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
1729                                         string pstr = dte.get_temporal_string();
1730                                         fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
1731                                         failed = true;
1732                                 }
1733 */
1734                         }
1735                         if(failed)
1736                                 return(1);
1737 ///                             Validation done, find the subquery, make a copy of the
1738 ///                             parse tree, and add it to the return list.
1739                         for(q=0;q<qnodes.size();++q)
1740                                 if(qnodes[q]->name == subq_name)
1741                                         break;
1742                         if(q==qnodes.size()){
1743                                 fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
1744                                 return(1);
1745                         }
1746
1747                 }
1748
1749 //                      Cross-link to from entry(s) in all sourced-to tables.
1750                 set<int>::iterator sii;
1751                 for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
1752 //printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
1753                         vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
1754                         int ii;
1755                         for(ii=0;ii<tblvars.size();++ii){
1756                                 if(tblvars[ii]->schema_name == opv->root_name){
1757                                         tblvars[ii]->set_opview_idx(opviews.size());
1758                                 }
1759
1760                         }
1761                 }
1762
1763                 opviews.append(opv);
1764         }else{
1765
1766 //                      Analyze the parse trees in this query,
1767 //                      put them in rootq
1768 //      vector<int> curr_list = process_sets[qi];
1769
1770
1771 ////////////////////////////////////////
1772
1773           rootq = NULL;
1774 //printf("Process set %d, has %d queries\n",qi,curr_list.size());
1775           for(qj=0;qj<curr_list.size();++qj){
1776                 i = curr_list[qj];
1777         fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
1778
1779 //                      Select the current query parse tree
1780                 table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
1781
1782 //                      if hfta only, try to fetch any missing schemas
1783 //                      from the registry (using the print_schema program).
1784 //                      Here I use a hack to avoid analyzing the query -- all referenced
1785 //                      tables must be in the from clause
1786 //                      If there is a problem loading any table, just issue a warning,
1787 //
1788                 tablevar_list_t *fm = fta_parse_tree->get_from();
1789                 vector<string> refd_tbls =  fm->get_src_tbls(Schema);
1790 //                      iterate over all referenced tables
1791                 int t;
1792                 for(t=0;t<refd_tbls.size();++t){
1793                   int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
1794
1795                   if(tbl_ref < 0){      // if this table is not in the Schema
1796
1797                         if(hfta_only){
1798                                 string cmd="print_schema "+refd_tbls[t];
1799                                 FILE *schema_in = popen(cmd.c_str(), "r");
1800                                 if(schema_in == NULL){
1801                                   fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
1802                                 }else{
1803                                   string schema_instr;
1804                                   while(fgets(tmpstr,TMPSTRLEN,schema_in)){
1805                                         schema_instr += tmpstr;
1806                                   }
1807                           fta_parse_result = new fta_parse_t();
1808                                   strcpy(tmp_schema_str,schema_instr.c_str());
1809                                   FtaParser_setstringinput(tmp_schema_str);
1810                           if(FtaParserparse()){
1811                                 fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
1812                           }else{
1813                                         if( fta_parse_result->tables != NULL){
1814                                                 int tl;
1815                                                 for(tl=0;tl<fta_parse_result->tables->size();++tl){
1816                                                         Schema->add_table(fta_parse_result->tables->get_table(tl));
1817                                                 }
1818                                         }else{
1819                                                 fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
1820                                         }
1821                                 }
1822                         }
1823                   }else{
1824                                 fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
1825                                 exit(1);
1826                   }
1827
1828                 }
1829           }
1830
1831
1832 //                              Analyze the query.
1833           query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
1834           if(qs == NULL){
1835                 fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
1836                 exit(1);
1837           }
1838
1839           stream_query new_sq(qs, Schema);
1840           if(new_sq.error_code){
1841                         fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
1842                         exit(1);
1843           }
1844
1845 //                      Add it to the Schema
1846           table_def *output_td = new_sq.get_output_tabledef();
1847           Schema->add_table(output_td);
1848
1849 //                      Create a query plan from the analyzed parse tree.
1850 //                      If its a query referneced via FROM, add it to the stream query.
1851           if(rootq){
1852                 rootq->add_query(new_sq);
1853           }else{
1854                 rootq = new stream_query(new_sq);
1855 //                      have the stream query object inherit properties form the analyzed
1856 //                      hfta_node object.
1857                 rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
1858                 rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
1859           }
1860
1861
1862     }
1863
1864 //              This stream query has all its parts
1865 //              Build and optimize it.
1866 //printf("translate_fta: generating plan.\n");
1867         if(rootq->generate_plan(Schema)){
1868                 fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
1869                 continue;
1870         }
1871
1872 //      If we've found the query plan head, so now add the output operators
1873         if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
1874                 pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
1875                 multimap<string, int>::iterator mmsi;
1876                 oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
1877                 for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
1878                         rootq->add_output_operator(output_specs[(*mmsi).second]);
1879                 }
1880         }
1881
1882
1883
1884 //                              Perform query splitting if necessary.
1885         bool hfta_returned;
1886     vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
1887
1888         int l;
1889 //for(l=0;l<split_queries.size();++l){
1890 //printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
1891 //}
1892
1893
1894
1895
1896     if(split_queries.size() > 0){       // should be at least one component.
1897
1898 //                              Compute the number of LFTAs.
1899           int n_lfta = split_queries.size();
1900           if(hfta_returned) n_lfta--;
1901 //                              Check if a schemaId constraint needs to be inserted.
1902
1903 //                              Process the LFTA components.
1904           for(l=0;l<n_lfta;++l){
1905            if(lfta_names.count(split_queries[l]->query_name) == 0){
1906 //                              Grab the lfta for global optimization.
1907                 vector<tablevar_t *> tvec =  split_queries[l]->query_plan[0]->get_input_tbls();
1908                 string liface = "_local_";
1909 //              string lmach = "";
1910                 string lmach = hostname;
1911                 if(tvec.size()>0){
1912                         liface = tvec[0]->get_interface();      // iface queries have been resolved
1913                         if(tvec[0]->get_machine() != ""){
1914                                 lmach = tvec[0]->get_machine();
1915                         }else{
1916                                 fprintf(stderr,"WARNING, lfta %s has empty machine name, using %s\n",  split_queries[l]->query_plan[0]->node_name.c_str(), hostname.c_str());
1917                         }
1918                 } // else{
1919                         interface_names.push_back(liface);
1920                         machine_names.push_back(lmach);
1921 //              }
1922
1923                 vector<predicate_t *> schemaid_preds;
1924                 for(int irv=0;irv<tvec.size();++irv){
1925
1926                         string schema_name = tvec[irv]->get_schema_name();
1927                         string rvar_name = tvec[irv]->get_var_name();
1928                         int schema_ref = tvec[irv]->get_schema_ref();
1929                         if (lmach == "")
1930                                 lmach = hostname;
1931 //                      interface_names.push_back(liface);
1932 //                      machine_names.push_back(lmach);
1933
1934 //printf("Machine is %s\n",lmach.c_str());
1935
1936 //                              Check if a schemaId constraint needs to be inserted.
1937                         if(schema_ref<0){ // can result from some kinds of splits
1938                                 schema_ref = Schema->get_table_ref(schema_name);
1939                         }
1940                         int schema_id = Schema->get_schema_id(schema_ref);  // id associated with PROTOCOL
1941                         int errnum = 0;
1942                         string if_error;
1943                         iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
1944                         if(iface==NULL){
1945                                 fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
1946                                 exit(1);
1947                         }       
1948
1949
1950                         if(tvec[irv]->get_interface() != "_local_"){
1951                          if(iface->has_multiple_schemas()){
1952                                 if(schema_id<0){        // invalid schema_id
1953                                         fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
1954                                         exit(1);
1955                                 }
1956                                 vector<string> iface_schemas = iface->get_property("Schemas");
1957                                 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1958                                         fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1959                                         exit(1);
1960                                 }
1961 // Ensure that in liface, schema_id is used for only one schema
1962                                 if(schema_of_schemaid.count(liface)==0){
1963                                         map<int, string> empty_map;
1964                                         schema_of_schemaid[liface] = empty_map;
1965                                 }
1966                                 if(schema_of_schemaid[liface].count(schema_id)==0){
1967                                         schema_of_schemaid[liface][schema_id] = schema_name;
1968                                 }else{
1969                                         if(schema_of_schemaid[liface][schema_id] != schema_name){
1970                                                 fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
1971                                                 exit(1);
1972                                         }
1973                                 }
1974                          }else{ // single-schema interface
1975                                 schema_id = -1; // don't generate schema_id predicate
1976                                 vector<string> iface_schemas = iface->get_property("Schemas");
1977                                 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1978                                         fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1979                                         exit(1);
1980                                 }
1981                                 if(iface_schemas.size()>1){
1982                                         fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
1983                                         exit(1);
1984                                 }
1985                          }                      
1986                         }else{
1987                                 schema_id = -1;
1988                         }
1989
1990 // If we need to check the schema_id, insert a predicate into the lfta.
1991 //       TODO not just schema_id, the full all_schema_ids set.
1992                         if(schema_id>=0){
1993                                 colref_t *schid_cr = new colref_t("schemaId");
1994                                 schid_cr->schema_ref = schema_ref;
1995                                 schid_cr->table_name = rvar_name;
1996                                 schid_cr->tablevar_ref = 0;
1997                                 schid_cr->default_table = false;
1998                                 scalarexp_t *schid_se = new scalarexp_t(schid_cr);
1999                                 data_type *schid_dt = new data_type("uint");
2000                                 schid_se->dt = schid_dt;
2001
2002                                 string schid_str = int_to_string(schema_id);
2003                                 literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
2004                                 scalarexp_t *lit_se = new scalarexp_t(schid_lit);
2005                                 lit_se->dt = schid_dt;
2006
2007                                 predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
2008                                 vector<cnf_elem *> clist;
2009                                 make_cnf_from_pr(schid_pr, clist);
2010                                 analyze_cnf(clist[0]);
2011                                 clist[0]->cost = 1;     // cheap one comparison
2012 // cnf built, now insert it.
2013                                 split_queries[l]->query_plan[0]->append_to_where(clist[0]);
2014
2015 // Specialized processing 
2016 //              filter join, get two schemaid preds
2017                                 string node_type = split_queries[l]->query_plan[0]->node_type();
2018                                 if(node_type == "filter_join"){
2019                                         filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2020                                         if(irv==0){
2021                                                 fj->pred_t0.push_back(clist[0]);
2022                                         }else{
2023                                                 fj->pred_t1.push_back(clist[0]);
2024                                         }
2025                                         schemaid_preds.push_back(schid_pr);
2026                                 }
2027 //              watchlist join, get the first schemaid pred
2028                                 if(node_type == "watch_join"){
2029                                         watch_join_qpn *fj = (watch_join_qpn *)split_queries[l]->query_plan[0];
2030                                         if(irv==0){
2031                                                 fj->pred_t0.push_back(clist[0]);
2032                                                 schemaid_preds.push_back(schid_pr);
2033                                         }
2034                                 }
2035                         }
2036                 }
2037 // Specialized processing, currently filter join.
2038                 if(schemaid_preds.size()>1){
2039                         string node_type = split_queries[l]->query_plan[0]->node_type();
2040                         if(node_type == "filter_join"){
2041                                 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2042                                 predicate_t *filter_pr = new predicate_t("OR", schemaid_preds[0], schemaid_preds[1]);
2043                                 vector<cnf_elem *> clist;
2044                                 make_cnf_from_pr(filter_pr, clist);
2045                                 analyze_cnf(clist[0]);
2046                                 clist[0]->cost = 1;     // cheap one comparison
2047                                 fj->shared_pred.push_back(clist[0]);
2048                         }
2049                 }
2050                         
2051
2052
2053
2054
2055
2056
2057 //                      Set the ht size from the recommendation, if there is one in the rec file
2058                 if(lfta_htsize.count(split_queries[l]->query_name)>0){
2059                         split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
2060                 }
2061
2062
2063                 lfta_names[split_queries[l]->query_name] = lfta_list.size();
2064                 split_queries[l]->set_gid(lfta_list.size());  // set lfta global id
2065                 lfta_list.push_back(split_queries[l]);
2066                 lfta_mach_lists[lmach].push_back(split_queries[l]);
2067
2068 //                      THe following is a hack,
2069 //                      as I should be generating LFTA code through
2070 //                      the stream_query object.
2071
2072                 split_queries[l]->query_plan[0]->bind_to_schema(Schema);
2073
2074 //              split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
2075
2076 /*
2077 //                              Create query description to embed in lfta.c
2078                 string lfta_schema_str = split_queries[l]->make_schema();
2079                 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2080
2081 //                              get NIC capabilities.
2082                 int erri;
2083                 nic_property *nicprop = NULL;
2084                 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2085                 if(iface_codegen_type.size()){
2086                         nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2087                         if(!nicprop){
2088                                 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2089                                         exit(1);
2090                         }
2091                 }
2092
2093                 lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
2094 */
2095
2096                 snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
2097
2098 // TODO NOTE : I'd like it to be the case that registration_query_names
2099 //      are the queries to be registered for subsciption.
2100 //      but there is some complex bookkeeping here.
2101                 registration_query_names.push_back(split_queries[l]->query_name);
2102                 mach_query_names[lmach].push_back(registration_query_names.size()-1);
2103 //                      NOTE: I will assume a 1-1 correspondance between
2104 //                      mach_query_names[lmach] and lfta_mach_lists[lmach]
2105 //                      where mach_query_names[lmach][i] contains the index into
2106 //                      query_names, which names the lfta, and
2107 //                      mach_query_names[lmach][i] is the stream_query * of the
2108 //                      corresponding lfta.
2109 //                      Later, lfta_iface_qnames are the query names matching lfta_iface_lists
2110
2111
2112
2113                 // check if lfta is reusable
2114                 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
2115
2116                 bool lfta_reusable = false;
2117                 if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
2118                         split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
2119                         lfta_reusable = true;
2120                 }
2121                 lfta_reuse_options.push_back(lfta_reusable);
2122
2123                 // LFTA will inherit the liveness timeout specification from the containing query
2124                 // it is too conservative as lfta are expected to spend less time per tuple
2125                 // then full query
2126
2127                 // extract liveness timeout from query definition
2128                 int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
2129                 if (!liveness_timeout) {
2130 //                  fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
2131 //                    split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2132                     liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2133                 }
2134                 lfta_liveness_timeouts.push_back(liveness_timeout);
2135
2136 //                      Add it to the schema
2137                 table_def *td = split_queries[l]->get_output_tabledef();
2138                 Schema->append_table(td);
2139 //printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
2140
2141           }
2142          }
2143
2144 //                              If the output is lfta-only, dump out the query name.
2145       if(split_queries.size() == 1 && !hfta_returned){
2146         if(output_query_names ){
2147            fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
2148                 }
2149 /*
2150 else{
2151            fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
2152                 }
2153 */
2154
2155 /*
2156 //                              output schema summary
2157                 if(output_schema_summary){
2158                         dump_summary(split_queries[0]);
2159                 }
2160 */
2161       }
2162
2163
2164           if(hfta_returned){            // query also has an HFTA component
2165                 int hfta_nbr = split_queries.size()-1;
2166
2167                         hfta_list.push_back(split_queries[hfta_nbr]);
2168
2169 //                                      report on generated query names
2170         if(output_query_names){
2171                         string hfta_name =split_queries[hfta_nbr]->query_name;
2172                 fprintf(query_name_output,"%s H\n",hfta_name.c_str());
2173                         for(l=0;l<hfta_nbr;++l){
2174                                 string lfta_name =split_queries[l]->query_name;
2175                         fprintf(query_name_output,"%s L\n",lfta_name.c_str());
2176                         }
2177                 }
2178 //              else{
2179 //              fprintf(stderr,"query names are ");
2180 //                      for(l=0;l<hfta_nbr;++l){
2181 //                              if(l>0) fprintf(stderr,",");
2182 //                              string fta_name =split_queries[l]->query_name;
2183 //                      fprintf(stderr," %s",fta_name.c_str());
2184 //                      }
2185 //                      fprintf(stderr,"\n");
2186 //              }
2187           }
2188
2189   }else{
2190           fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
2191           fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
2192           exit(1);
2193   }
2194  }
2195 }
2196
2197
2198 //-----------------------------------------------------------------
2199 //              Compute and propagate the SE in PROTOCOL fields compute a field.
2200 //-----------------------------------------------------------------
2201
2202 for(i=0;i<lfta_list.size();i++){
2203         lfta_list[i]->generate_protocol_se(sq_map, Schema);
2204         sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
2205 }
2206 for(i=0;i<hfta_list.size();i++){
2207         hfta_list[i]->generate_protocol_se(sq_map, Schema);
2208         sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
2209 }
2210
2211
2212
2213 //------------------------------------------------------------------------
2214 //              Perform  individual FTA optimizations
2215 //-----------------------------------------------------------------------
2216
2217 if (partitioned_mode) {
2218
2219         // open partition definition file
2220         string part_fname = config_dir_path + "partition.txt";
2221
2222         FILE* partfd = fopen(part_fname.c_str(), "r");
2223         if (!partfd) {
2224                 fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
2225                 exit(1);
2226         }
2227         PartnParser_setfileinput(partfd);
2228         if (PartnParserparse()) {
2229                 fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
2230                 exit(1);
2231         }
2232         fclose(partfd);
2233 }
2234
2235
2236 print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2237
2238 int num_hfta = hfta_list.size();
2239 for(i=0; i < hfta_list.size(); ++i){
2240         hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
2241 }
2242
2243 //                      Add all new hftas to schema
2244 for(i=num_hfta; i < hfta_list.size(); ++i){
2245                 table_def *td = hfta_list[i]->get_output_tabledef();
2246                 Schema->append_table(td);
2247 }
2248
2249 print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2250
2251
2252
2253 //------------------------------------------------------------------------
2254 //              Do global (cross-fta) optimization
2255 //-----------------------------------------------------------------------
2256
2257
2258
2259
2260
2261
2262 set<string> extra_external_libs;
2263
2264 for(i=0;i<hfta_list.size();++i){                // query also has an HFTA component
2265
2266                 if(! debug_only){
2267 //                                      build hfta file name, create output
2268            if(numeric_hfta_flname){
2269             sprintf(tmpstr,"hfta_%d",hfta_count);
2270                         hfta_names.push_back(tmpstr);
2271             sprintf(tmpstr,"hfta_%d.cc",hfta_count);
2272          }else{
2273             sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
2274                         hfta_names.push_back(tmpstr);
2275             sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
2276           }
2277                   FILE *hfta_fl = fopen(tmpstr,"w");
2278                   if(hfta_fl == NULL){
2279                         fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
2280                         exit(1);
2281                   }
2282                   fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
2283
2284 //                      If there is a field verifier, warn about
2285 //                      lack of compatability
2286 //                              NOTE : this code assumes that visible non-lfta queries
2287 //                              are those at the root of a stream query.
2288                   string hfta_comment;
2289                   string hfta_title;
2290                   string hfta_namespace;
2291                   if(hfta_list[i]->defines.count("comment")>0)
2292                         hfta_comment = hfta_list[i]->defines["comment"];
2293                   if(hfta_list[i]->defines.count("Comment")>0)
2294                         hfta_comment = hfta_list[i]->defines["Comment"];
2295                   if(hfta_list[i]->defines.count("COMMENT")>0)
2296                         hfta_comment = hfta_list[i]->defines["COMMENT"];
2297                   if(hfta_list[i]->defines.count("title")>0)
2298                         hfta_title = hfta_list[i]->defines["title"];
2299                   if(hfta_list[i]->defines.count("Title")>0)
2300                         hfta_title = hfta_list[i]->defines["Title"];
2301                   if(hfta_list[i]->defines.count("TITLE")>0)
2302                         hfta_title = hfta_list[i]->defines["TITLE"];
2303                   if(hfta_list[i]->defines.count("namespace")>0)
2304                         hfta_namespace = hfta_list[i]->defines["namespace"];
2305                   if(hfta_list[i]->defines.count("Namespace")>0)
2306                         hfta_namespace = hfta_list[i]->defines["Namespace"];
2307                   if(hfta_list[i]->defines.count("NAMESPACE")>0)
2308                         hfta_namespace = hfta_list[i]->defines["NAMESPACE"];
2309
2310                   if(field_verifier != NULL){
2311                         string warning_str;
2312                         if(hfta_comment == "")
2313                                 warning_str += "\tcomment not found.\n";
2314
2315 // Obsolete stuff that Carsten wanted
2316 //                      if(hfta_title == "")
2317 //                              warning_str += "\ttitle not found.\n";
2318 //                      if(hfta_namespace == "")
2319 //                              warning_str += "\tnamespace not found.\n";
2320
2321 //      There is a get_tbl_keys method implemented for qp_nodes,
2322 //      integrate it into steam_query, then call it to find keys,
2323 //      and annotate feidls with their key-ness.
2324 //      If there is a "keys" proprty in the defines block, override anything returned
2325 //      from the automated analysis
2326
2327                         vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2328                         int fi;
2329                         for(fi=0;fi<flds.size();fi++){
2330                                 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2331                         }
2332                         if(warning_str != "")
2333                                 fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
2334                                         hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
2335                   }
2336
2337 // Get the fields in this query
2338                   vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2339
2340 // do key processing
2341                   string hfta_keys_s = "";
2342                   if(hfta_list[i]->defines.count("keys")>0)
2343                         hfta_keys_s = hfta_list[i]->defines["keys"];
2344                   if(hfta_list[i]->defines.count("Keys")>0)
2345                         hfta_keys_s = hfta_list[i]->defines["Keys"];
2346                   if(hfta_list[i]->defines.count("KEYS")>0)
2347                         hfta_keys_s = hfta_list[i]->defines["KEYS"];
2348                   string xtra_keys_s = "";
2349                   if(hfta_list[i]->defines.count("extra_keys")>0)
2350                         xtra_keys_s = hfta_list[i]->defines["extra_keys"];
2351                   if(hfta_list[i]->defines.count("Extra_Keys")>0)
2352                         xtra_keys_s = hfta_list[i]->defines["Extra_Keys"];
2353                   if(hfta_list[i]->defines.count("EXTRA_KEYS")>0)
2354                         xtra_keys_s = hfta_list[i]->defines["EXTRA_KEYS"];
2355 // get the keys
2356                   vector<string> hfta_keys;
2357                   vector<string> partial_keys;
2358                   vector<string> xtra_keys;
2359                   if(hfta_keys_s==""){
2360                                 hfta_keys = hfta_list[i]->get_tbl_keys(partial_keys);
2361                                 if(xtra_keys_s.size()>0){
2362                                         xtra_keys = split_string(xtra_keys_s, ',');
2363                                 }
2364                                 for(int xi=0;xi<xtra_keys.size();++xi){
2365                                         if(find(hfta_keys.begin(), hfta_keys.end(), xtra_keys[xi]) == hfta_keys.end()){
2366                                                 hfta_keys.push_back(xtra_keys[xi]);
2367                                         }
2368                                 }
2369                   }else{
2370                                 hfta_keys = split_string(hfta_keys_s, ',');
2371                   }
2372 // validate that all of the keys exist in the output.
2373 //      (exit on error, as its a bad specificiation)
2374                   vector<string> missing_keys;
2375                   for(int ki=0;ki<hfta_keys.size(); ++ki){
2376                         int fi;
2377                         for(fi=0;fi<flds.size();++fi){
2378                                 if(hfta_keys[ki] == flds[fi]->get_name())
2379                                         break;
2380                         }
2381                         if(fi==flds.size())
2382                                 missing_keys.push_back(hfta_keys[ki]);
2383                   }
2384                   if(missing_keys.size()>0){
2385                         fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the output:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2386                         for(int hi=0; hi<missing_keys.size(); ++hi){
2387                                 fprintf(stderr," %s", missing_keys[hi].c_str());
2388                         }
2389                         fprintf(stderr,"\n");
2390                         exit(1);
2391                   }
2392
2393                   fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2394                   if(hfta_comment != "")
2395                         fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
2396                   if(hfta_title != "")
2397                         fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
2398                   if(hfta_namespace != "")
2399                         fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
2400                   fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
2401                   fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2402
2403 //                              write info about fields to qtree.xml
2404                   int fi;
2405                   for(fi=0;fi<flds.size();fi++){
2406                         fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2407                         if(flds[fi]->get_modifier_list()->size()){
2408                                 fprintf(qtree_output,"mods='%s'", flds[fi]->get_modifier_list()->to_string().c_str());
2409                         }
2410                         fprintf(qtree_output," />\n");
2411                   }
2412 // info about keys
2413                   for(int hi=0;hi<hfta_keys.size();++hi){
2414                         fprintf(qtree_output, "\t\t<Key value='%s'/>\n",hfta_keys[hi].c_str());
2415                   }
2416                   for(int hi=0;hi<partial_keys.size();++hi){
2417                         fprintf(qtree_output, "\t\t<PartialKey value='%s'/>\n",partial_keys[hi].c_str());
2418                   }
2419                   for(int hi=0;hi<xtra_keys.size();++hi){
2420                         fprintf(qtree_output, "\t\t<ExtraKey value='%s'/>\n",xtra_keys[hi].c_str());
2421                   }
2422
2423
2424                   // extract liveness timeout from query definition
2425                   int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
2426                   if (!liveness_timeout) {
2427 //                  fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
2428 //                    hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2429                     liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2430                   }
2431                   fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
2432
2433                   vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
2434                   int itv;
2435                   for(itv=0;itv<tmp_tv.size();++itv){
2436                         fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
2437                   }
2438                   string ifrs = hfta_list[i]->collect_refd_ifaces();
2439                   if(ifrs != ""){
2440                         fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
2441                   }
2442                   fprintf(qtree_output,"\t</HFTA>\n");
2443
2444                   fclose(hfta_fl);
2445                 }else{
2446 //                                      debug only -- do code generation to catch generation-time errors.
2447                   hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
2448                 }
2449
2450                 hfta_count++;   // for hfta file names with numeric suffixes
2451
2452                 hfta_list[i]->get_external_libs(extra_external_libs);
2453
2454           }
2455
2456 string ext_lib_string;
2457 set<string>::iterator ssi_el;
2458 for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
2459         ext_lib_string += (*ssi_el)+" ";
2460
2461
2462
2463 //                      Report on the set of operator views
2464   for(i=0;i<opviews.size();++i){
2465         opview_entry *opve = opviews.get_entry(i);
2466         fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
2467         fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
2468         fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
2469         fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
2470         fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2471
2472         if (!opve->liveness_timeout) {
2473 //              fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
2474 //                      opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2475                 opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2476         }
2477         fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
2478     int j;
2479         for(j=0;j<opve->subq_names.size();j++)
2480                 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
2481         fprintf(qtree_output,"\t</UDOP>\n");
2482   }
2483
2484
2485 //-----------------------------------------------------------------
2486
2487 //                      Create interface-specific meta code files.
2488 //                              first, open and parse the interface resources file.
2489         ifaces_db = new ifq_t();
2490     ierr = "";
2491         if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
2492                 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
2493                                 ifx_fname.c_str(), ierr.c_str());
2494                 exit(1);
2495         }
2496
2497         map<string, vector<stream_query *> >::iterator svsi;
2498         for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
2499                 string lmach = (*svsi).first;
2500
2501         //              For this machine, create a set of lftas per interface.
2502                 vector<stream_query *> mach_lftas = (*svsi).second;
2503                 map<string, vector<stream_query *> > lfta_iface_lists;
2504                 int li;
2505                 for(li=0;li<mach_lftas.size();++li){
2506                         vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2507                         string lfta_iface = "_local_";
2508                         if(tvec.size()>0){
2509                                 string lfta_iface = tvec[0]->get_interface();
2510                         }
2511                         lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2512                 }
2513
2514                 map<string, vector<stream_query *> >::iterator lsvsi;
2515                 for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
2516                         int erri;
2517                         string liface = (*lsvsi).first;
2518                         vector<stream_query *> iface_lftas = (*lsvsi).second;
2519                         vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2520                         if(iface_codegen_type.size()){
2521                                 nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2522                                 if(!nicprop){
2523                                         fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2524                                         exit(1);
2525                                 }
2526                                 string mcs = generate_nic_code(iface_lftas, nicprop);
2527                                 string mcf_flnm;
2528                                 if(lmach != "")
2529                                   mcf_flnm = lmach + "_"+liface+".mcf";
2530                                 else
2531                                   mcf_flnm = hostname + "_"+liface+".mcf";
2532                                 FILE *mcf_fl ;
2533                                 if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
2534                                         fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
2535                                         exit(1);
2536                                 }
2537                                 fprintf(mcf_fl,"%s",mcs.c_str());
2538                                 fclose(mcf_fl);
2539 //printf("mcs of machine %s, iface %s of type %s is \n%s\n",
2540 //lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
2541                         }
2542                 }
2543
2544
2545         }
2546
2547
2548
2549 //-----------------------------------------------------------------
2550
2551
2552 //                      Find common filter predicates in the LFTAs.
2553 //                      in addition generate structs to store the temporal attributes unpacked by prefilter
2554         
2555         map<string, vector<stream_query *> >::iterator ssqi;
2556         for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
2557
2558                 string lmach = (*ssqi).first;
2559                 bool packed_return = false;
2560                 int li, erri;
2561
2562
2563 //      The LFTAs of this machine.
2564                 vector<stream_query *> mach_lftas = (*ssqi).second;
2565 //      break up on a per-interface basis.
2566                 map<string, vector<stream_query *> > lfta_iface_lists;
2567                 map<string, vector<int> > lfta_iface_qname_ix; // need the query name
2568                                                         // for fta_init
2569                 for(li=0;li<mach_lftas.size();++li){
2570                         vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2571                         string lfta_iface = "_local_";
2572                         if(tvec.size()>0){
2573                                 lfta_iface = tvec[0]->get_interface();
2574                         }
2575                         lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2576                         lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
2577                 }
2578
2579
2580 //      Are the return values "packed"?
2581 //      This should be done on a per-interface basis.
2582 //      But this is defunct code for gs-lite
2583                 for(li=0;li<mach_lftas.size();++li){
2584                   vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();
2585                   string liface = "_local_";
2586                   if(tvec.size()>0){
2587                          liface = tvec[0]->get_interface();
2588                   }
2589                   vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2590                   if(iface_codegen_type.size()){
2591                         if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
2592                         packed_return = true;
2593                         }
2594                   }
2595                 }
2596
2597
2598 // Separate lftas by interface, collect results on a per-interface basis.
2599
2600                 vector<cnf_set *> no_preds;     // fallback if there is no prefilter
2601                 map<string, vector<cnf_set *> > prefilter_preds;
2602                 set<unsigned int> pred_ids;     // this can be global for all interfaces
2603                 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2604                         string liface = (*mvsi).first;
2605                         vector<cnf_set *> empty_list;
2606                         prefilter_preds[liface] = empty_list;
2607                         if(! packed_return){
2608                                 get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
2609                         }
2610
2611 //                              get NIC capabilities.  (Is this needed?)
2612                         nic_property *nicprop = NULL;
2613                         vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2614                         if(iface_codegen_type.size()){
2615                                 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2616                                 if(!nicprop){
2617                                         fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2618                                         exit(1);
2619                                 }
2620                         }
2621                 }
2622
2623
2624 //              Now that we know the prefilter preds, generate the lfta code.
2625 //      Do this for all lftas in this machine.
2626                 for(li=0;li<mach_lftas.size();++li){
2627                         set<unsigned int> subsumed_preds;
2628                         set<unsigned int>::iterator sii;
2629 #ifdef PREFILTER_OK
2630                         for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
2631                                 int pid = (*sii);
2632                                 if((pid>>16) == li){
2633                                         subsumed_preds.insert(pid & 0xffff);
2634                                 }
2635                         }
2636 #endif
2637                         string lfta_schema_str = mach_lftas[li]->make_schema();
2638                         string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2639                         nic_property *nicprop = NULL; // no NIC properties?
2640                         lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
2641                 }
2642
2643
2644 //                      generate structs to store the temporal attributes
2645 //                      unpacked by prefilter
2646                 col_id_set temp_cids;
2647                 get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
2648                 lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
2649
2650 //                      Compute the lfta bit signatures and the lfta colrefs
2651 //      do this on a per-interface basis
2652 #ifdef PREFILTER_OK
2653                         lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2654 #endif
2655                 map<string, vector<long long int> > lfta_sigs; // used again later
2656                 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2657                         string liface = (*mvsi).first;
2658                         vector<long long int> empty_list;
2659                         lfta_sigs[liface] = empty_list;
2660
2661                         vector<col_id_set> lfta_cols;
2662                         vector<int> lfta_snap_length;
2663                         for(li=0;li<lfta_iface_lists[liface].size();++li){
2664                                 unsigned long long int mask=0, bpos=1;
2665                                 int f_pos;
2666                                 for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
2667                                         if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
2668                                                 mask |= bpos;
2669                                         bpos = bpos << 1;
2670                                 }
2671                                 lfta_sigs[liface].push_back(mask);
2672                                 lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
2673                                 lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
2674                         }
2675
2676 //for(li=0;li<mach_lftas.size();++li){
2677 //printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
2678 //col_id_set::iterator tcisi;
2679 //for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
2680 //printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
2681 //}
2682 //}
2683
2684
2685 //                      generate the prefilter
2686 //      Do this on a per-interface basis, except for the #define
2687 #ifdef PREFILTER_OK
2688 //                      lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2689                         lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2690 #else
2691                         lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns,  lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2692
2693 #endif
2694                 }
2695
2696 //                      Generate interface parameter lookup function
2697           lfta_val[lmach] += "// lookup interface properties by name\n";
2698           lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
2699           lfta_val[lmach] += "// returns NULL if given property does not exist\n";
2700           lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
2701
2702 //        collect a lit of interface names used by queries running on this host
2703           set<std::string> iface_names;
2704           for(i=0;i<mach_query_names[lmach].size();i++){
2705                 int mi = mach_query_names[lmach][i];
2706                 stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2707
2708                 if(interface_names[mi]=="")
2709                         iface_names.insert("DEFAULTDEV");
2710                 else
2711                         iface_names.insert(interface_names[mi]);
2712           }
2713
2714 //        generate interface property lookup code for every interface
2715           set<std::string>::iterator sir;
2716           for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
2717                 if (sir == iface_names.begin())
2718                         lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2719                 else
2720                         lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2721
2722                 // iterate through interface properties
2723                 vector<string> iface_properties;
2724                 if(*sir!="_local_"){    // dummy watchlist interface, don't process.
2725                         iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
2726                 }
2727                 if (erri) {
2728                         fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
2729                         exit(1);
2730                 }
2731                 if (iface_properties.empty())
2732                         lfta_val[lmach] += "\t\treturn NULL;\n";
2733                 else {
2734                         for (int i = 0; i < iface_properties.size(); ++i) {
2735                                 if (i == 0)
2736                                         lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2737                                 else
2738                                         lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2739
2740                                 // combine all values for the interface property using comma separator
2741                                 vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
2742                                 lfta_val[lmach] += "\t\t\treturn \"";
2743                                 for (int j = 0; j < vals.size(); ++j) {
2744                                         lfta_val[lmach] +=  vals[j];
2745                                         if (j != vals.size()-1)
2746                                                 lfta_val[lmach] += ",";
2747                                 }
2748                                 lfta_val[lmach] += "\";\n";
2749                         }
2750                         lfta_val[lmach] += "\t\t} else\n";
2751                         lfta_val[lmach] += "\t\t\treturn NULL;\n";
2752                 }
2753           }
2754           lfta_val[lmach] += "\t} else\n";
2755           lfta_val[lmach] += "\t\treturn NULL;\n";
2756           lfta_val[lmach] += "}\n\n";
2757
2758
2759 //                      Generate a full list of FTAs for clearinghouse reference
2760           lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
2761           lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
2762
2763           bool first = true;
2764           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2765                 string liface = (*mvsi).first;
2766                 if(liface != "_local_"){        // these don't register themselves
2767                         vector<stream_query *> lfta_list = (*mvsi).second;
2768                         for(i=0;i<lfta_list.size();i++){
2769                                 int mi = lfta_iface_qname_ix[liface][i];
2770                                 if(first) first = false;
2771                                 else      lfta_val[lmach] += ", ";
2772                                 lfta_val[lmach] += "\"" + registration_query_names[mi] + "\"";
2773                         }
2774                 }
2775           }
2776 //        for (i = 0; i < registration_query_names.size(); ++i) {
2777 //                 if (i)
2778 //                        lfta_val[lmach] += ", ";
2779 //                 lfta_val[lmach] += "\"" + registration_query_names[i] + "\"";
2780 //        }
2781
2782           for (i = 0; i < hfta_list.size(); ++i) {
2783                    lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
2784           }
2785           lfta_val[lmach] += ", NULL};\n\n";
2786
2787
2788 //                      Add the initialization function to lfta.c
2789 //      Change to accept the interface name, and 
2790 //      set the prefilter function accordingly.
2791 //      see the example in demo/err2
2792           lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
2793           lfta_val[lmach] += "//        note: the last parameter in fta_register is the prefilter signature\n";
2794
2795 //        for(i=0;i<mach_query_names[lmach].size();i++)
2796 //              int mi = mach_query_names[lmach][i];
2797 //              stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2798
2799           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2800                 string liface = (*mvsi).first;
2801                 vector<stream_query *> lfta_list = (*mvsi).second;
2802                 for(i=0;i<lfta_list.size();i++){
2803                         stream_query *lfta_sq = lfta_list[i];
2804                         int mi = lfta_iface_qname_ix[liface][i];
2805
2806                         if(liface == "_local_"){
2807 //  Don't register an init function, do the init code inline
2808                                 lfta_val[lmach] += "// initialization code for "+registration_query_names[mi]+";\n";
2809                                 lfta_val[lmach] += "\t"+generate_watchlist_name(registration_query_names[mi])+".filename = "+generate_watchlist_name(registration_query_names[mi])+"__fstr;\n";
2810                                 continue;
2811                         }
2812                 
2813                         fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
2814
2815                         string this_iface = "DEFAULTDEV";
2816                         if(interface_names[mi]!="")
2817                                 this_iface = '"'+interface_names[mi]+'"';
2818                         lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
2819                 lfta_val[lmach] += "\t\tfta_register(\""+registration_query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
2820 //              if(interface_names[mi]=="")
2821 //                              lfta_val[lmach]+="DEFAULTDEV";
2822 //              else
2823 //                              lfta_val[lmach]+='"'+interface_names[mi]+'"';
2824                         lfta_val[lmach] += this_iface;
2825
2826
2827                 lfta_val[lmach] += ", "+generate_alloc_name(registration_query_names[mi])
2828                         +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(registration_query_names[mi])
2829                         +"\n#endif\n";
2830                                 sprintf(tmpstr,",%d",snap_lengths[mi]);
2831                         lfta_val[lmach] += tmpstr;
2832
2833 //                      unsigned long long int mask=0, bpos=1;
2834 //                      int f_pos;
2835 //                      for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
2836 //                              if(prefilter_preds[f_pos]->lfta_id.count(i))
2837 //                                      mask |= bpos;
2838 //                              bpos = bpos << 1;
2839 //                      }
2840
2841 #ifdef PREFILTER_OK
2842 //                      sprintf(tmpstr,",%lluull",mask);
2843                         sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
2844                         lfta_val[lmach]+=tmpstr;
2845 #else
2846                         lfta_val[lmach] += ",0ull";
2847 #endif
2848
2849                         lfta_val[lmach] += ");\n";
2850
2851
2852
2853 //    End of lfta prefilter stuff
2854 // --------------------------------------------------
2855
2856 //                      If there is a field verifier, warn about
2857 //                      lack of compatability
2858                   string lfta_comment;
2859                   string lfta_title;
2860                   string lfta_namespace;
2861                   map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
2862                   if(ldefs.count("comment")>0)
2863                         lfta_comment = lfta_sq->defines["comment"];
2864                   if(ldefs.count("Comment")>0)
2865                         lfta_comment = lfta_sq->defines["Comment"];
2866                   if(ldefs.count("COMMENT")>0)
2867                         lfta_comment = lfta_sq->defines["COMMENT"];
2868                   if(ldefs.count("title")>0)
2869                         lfta_title = lfta_sq->defines["title"];
2870                   if(ldefs.count("Title")>0)
2871                         lfta_title = lfta_sq->defines["Title"];
2872                   if(ldefs.count("TITLE")>0)
2873                         lfta_title = lfta_sq->defines["TITLE"];
2874                   if(ldefs.count("NAMESPACE")>0)
2875                         lfta_namespace = lfta_sq->defines["NAMESPACE"];
2876                   if(ldefs.count("Namespace")>0)
2877                         lfta_namespace = lfta_sq->defines["Namespace"];
2878                   if(ldefs.count("namespace")>0)
2879                         lfta_namespace = lfta_sq->defines["namespace"];
2880
2881                   string lfta_ht_size;
2882                   if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
2883                         lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
2884                   if(ldefs.count("aggregate_slots")>0){
2885                         lfta_ht_size = ldefs["aggregate_slots"];
2886                   }
2887
2888 //                      NOTE : I'm assuming that visible lftas do not start with _fta.
2889 //                              -- will fail for non-visible simple selection queries.
2890                 if(field_verifier != NULL && registration_query_names[mi].substr(0,1) != "_"){
2891                         string warning_str;
2892                         if(lfta_comment == "")
2893                                 warning_str += "\tcomment not found.\n";
2894 // Obsolete stuff that carsten wanted
2895 //                      if(lfta_title == "")
2896 //                              warning_str += "\ttitle not found.\n";
2897 //                      if(lfta_namespace == "")
2898 //                              warning_str += "\tnamespace not found.\n";
2899
2900                         vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2901                         int fi;
2902                         for(fi=0;fi<flds.size();fi++){
2903                                 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2904                         }
2905                         if(warning_str != "")
2906                                 fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
2907                                         registration_query_names[mi].c_str(),warning_str.c_str());
2908                 }
2909
2910
2911 //                      Create qtree output
2912                 fprintf(qtree_output,"\t<LFTA name='%s' >\n",registration_query_names[mi].c_str());
2913         if(lfta_comment != "")
2914               fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
2915         if(lfta_title != "")
2916               fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
2917         if(lfta_namespace != "")
2918               fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
2919         if(lfta_ht_size != "")
2920               fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
2921                 if(lmach != "")
2922                   fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
2923                 else
2924                   fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
2925                 fprintf(qtree_output,"\t\t<Interface  value='%s' />\n",interface_names[mi].c_str());
2926                 std::vector<tablevar_t *> itbls = lfta_sq->get_input_tables();
2927                 for(int t=0;t<itbls.size();++t){
2928                         fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",itbls[t]->get_schema_name().c_str());
2929                 }
2930 //              fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
2931                 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2932                 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
2933 //                              write info about fields to qtree.xml
2934                   vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2935                   int fi;
2936                   for(fi=0;fi<flds.size();fi++){
2937                         fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2938                         if(flds[fi]->get_modifier_list()->size()){
2939                                 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2940                         }
2941                         fprintf(qtree_output," />\n");
2942                   }
2943                 fprintf(qtree_output,"\t</LFTA>\n");
2944
2945
2946             }
2947           }
2948
2949           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2950                         string liface = (*mvsi).first;
2951                         lfta_val[lmach] += 
2952 "       if (!strcmp(device, \""+liface+"\")) \n"
2953 "               lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
2954 ;
2955                 }
2956                 lfta_val[lmach] += 
2957 "       if(lfta_prefilter == NULL){\n"
2958 "               fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
2959 "               exit(1);\n"
2960 "       }\n"
2961 ;
2962
2963
2964
2965           lfta_val[lmach] += "}\n\n";
2966
2967       if(!(debug_only || hfta_only) ){
2968                 string lfta_flnm;
2969                 if(lmach != "")
2970                   lfta_flnm = lmach + "_lfta.c";
2971                 else
2972                   lfta_flnm = hostname + "_lfta.c";
2973                 if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
2974                         fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
2975                         exit(1);
2976                 }
2977               fprintf(lfta_out,"%s",lfta_header.c_str());
2978               fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
2979               fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
2980                 fclose(lfta_out);
2981           }
2982         }
2983
2984 //              Say what are the operators which must execute
2985         if(opviews.size()>0)
2986                 fprintf(stderr,"The queries use the following external operators:\n");
2987         for(i=0;i<opviews.size();++i){
2988                 opview_entry *opv = opviews.get_entry(i);
2989                 fprintf(stderr,"\t%s\n",opv->view_name.c_str());
2990         }
2991
2992         if(create_makefile)
2993                 generate_makefile(input_file_names, nfiles, hfta_names, opviews,
2994                 machine_names, schema_file_name,
2995                 interface_names,
2996                 ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
2997
2998
2999         fprintf(qtree_output,"</QueryNodes>\n");
3000
3001         return(0);
3002 }
3003
3004 ////////////////////////////////////////////////////////////
3005
3006 void generate_makefile(vector<string> &input_file_names, int nfiles,
3007                                            vector<string> &hfta_names, opview_set &opviews,
3008                                                 vector<string> &machine_names,
3009                                                 string schema_file_name,
3010                                                 vector<string> &interface_names,
3011                                                 ifq_t *ifdb, string &config_dir_path,
3012                                                 bool use_pads,
3013                                                 string extra_libs,
3014                                                 map<string, vector<int> > &rts_hload
3015                                          ){
3016         int i,j;
3017
3018         if(config_dir_path != ""){
3019                 config_dir_path = "-C "+config_dir_path;
3020         }
3021
3022         struct stat sb;
3023         bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
3024         bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
3025
3026 //      if(libz_exists && !libast_exists){
3027 //              fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
3028 //              exit(1);
3029 //      }
3030
3031 //                      Get set of operator executable files to run
3032         set<string> op_fls;
3033         set<string>::iterator ssi;
3034         for(i=0;i<opviews.size();++i){
3035                 opview_entry *opv = opviews.get_entry(i);
3036                 if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
3037         }
3038
3039         FILE *outfl = fopen("Makefile", "w");
3040         if(outfl==NULL){
3041                 fprintf(stderr,"Can't open Makefile for write, exiting.\n");
3042                 exit(0);
3043         }
3044
3045         fputs(
3046 ("CPP= g++ -O3 -g -I "+root_path+"/include  -I "+root_path+"/include/hfta\n"
3047 "CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
3048 ).c_str(), outfl
3049 );
3050         if(generate_stats)
3051                 fprintf(outfl,"  -DLFTA_STATS");
3052
3053 //              Gather the set of interfaces
3054 //              Also, gather "base interface names" for use in computing
3055 //              the hash splitting to virtual interfaces.
3056 //              TODO : must update to hanndle machines
3057         set<string> ifaces;
3058         set<string> base_vifaces;       // base interfaces of virtual interfaces
3059         map<string, string> ifmachines;
3060         map<string, string> ifattrs;
3061         for(i=0;i<interface_names.size();++i){
3062                 ifaces.insert(interface_names[i]);
3063                 ifmachines[interface_names[i]] = machine_names[i];
3064
3065                 size_t Xpos = interface_names[i].find_last_of("X");
3066                 if(Xpos!=string::npos){
3067                         string iface = interface_names[i].substr(0,Xpos);
3068                         base_vifaces.insert(iface);
3069                 }
3070                 // get interface attributes and add them to the list
3071         }
3072
3073 //              Do we need to include protobuf libraries?
3074 //              TODO Move to the interface library: get the libraries to include 
3075 //              for an interface type
3076
3077         bool use_proto = false;
3078         bool use_bsa = false;
3079         bool use_kafka = false;
3080         int erri;
3081         string err_str;
3082         for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3083                 string ifnm = (*ssi);
3084                 vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
3085                 for(int ift_i=0;ift_i<ift.size();ift_i++){
3086                         if(ift[ift_i]=="PROTO"){
3087 #ifdef PROTO_ENABLED                            
3088                                 use_proto = true;
3089 #else
3090                                 fprintf(stderr,"Runtime libraries built without PROTO support. Rebuild with PROTO_ENABLED defined in gsoptions.h\n");
3091                                 exit(0);
3092 #endif
3093                         }
3094                 }
3095                 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "BSA", erri, err_str);
3096                 for(int ift_i=0;ift_i<ift.size();ift_i++){
3097                         if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){       
3098 #ifdef BSA_ENABLED                              
3099                                 use_bsa = true;
3100 #else
3101                                 fprintf(stderr,"Runtime libraries built without BSA support. Rebuild with BSA_ENABLED defined in gsoptions.h\n");
3102                                 exit(0);
3103 #endif                                  
3104                         }
3105                 }
3106                 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "KAFKA", erri, err_str);
3107                 for(int ift_i=0;ift_i<ift.size();ift_i++){
3108                         if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){       
3109 #ifdef KAFKA_ENABLED                            
3110                                 use_kafka = true;
3111 #else
3112                                 fprintf(stderr,"Runtime libraries built without KAFKA support. Rebuild with KAFKA_ENABLED defined in gsoptions.h\n");
3113                                 exit(0);
3114 #endif  
3115                         }
3116                 }
3117         }
3118
3119         fprintf(outfl,
3120 "\n"
3121 "\n"
3122 "all: rts");
3123         for(i=0;i<hfta_names.size();++i)
3124                 fprintf(outfl," %s",hfta_names[i].c_str());
3125         fputs(
3126 ("\n"
3127 "\n"
3128 "rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a  "+root_path+"/lib/libclearinghouse.a\n"
3129 "\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
3130         if(use_pads)
3131                 fprintf(outfl,"-L. ");
3132         fputs(
3133 ("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
3134         if(use_pads)
3135                 fprintf(outfl,"-lgscppads -lpads ");
3136         fprintf(outfl,
3137 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux  -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
3138         if(use_pads)
3139                 fprintf(outfl, " -lpz -lz -lbz ");
3140         if(libz_exists && libast_exists)
3141                 fprintf(outfl," -last ");
3142         if(use_pads)
3143                 fprintf(outfl, " -ldll -ldl ");
3144
3145 #ifdef PROTO_ENABLED    
3146         fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
3147 #endif
3148 #ifdef BSA_ENABLED      
3149         fprintf(outfl, " -lbsa_stream ");
3150 #endif
3151 #ifdef KAFKA_ENABLED    
3152         fprintf(outfl, " -lrdkafka ");
3153 #endif
3154         fprintf(outfl," -lgscpaux");
3155 #ifdef GCOV
3156         fprintf(outfl," -fprofile-arcs");
3157 #endif
3158         fprintf(outfl,
3159 "\n"
3160 "\n"
3161 "lfta.o: %s_lfta.c\n"
3162 "\t$(CC) -o lfta.o -c %s_lfta.c\n"
3163 "\n"
3164 "%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
3165         for(i=0;i<nfiles;++i)
3166                 fprintf(outfl," %s",input_file_names[i].c_str());
3167         if(hostname == ""){
3168                 fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3169         }else{
3170                 fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3171         }
3172         for(i=0;i<nfiles;++i)
3173                 fprintf(outfl," %s",input_file_names[i].c_str());
3174         fprintf(outfl,"\n");
3175
3176         for(i=0;i<hfta_names.size();++i)
3177                 fprintf(outfl,
3178 ("%s: %s.o\n"
3179 "\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
3180 "\n"
3181 "%s.o: %s.cc\n"
3182 "\t$(CPP) -o %s.o -c %s.cc\n"
3183 "\n"
3184 "\n").c_str(),
3185     hfta_names[i].c_str(), hfta_names[i].c_str(),
3186         hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
3187         hfta_names[i].c_str(), hfta_names[i].c_str(),
3188         hfta_names[i].c_str(), hfta_names[i].c_str()
3189                 );
3190
3191         fprintf(outfl,
3192 ("\n"
3193 "packet_schema.txt:\n"
3194 "\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
3195 "\n"
3196 "external_fcns.def:\n"
3197 "\tln -s "+root_path+"/cfg/external_fcns.def .\n"
3198 "\n"
3199 "clean:\n"
3200 "\trm -rf core rts *.o  %s_lfta.c  external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
3201         for(i=0;i<hfta_names.size();++i)
3202                 fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
3203         fprintf(outfl,"\n");
3204
3205         fclose(outfl);
3206
3207
3208
3209 //              Gather the set of interfaces
3210 //              TODO : must update to hanndle machines
3211 //              TODO : lookup interface attributes and add them as a parameter to rts process
3212         outfl = fopen("runit", "w");
3213         if(outfl==NULL){
3214                 fprintf(stderr,"Can't open runit for write, exiting.\n");
3215                 exit(0);
3216         }
3217
3218
3219         fputs(
3220 ("#!/bin/sh\n"
3221 "./stopit\n"
3222 +root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
3223 "sleep 5\n"
3224 "if [ ! -f gshub.log ]\n"
3225 "then\n"
3226 "\techo \"Failed to start bin/gshub.py\"\n"
3227 "\texit -1\n"
3228 "fi\n"
3229 "ADDR=`cat gshub.log`\n"
3230 "ps opgid= $! >> gs.pids\n"
3231 "./rts $ADDR default ").c_str(), outfl);
3232 //      int erri;
3233 //      string err_str;
3234         for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3235                 string ifnm = (*ssi);
3236                 // suppress internal _local_ interface
3237                 if (ifnm == "_local_")
3238                         continue;
3239                 fprintf(outfl, "%s ",ifnm.c_str());
3240                 vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
3241                 for(j=0;j<ifv.size();++j)
3242                         fprintf(outfl, "%s ",ifv[j].c_str());
3243         }
3244         fprintf(outfl, " &\n");
3245         fprintf(outfl, "echo $! >> gs.pids\n");
3246         for(i=0;i<hfta_names.size();++i)
3247                 fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
3248
3249         for(j=0;j<opviews.opview_list.size();++j){
3250                 fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
3251         }
3252
3253         fclose(outfl);
3254         system("chmod +x runit");
3255
3256         outfl = fopen("stopit", "w");
3257         if(outfl==NULL){
3258                 fprintf(stderr,"Can't open stopit for write, exiting.\n");
3259                 exit(0);
3260         }
3261
3262         fprintf(outfl,"#!/bin/sh\n"
3263 "rm -f gshub.log\n"
3264 "if [ ! -f gs.pids ]\n"
3265 "then\n"
3266 "exit\n"
3267 "fi\n"
3268 "for pgid in `cat gs.pids`\n"
3269 "do\n"
3270 "kill -TERM -$pgid\n"
3271 "done\n"
3272 "sleep 1\n"
3273 "for pgid in `cat gs.pids`\n"
3274 "do\n"
3275 "kill -9 -$pgid\n"
3276 "done\n"
3277 "rm gs.pids\n");
3278
3279         fclose(outfl);
3280         system("chmod +x stopit");
3281
3282 //-----------------------------------------------
3283
3284 /* For now disable support for virtual interfaces
3285         outfl = fopen("set_vinterface_hash.bat", "w");
3286         if(outfl==NULL){
3287                 fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
3288                 exit(0);
3289         }
3290
3291 //              The format should be determined by an entry in the ifres.xml file,
3292 //              but for now hardcode the only example I have.
3293         for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
3294                 if(rts_hload.count((*ssi))){
3295                         string iface_name = (*ssi);
3296                         string iface_number = "";
3297                         for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
3298                                 if(isdigit(iface_name[j])){
3299                                         iface_number = iface_name[j];
3300                                         if(j>0 && isdigit(iface_name[j-1]))
3301                                                 iface_number = iface_name[j-1] + iface_number;
3302                                 }
3303                         }
3304
3305                         fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
3306                         vector<int> halloc = rts_hload[iface_name];
3307                         int prev_limit = 0;
3308                         for(j=0;j<halloc.size();++j){
3309                                 if(j>0)
3310                                         fprintf(outfl,":");
3311                                 fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
3312                                 prev_limit = halloc[j];
3313                         }
3314                         fprintf(outfl,"\n");
3315                 }
3316         }
3317         fclose(outfl);
3318         system("chmod +x set_vinterface_hash.bat");
3319 */
3320 }
3321
3322 //              Code for implementing a local schema
3323 /*
3324                 table_list qpSchema;
3325
3326 //                              Load the schemas of any LFTAs.
3327                 int l;
3328                 for(l=0;l<hfta_nbr;++l){
3329                         stream_query *sq0 = split_queries[l];
3330                         table_def *td = sq0->get_output_tabledef();
3331                         qpSchema.append_table(td);
3332                 }
3333 //                              load the schemas of any other ref'd tables.
3334 //                              (e.g., hftas)
3335                 vector<tablevar_t *>  input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
3336                 int ti;
3337                 for(ti=0;ti<input_tbl_names.size();++ti){
3338                         int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
3339                         if(tbl_ref < 0){
3340                                 tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
3341                                 if(tbl_ref < 0){
3342                                         fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
3343                                         exit(1);
3344                                 }
3345                                 qpSchema.append_table(Schema->get_table(tbl_ref));
3346                         }
3347                 }
3348 */
3349
3350 //              Functions related to parsing.
3351
3352 /*
3353 static int split_string(char *instr,char sep, char **words,int max_words){
3354    char *loc;
3355    char *str;
3356    int nwords = 0;
3357
3358    str = instr;
3359    words[nwords++] = str;
3360    while( (loc = strchr(str,sep)) != NULL){
3361         *loc = '\0';
3362         str = loc+1;
3363         if(nwords >= max_words){
3364                 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
3365                 nwords = max_words-1;
3366         }
3367         words[nwords++] = str;
3368    }
3369
3370    return(nwords);
3371 }
3372
3373 */
3374