Added quantiling UDAFs
[com/gs-lite.git] / src / ftacmp / translate_fta.cc
1 /* ------------------------------------------------\r
2 Copyright 2014 AT&T Intellectual Property\r
3    Licensed under the Apache License, Version 2.0 (the "License");\r
4    you may not use this file except in compliance with the License.\r
5    You may obtain a copy of the License at\r
6 \r
7      http://www.apache.org/licenses/LICENSE-2.0\r
8 \r
9    Unless required by applicable law or agreed to in writing, software\r
10    distributed under the License is distributed on an "AS IS" BASIS,\r
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12    See the License for the specific language governing permissions and\r
13    limitations under the License.\r
14  ------------------------------------------- */\r
15 \r
16 #include<unistd.h>              // for gethostname\r
17 \r
18 #include <string>\r
19 #include "parse_fta.h"\r
20 #include "parse_schema.h"\r
21 #include "parse_ext_fcns.h"\r
22 #include"analyze_fta.h"\r
23 #include"query_plan.h"\r
24 #include"generate_lfta_code.h"\r
25 #include"stream_query.h"\r
26 #include"generate_utils.h"\r
27 #include"nic_def.h"\r
28 #include"generate_nic_code.h"\r
29 \r
30 #include <stdlib.h>\r
31 #include <stdio.h>\r
32 #include<ctype.h>\r
33 #include<glob.h>\r
34 #include<string.h>\r
35 \r
36 #include<list>\r
37 \r
38 //              for the scandir\r
39      #include <sys/types.h>\r
40      #include <dirent.h>\r
41 \r
42 \r
43 #include<errno.h>\r
44 \r
45 //              to verify that some files exist.\r
46      #include <sys/types.h>\r
47      #include <sys/stat.h>\r
48 \r
49 #include "parse_partn.h"\r
50 \r
51 #include "print_plan.h"\r
52 \r
53 //              Interface to the xml parser\r
54 \r
55 #include"xml_t.h"\r
56 #include"field_list.h"\r
57 \r
58 extern int xmlParserparse(void);\r
59 extern FILE *xmlParserin;\r
60 extern int xmlParserdebug;\r
61 \r
62 std::vector<std::string> xml_attr_vec;\r
63 std::vector<std::string> xml_val_vec;\r
64 std::string xml_a, xml_v;\r
65 xml_t *xml_leaves = NULL;\r
66 \r
67 //      Interface to the field list verifier\r
68 field_list *field_verifier = NULL;\r
69 \r
70 #define TMPSTRLEN 1000\r
71 \r
72 #ifndef PATH_DELIM\r
73   #define PATH_DELIM '/'\r
74 #endif\r
75 \r
76 char tmp_schema_str[10000];\r
77 \r
78 // maximum delay between two hearbeats produced\r
79 // by UDOP. Used when its not explicity\r
80 // provided in udop definition\r
81 #define DEFAULT_UDOP_LIVENESS_TIMEOUT 5\r
82 \r
83 //              Default lfta hash table size, must be power of 2.\r
84 int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;\r
85 \r
86 //              Interface to FTA definition lexer and parser ...\r
87 \r
88 extern int FtaParserparse(void);\r
89 extern FILE *FtaParserin;\r
90 extern int FtaParserdebug;\r
91 \r
92 fta_parse_t *fta_parse_result;\r
93 var_defs_t *fta_parse_defines;\r
94 \r
95 \r
96 \r
97 //              Interface to external function lexer and parser ...\r
98 \r
99 extern int Ext_fcnsParserparse(void);\r
100 extern FILE *Ext_fcnsParserin;\r
101 extern int Ext_fcnsParserdebug;\r
102 \r
103 ext_fcn_list *Ext_fcns;\r
104 \r
105 \r
106 //              Interface to partition definition parser\r
107 extern int PartnParserparse();\r
108 partn_def_list_t *partn_parse_result = NULL;\r
109 \r
110 \r
111 \r
112 using namespace std;\r
113 //extern int errno;\r
114 \r
115 \r
116 //              forward delcaration of local utility function\r
117 void generate_makefile(vector<string> &input_file_names, int nfiles,\r
118                                            vector<string> &hfta_names, opview_set &opviews,\r
119                                                 vector<string> &machine_names,\r
120                                                 string schema_file_name,\r
121                                                 vector<string> &interface_names,\r
122                                                 ifq_t *ifdb, string &config_dir_path,\r
123                                                 bool use_pads,\r
124                                                 string extra_libs,\r
125                                                 map<string, vector<int> > &rts_hload\r
126                                         );\r
127 \r
128 //static int split_string(char *instr,char sep, char **words,int max_words);\r
129 #define MAXFLDS 100\r
130 \r
131   FILE *schema_summary_output = NULL;           // query names\r
132 \r
133 //                      Dump schema summary\r
134 void dump_summary(stream_query *str){\r
135         fprintf(schema_summary_output,"%s\n",str->query_name.c_str());\r
136 \r
137         table_def *sch = str->get_output_tabledef();\r
138 \r
139         vector<field_entry *> flds = sch->get_fields();\r
140         int f;\r
141         for(f=0;f<flds.size();++f){\r
142                 if(f>0) fprintf(schema_summary_output,"|");\r
143                 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());\r
144         }\r
145         fprintf(schema_summary_output,"\n");\r
146         for(f=0;f<flds.size();++f){\r
147                 if(f>0) fprintf(schema_summary_output,"|");\r
148                 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());\r
149         }\r
150         fprintf(schema_summary_output,"\n");\r
151 }\r
152 \r
153 //              Globals\r
154 string hostname;                // name of current host.\r
155 int hostname_len;\r
156 bool generate_stats = false;\r
157 string root_path = "../..";\r
158 \r
159 \r
160 int main(int argc, char **argv){\r
161   char tmpstr[TMPSTRLEN];\r
162   string err_str;\r
163   int q,s,h,f;\r
164 \r
165   set<int>::iterator si;\r
166 \r
167   vector<string> query_names;                   // for lfta.c registration\r
168   map<string, vector<int> > mach_query_names;   // list queries of machine\r
169   vector<int> snap_lengths;                             // for lfta.c registration\r
170   vector<string> interface_names;                       // for lfta.c registration\r
171   vector<string> machine_names;                 // machine of interface\r
172   vector<bool> lfta_reuse_options;                      // for lfta.c registration\r
173   vector<int> lfta_liveness_timeouts;           // fot qtree.xml generation\r
174   vector<string> hfta_names;                    // hfta cource code names, for\r
175                                                                                 // creating make file.\r
176   vector<string> qnames;                                // ensure unique names\r
177   map<string, int> lfta_names;                  // keep track of unique lftas.\r
178 \r
179 \r
180 //                              set these to 1 to debug the parser\r
181   FtaParserdebug = 0;\r
182   Ext_fcnsParserdebug = 0;\r
183 \r
184   FILE *lfta_out;                               // lfta.c output.\r
185   FILE *fta_in;                                 // input file\r
186   FILE *table_schemas_in;               // source tables definition file\r
187   FILE *query_name_output;              // query names\r
188   FILE *qtree_output;                   // interconnections of query nodes\r
189 \r
190   // -------------------------------\r
191   // Handling of Input Arguments\r
192   // -------------------------------\r
193     char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";\r
194         const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"\r
195                 "\t[-B] : debug only (don't create output files)\n"\r
196                 "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"\r
197                 "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"\r
198                 "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"\r
199                 "\t[-C] : use <config directory> for definition files\n"\r
200                 "\t[-l] : use <library directory> for library queries\n"\r
201                 "\t[-N] : output query names in query_names.txt\n"\r
202                 "\t[-H] : create HFTA only (no schema_file)\n"\r
203                 "\t[-Q] : use query name for hfta suffix\n"\r
204                 "\t[-M] : generate make file and runit, stopit scripts\n"\r
205                 "\t[-S] : enable LFTA statistics (alters Makefile).\n"\r
206                 "\t[-f] : Output schema summary to schema_summary.txt\n"\r
207                 "\t[-P] : link with PADS\n"\r
208                 "\t[-h] : override host name.\n"\r
209                 "\t[-c] : clean out Makefile and hfta_*.cc first.\n"\r
210                 "\t[-R] : path to root of GS-lite\n"\r
211 ;\r
212 \r
213 //              parameters gathered from command line processing\r
214         string external_fcns_path;\r
215 //      string internal_fcn_path;\r
216         string config_dir_path;\r
217         string library_path = "./";\r
218         vector<string> input_file_names;\r
219         string schema_file_name;\r
220         bool debug_only = false;\r
221         bool hfta_only = false;\r
222         bool output_query_names = false;\r
223         bool output_schema_summary=false;\r
224         bool numeric_hfta_flname = true;\r
225         bool create_makefile = false;\r
226         bool distributed_mode = false;\r
227         bool partitioned_mode = false;\r
228         bool use_live_hosts_file = false;\r
229         bool use_pads = false;\r
230         bool clean_make = false;\r
231         int n_virtual_interfaces = 1;\r
232 \r
233    char chopt;\r
234    while((chopt = getopt(argc,argv,optstr)) != -1){\r
235                 switch(chopt){\r
236                 case 'B':\r
237                         debug_only = true;\r
238                         break;\r
239                 case 'D':\r
240                         distributed_mode = true;\r
241                         break;\r
242                 case 'p':\r
243                         partitioned_mode = true;\r
244                         break;\r
245                 case 'L':\r
246                         use_live_hosts_file = true;\r
247                         break;\r
248                 case 'C':\r
249                                 if(optarg != NULL)\r
250                                  config_dir_path = string(optarg) + string("/");\r
251                         break;\r
252                 case 'l':\r
253                                 if(optarg != NULL)\r
254                                  library_path = string(optarg) + string("/");\r
255                         break;\r
256                 case 'N':\r
257                         output_query_names = true;\r
258                         break;\r
259                 case 'Q':\r
260                         numeric_hfta_flname = false;\r
261                         break;\r
262                 case 'H':\r
263                         if(schema_file_name == ""){\r
264                                 hfta_only = true;\r
265                         }\r
266                         break;\r
267                 case 'f':\r
268                         output_schema_summary=true;\r
269                         break;\r
270                 case 'M':\r
271                         create_makefile=true;\r
272                         break;\r
273                 case 'S':\r
274                         generate_stats=true;\r
275                         break;\r
276                 case 'P':\r
277                         use_pads = true;\r
278                         break;\r
279                 case 'c':\r
280                         clean_make = true;\r
281                         break;\r
282                 case 'h':\r
283                         if(optarg != NULL)\r
284                                 hostname = optarg;\r
285                         break;\r
286                 case 'R':\r
287                         if(optarg != NULL)\r
288                                 root_path = optarg;\r
289                         break;\r
290                 case 'n':\r
291                         if(optarg != NULL){\r
292                                 n_virtual_interfaces = atoi(optarg);\r
293                                 if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){\r
294                                         fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);\r
295                                         n_virtual_interfaces = 1;\r
296                                 }\r
297                         }\r
298                         break;\r
299                 case '?':\r
300                         fprintf(stderr,"Error, argument %c not recognized.\n",optopt);\r
301                         fprintf(stderr,"%s\n", usage_str);\r
302                         exit(1);\r
303                 default:\r
304                         fprintf(stderr, "Argument was %c\n", optopt);\r
305                         fprintf(stderr,"Invalid arguments\n");\r
306                         fprintf(stderr,"%s\n", usage_str);\r
307                         exit(1);\r
308                 }\r
309         }\r
310         argc -= optind;\r
311         argv += optind;\r
312         for (int i = 0; i < argc; ++i) {\r
313                 if((schema_file_name == "") && !hfta_only){\r
314                         schema_file_name = argv[i];\r
315                 }else{\r
316                         input_file_names.push_back(argv[i]);\r
317                 }\r
318         }\r
319 \r
320         if(input_file_names.size() == 0){\r
321                 fprintf(stderr,"%s\n", usage_str);\r
322                 exit(1);\r
323         }\r
324 \r
325         if(clean_make){\r
326                 string clean_cmd = "rm Makefile hfta_*.cc";\r
327                 int clean_ret = system(clean_cmd.c_str());\r
328                 if(clean_ret){\r
329                         fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);\r
330                 }\r
331         }\r
332 \r
333 \r
334         nic_prop_db *npdb = new nic_prop_db(config_dir_path);\r
335 \r
336 //                      Open globally used file names.\r
337 \r
338         // prepend config directory to schema file\r
339         schema_file_name = config_dir_path + schema_file_name;\r
340         external_fcns_path = config_dir_path + string("external_fcns.def");\r
341     string ifx_fname = config_dir_path + string("ifres.xml");\r
342 \r
343 //              Find interface query file(s).\r
344         if(hostname == ""){\r
345                 gethostname(tmpstr,TMPSTRLEN);\r
346                 hostname = tmpstr;\r
347         }\r
348         hostname_len = strlen(tmpstr);\r
349     string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");\r
350         vector<string> ifq_fls;\r
351 \r
352                 ifq_fls.push_back(ifq_fname);\r
353 \r
354 \r
355 //                      Get the field list, if it exists\r
356         string flist_fl = config_dir_path + "field_list.xml";\r
357         FILE *flf_in = NULL;\r
358         if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {\r
359                 fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());\r
360                 xml_leaves = new xml_t();\r
361                 xmlParser_setfileinput(flf_in);\r
362                 if(xmlParserparse()){\r
363                         fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());\r
364                 }else{\r
365                         field_verifier = new field_list(xml_leaves);\r
366                 }\r
367         }\r
368 \r
369         if(!hfta_only){\r
370           if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {\r
371                 fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));\r
372                 exit(1);\r
373           }\r
374         }\r
375 \r
376 /*\r
377         if(!(debug_only || hfta_only)){\r
378           if((lfta_out = fopen("lfta.c","w")) == NULL){\r
379                 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));\r
380                 exit(1);\r
381           }\r
382         }\r
383 */\r
384 \r
385 //              Get the output specification file.\r
386 //              format is\r
387 //                      query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions\r
388         string ospec_fl = "output_spec.cfg";\r
389         FILE *osp_in = NULL;\r
390         vector<ospec_str *> output_specs;\r
391         multimap<string, int> qname_to_ospec;\r
392         if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {\r
393                 char *flds[MAXFLDS];\r
394                 int o_lineno = 0;\r
395                 while(fgets(tmpstr,TMPSTRLEN,osp_in)){\r
396                         o_lineno++;\r
397                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);\r
398                         if(nflds == 7){\r
399 //              make operator type lowercase\r
400                                 char *tmpc;\r
401                                 for(tmpc=flds[1];*tmpc!='\0';++tmpc)\r
402                                         *tmpc = tolower(*tmpc);\r
403 \r
404                                 ospec_str *tmp_ospec = new ospec_str();\r
405                                 tmp_ospec->query = flds[0];\r
406                                 tmp_ospec->operator_type = flds[1];\r
407                                 tmp_ospec->operator_param = flds[2];\r
408                                 tmp_ospec->output_directory = flds[3];\r
409                                 tmp_ospec->bucketwidth = atoi(flds[4]);\r
410                                 tmp_ospec->partitioning_flds = flds[5];\r
411                                 tmp_ospec->n_partitions = atoi(flds[6]);\r
412                                 qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));\r
413                                 output_specs.push_back(tmp_ospec);\r
414                         }else{\r
415                                 fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);\r
416                         }\r
417                 }\r
418                 fclose(osp_in);\r
419         }else{\r
420                 fprintf(stderr,"output_spec.cfg not found.  The query set has no output.  exiting.\n");\r
421                 exit(1);\r
422         }\r
423 \r
424 //              hfta parallelism\r
425         string pspec_fl = "hfta_parallelism.cfg";\r
426         FILE *psp_in = NULL;\r
427         map<string, int> hfta_parallelism;\r
428         if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){\r
429                 char *flds[MAXFLDS];\r
430                 int o_lineno = 0;\r
431                 while(fgets(tmpstr,TMPSTRLEN,psp_in)){\r
432                         bool good_entry = true;\r
433                         o_lineno++;\r
434                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);\r
435                         if(nflds == 2){\r
436                                 string hname = flds[0];\r
437                                 int par = atoi(flds[1]);\r
438                                 if(par <= 0 || par > n_virtual_interfaces){\r
439                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);\r
440                                         good_entry = false;\r
441                                 }\r
442                                 if(good_entry && n_virtual_interfaces % par != 0){\r
443                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);\r
444                                         good_entry = false;\r
445                                 }\r
446                                 if(good_entry)\r
447                                         hfta_parallelism[hname] = par;\r
448                         }\r
449                 }\r
450         }else{\r
451                 fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());\r
452         }\r
453 \r
454 \r
455 //              LFTA hash table sizes\r
456         string htspec_fl = "lfta_htsize.cfg";\r
457         FILE *htsp_in = NULL;\r
458         map<string, int> lfta_htsize;\r
459         if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){\r
460                 char *flds[MAXFLDS];\r
461                 int o_lineno = 0;\r
462                 while(fgets(tmpstr,TMPSTRLEN,htsp_in)){\r
463                         bool good_entry = true;\r
464                         o_lineno++;\r
465                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);\r
466                         if(nflds == 2){\r
467                                 string lfta_name = flds[0];\r
468                                 int htsz = atoi(flds[1]);\r
469                                 if(htsz>0){\r
470                                         lfta_htsize[lfta_name] = htsz;\r
471                                 }else{\r
472                                         fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);\r
473                                 }\r
474                         }\r
475                 }\r
476         }else{\r
477                 fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());\r
478         }\r
479 \r
480 //              LFTA vitual interface hash split\r
481         string rtlspec_fl = "rts_load.cfg";\r
482         FILE *rtl_in = NULL;\r
483         map<string, vector<int> > rts_hload;\r
484         if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){\r
485                 char *flds[MAXFLDS];\r
486                 int r_lineno = 0;\r
487                 string iface_name;\r
488                 vector<int> hload;\r
489                 while(fgets(tmpstr,TMPSTRLEN,rtl_in)){\r
490                         bool good_entry = true;\r
491                         r_lineno++;\r
492                         iface_name = "";\r
493                         hload.clear();\r
494                         int nflds = split_string(tmpstr,',',flds,MAXFLDS);\r
495                         if(nflds >1){\r
496                                 iface_name = flds[0];\r
497                                 int cumm_h = 0;\r
498                                 int j;\r
499                                 for(j=1;j<nflds;++j){\r
500                                         int h = atoi(flds[j]);\r
501                                         if(h<=0)\r
502                                                 good_entry = false;\r
503                                         cumm_h += h;\r
504                                         hload.push_back(cumm_h);\r
505                                 }\r
506                         }else{\r
507                                 good_entry = false;\r
508                         }\r
509                         if(good_entry){\r
510                                 rts_hload[iface_name] = hload;\r
511                         }else{\r
512                                 fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());\r
513                         }\r
514                 }\r
515         }\r
516 \r
517 \r
518 \r
519         if(output_query_names){\r
520           if((query_name_output = fopen("query_names.txt","w")) == NULL){\r
521                 fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));\r
522                 exit(1);\r
523           }\r
524         }\r
525 \r
526         if(output_schema_summary){\r
527           if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){\r
528                 fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));\r
529                 exit(1);\r
530           }\r
531         }\r
532 \r
533         if((qtree_output = fopen("qtree.xml","w")) == NULL){\r
534                 fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));\r
535                 exit(1);\r
536         }\r
537         fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");\r
538         fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");\r
539         fprintf(qtree_output,"<QueryNodes>\n");\r
540 \r
541 \r
542 //                      Get an initial Schema\r
543         table_list *Schema;\r
544         if(!hfta_only){\r
545 //                      Parse the table schema definitions.\r
546           fta_parse_result = new fta_parse_t();\r
547           FtaParser_setfileinput(table_schemas_in);\r
548           if(FtaParserparse()){\r
549                 fprintf(stderr,"Table schema parse failed.\n");\r
550                 exit(1);\r
551           }\r
552           if(fta_parse_result->parse_type != TABLE_PARSE){\r
553                 fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());\r
554                 exit(1);\r
555           }\r
556           Schema = fta_parse_result->tables;\r
557 \r
558 //                      Process schema field inheritance\r
559           int retval;\r
560           retval = Schema->unroll_tables(err_str);\r
561           if(retval){\r
562                 fprintf(stderr,"Error processing schema filed inheritance:\n %s\n", err_str.c_str() );\r
563                 exit(1);\r
564           }\r
565         }else{\r
566 //                      hfta only => we will try to fetch schemas from the registry.\r
567 //                      therefore, start off with an empty schema.\r
568           Schema = new table_list();\r
569         }\r
570 \r
571 \r
572 //                      Open and parse the external functions file.\r
573         Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");\r
574         if(Ext_fcnsParserin == NULL){\r
575                 fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");\r
576                 Ext_fcns = new ext_fcn_list();\r
577         }else{\r
578                 if(Ext_fcnsParserparse()){\r
579                         fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");\r
580                         Ext_fcns = new ext_fcn_list();\r
581                 }\r
582         }\r
583         if(Ext_fcns->validate_fcns(err_str)){\r
584                 fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());\r
585                 exit(1);\r
586         }\r
587 \r
588 //              Open and parse the interface resources file.\r
589 //      ifq_t *ifaces_db = new ifq_t();\r
590 //   string ierr;\r
591 //      if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){\r
592 //              fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",\r
593 //                              ifx_fname.c_str(), ierr.c_str());\r
594 //              exit(1);\r
595 //      }\r
596 //      if(ifaces_db->load_ifqs(ifq_fname, ierr)){\r
597 //              fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",\r
598 //                              ifq_fname.c_str(), ierr.c_str());\r
599 //              exit(1);\r
600 //      }\r
601 \r
602 \r
603 //                      The LFTA code string.\r
604 //                      Put the standard preamble here.\r
605 //                      NOTE: the hash macros, fcns should go into the run time\r
606   map<string, string> lfta_val;\r
607   map<string, string> lfta_prefilter_val;\r
608 \r
609   string lfta_header =\r
610 "#include <limits.h>\n\n"\r
611 "#include \"rts.h\"\n"\r
612 "#include \"fta.h\"\n"\r
613 "#include \"lapp.h\"\n"\r
614 "#include \"rts_udaf.h\"\n\n"\r
615 ;\r
616 // Get any locally defined parsing headers\r
617     glob_t glob_result;\r
618     memset(&glob_result, 0, sizeof(glob_result));\r
619 \r
620     // do the glob operation\r
621     int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);\r
622         if(return_value == 0){\r
623         for(size_t i = 0; i < glob_result.gl_pathc; ++i) {\r
624                         char *flds[1000];\r
625                         int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);\r
626                         lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n\n";\r
627         }\r
628         }else{\r
629                 fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");\r
630         }\r
631 \r
632 /*\r
633 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"\r
634 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"\r
635 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"\r
636 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"\r
637 */\r
638 \r
639         lfta_header += \r
640 "\n"\r
641 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"\r
642 "\n"\r
643 "#define SLOT_FILLED 0x04\n"\r
644 "#define SLOT_GEN_BITS 0x03\n"\r
645 "#define SLOT_HASH_BITS 0xfffffff8\n"\r
646 "#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"\r
647 "#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"\r
648 "#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"\r
649 "\n\n"\r
650 \r
651 "#define lfta_BOOL_to_hash(x) (x)\n"\r
652 "#define lfta_USHORT_to_hash(x) (x)\n"\r
653 "#define lfta_UINT_to_hash(x) (x)\n"\r
654 "#define lfta_IP_to_hash(x) (x)\n"\r
655 "#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"\r
656 "#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"\r
657 "#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"\r
658 "#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"\r
659 "#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"\r
660 "static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"\r
661 "       gs_uint32_t i,ret=0,tmp_sum = 0;\n"\r
662 "       for(i=0;i<x.length;++i){\n"\r
663 "               tmp_sum |= (x.data[i]) << (8*(i%4));\n"\r
664 "               if((i%4) == 3){\n"\r
665 "                       ret ^= tmp_sum;\n"\r
666 "                       tmp_sum = 0;\n"\r
667 "               }\n"\r
668 "       }\n"\r
669 "       if((i%4)!=0) ret ^=tmp_sum;\n"\r
670 "       return(ret);\n"\r
671 "}\n\n\n";\r
672 \r
673 \r
674 \r
675 //////////////////////////////////////////////////////////////////\r
676 /////                   Get all of the query parse trees\r
677 \r
678 \r
679   int i,p;\r
680   int hfta_count = 0;           // for numeric suffixes to hfta .cc files\r
681 \r
682 //---------------------------\r
683 //              Global info needed for post processing.\r
684 \r
685 //                      Set of operator views ref'd in the query set.\r
686         opview_set opviews;\r
687 //                      lftas on a per-machine basis.\r
688         map<string, vector<stream_query *> > lfta_mach_lists;\r
689         int nfiles = input_file_names.size();\r
690         vector<stream_query *> hfta_list;               // list of hftas.\r
691         map<string, stream_query *> sq_map;             // map from query name to stream query.\r
692 \r
693 \r
694 //////////////////////////////////////////\r
695 \r
696 //              Open and parse the interface resources file.\r
697         ifq_t *ifaces_db = new ifq_t();\r
698     string ierr;\r
699         if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){\r
700                 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",\r
701                                 ifx_fname.c_str(), ierr.c_str());\r
702                 exit(1);\r
703         }\r
704         if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){\r
705                 fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",\r
706                                 ifq_fls[0].c_str(), ierr.c_str());\r
707                 exit(1);\r
708         }\r
709 \r
710   map<string, string> qname_to_flname;  // for detecting duplicate query names\r
711 \r
712 \r
713 \r
714 //                      Parse the files to create a vector of parse trees.\r
715 //                      Load qnodes with information to perform a topo sort\r
716 //                      based on query dependencies.\r
717   vector<query_node *> qnodes;                          // for topo sort.\r
718   map<string,int> name_node_map;                        // map query name to qnodes entry\r
719   for(i=0;i<input_file_names.size();i++){\r
720 \r
721           if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {\r
722                   fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));\r
723                   continue;\r
724           }\r
725 fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());\r
726 \r
727 //                      Parse the FTA query\r
728           fta_parse_result = new fta_parse_t();\r
729           FtaParser_setfileinput(fta_in);\r
730           if(FtaParserparse()){\r
731                 fprintf(stderr,"FTA parse failed.\n");\r
732                 exit(1);\r
733           }\r
734           if(fta_parse_result->parse_type != QUERY_PARSE){\r
735                 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());\r
736                 exit(1);\r
737           }\r
738 \r
739 //                      returns a list of parse trees\r
740           vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;\r
741           for(p=0;p<qlist.size();++p){\r
742             table_exp_t *fta_parse_tree = qlist[p];\r
743 //              query_parse_trees.push_back(fta_parse_tree);\r
744 \r
745 //                      compute the default name -- extract from query name\r
746                 strcpy(tmpstr,input_file_names[i].c_str());\r
747                 char *qname = strrchr(tmpstr,PATH_DELIM);\r
748                 if(qname == NULL)\r
749                         qname = tmpstr;\r
750                 else\r
751                         qname++;\r
752                 char *qname_end = strchr(qname,'.');\r
753                 if(qname_end != NULL) *qname_end = '\0';\r
754                 string qname_str = qname;\r
755                 string imputed_qname = impute_query_name(fta_parse_tree, qname_str);\r
756 \r
757 //                      Deternmine visibility.  Should I be attaching all of the output methods?\r
758                 if(qname_to_ospec.count(imputed_qname)>0)\r
759                         fta_parse_tree->set_visible(true);\r
760                 else\r
761                         fta_parse_tree->set_visible(false);\r
762 \r
763 \r
764 //                              Create a manipulable repesentation of the parse tree.\r
765 //                              the qnode inherits the visibility assigned to the parse tree.\r
766             int pos = qnodes.size();\r
767                 qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));\r
768                 name_node_map[ qnodes[pos]->name ] = pos;\r
769 //printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);\r
770 //              qnames.push_back(impute_query_name(fta_parse_tree, qname_str));\r
771 //              qfiles.push_back(i);\r
772 \r
773 //                      Check for duplicate query names\r
774 //                                      NOTE : in hfta-only generation, I should\r
775 //                                      also check with the names of the registered queries.\r
776                 if(qname_to_flname.count(qnodes[pos]->name) > 0){\r
777                         fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",\r
778                                 qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());\r
779                         exit(1);\r
780                 }\r
781                 if(Schema->find_tbl(qnodes[pos]->name) >= 0){\r
782                         fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",\r
783                                 qnodes[pos]->name.c_str(), input_file_names[i].c_str());\r
784                         exit(1);\r
785                 }\r
786                 qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();\r
787 \r
788 \r
789         }\r
790   }\r
791 \r
792 //              Add the library queries\r
793 \r
794   int pos;\r
795   for(pos=0;pos<qnodes.size();++pos){\r
796         int fi;\r
797         for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){\r
798                 string src_tbl = qnodes[pos]->refd_tbls[fi];\r
799                 if(qname_to_flname.count(src_tbl) == 0){\r
800                         int last_sep = src_tbl.find_last_of('/');\r
801                         if(last_sep != string::npos){\r
802 fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());\r
803                                 string target_qname = src_tbl.substr(last_sep+1);\r
804                                 string qpathname = library_path + src_tbl + ".gsql";\r
805                                 if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {\r
806                                         fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));\r
807                                         exit(1);\r
808                                         fprintf(stderr,"After exit\n");\r
809                                 }\r
810 fprintf(stderr,"Parsing file %s\n",qpathname.c_str());\r
811 //                      Parse the FTA query\r
812                                 fta_parse_result = new fta_parse_t();\r
813                                 FtaParser_setfileinput(fta_in);\r
814                                 if(FtaParserparse()){\r
815                                         fprintf(stderr,"FTA parse failed.\n");\r
816                                         exit(1);\r
817                                 }\r
818                                 if(fta_parse_result->parse_type != QUERY_PARSE){\r
819                                         fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());\r
820                                         exit(1);\r
821                                 }\r
822 \r
823                                 map<string, int> local_query_map;\r
824                                 vector<string> local_query_names;\r
825                                 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;\r
826                                 for(p=0;p<qlist.size();++p){\r
827                                 table_exp_t *fta_parse_tree = qlist[p];\r
828                                         fta_parse_tree->set_visible(false);             // assumed to not produce output\r
829                                         string imputed_qname = impute_query_name(fta_parse_tree, target_qname);\r
830                                         if(imputed_qname == target_qname)\r
831                                                 imputed_qname = src_tbl;\r
832                                         if(local_query_map.count(imputed_qname)>0){\r
833                                                 fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());\r
834                                                 exit(1);\r
835                                         }\r
836                                         local_query_map[ imputed_qname ] = p;\r
837                                         local_query_names.push_back(imputed_qname);\r
838                                 }\r
839 \r
840                                 if(local_query_map.count(src_tbl)==0){\r
841                                         fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());\r
842                                         exit(1);\r
843                                 }\r
844 \r
845                                 vector<int> worklist;\r
846                                 set<int> added_queries;\r
847                                 vector<query_node *> new_qnodes;\r
848                                 worklist.push_back(local_query_map[target_qname]);\r
849                                 added_queries.insert(local_query_map[target_qname]);\r
850                                 int qq;\r
851                                 int qpos = qnodes.size();\r
852                                 for(qq=0;qq<worklist.size();++qq){\r
853                                         int q_id = worklist[qq];\r
854                                         query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );\r
855                                         new_qnodes.push_back( new_qnode);\r
856                                         vector<string> refd_tbls =  new_qnode->refd_tbls;\r
857                                         int ff;\r
858                                         for(ff = 0;ff<refd_tbls.size();++ff){\r
859                                                 if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){\r
860 \r
861                                                         if(name_node_map.count(refd_tbls[ff])>0){\r
862                                                                 fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s,  and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );\r
863                                                                 exit(1);\r
864                                                         }else{\r
865                                                                 worklist.push_back(local_query_map[refd_tbls[ff]]);\r
866                                                         }\r
867                                                 }\r
868                                         }\r
869                                 }\r
870 \r
871                                 for(qq=0;qq<new_qnodes.size();++qq){\r
872                                         int qpos = qnodes.size();\r
873                                         qnodes.push_back(new_qnodes[qq]);\r
874                                         name_node_map[qnodes[qpos]->name ] = qpos;\r
875                                         qname_to_flname[qnodes[qpos]->name ] = qpathname;\r
876                                 }\r
877                         }\r
878                 }\r
879         }\r
880   }\r
881 \r
882 \r
883 \r
884 \r
885 \r
886 \r
887 \r
888 \r
889 //---------------------------------------\r
890 \r
891 \r
892 //              Add the UDOPS.\r
893 \r
894   string udop_missing_sources;\r
895   for(i=0;i<qnodes.size();++i){\r
896         int fi;\r
897         for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){\r
898                 int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);\r
899                 if(sid >= 0){\r
900                         if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){\r
901                                 if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){\r
902                                 int pos = qnodes.size();\r
903                                         qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));\r
904                                         name_node_map[ qnodes[pos]->name ] = pos;\r
905                                         qnodes[pos]->is_externally_visible = false;   // its visible\r
906         //                                      Need to mark the source queries as visible.\r
907                                         int si;\r
908                                         string missing_sources = "";\r
909                                         for(si=0;si<qnodes[pos]->refd_tbls.size();++si){\r
910                                                 string src_tbl = qnodes[pos]->refd_tbls[si];\r
911                                                 if(name_node_map.count(src_tbl)==0){\r
912                                                         missing_sources += src_tbl + " ";\r
913                                                 }\r
914                                         }\r
915                                         if(missing_sources != ""){\r
916                                                 udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";\r
917                                         }\r
918                                 }\r
919                         }\r
920                 }\r
921         }\r
922   }\r
923   if(udop_missing_sources != ""){\r
924         fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());\r
925         exit(1);\r
926   }\r
927 \r
928 \r
929 \r
930 ////////////////////////////////////////////////////////////////////\r
931 ///                             Check parse trees to verify that some\r
932 ///                             global properties are met :\r
933 ///                             if q1 reads from q2, then\r
934 ///                               q2 is processed before q1\r
935 ///                               q1 can supply q2's parameters\r
936 ///                             Verify there is no cycle in the reads-from graph.\r
937 \r
938 //                      Compute an order in which to process the\r
939 //                      queries.\r
940 \r
941 //                      Start by building the reads-from lists.\r
942 //\r
943 \r
944   for(i=0;i<qnodes.size();++i){\r
945         int qi, fi;\r
946         vector<string> refd_tbls =  qnodes[i]->refd_tbls;\r
947         for(fi = 0;fi<refd_tbls.size();++fi){\r
948                 if(name_node_map.count(refd_tbls[fi])>0){\r
949 //printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);\r
950                         (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);\r
951                 }\r
952         }\r
953   }\r
954 \r
955 \r
956 //              If one query reads the result of another,\r
957 //              check for parameter compatibility.  Currently it must\r
958 //              be an exact match.  I will move to requiring\r
959 //              containment after re-ordering, but will require\r
960 //              some analysis for code generation which is not\r
961 //              yet in place.\r
962 //printf("There are %d query nodes.\n",qnodes.size());\r
963 \r
964 \r
965   for(i=0;i<qnodes.size();++i){\r
966         vector<var_pair_t *> target_params  = qnodes[i]->params;\r
967         for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){\r
968                 vector<var_pair_t *> source_params  = qnodes[(*si)]->params;\r
969                 if(target_params.size() != source_params.size()){\r
970                         fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());\r
971                         exit(1);\r
972                 }\r
973                 int p;\r
974                 for(p=0;p<target_params.size();++p){\r
975                         if(! (target_params[p]->name == source_params[p]->name &&\r
976                               target_params[p]->val == source_params[p]->val ) ){\r
977                         fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());\r
978                                 exit(1);\r
979                         }\r
980                 }\r
981         }\r
982   }\r
983 \r
984 \r
985 //              Find the roots.\r
986 //              Start by counting inedges.\r
987   for(i=0;i<qnodes.size();++i){\r
988         for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){\r
989                 qnodes[(*si)]->n_consumers++;\r
990         }\r
991   }\r
992 \r
993 //              The roots are the nodes with indegree zero.\r
994   set<int> roots;\r
995   for(i=0;i<qnodes.size();++i){\r
996         if(qnodes[i]->n_consumers == 0){\r
997                 if(qnodes[i]->is_externally_visible == false){\r
998                         fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible.  Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());\r
999                 }\r
1000                 roots.insert(i);\r
1001         }\r
1002   }\r
1003 \r
1004 //              Remove the parts of the subtree that produce no output.\r
1005   set<int> valid_roots;\r
1006   set<int> discarded_nodes;\r
1007   set<int> candidates;\r
1008   while(roots.size() >0){\r
1009         for(si=roots.begin();si!=roots.end();++si){\r
1010                 if(qnodes[(*si)]->is_externally_visible){\r
1011                         valid_roots.insert((*si));\r
1012                 }else{\r
1013                         discarded_nodes.insert((*si));\r
1014                         set<int>::iterator sir;\r
1015                         for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){\r
1016                                 qnodes[(*sir)]->n_consumers--;\r
1017                                 if(qnodes[(*sir)]->n_consumers == 0)\r
1018                                         candidates.insert( (*sir));\r
1019                         }\r
1020                 }\r
1021         }\r
1022         roots = candidates;\r
1023         candidates.clear();\r
1024   }\r
1025   roots = valid_roots;\r
1026   if(discarded_nodes.size()>0){\r
1027         fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");\r
1028         int di = 0;\r
1029         for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){\r
1030                 if(di>0 && (di%8)==0) fprintf(stderr,"\n");\r
1031                 di++;\r
1032                 fprintf(stderr," %s",qnodes[(*si)]->name.c_str());\r
1033         }\r
1034         fprintf(stderr,"\n");\r
1035   }\r
1036 \r
1037 //              Compute the sources_to set, ignoring discarded nodes.\r
1038   for(i=0;i<qnodes.size();++i){\r
1039         if(discarded_nodes.count(i)==0)\r
1040                 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){\r
1041                         qnodes[(*si)]->sources_to.insert(i);\r
1042         }\r
1043   }\r
1044 \r
1045 \r
1046 //              Find the nodes that are shared by multiple visible subtrees.\r
1047 //              THe roots become inferred visible nodes.\r
1048 \r
1049 //              Find the visible nodes.\r
1050         vector<int> visible_nodes;\r
1051         for(i=0;i<qnodes.size();i++){\r
1052                 if(qnodes[i]->is_externally_visible){\r
1053                         visible_nodes.push_back(i);\r
1054                 }\r
1055         }\r
1056 \r
1057 //              Find UDOPs referenced by visible nodes.\r
1058   list<int> workq;\r
1059   for(i=0;i<visible_nodes.size();++i){\r
1060         workq.push_back(visible_nodes[i]);\r
1061   }\r
1062   while(!workq.empty()){\r
1063         int node = workq.front();\r
1064         workq.pop_front();\r
1065         set<int>::iterator children;\r
1066         if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){\r
1067                 qnodes[node]->is_externally_visible = true;\r
1068                 visible_nodes.push_back(node);\r
1069                 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){\r
1070                         if(qnodes[(*children)]->is_externally_visible == false){\r
1071                                 qnodes[(*children)]->is_externally_visible = true;\r
1072                                 visible_nodes.push_back((*children));\r
1073                         }\r
1074                 }\r
1075         }\r
1076         for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){\r
1077                 workq.push_back((*children));\r
1078         }\r
1079   }\r
1080 \r
1081         bool done = false;\r
1082         while(!done){\r
1083 //      reset the nodes\r
1084                 for(i=0;i<qnodes.size();i++){\r
1085                         qnodes[i]->subtree_roots.clear();\r
1086                 }\r
1087 \r
1088 //              Walk the tree defined by a visible node, not descending into\r
1089 //              subtrees rooted by a visible node.  Mark the node visited with\r
1090 //              the visible node ID.\r
1091                 for(i=0;i<visible_nodes.size();++i){\r
1092                         set<int> vroots;\r
1093                         vroots.insert(visible_nodes[i]);\r
1094                         while(vroots.size()>0){\r
1095                                 for(si=vroots.begin();si!=vroots.end();++si){\r
1096                                         qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);\r
1097 \r
1098                                         set<int>::iterator sir;\r
1099                                         for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){\r
1100                                                 if(! qnodes[(*sir)]->is_externally_visible){\r
1101                                                         candidates.insert( (*sir));\r
1102                                                 }\r
1103                                         }\r
1104                                 }\r
1105                                 vroots = candidates;\r
1106                                 candidates.clear();\r
1107                         }\r
1108                 }\r
1109 //              Find the nodes in multiple visible node subtrees, but with no parent\r
1110 //              that has is in multile visible node subtrees.  Mark these as inferred visible nodes.\r
1111                 done = true;    // until proven otherwise\r
1112                 for(i=0;i<qnodes.size();i++){\r
1113                         if(qnodes[i]->subtree_roots.size()>1){\r
1114                                 bool is_new_root = true;\r
1115                                 set<int>::iterator sir;\r
1116                                 for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){\r
1117                                         if(qnodes[(*sir)]->subtree_roots.size()>1)\r
1118                                                 is_new_root = false;\r
1119                                 }\r
1120                                 if(is_new_root){\r
1121                                         qnodes[i]->is_externally_visible = true;\r
1122                                         qnodes[i]->inferred_visible_node = true;\r
1123                                         visible_nodes.push_back(i);\r
1124                                         done = false;\r
1125                                 }\r
1126                         }\r
1127                 }\r
1128         }\r
1129 \r
1130 \r
1131 \r
1132 \r
1133 \r
1134 //              get visible nodes in topo ordering.\r
1135 //  for(i=0;i<qnodes.size();i++){\r
1136 //              qnodes[i]->n_consumers = qnodes[i]->sources_to.size();\r
1137 //  }\r
1138   vector<int> process_order;\r
1139   while(roots.size() >0){\r
1140         for(si=roots.begin();si!=roots.end();++si){\r
1141                 if(discarded_nodes.count((*si))==0){\r
1142                         process_order.push_back( (*si) );\r
1143                 }\r
1144                 set<int>::iterator sir;\r
1145                 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){\r
1146                         qnodes[(*sir)]->n_consumers--;\r
1147                         if(qnodes[(*sir)]->n_consumers == 0)\r
1148                                 candidates.insert( (*sir));\r
1149                 }\r
1150         }\r
1151         roots = candidates;\r
1152         candidates.clear();\r
1153   }\r
1154 \r
1155 \r
1156 //printf("process_order.size() =%d\n",process_order.size());\r
1157 \r
1158 //              Search for cyclic dependencies\r
1159   string found_dep;\r
1160   for(i=0;i<qnodes.size();++i){\r
1161         if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){\r
1162                 if(found_dep.size() != 0) found_dep += ", ";\r
1163                 found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";\r
1164         }\r
1165   }\r
1166   if(found_dep.size()>0){\r
1167         fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());\r
1168         exit(1);\r
1169   }\r
1170 \r
1171 //              Get a list of query sets, in the order to be processed.\r
1172 //              Start at visible root and do bfs.\r
1173 //              The query set includes queries referenced indirectly,\r
1174 //              as sources for user-defined operators.  These are needed\r
1175 //              to ensure that they are added to the schema, but are not part\r
1176 //              of the query tree.\r
1177 \r
1178 //              stream_node_sets contains queries reachable only through the\r
1179 //              FROM clause, so I can tell which queries to add to the stream\r
1180 //              query. (DISABLED, UDOPS are integrated, does this cause problems?)\r
1181 \r
1182 //                      NOTE: this code works because in order for data to be\r
1183 //                      read by multiple hftas, the node must be externally visible.\r
1184 //                      But visible nodes define roots of process sets.\r
1185 //                      internally visible nodes can feed data only\r
1186 //                      to other nodes in the same query file.\r
1187 //                      Therefore, any access can be restricted to a file,\r
1188 //                      hfta output sharing is done only on roots\r
1189 //                      never on interior nodes.\r
1190 \r
1191 \r
1192 \r
1193 \r
1194 //              Conpute the base collection of hftas.\r
1195   vector<hfta_node *> hfta_sets;\r
1196   map<string, int> hfta_name_map;\r
1197 //  vector< vector<int> > process_sets;\r
1198 //  vector< set<int> > stream_node_sets;\r
1199   reverse(process_order.begin(), process_order.end());  // get listing in reverse order.\r
1200                                                                                                                 // i.e. process leaves 1st.\r
1201   for(i=0;i<process_order.size();++i){\r
1202         if(qnodes[process_order[i]]->is_externally_visible == true){\r
1203 //printf("Visible.\n");\r
1204                 int root = process_order[i];\r
1205                 hfta_node *hnode = new hfta_node();\r
1206                 hnode->name = qnodes[root]-> name;\r
1207                 hnode->source_name = qnodes[root]-> name;\r
1208                 hnode->is_udop = qnodes[root]->is_udop;\r
1209                 hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;\r
1210 \r
1211                 vector<int> proc_list;  proc_list.push_back(root);\r
1212 //                      Ensure that nodes are added only once.\r
1213                 set<int> proc_set;      proc_set.insert(root);\r
1214                 roots.clear();                  roots.insert(root);\r
1215                 candidates.clear();\r
1216                 while(roots.size()>0){\r
1217                         for(si=roots.begin();si!=roots.end();++si){\r
1218 //printf("Processing root %d\n",(*si));\r
1219                                 set<int>::iterator sir;\r
1220                                 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){\r
1221 //printf("reads fom %d\n",(*sir));\r
1222                                         if(qnodes[(*sir)]->is_externally_visible==false){\r
1223                                                 candidates.insert( (*sir) );\r
1224                                                 if(proc_set.count( (*sir) )==0){\r
1225                                                         proc_set.insert( (*sir) );\r
1226                                                         proc_list.push_back( (*sir) );\r
1227                                                 }\r
1228                                         }\r
1229                                 }\r
1230                         }\r
1231                         roots = candidates;\r
1232                         candidates.clear();\r
1233                 }\r
1234 \r
1235                 reverse(proc_list.begin(), proc_list.end());\r
1236                 hnode->query_node_indices = proc_list;\r
1237                 hfta_name_map[hnode->name] = hfta_sets.size();\r
1238                 hfta_sets.push_back(hnode);\r
1239         }\r
1240   }\r
1241 \r
1242 //              Compute the reads_from / sources_to graphs for the hftas.\r
1243 \r
1244   for(i=0;i<hfta_sets.size();++i){\r
1245         hfta_node *hnode = hfta_sets[i];\r
1246         for(q=0;q<hnode->query_node_indices.size();q++){\r
1247                 query_node *qnode = qnodes[hnode->query_node_indices[q]];\r
1248                 for(s=0;s<qnode->refd_tbls.size();++s){\r
1249                         if(hfta_name_map.count(qnode->refd_tbls[s])){\r
1250                                 int other_hfta = hfta_name_map[qnode->refd_tbls[s]];\r
1251                                 hnode->reads_from.insert(other_hfta);\r
1252                                 hfta_sets[other_hfta]->sources_to.insert(i);\r
1253                         }\r
1254                 }\r
1255         }\r
1256   }\r
1257 \r
1258 //              Compute a topological sort of the hfta_sets.\r
1259 \r
1260   vector<int> hfta_topsort;\r
1261   workq.clear();\r
1262   int hnode_srcs[hfta_sets.size()];\r
1263   for(i=0;i<hfta_sets.size();++i){\r
1264         hnode_srcs[i] = 0;\r
1265         if(hfta_sets[i]->sources_to.size() == 0)\r
1266                 workq.push_back(i);\r
1267   }\r
1268 \r
1269   while(! workq.empty()){\r
1270         int     node = workq.front();\r
1271         workq.pop_front();\r
1272         hfta_topsort.push_back(node);\r
1273         set<int>::iterator stsi;\r
1274         for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){\r
1275                 int parent = (*stsi);\r
1276                 hnode_srcs[parent]++;\r
1277                 if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){\r
1278                         workq.push_back(parent);\r
1279                 }\r
1280         }\r
1281   }\r
1282 \r
1283 //              Decorate hfta nodes with the level of parallelism given as input.\r
1284 \r
1285   map<string, int>::iterator msii;\r
1286   for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){\r
1287         string hfta_name = (*msii).first;\r
1288         int par = (*msii).second;\r
1289         if(hfta_name_map.count(hfta_name) > 0){\r
1290                 hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;\r
1291         }else{\r
1292                 fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());\r
1293         }\r
1294   }\r
1295 \r
1296 //              Propagate levels of parallelism: children should have a level of parallelism\r
1297 //              as large as any of its parents.  Adjust children upwards to compensate.\r
1298 //              Start at parents and adjust children, auto-propagation will occur.\r
1299 \r
1300   for(i=hfta_sets.size()-1;i>=0;i--){\r
1301         set<int>::iterator stsi;\r
1302         for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){\r
1303                 if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){\r
1304                         hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;\r
1305                 }\r
1306         }\r
1307   }\r
1308 \r
1309 //              Before all the name mangling, check if therey are any output_spec.cfg\r
1310 //              or hfta_parallelism.cfg entries that do not have a matching query.\r
1311 \r
1312         string dangling_ospecs = "";\r
1313         for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){\r
1314                 string oq = (*msii).first;\r
1315                 if(hfta_name_map.count(oq) == 0){\r
1316                         dangling_ospecs += " "+(*msii).first;\r
1317                 }\r
1318         }\r
1319         if(dangling_ospecs!=""){\r
1320                 fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());\r
1321         }\r
1322 \r
1323         string dangling_par = "";\r
1324         for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){\r
1325                 string oq = (*msii).first;\r
1326                 if(hfta_name_map.count(oq) == 0){\r
1327                         dangling_par += " "+(*msii).first;\r
1328                 }\r
1329         }\r
1330         if(dangling_par!=""){\r
1331                 fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());\r
1332         }\r
1333 \r
1334 \r
1335 \r
1336 //              Replicate parallelized hftas.  Do __copyX name mangling.  Adjust\r
1337 //              FROM clauses: retarget any name which is an internal node, and\r
1338 //              any which is in hfta_sets (and which is parallelized). Add Merge nodes\r
1339 //              when the source hfta has more parallelism than the target node.\r
1340 //              Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.\r
1341 \r
1342 \r
1343   int n_original_hfta_sets = hfta_sets.size();\r
1344   for(i=0;i<n_original_hfta_sets;++i){\r
1345         if(hfta_sets[i]->n_parallel > 1){\r
1346                 hfta_sets[i]->do_generation =false;     // set the deletion flag for this entry.\r
1347                 set<string> local_nodes;                // names of query nodes in the hfta.\r
1348                 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){\r
1349                         local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);\r
1350                 }\r
1351 \r
1352                 for(p=0;p<hfta_sets[i]->n_parallel;++p){\r
1353                         string mangler = "__copy"+int_to_string(p);\r
1354                         hfta_node *par_hfta  = new hfta_node();\r
1355                         par_hfta->name = hfta_sets[i]->name + mangler;\r
1356                         par_hfta->source_name = hfta_sets[i]->name;\r
1357                         par_hfta->is_udop = hfta_sets[i]->is_udop;\r
1358                         par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;\r
1359                         par_hfta->n_parallel = hfta_sets[i]->n_parallel;\r
1360                         par_hfta->parallel_idx = p;\r
1361 \r
1362                         map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.\r
1363 \r
1364 //      Is it a UDOP?\r
1365                         if(hfta_sets[i]->is_udop){\r
1366                                 int root = hfta_sets[i]->query_node_indices[0];\r
1367 \r
1368                                 string unequal_par_sources;\r
1369                                 set<int>::iterator rfsii;\r
1370                                 for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){\r
1371                                         if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){\r
1372                                                 unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";\r
1373                                         }\r
1374                                 }\r
1375                                 if(unequal_par_sources != ""){\r
1376                                         fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());\r
1377                                         exit(1);\r
1378                                 }\r
1379 \r
1380                                 int rti;\r
1381                                 vector<string> new_sources;\r
1382                                 for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){\r
1383                                         new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);\r
1384                                 }\r
1385 \r
1386                                 query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);\r
1387                                 new_qn->name += mangler;\r
1388                                 new_qn->mangler = mangler;\r
1389                                 new_qn->refd_tbls = new_sources;\r
1390                                 par_hfta->query_node_indices.push_back(qnodes.size());\r
1391                                 par_qnode_map[new_qn->name] = qnodes.size();\r
1392                                 name_node_map[ new_qn->name ] = qnodes.size();\r
1393                                 qnodes.push_back(new_qn);\r
1394                         }else{\r
1395 //              regular query node\r
1396                           for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){\r
1397                                 int hqn_idx = hfta_sets[i]->query_node_indices[h];\r
1398                                 table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);\r
1399 //                                      rehome the from clause on mangled names.\r
1400 //                                      create merge nodes as needed for external sources.\r
1401                                 for(f=0;f<dup_pt->fm->tlist.size();++f){\r
1402                                         if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){\r
1403                                                 dup_pt->fm->tlist[f]->schema_name += mangler;\r
1404                                         }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){\r
1405 //                      Ref's an external HFTA.  No parallelism => leave as is.  Else, if level of parallelism of the two hftas is the same, rename by the mangler.  Else, there mnust be more sources, so create a merge node.\r
1406                                                 int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];\r
1407                                                 if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){\r
1408                                                         dup_pt->fm->tlist[f]->schema_name += mangler;\r
1409                                                 }else{\r
1410                                                         vector<string> src_tbls;\r
1411                                                         int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;\r
1412                                                         if(stride == 0){\r
1413                                                                 fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());\r
1414                                                                 exit(1);\r
1415                                                         }\r
1416                                                         for(s=0;s<stride;++s){\r
1417                                                                 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);\r
1418                                                                 src_tbls.push_back(ext_src_name);\r
1419                                                         }\r
1420                                                         table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);\r
1421                                                         string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);\r
1422                                                         dup_pt->fm->tlist[f]->schema_name = merge_node_name;\r
1423 //                                      Make a qnode to represent the new merge node\r
1424                                                         query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);\r
1425                                                         qn_pt->refd_tbls = src_tbls;\r
1426                                                         qn_pt->is_udop  = false;\r
1427                                                         qn_pt->is_externally_visible = false;\r
1428                                                         qn_pt->inferred_visible_node  = false;\r
1429                                                         par_hfta->query_node_indices.push_back(qnodes.size());\r
1430                                                         par_qnode_map[merge_node_name] = qnodes.size();\r
1431                                                         name_node_map[ merge_node_name ] = qnodes.size();\r
1432                                                         qnodes.push_back(qn_pt);\r
1433                                                 }\r
1434                                         }\r
1435                                 }\r
1436                                 query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);\r
1437                                 for(f=0;f<dup_pt->fm->tlist.size();++f){\r
1438                                         new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);\r
1439                                 }\r
1440                                 new_qn->params = qnodes[hqn_idx]->params;\r
1441                                 new_qn->is_udop = false;\r
1442                                 new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;\r
1443                                 new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;\r
1444                                 par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());\r
1445                                 par_qnode_map[new_qn->name] = qnodes.size();\r
1446                                 name_node_map[ new_qn->name ] = qnodes.size();\r
1447                                 qnodes.push_back(new_qn);\r
1448                           }\r
1449                         }\r
1450                         hfta_name_map[par_hfta->name] = hfta_sets.size();\r
1451                         hfta_sets.push_back(par_hfta);\r
1452                 }\r
1453         }else{\r
1454 //              This hfta isn't being parallelized, but add merge nodes for any parallelized\r
1455 //              hfta sources.\r
1456                 if(!hfta_sets[i]->is_udop){\r
1457                   for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){\r
1458                         int hqn_idx = hfta_sets[i]->query_node_indices[h];\r
1459                         for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){\r
1460                                 if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){\r
1461 //                      Ref's an external HFTA.  No parallelism => leave as is.  Else, if level of parallelism of the two hftas is the same, rename by the mangler.  Else, there mnust be more sources, so create a merge node.\r
1462                                         int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];\r
1463                                         if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){\r
1464                                                 vector<string> src_tbls;\r
1465                                                 for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){\r
1466                                                         string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);\r
1467                                                         src_tbls.push_back(ext_src_name);\r
1468                                                 }\r
1469                                                 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);\r
1470                                                 string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);\r
1471                                                 qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;\r
1472 //                                      Make a qnode to represent the new merge node\r
1473                                                 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);\r
1474                                                 qn_pt->refd_tbls = src_tbls;\r
1475                                                 qn_pt->is_udop  = false;\r
1476                                                 qn_pt->is_externally_visible = false;\r
1477                                                 qn_pt->inferred_visible_node  = false;\r
1478                                                 hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());\r
1479                                                 name_node_map[ merge_node_name ] = qnodes.size();\r
1480                                                 qnodes.push_back(qn_pt);\r
1481                                         }\r
1482                                 }\r
1483                         }\r
1484                 }\r
1485           }\r
1486         }\r
1487   }\r
1488 \r
1489 //                      Rebuild the reads_from / sources_to lists in the qnodes\r
1490   for(q=0;q<qnodes.size();++q){\r
1491         qnodes[q]->reads_from.clear();\r
1492         qnodes[q]->sources_to.clear();\r
1493   }\r
1494   for(q=0;q<qnodes.size();++q){\r
1495         for(s=0;s<qnodes[q]->refd_tbls.size();++s){\r
1496                 if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){\r
1497                         int rf = name_node_map[qnodes[q]->refd_tbls[s]];\r
1498                         qnodes[q]->reads_from.insert(rf);\r
1499                         qnodes[rf]->sources_to.insert(q);\r
1500                 }\r
1501         }\r
1502   }\r
1503 \r
1504 //                      Rebuild the reads_from / sources_to lists in hfta_sets\r
1505   for(q=0;q<hfta_sets.size();++q){\r
1506         hfta_sets[q]->reads_from.clear();\r
1507         hfta_sets[q]->sources_to.clear();\r
1508   }\r
1509   for(q=0;q<hfta_sets.size();++q){\r
1510         for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){\r
1511                 int node = hfta_sets[q]->query_node_indices[s];\r
1512                 set<int>::iterator rfsii;\r
1513                 for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){\r
1514                         if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){\r
1515                                 hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);\r
1516                                 hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);\r
1517                         }\r
1518                 }\r
1519         }\r
1520   }\r
1521 \r
1522 /*\r
1523 for(q=0;q<qnodes.size();++q){\r
1524  printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());\r
1525  set<int>::iterator rsii;\r
1526  for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)\r
1527   printf(" %d",(*rsii));\r
1528   printf(", and sources-to %d:",qnodes[q]->sources_to.size());\r
1529  for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)\r
1530   printf(" %d",(*rsii));\r
1531  printf("\n");\r
1532 }\r
1533 \r
1534 for(q=0;q<hfta_sets.size();++q){\r
1535  if(hfta_sets[q]->do_generation==false)\r
1536         continue;\r
1537  printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());\r
1538  set<int>::iterator rsii;\r
1539  for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)\r
1540   printf(" %d",(*rsii));\r
1541   printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());\r
1542  for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)\r
1543   printf(" %d",(*rsii));\r
1544  printf("\n");\r
1545 }\r
1546 */\r
1547 \r
1548 \r
1549 \r
1550 //              Re-topo sort the hftas\r
1551   hfta_topsort.clear();\r
1552   workq.clear();\r
1553   int hnode_srcs_2[hfta_sets.size()];\r
1554   for(i=0;i<hfta_sets.size();++i){\r
1555         hnode_srcs_2[i] = 0;\r
1556         if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){\r
1557                 workq.push_back(i);\r
1558         }\r
1559   }\r
1560 \r
1561   while(workq.empty() == false){\r
1562         int     node = workq.front();\r
1563         workq.pop_front();\r
1564         hfta_topsort.push_back(node);\r
1565         set<int>::iterator stsii;\r
1566         for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){\r
1567                 int child = (*stsii);\r
1568                 hnode_srcs_2[child]++;\r
1569                 if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){\r
1570                         workq.push_back(child);\r
1571                 }\r
1572         }\r
1573   }\r
1574 \r
1575 //              Ensure that all of the query_node_indices in hfta_sets are topologically\r
1576 //              sorted, don't rely on assumptions that all transforms maintain some kind of order.\r
1577   for(i=0;i<hfta_sets.size();++i){\r
1578         if(hfta_sets[i]->do_generation){\r
1579                 map<int,int> n_accounted;\r
1580                 vector<int> new_order;\r
1581                 workq.clear();\r
1582                 vector<int>::iterator vii;\r
1583                 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){\r
1584                         n_accounted[(*vii)]= 0;\r
1585                 }\r
1586                 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){\r
1587                         set<int>::iterator rfsii;\r
1588                         for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){\r
1589                                 if(n_accounted.count((*rfsii)) == 0){\r
1590                                         n_accounted[(*vii)]++;\r
1591                                 }\r
1592                         }\r
1593                         if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){\r
1594                                 workq.push_back((*vii));\r
1595                         }\r
1596                 }\r
1597 \r
1598                 while(workq.empty() == false){\r
1599                         int node = workq.front();\r
1600                         workq.pop_front();\r
1601                         new_order.push_back(node);\r
1602                         set<int>::iterator stsii;\r
1603                         for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){\r
1604                                 if(n_accounted.count((*stsii))){\r
1605                                         n_accounted[(*stsii)]++;\r
1606                                         if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){\r
1607                                                 workq.push_back((*stsii));\r
1608                                         }\r
1609                                 }\r
1610                         }\r
1611                 }\r
1612                 hfta_sets[i]->query_node_indices = new_order;\r
1613         }\r
1614   }\r
1615 \r
1616 \r
1617 \r
1618 \r
1619 \r
1620 ///                     Global checkng is done, start the analysis and translation\r
1621 ///                     of the query parse tree in the order specified by process_order\r
1622 \r
1623 \r
1624 //                      Get a list of the LFTAs for global lfta optimization\r
1625 //                              TODO: separate building operators from spliting lftas,\r
1626 //                                      that will make optimizations such as predicate pushing easier.\r
1627         vector<stream_query *> lfta_list;\r
1628 \r
1629         stream_query *rootq;\r
1630 \r
1631     int qi,qj;\r
1632 \r
1633         for(qi=hfta_topsort.size()-1;qi>=0;--qi){\r
1634 \r
1635         int hfta_id = hfta_topsort[qi];\r
1636     vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;\r
1637 \r
1638 \r
1639 \r
1640 //              Two possibilities, either its a UDOP, or its a collection of queries.\r
1641 //      if(qnodes[curr_list.back()]->is_udop)\r
1642         if(hfta_sets[hfta_id]->is_udop){\r
1643                 int node_id = curr_list.back();\r
1644                 int udop_schref = Schema->find_tbl(qnodes[node_id]->file);\r
1645                 opview_entry *opv = new opview_entry();\r
1646 \r
1647 //                      Many of the UDOP properties aren't currently used.\r
1648                 opv->parent_qname = "no_parent";\r
1649                 opv->root_name = qnodes[node_id]->name;\r
1650                 opv->view_name = qnodes[node_id]->file;\r
1651                 opv->pos = qi;\r
1652                 sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());\r
1653                 opv->udop_alias = tmpstr;\r
1654                 opv->mangler = qnodes[node_id]->mangler;\r
1655 \r
1656                 if(opv->mangler != ""){\r
1657                         int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);\r
1658                         Schema->mangle_subq_names(new_udop_schref,opv->mangler);\r
1659                 }\r
1660 \r
1661 //                      This piece of code makes each hfta which referes to the same udop\r
1662 //                      reference a distinct running udop.  Do this at query optimization time?\r
1663 //              fmtbl->set_udop_alias(opv->udop_alias);\r
1664 \r
1665                 opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));\r
1666                 opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());\r
1667 \r
1668                 vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);\r
1669                 int s,f,q;\r
1670                 for(s=0;s<subq.size();++s){\r
1671 //                              Validate that the fields match.\r
1672                         subquery_spec *sqs = subq[s];\r
1673                         string subq_name = sqs->name + opv->mangler;\r
1674                         vector<field_entry *> flds = Schema->get_fields(subq_name);\r
1675                         if(flds.size() == 0){\r
1676                                 fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());\r
1677                                 return(1);\r
1678                         }\r
1679                         if(flds.size() < sqs->types.size()){\r
1680                                 fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());\r
1681                                 return(1);\r
1682                         }\r
1683                         bool failed = false;\r
1684                         for(f=0;f<sqs->types.size();++f){\r
1685                                 data_type dte(sqs->types[f],sqs->modifiers[f]);\r
1686                                 data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());\r
1687                                 if(! dte.subsumes_type(&dtf) ){\r
1688                                         fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());\r
1689                                         failed = true;\r
1690                                 }\r
1691 /*\r
1692                                 if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){\r
1693                                         string pstr = dte.get_temporal_string();\r
1694                                         fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);\r
1695                                         failed = true;\r
1696                                 }\r
1697 */\r
1698                         }\r
1699                         if(failed)\r
1700                                 return(1);\r
1701 ///                             Validation done, find the subquery, make a copy of the\r
1702 ///                             parse tree, and add it to the return list.\r
1703                         for(q=0;q<qnodes.size();++q)\r
1704                                 if(qnodes[q]->name == subq_name)\r
1705                                         break;\r
1706                         if(q==qnodes.size()){\r
1707                                 fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());\r
1708                                 return(1);\r
1709                         }\r
1710 \r
1711                 }\r
1712 \r
1713 //                      Cross-link to from entry(s) in all sourced-to tables.\r
1714                 set<int>::iterator sii;\r
1715                 for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){\r
1716 //printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());\r
1717                         vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();\r
1718                         int ii;\r
1719                         for(ii=0;ii<tblvars.size();++ii){\r
1720                                 if(tblvars[ii]->schema_name == opv->root_name){\r
1721                                         tblvars[ii]->set_opview_idx(opviews.size());\r
1722                                 }\r
1723 \r
1724                         }\r
1725                 }\r
1726 \r
1727                 opviews.append(opv);\r
1728         }else{\r
1729 \r
1730 //                      Analyze the parse trees in this query,\r
1731 //                      put them in rootq\r
1732 //      vector<int> curr_list = process_sets[qi];\r
1733 \r
1734 \r
1735 ////////////////////////////////////////\r
1736 \r
1737           rootq = NULL;\r
1738 //printf("Process set %d, has %d queries\n",qi,curr_list.size());\r
1739           for(qj=0;qj<curr_list.size();++qj){\r
1740                 i = curr_list[qj];\r
1741         fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);\r
1742 \r
1743 //                      Select the current query parse tree\r
1744                 table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;\r
1745 \r
1746 //                      if hfta only, try to fetch any missing schemas\r
1747 //                      from the registry (using the print_schema program).\r
1748 //                      Here I use a hack to avoid analyzing the query -- all referenced\r
1749 //                      tables must be in the from clause\r
1750 //                      If there is a problem loading any table, just issue a warning,\r
1751 //\r
1752                 tablevar_list_t *fm = fta_parse_tree->get_from();\r
1753                 vector<string> refd_tbls =  fm->get_src_tbls(Schema);\r
1754 //                      iterate over all referenced tables\r
1755                 int t;\r
1756                 for(t=0;t<refd_tbls.size();++t){\r
1757                   int tbl_ref = Schema->get_table_ref(refd_tbls[t]);\r
1758 \r
1759                   if(tbl_ref < 0){      // if this table is not in the Schema\r
1760 \r
1761                         if(hfta_only){\r
1762                                 string cmd="print_schema "+refd_tbls[t];\r
1763                                 FILE *schema_in = popen(cmd.c_str(), "r");\r
1764                                 if(schema_in == NULL){\r
1765                                   fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());\r
1766                                 }else{\r
1767                                   string schema_instr;\r
1768                                   while(fgets(tmpstr,TMPSTRLEN,schema_in)){\r
1769                                         schema_instr += tmpstr;\r
1770                                   }\r
1771                           fta_parse_result = new fta_parse_t();\r
1772                                   strcpy(tmp_schema_str,schema_instr.c_str());\r
1773                                   FtaParser_setstringinput(tmp_schema_str);\r
1774                           if(FtaParserparse()){\r
1775                                 fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());\r
1776                           }else{\r
1777                                         if( fta_parse_result->tables != NULL){\r
1778                                                 int tl;\r
1779                                                 for(tl=0;tl<fta_parse_result->tables->size();++tl){\r
1780                                                         Schema->add_table(fta_parse_result->tables->get_table(tl));\r
1781                                                 }\r
1782                                         }else{\r
1783                                                 fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());\r
1784                                         }\r
1785                                 }\r
1786                         }\r
1787                   }else{\r
1788                                 fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());\r
1789                                 exit(1);\r
1790                   }\r
1791 \r
1792                 }\r
1793           }\r
1794 \r
1795 \r
1796 //                              Analyze the query.\r
1797           query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);\r
1798           if(qs == NULL){\r
1799                 fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());\r
1800                 exit(1);\r
1801           }\r
1802 \r
1803           stream_query new_sq(qs, Schema);\r
1804           if(new_sq.error_code){\r
1805                         fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());\r
1806                         exit(1);\r
1807           }\r
1808 \r
1809 //                      Add it to the Schema\r
1810           table_def *output_td = new_sq.get_output_tabledef();\r
1811           Schema->add_table(output_td);\r
1812 \r
1813 //                      Create a query plan from the analyzed parse tree.\r
1814 //                      If its a query referneced via FROM, add it to the stream query.\r
1815           if(rootq){\r
1816                 rootq->add_query(new_sq);\r
1817           }else{\r
1818                 rootq = new stream_query(new_sq);\r
1819 //                      have the stream query object inherit properties form the analyzed\r
1820 //                      hfta_node object.\r
1821                 rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);\r
1822                 rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();\r
1823           }\r
1824 \r
1825 \r
1826     }\r
1827 \r
1828 //              This stream query has all its parts\r
1829 //              Build and optimize it.\r
1830 //printf("translate_fta: generating plan.\n");\r
1831         if(rootq->generate_plan(Schema)){\r
1832                 fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());\r
1833                 continue;\r
1834         }\r
1835 \r
1836 //      If we've found the query plan head, so now add the output operators\r
1837         if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){\r
1838                 pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;\r
1839                 multimap<string, int>::iterator mmsi;\r
1840                 oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);\r
1841                 for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){\r
1842                         rootq->add_output_operator(output_specs[(*mmsi).second]);\r
1843                 }\r
1844         }\r
1845 \r
1846 \r
1847 \r
1848 //                              Perform query splitting if necessary.\r
1849         bool hfta_returned;\r
1850     vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);\r
1851 \r
1852         int l;\r
1853 //for(l=0;l<split_queries.size();++l){\r
1854 //printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());\r
1855 //}\r
1856 \r
1857 \r
1858 \r
1859 \r
1860     if(split_queries.size() > 0){       // should be at least one component.\r
1861 \r
1862 //                              Compute the number of LFTAs.\r
1863           int n_lfta = split_queries.size();\r
1864           if(hfta_returned) n_lfta--;\r
1865 \r
1866 \r
1867 //                              Process the LFTA components.\r
1868           for(l=0;l<n_lfta;++l){\r
1869            if(lfta_names.count(split_queries[l]->query_name) == 0){\r
1870 //                              Grab the lfta for global optimization.\r
1871                 vector<tablevar_t *> tvec =  split_queries[l]->query_plan[0]->get_input_tbls();\r
1872                 string liface = tvec[0]->get_interface();\r
1873                 string lmach = tvec[0]->get_machine();\r
1874                 if (lmach == "")\r
1875                         lmach = hostname;\r
1876                 interface_names.push_back(liface);\r
1877                 machine_names.push_back(lmach);\r
1878 //printf("Machine is %s\n",lmach.c_str());\r
1879 \r
1880 //                      Set the ht size from the recommendation, if there is one in the rec file\r
1881                 if(lfta_htsize.count(split_queries[l]->query_name)>0){\r
1882                         split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));\r
1883                 }\r
1884 \r
1885 \r
1886                 lfta_names[split_queries[l]->query_name] = lfta_list.size();\r
1887                 split_queries[l]->set_gid(lfta_list.size());  // set lfta global id\r
1888                 lfta_list.push_back(split_queries[l]);\r
1889                 lfta_mach_lists[lmach].push_back(split_queries[l]);\r
1890 \r
1891 //                      THe following is a hack,\r
1892 //                      as I should be generating LFTA code through\r
1893 //                      the stream_query object.\r
1894                 split_queries[l]->query_plan[0]->bind_to_schema(Schema);\r
1895 //              split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;\r
1896 \r
1897 /*\r
1898 //                              Create query description to embed in lfta.c\r
1899                 string lfta_schema_str = split_queries[l]->make_schema();\r
1900                 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);\r
1901 \r
1902 //                              get NIC capabilities.\r
1903                 int erri;\r
1904                 nic_property *nicprop = NULL;\r
1905                 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);\r
1906                 if(iface_codegen_type.size()){\r
1907                         nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);\r
1908                         if(!nicprop){\r
1909                                 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());\r
1910                                         exit(1);\r
1911                         }\r
1912                 }\r
1913 \r
1914                 lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);\r
1915 */\r
1916 \r
1917                 snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));\r
1918                 query_names.push_back(split_queries[l]->query_name);\r
1919                 mach_query_names[lmach].push_back(query_names.size()-1);\r
1920 //                      NOTE: I will assume a 1-1 correspondance between\r
1921 //                      mach_query_names[lmach] and lfta_mach_lists[lmach]\r
1922 //                      where mach_query_names[lmach][i] contains the index into\r
1923 //                      query_names, which names the lfta, and\r
1924 //                      mach_query_names[lmach][i] is the stream_query * of the\r
1925 //                      corresponding lfta.\r
1926 //                      Later, lfta_iface_qnames are the query names matching lfta_iface_lists\r
1927 \r
1928 \r
1929 \r
1930                 // check if lfta is reusable\r
1931                 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters\r
1932 \r
1933                 bool lfta_reusable = false;\r
1934                 if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||\r
1935                         split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {\r
1936                         lfta_reusable = true;\r
1937                 }\r
1938                 lfta_reuse_options.push_back(lfta_reusable);\r
1939 \r
1940                 // LFTA will inherit the liveness timeout specification from the containing query\r
1941                 // it is too conservative as lfta are expected to spend less time per tuple\r
1942                 // then full query\r
1943 \r
1944                 // extract liveness timeout from query definition\r
1945                 int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());\r
1946                 if (!liveness_timeout) {\r
1947 //                  fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",\r
1948 //                    split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);\r
1949                     liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;\r
1950                 }\r
1951                 lfta_liveness_timeouts.push_back(liveness_timeout);\r
1952 \r
1953 //                      Add it to the schema\r
1954                 table_def *td = split_queries[l]->get_output_tabledef();\r
1955                 Schema->append_table(td);\r
1956 //printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());\r
1957 \r
1958           }\r
1959          }\r
1960 \r
1961 //                              If the output is lfta-only, dump out the query name.\r
1962       if(split_queries.size() == 1 && !hfta_returned){\r
1963         if(output_query_names ){\r
1964            fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());\r
1965                 }\r
1966 /*\r
1967 else{\r
1968            fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());\r
1969                 }\r
1970 */\r
1971 \r
1972 /*\r
1973 //                              output schema summary\r
1974                 if(output_schema_summary){\r
1975                         dump_summary(split_queries[0]);\r
1976                 }\r
1977 */\r
1978       }\r
1979 \r
1980 \r
1981           if(hfta_returned){            // query also has an HFTA component\r
1982                 int hfta_nbr = split_queries.size()-1;\r
1983 \r
1984                         hfta_list.push_back(split_queries[hfta_nbr]);\r
1985 \r
1986 //                                      report on generated query names\r
1987         if(output_query_names){\r
1988                         string hfta_name =split_queries[hfta_nbr]->query_name;\r
1989                 fprintf(query_name_output,"%s H\n",hfta_name.c_str());\r
1990                         for(l=0;l<hfta_nbr;++l){\r
1991                                 string lfta_name =split_queries[l]->query_name;\r
1992                         fprintf(query_name_output,"%s L\n",lfta_name.c_str());\r
1993                         }\r
1994                 }\r
1995 //              else{\r
1996 //              fprintf(stderr,"query names are ");\r
1997 //                      for(l=0;l<hfta_nbr;++l){\r
1998 //                              if(l>0) fprintf(stderr,",");\r
1999 //                              string fta_name =split_queries[l]->query_name;\r
2000 //                      fprintf(stderr," %s",fta_name.c_str());\r
2001 //                      }\r
2002 //                      fprintf(stderr,"\n");\r
2003 //              }\r
2004           }\r
2005 \r
2006   }else{\r
2007           fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());\r
2008           fprintf(stderr,"%s\n",rootq->get_error_str().c_str());\r
2009           exit(1);\r
2010   }\r
2011  }\r
2012 }\r
2013 \r
2014 \r
2015 //-----------------------------------------------------------------\r
2016 //              Compute and propagate the SE in PROTOCOL fields compute a field.\r
2017 //-----------------------------------------------------------------\r
2018 \r
2019 for(i=0;i<lfta_list.size();i++){\r
2020         lfta_list[i]->generate_protocol_se(sq_map, Schema);\r
2021         sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];\r
2022 }\r
2023 for(i=0;i<hfta_list.size();i++){\r
2024         hfta_list[i]->generate_protocol_se(sq_map, Schema);\r
2025         sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];\r
2026 }\r
2027 \r
2028 \r
2029 \r
2030 //------------------------------------------------------------------------\r
2031 //              Perform  individual FTA optimizations\r
2032 //-----------------------------------------------------------------------\r
2033 \r
2034 if (partitioned_mode) {\r
2035 \r
2036         // open partition definition file\r
2037         string part_fname = config_dir_path + "partition.txt";\r
2038 \r
2039         FILE* partfd = fopen(part_fname.c_str(), "r");\r
2040         if (!partfd) {\r
2041                 fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());\r
2042                 exit(1);\r
2043         }\r
2044         PartnParser_setfileinput(partfd);\r
2045         if (PartnParserparse()) {\r
2046                 fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());\r
2047                 exit(1);\r
2048         }\r
2049         fclose(partfd);\r
2050 }\r
2051 \r
2052 \r
2053 print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);\r
2054 \r
2055 int num_hfta = hfta_list.size();\r
2056 for(i=0; i < hfta_list.size(); ++i){\r
2057         hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);\r
2058 }\r
2059 \r
2060 //                      Add all new hftas to schema\r
2061 for(i=num_hfta; i < hfta_list.size(); ++i){\r
2062                 table_def *td = hfta_list[i]->get_output_tabledef();\r
2063                 Schema->append_table(td);\r
2064 }\r
2065 \r
2066 print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);\r
2067 \r
2068 \r
2069 \r
2070 //------------------------------------------------------------------------\r
2071 //              Do global (cross-fta) optimization\r
2072 //-----------------------------------------------------------------------\r
2073 \r
2074 \r
2075 \r
2076 \r
2077 \r
2078 \r
2079 set<string> extra_external_libs;\r
2080 \r
2081 for(i=0;i<hfta_list.size();++i){                // query also has an HFTA component\r
2082 \r
2083                 if(! debug_only){\r
2084 //                                      build hfta file name, create output\r
2085            if(numeric_hfta_flname){\r
2086             sprintf(tmpstr,"hfta_%d",hfta_count);\r
2087                         hfta_names.push_back(tmpstr);\r
2088             sprintf(tmpstr,"hfta_%d.cc",hfta_count);\r
2089          }else{\r
2090             sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());\r
2091                         hfta_names.push_back(tmpstr);\r
2092             sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());\r
2093           }\r
2094                   FILE *hfta_fl = fopen(tmpstr,"w");\r
2095                   if(hfta_fl == NULL){\r
2096                         fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);\r
2097                         exit(1);\r
2098                   }\r
2099                   fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());\r
2100 \r
2101 //                      If there is a field verifier, warn about\r
2102 //                      lack of compatability\r
2103 //                              NOTE : this code assumes that visible non-lfta queries\r
2104 //                              are those at the root of a stream query.\r
2105                   string hfta_comment;\r
2106                   string hfta_title;\r
2107                   string hfta_namespace;\r
2108                   if(hfta_list[i]->defines.count("comment")>0)\r
2109                         hfta_comment = hfta_list[i]->defines["comment"];\r
2110                   if(hfta_list[i]->defines.count("Comment")>0)\r
2111                         hfta_comment = hfta_list[i]->defines["Comment"];\r
2112                   if(hfta_list[i]->defines.count("COMMENT")>0)\r
2113                         hfta_comment = hfta_list[i]->defines["COMMENT"];\r
2114                   if(hfta_list[i]->defines.count("title")>0)\r
2115                         hfta_title = hfta_list[i]->defines["title"];\r
2116                   if(hfta_list[i]->defines.count("Title")>0)\r
2117                         hfta_title = hfta_list[i]->defines["Title"];\r
2118                   if(hfta_list[i]->defines.count("TITLE")>0)\r
2119                         hfta_title = hfta_list[i]->defines["TITLE"];\r
2120                   if(hfta_list[i]->defines.count("namespace")>0)\r
2121                         hfta_namespace = hfta_list[i]->defines["namespace"];\r
2122                   if(hfta_list[i]->defines.count("Namespace")>0)\r
2123                         hfta_namespace = hfta_list[i]->defines["Namespace"];\r
2124                   if(hfta_list[i]->defines.count("Namespace")>0)\r
2125                         hfta_namespace = hfta_list[i]->defines["Namespace"];\r
2126 \r
2127                   if(field_verifier != NULL){\r
2128                         string warning_str;\r
2129                         if(hfta_comment == "")\r
2130                                 warning_str += "\tcomment not found.\n";\r
2131                         if(hfta_title == "")\r
2132                                 warning_str += "\ttitle not found.\n";\r
2133                         if(hfta_namespace == "")\r
2134                                 warning_str += "\tnamespace not found.\n";\r
2135 \r
2136                         vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();\r
2137                         int fi;\r
2138                         for(fi=0;fi<flds.size();fi++){\r
2139                                 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);\r
2140                         }\r
2141                         if(warning_str != "")\r
2142                                 fprintf(stderr,"Warning, in HFTA stream %s:\n%s",\r
2143                                         hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());\r
2144                   }\r
2145 \r
2146                   fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());\r
2147                   if(hfta_comment != "")\r
2148                         fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());\r
2149                   if(hfta_title != "")\r
2150                         fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());\r
2151                   if(hfta_namespace != "")\r
2152                         fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());\r
2153                   fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);\r
2154                   fprintf(qtree_output,"\t\t<Rate value='100' />\n");\r
2155 \r
2156 //                              write info about fields to qtree.xml\r
2157                   vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();\r
2158                   int fi;\r
2159                   for(fi=0;fi<flds.size();fi++){\r
2160                         fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());\r
2161                         if(flds[fi]->get_modifier_list()->size()){\r
2162                                 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());\r
2163                         }\r
2164                         fprintf(qtree_output," />\n");\r
2165                   }\r
2166 \r
2167                   // extract liveness timeout from query definition\r
2168                   int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());\r
2169                   if (!liveness_timeout) {\r
2170 //                  fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",\r
2171 //                    hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);\r
2172                     liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;\r
2173                   }\r
2174                   fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);\r
2175 \r
2176                   vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();\r
2177                   int itv;\r
2178                   for(itv=0;itv<tmp_tv.size();++itv){\r
2179                         fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());\r
2180                   }\r
2181                   string ifrs = hfta_list[i]->collect_refd_ifaces();\r
2182                   if(ifrs != ""){\r
2183                         fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());\r
2184                   }\r
2185                   fprintf(qtree_output,"\t</HFTA>\n");\r
2186 \r
2187                   fclose(hfta_fl);\r
2188                 }else{\r
2189 //                                      debug only -- do code generation to catch generation-time errors.\r
2190                   hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);\r
2191                 }\r
2192 \r
2193                 hfta_count++;   // for hfta file names with numeric suffixes\r
2194 \r
2195                 hfta_list[i]->get_external_libs(extra_external_libs);\r
2196 \r
2197           }\r
2198 \r
2199 string ext_lib_string;\r
2200 set<string>::iterator ssi_el;\r
2201 for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)\r
2202         ext_lib_string += (*ssi_el)+" ";\r
2203 \r
2204 \r
2205 \r
2206 //                      Report on the set of operator views\r
2207   for(i=0;i<opviews.size();++i){\r
2208         opview_entry *opve = opviews.get_entry(i);\r
2209         fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());\r
2210         fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());\r
2211         fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());\r
2212         fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());\r
2213         fprintf(qtree_output,"\t\t<Rate value='100' />\n");\r
2214 \r
2215         if (!opve->liveness_timeout) {\r
2216 //              fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",\r
2217 //                      opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);\r
2218                 opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;\r
2219         }\r
2220         fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);\r
2221     int j;\r
2222         for(j=0;j<opve->subq_names.size();j++)\r
2223                 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());\r
2224         fprintf(qtree_output,"\t</UDOP>\n");\r
2225   }\r
2226 \r
2227 \r
2228 //-----------------------------------------------------------------\r
2229 \r
2230 //                      Create interface-specific meta code files.\r
2231 //                              first, open and parse the interface resources file.\r
2232         ifaces_db = new ifq_t();\r
2233     ierr = "";\r
2234         if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){\r
2235                 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",\r
2236                                 ifx_fname.c_str(), ierr.c_str());\r
2237                 exit(1);\r
2238         }\r
2239 \r
2240         map<string, vector<stream_query *> >::iterator svsi;\r
2241         for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){\r
2242                 string lmach = (*svsi).first;\r
2243 \r
2244         //              For this machine, create a set of lftas per interface.\r
2245                 vector<stream_query *> mach_lftas = (*svsi).second;\r
2246                 map<string, vector<stream_query *> > lfta_iface_lists;\r
2247                 int li;\r
2248                 for(li=0;li<mach_lftas.size();++li){\r
2249                         vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();\r
2250                         string lfta_iface = tvec[0]->get_interface();\r
2251                         lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);\r
2252                 }\r
2253 \r
2254                 map<string, vector<stream_query *> >::iterator lsvsi;\r
2255                 for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){\r
2256                         int erri;\r
2257                         string liface = (*lsvsi).first;\r
2258                         vector<stream_query *> iface_lftas = (*lsvsi).second;\r
2259                         vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);\r
2260                         if(iface_codegen_type.size()){\r
2261                                 nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);\r
2262                                 if(!nicprop){\r
2263                                         fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());\r
2264                                         exit(1);\r
2265                                 }\r
2266                                 string mcs = generate_nic_code(iface_lftas, nicprop);\r
2267                                 string mcf_flnm;\r
2268                                 if(lmach != "")\r
2269                                   mcf_flnm = lmach + "_"+liface+".mcf";\r
2270                                 else\r
2271                                   mcf_flnm = hostname + "_"+liface+".mcf";\r
2272                                 FILE *mcf_fl ;\r
2273                                 if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){\r
2274                                         fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));\r
2275                                         exit(1);\r
2276                                 }\r
2277                                 fprintf(mcf_fl,"%s",mcs.c_str());\r
2278                                 fclose(mcf_fl);\r
2279 //printf("mcs of machine %s, iface %s of type %s is \n%s\n",\r
2280 //lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());\r
2281                         }\r
2282                 }\r
2283 \r
2284 \r
2285         }\r
2286 \r
2287 \r
2288 \r
2289 //-----------------------------------------------------------------\r
2290 \r
2291 \r
2292 //                      Find common filter predicates in the LFTAs.\r
2293 //                      in addition generate structs to store the temporal attributes unpacked by prefilter\r
2294         \r
2295         map<string, vector<stream_query *> >::iterator ssqi;\r
2296         for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){\r
2297 \r
2298                 string lmach = (*ssqi).first;\r
2299                 bool packed_return = false;\r
2300                 int li, erri;\r
2301 \r
2302 \r
2303 //      The LFTAs of this machine.\r
2304                 vector<stream_query *> mach_lftas = (*ssqi).second;\r
2305 //      break up on a per-interface basis.\r
2306                 map<string, vector<stream_query *> > lfta_iface_lists;\r
2307                 map<string, vector<int> > lfta_iface_qname_ix; // need the query name\r
2308                                                         // for fta_init\r
2309                 for(li=0;li<mach_lftas.size();++li){\r
2310                         vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();\r
2311                         string lfta_iface = tvec[0]->get_interface();\r
2312                         lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);\r
2313                         lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);\r
2314                 }\r
2315 \r
2316 \r
2317 //      Are the return values "packed"?\r
2318 //      This should be done on a per-interface basis.\r
2319 //      But this is defunct code for gs-lite\r
2320                 for(li=0;li<mach_lftas.size();++li){\r
2321                   vector<tablevar_t *> tvec =  mach_lftas[li]->query_plan[0]->get_input_tbls();\r
2322                   string liface = tvec[0]->get_interface();\r
2323                   vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);\r
2324                   if(iface_codegen_type.size()){\r
2325                         if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){\r
2326                           packed_return = true;\r
2327                         }\r
2328                   }\r
2329                 }\r
2330 \r
2331 \r
2332 // Separate lftas by interface, collect results on a per-interface basis.\r
2333 \r
2334                 vector<cnf_set *> no_preds;     // fallback if there is no prefilter\r
2335                 map<string, vector<cnf_set *> > prefilter_preds;\r
2336                 set<unsigned int> pred_ids;     // this can be global for all interfaces\r
2337                 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){\r
2338                         string liface = (*mvsi).first;\r
2339                         vector<cnf_set *> empty_list;\r
2340                         prefilter_preds[liface] = empty_list;\r
2341                         if(! packed_return){\r
2342                                 get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);\r
2343                         }\r
2344 \r
2345 //                              get NIC capabilities.  (Is this needed?)\r
2346                         nic_property *nicprop = NULL;\r
2347                         vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);\r
2348                         if(iface_codegen_type.size()){\r
2349                                 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);\r
2350                                 if(!nicprop){\r
2351                                         fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());\r
2352                                         exit(1);\r
2353                                 }\r
2354                         }\r
2355                 }\r
2356 \r
2357 \r
2358 //              Now that we know the prefilter preds, generate the lfta code.\r
2359 //      Do this for all lftas in this machine.\r
2360                 for(li=0;li<mach_lftas.size();++li){\r
2361                         set<unsigned int> subsumed_preds;\r
2362                         set<unsigned int>::iterator sii;\r
2363 #ifdef PREFILTER_OK\r
2364                         for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){\r
2365                                 int pid = (*sii);\r
2366                                 if((pid>>16) == li){\r
2367                                         subsumed_preds.insert(pid & 0xffff);\r
2368                                 }\r
2369                         }\r
2370 #endif\r
2371                         string lfta_schema_str = mach_lftas[li]->make_schema();\r
2372                         string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);\r
2373                         nic_property *nicprop = NULL; // no NIC properties?\r
2374                         lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);\r
2375                 }\r
2376 \r
2377 \r
2378 //                      generate structs to store the temporal attributes\r
2379 //                      unpacked by prefilter\r
2380                 col_id_set temp_cids;\r
2381                 get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);\r
2382                 lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);\r
2383 \r
2384 //                      Compute the lfta bit signatures and the lfta colrefs\r
2385 //      do this on a per-interface basis\r
2386 #ifdef PREFILTER_OK\r
2387                         lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";\r
2388 #endif\r
2389                 map<string, vector<long long int> > lfta_sigs; // used again later\r
2390                 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){\r
2391                         string liface = (*mvsi).first;\r
2392                         vector<long long int> empty_list;\r
2393                         lfta_sigs[liface] = empty_list;\r
2394 \r
2395                         vector<col_id_set> lfta_cols;\r
2396                         vector<int> lfta_snap_length;\r
2397                         for(li=0;li<lfta_iface_lists[liface].size();++li){\r
2398                                 unsigned long long int mask=0, bpos=1;\r
2399                                 int f_pos;\r
2400                                 for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){\r
2401                                         if(prefilter_preds[liface][f_pos]->lfta_id.count(li))\r
2402                                                 mask |= bpos;\r
2403                                         bpos = bpos << 1;\r
2404                                 }\r
2405                                 lfta_sigs[liface].push_back(mask);\r
2406                                 lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));\r
2407                                 lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));\r
2408                         }\r
2409 \r
2410 //for(li=0;li<mach_lftas.size();++li){\r
2411 //printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);\r
2412 //col_id_set::iterator tcisi;\r
2413 //for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){\r
2414 //printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);\r
2415 //}\r
2416 //}\r
2417 \r
2418 \r
2419 //                      generate the prefilter\r
2420 //      Do this on a per-interface basis, except for the #define\r
2421 #ifdef PREFILTER_OK\r
2422 //                      lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";\r
2423                         lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);\r
2424 #else\r
2425                         lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns,  lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);\r
2426 \r
2427 #endif\r
2428                 }\r
2429 \r
2430 //                      Generate interface parameter lookup function\r
2431           lfta_val[lmach] += "// lookup interface properties by name\n";\r
2432           lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";\r
2433           lfta_val[lmach] += "// returns NULL if given property does not exist\n";\r
2434           lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";\r
2435 \r
2436 //        collect a lit of interface names used by queries running on this host\r
2437           set<std::string> iface_names;\r
2438           for(i=0;i<mach_query_names[lmach].size();i++){\r
2439                 int mi = mach_query_names[lmach][i];\r
2440                 stream_query *lfta_sq = lfta_mach_lists[lmach][i];\r
2441 \r
2442                 if(interface_names[mi]=="")\r
2443                         iface_names.insert("DEFAULTDEV");\r
2444                 else\r
2445                         iface_names.insert(interface_names[mi]);\r
2446           }\r
2447 \r
2448 //        generate interface property lookup code for every interface\r
2449           set<std::string>::iterator sir;\r
2450           for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {\r
2451                 if (sir == iface_names.begin())\r
2452                         lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";\r
2453                 else\r
2454                         lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";\r
2455 \r
2456                 // iterate through interface properties\r
2457                 vector<string> iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);\r
2458                 if (erri) {\r
2459                         fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());\r
2460                         exit(1);\r
2461                 }\r
2462                 if (iface_properties.empty())\r
2463                         lfta_val[lmach] += "\t\treturn NULL;\n";\r
2464                 else {\r
2465                         for (int i = 0; i < iface_properties.size(); ++i) {\r
2466                                 if (i == 0)\r
2467                                         lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";\r
2468                                 else\r
2469                                         lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";\r
2470 \r
2471                                 // combine all values for the interface property using comma separator\r
2472                                 vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);\r
2473                                 for (int j = 0; j < vals.size(); ++j) {\r
2474                                         lfta_val[lmach] += "\t\t\treturn \"" + vals[j];\r
2475                                         if (j != vals.size()-1)\r
2476                                                 lfta_val[lmach] += ",";\r
2477                                 }\r
2478                                 lfta_val[lmach] += "\";\n";\r
2479                         }\r
2480                         lfta_val[lmach] += "\t\t} else\n";\r
2481                         lfta_val[lmach] += "\t\t\treturn NULL;\n";\r
2482                 }\r
2483           }\r
2484           lfta_val[lmach] += "\t} else\n";\r
2485           lfta_val[lmach] += "\t\treturn NULL;\n";\r
2486           lfta_val[lmach] += "}\n\n";\r
2487 \r
2488 \r
2489 //                      Generate a full list of FTAs for clearinghouse reference\r
2490           lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";\r
2491           lfta_val[lmach] += "const gs_sp_t fta_names[] = {";\r
2492 \r
2493           for (i = 0; i < query_names.size(); ++i) {\r
2494                    if (i)\r
2495                           lfta_val[lmach] += ", ";\r
2496                    lfta_val[lmach] += "\"" + query_names[i] + "\"";\r
2497           }\r
2498           for (i = 0; i < hfta_list.size(); ++i) {\r
2499                    lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";\r
2500           }\r
2501           lfta_val[lmach] += ", NULL};\n\n";\r
2502 \r
2503 \r
2504 //                      Add the initialization function to lfta.c\r
2505 //      Change to accept the interface name, and \r
2506 //      set the prefilter function accordingly.\r
2507 //      see the example in demo/err2\r
2508           lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";\r
2509 \r
2510 //        for(i=0;i<mach_query_names[lmach].size();i++)\r
2511 //              int mi = mach_query_names[lmach][i];\r
2512 //              stream_query *lfta_sq = lfta_mach_lists[lmach][i];\r
2513 \r
2514           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){\r
2515                 string liface = (*mvsi).first;\r
2516                 vector<stream_query *> lfta_list = (*mvsi).second;\r
2517                 for(i=0;i<lfta_list.size();i++){\r
2518                         stream_query *lfta_sq = lfta_list[i];\r
2519                         int mi = lfta_iface_qname_ix[liface][i];\r
2520                 \r
2521                         fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);\r
2522 \r
2523                         string this_iface = "DEFAULTDEV";\r
2524                         if(interface_names[mi]!="")\r
2525                                 this_iface = '"'+interface_names[mi]+'"';\r
2526                         lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";\r
2527                 lfta_val[lmach] += "\t\tfta_register(\""+query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";\r
2528 //              if(interface_names[mi]=="")\r
2529 //                              lfta_val[lmach]+="DEFAULTDEV";\r
2530 //              else\r
2531 //                              lfta_val[lmach]+='"'+interface_names[mi]+'"';\r
2532                         lfta_val[lmach] += this_iface;\r
2533 \r
2534 \r
2535                 lfta_val[lmach] += ", "+generate_alloc_name(query_names[mi])\r
2536                         +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(query_names[mi])\r
2537                         +"\n#endif\n";\r
2538                                 sprintf(tmpstr,",%d",snap_lengths[mi]);\r
2539                         lfta_val[lmach] += tmpstr;\r
2540 \r
2541 //                      unsigned long long int mask=0, bpos=1;\r
2542 //                      int f_pos;\r
2543 //                      for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){\r
2544 //                              if(prefilter_preds[f_pos]->lfta_id.count(i))\r
2545 //                                      mask |= bpos;\r
2546 //                              bpos = bpos << 1;\r
2547 //                      }\r
2548 \r
2549 #ifdef PREFILTER_OK\r
2550 //                      sprintf(tmpstr,",%lluull",mask);\r
2551                         sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);\r
2552                         lfta_val[lmach]+=tmpstr;\r
2553 #else\r
2554                         lfta_val[lmach] += ",0ull";\r
2555 #endif\r
2556 \r
2557                         lfta_val[lmach] += ");\n";\r
2558 \r
2559 \r
2560 \r
2561 //    End of lfta prefilter stuff\r
2562 // --------------------------------------------------\r
2563 \r
2564 //                      If there is a field verifier, warn about\r
2565 //                      lack of compatability\r
2566                   string lfta_comment;\r
2567                   string lfta_title;\r
2568                   string lfta_namespace;\r
2569                   map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();\r
2570                   if(ldefs.count("comment")>0)\r
2571                         lfta_comment = lfta_sq->defines["comment"];\r
2572                   if(ldefs.count("Comment")>0)\r
2573                         lfta_comment = lfta_sq->defines["Comment"];\r
2574                   if(ldefs.count("COMMENT")>0)\r
2575                         lfta_comment = lfta_sq->defines["COMMENT"];\r
2576                   if(ldefs.count("title")>0)\r
2577                         lfta_title = lfta_sq->defines["title"];\r
2578                   if(ldefs.count("Title")>0)\r
2579                         lfta_title = lfta_sq->defines["Title"];\r
2580                   if(ldefs.count("TITLE")>0)\r
2581                         lfta_title = lfta_sq->defines["TITLE"];\r
2582                   if(ldefs.count("NAMESPACE")>0)\r
2583                         lfta_namespace = lfta_sq->defines["NAMESPACE"];\r
2584                   if(ldefs.count("Namespace")>0)\r
2585                         lfta_namespace = lfta_sq->defines["Namespace"];\r
2586                   if(ldefs.count("namespace")>0)\r
2587                         lfta_namespace = lfta_sq->defines["namespace"];\r
2588 \r
2589                   string lfta_ht_size;\r
2590                   if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")\r
2591                         lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);\r
2592                   if(ldefs.count("aggregate_slots")>0){\r
2593                         lfta_ht_size = ldefs["aggregate_slots"];\r
2594                   }\r
2595 \r
2596 //                      NOTE : I'm assuming that visible lftas do not start with _fta.\r
2597 //                              -- will fail for non-visible simple selection queries.\r
2598                 if(field_verifier != NULL && query_names[mi].substr(0,1) != "_"){\r
2599                         string warning_str;\r
2600                         if(lfta_comment == "")\r
2601                                 warning_str += "\tcomment not found.\n";\r
2602                         if(lfta_title == "")\r
2603                                 warning_str += "\ttitle not found.\n";\r
2604                         if(lfta_namespace == "")\r
2605                                 warning_str += "\tnamespace not found.\n";\r
2606 \r
2607                         vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();\r
2608                         int fi;\r
2609                         for(fi=0;fi<flds.size();fi++){\r
2610                                 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);\r
2611                         }\r
2612                         if(warning_str != "")\r
2613                                 fprintf(stderr,"Warning, in LFTA stream %s:\n%s",\r
2614                                         query_names[mi].c_str(),warning_str.c_str());\r
2615                 }\r
2616 \r
2617 \r
2618 //                      Create qtree output\r
2619                 fprintf(qtree_output,"\t<LFTA name='%s' >\n",query_names[mi].c_str());\r
2620                   if(lfta_comment != "")\r
2621                         fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());\r
2622                   if(lfta_title != "")\r
2623                         fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());\r
2624                   if(lfta_namespace != "")\r
2625                         fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());\r
2626                   if(lfta_ht_size != "")\r
2627                         fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());\r
2628                 if(lmach != "")\r
2629                   fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());\r
2630                 else\r
2631                   fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());\r
2632                 fprintf(qtree_output,"\t\t<Interface  value='%s' />\n",interface_names[mi].c_str());\r
2633                 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());\r
2634                 fprintf(qtree_output,"\t\t<Rate value='100' />\n");\r
2635                 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);\r
2636 //                              write info about fields to qtree.xml\r
2637                   vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();\r
2638                   int fi;\r
2639                   for(fi=0;fi<flds.size();fi++){\r
2640                         fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());\r
2641                         if(flds[fi]->get_modifier_list()->size()){\r
2642                                 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());\r
2643                         }\r
2644                         fprintf(qtree_output," />\n");\r
2645                   }\r
2646                 fprintf(qtree_output,"\t</LFTA>\n");\r
2647 \r
2648 \r
2649             }\r
2650           }\r
2651 \r
2652           for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){\r
2653                         string liface = (*mvsi).first;\r
2654                         lfta_val[lmach] += \r
2655 "       if (!strcmp(device, \""+liface+"\")) \n"\r
2656 "               lfta_prefilter = &lfta_prefilter_"+liface+"; \n"\r
2657 ;\r
2658                 }\r
2659                 lfta_val[lmach] += \r
2660 "       if(lfta_prefilter == NULL){\n"\r
2661 "               fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"\r
2662 "               exit(1);\n"\r
2663 "       }\n"\r
2664 ;\r
2665 \r
2666 \r
2667 \r
2668           lfta_val[lmach] += "}\n\n";\r
2669 \r
2670       if(!(debug_only || hfta_only) ){\r
2671                 string lfta_flnm;\r
2672                 if(lmach != "")\r
2673                   lfta_flnm = lmach + "_lfta.c";\r
2674                 else\r
2675                   lfta_flnm = hostname + "_lfta.c";\r
2676                 if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){\r
2677                         fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));\r
2678                         exit(1);\r
2679                 }\r
2680               fprintf(lfta_out,"%s",lfta_header.c_str());\r
2681               fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());\r
2682               fprintf(lfta_out,"%s",lfta_val[lmach].c_str());\r
2683                 fclose(lfta_out);\r
2684           }\r
2685         }\r
2686 \r
2687 //              Say what are the operators which must execute\r
2688         if(opviews.size()>0)\r
2689                 fprintf(stderr,"The queries use the following external operators:\n");\r
2690         for(i=0;i<opviews.size();++i){\r
2691                 opview_entry *opv = opviews.get_entry(i);\r
2692                 fprintf(stderr,"\t%s\n",opv->view_name.c_str());\r
2693         }\r
2694 \r
2695         if(create_makefile)\r
2696                 generate_makefile(input_file_names, nfiles, hfta_names, opviews,\r
2697                 machine_names, schema_file_name,\r
2698                 interface_names,\r
2699                 ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);\r
2700 \r
2701 \r
2702         fprintf(qtree_output,"</QueryNodes>\n");\r
2703 \r
2704         return(0);\r
2705 }\r
2706 \r
2707 ////////////////////////////////////////////////////////////\r
2708 \r
2709 void generate_makefile(vector<string> &input_file_names, int nfiles,\r
2710                                            vector<string> &hfta_names, opview_set &opviews,\r
2711                                                 vector<string> &machine_names,\r
2712                                                 string schema_file_name,\r
2713                                                 vector<string> &interface_names,\r
2714                                                 ifq_t *ifdb, string &config_dir_path,\r
2715                                                 bool use_pads,\r
2716                                                 string extra_libs,\r
2717                                                 map<string, vector<int> > &rts_hload\r
2718                                          ){\r
2719         int i,j;\r
2720 \r
2721         if(config_dir_path != ""){\r
2722                 config_dir_path = "-C "+config_dir_path;\r
2723         }\r
2724 \r
2725         struct stat sb;\r
2726         bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;\r
2727         bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;\r
2728 \r
2729 //      if(libz_exists && !libast_exists){\r
2730 //              fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");\r
2731 //              exit(1);\r
2732 //      }\r
2733 \r
2734 //                      Get set of operator executable files to run\r
2735         set<string> op_fls;\r
2736         set<string>::iterator ssi;\r
2737         for(i=0;i<opviews.size();++i){\r
2738                 opview_entry *opv = opviews.get_entry(i);\r
2739                 if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);\r
2740         }\r
2741 \r
2742         FILE *outfl = fopen("Makefile", "w");\r
2743         if(outfl==NULL){\r
2744                 fprintf(stderr,"Can't open Makefile for write, exiting.\n");\r
2745                 exit(0);\r
2746         }\r
2747 \r
2748         fputs(\r
2749 ("CPP= g++ -O3 -g -I "+root_path+"/include  -I "+root_path+"/include/hfta\n"\r
2750 "CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"\r
2751 ).c_str(), outfl\r
2752 );\r
2753         if(generate_stats)\r
2754                 fprintf(outfl,"  -DLFTA_STATS");\r
2755 \r
2756 //              Gather the set of interfaces\r
2757 //              Also, gather "base interface names" for use in computing\r
2758 //              the hash splitting to virtual interfaces.\r
2759 //              TODO : must update to hanndle machines\r
2760         set<string> ifaces;\r
2761         set<string> base_vifaces;       // base interfaces of virtual interfaces\r
2762         map<string, string> ifmachines;\r
2763         map<string, string> ifattrs;\r
2764         for(i=0;i<interface_names.size();++i){\r
2765                 ifaces.insert(interface_names[i]);\r
2766                 ifmachines[interface_names[i]] = machine_names[i];\r
2767 \r
2768                 size_t Xpos = interface_names[i].find_last_of("X");\r
2769                 if(Xpos!=string::npos){\r
2770                         string iface = interface_names[i].substr(0,Xpos);\r
2771                         base_vifaces.insert(iface);\r
2772                 }\r
2773                 // get interface attributes and add them to the list\r
2774         }\r
2775 \r
2776 //              Do we need to include protobuf libraries?\r
2777         bool use_proto = false;\r
2778         int erri;\r
2779         string err_str;\r
2780         for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){\r
2781                 string ifnm = (*ssi);\r
2782                 vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);\r
2783                 for(int ift_i=0;ift_i<ift.size();ift_i++){\r
2784                         if(ift[ift_i]=="PROTO"){\r
2785                                 use_proto = true;\r
2786                         }\r
2787                 }\r
2788         }\r
2789 \r
2790         fprintf(outfl,\r
2791 "\n"\r
2792 "\n"\r
2793 "all: rts");\r
2794         for(i=0;i<hfta_names.size();++i)\r
2795                 fprintf(outfl," %s",hfta_names[i].c_str());\r
2796         fputs(\r
2797 ("\n"\r
2798 "\n"\r
2799 "rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a  "+root_path+"/lib/libclearinghouse.a\n"\r
2800 "\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);\r
2801         if(use_pads)\r
2802                 fprintf(outfl,"-L. ");\r
2803         fputs(\r
2804 ("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);\r
2805         if(use_pads)\r
2806                 fprintf(outfl,"-lgscppads -lpads ");\r
2807         fprintf(outfl,\r
2808 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux  -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");\r
2809         if(use_pads)\r
2810                 fprintf(outfl, " -lpz -lz -lbz ");\r
2811         if(libz_exists && libast_exists)\r
2812                 fprintf(outfl," -last ");\r
2813         if(use_pads)\r
2814                 fprintf(outfl, " -ldll -ldl ");\r
2815         if(use_proto)\r
2816                 fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");\r
2817         fprintf(outfl," -lgscpaux");\r
2818 #ifdef GCOV\r
2819         fprintf(outfl," -fprofile-arcs");\r
2820 #endif\r
2821         fprintf(outfl,\r
2822 "\n"\r
2823 "\n"\r
2824 "lfta.o: %s_lfta.c\n"\r
2825 "\t$(CC) -o lfta.o -c %s_lfta.c\n"\r
2826 "\n"\r
2827 "%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());\r
2828         for(i=0;i<nfiles;++i)\r
2829                 fprintf(outfl," %s",input_file_names[i].c_str());\r
2830         if(hostname == ""){\r
2831                 fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());\r
2832         }else{\r
2833                 fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());\r
2834         }\r
2835         for(i=0;i<nfiles;++i)\r
2836                 fprintf(outfl," %s",input_file_names[i].c_str());\r
2837         fprintf(outfl,"\n");\r
2838 \r
2839         for(i=0;i<hfta_names.size();++i)\r
2840                 fprintf(outfl,\r
2841 ("%s: %s.o\n"\r
2842 "\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"\r
2843 "\n"\r
2844 "%s.o: %s.cc\n"\r
2845 "\t$(CPP) -o %s.o -c %s.cc\n"\r
2846 "\n"\r
2847 "\n").c_str(),\r
2848     hfta_names[i].c_str(), hfta_names[i].c_str(),\r
2849         hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),\r
2850         hfta_names[i].c_str(), hfta_names[i].c_str(),\r
2851         hfta_names[i].c_str(), hfta_names[i].c_str()\r
2852                 );\r
2853 \r
2854         fprintf(outfl,\r
2855 ("\n"\r
2856 "packet_schema.txt:\n"\r
2857 "\tln -s "+root_path+"/cfg/packet_schema.txt .\n"\r
2858 "\n"\r
2859 "external_fcns.def:\n"\r
2860 "\tln -s "+root_path+"/cfg/external_fcns.def .\n"\r
2861 "\n"\r
2862 "clean:\n"\r
2863 "\trm -rf core rts *.o  %s_lfta.c  external_fcns.def packet_schema.txt").c_str(),hostname.c_str());\r
2864         for(i=0;i<hfta_names.size();++i)\r
2865                 fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());\r
2866         fprintf(outfl,"\n");\r
2867 \r
2868         fclose(outfl);\r
2869 \r
2870 \r
2871 \r
2872 //              Gather the set of interfaces\r
2873 //              TODO : must update to hanndle machines\r
2874 //              TODO : lookup interface attributes and add them as a parameter to rts process\r
2875         outfl = fopen("runit", "w");\r
2876         if(outfl==NULL){\r
2877                 fprintf(stderr,"Can't open runit for write, exiting.\n");\r
2878                 exit(0);\r
2879         }\r
2880 \r
2881 \r
2882         fputs(\r
2883 ("#!/bin/sh\n"\r
2884 "./stopit\n"\r
2885 +root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"\r
2886 "sleep 5\n"\r
2887 "if [ ! -f gshub.log ]\n"\r
2888 "then\n"\r
2889 "\techo \"Failed to start bin/gshub.py\"\n"\r
2890 "\texit -1\n"\r
2891 "fi\n"\r
2892 "ADDR=`cat gshub.log`\n"\r
2893 "ps opgid= $! >> gs.pids\n"\r
2894 "./rts $ADDR default ").c_str(), outfl);\r
2895 //      int erri;\r
2896 //      string err_str;\r
2897         for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){\r
2898                 string ifnm = (*ssi);\r
2899                 fprintf(outfl, "%s ",ifnm.c_str());\r
2900                 vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);\r
2901                 for(j=0;j<ifv.size();++j)\r
2902                         fprintf(outfl, "%s ",ifv[j].c_str());\r
2903         }\r
2904         fprintf(outfl, " &\n");\r
2905         fprintf(outfl, "echo $! >> gs.pids\n");\r
2906         for(i=0;i<hfta_names.size();++i)\r
2907                 fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());\r
2908 \r
2909         for(j=0;j<opviews.opview_list.size();++j){\r
2910                 fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());\r
2911         }\r
2912 \r
2913         fclose(outfl);\r
2914         system("chmod +x runit");\r
2915 \r
2916         outfl = fopen("stopit", "w");\r
2917         if(outfl==NULL){\r
2918                 fprintf(stderr,"Can't open stopit for write, exiting.\n");\r
2919                 exit(0);\r
2920         }\r
2921 \r
2922         fprintf(outfl,"#!/bin/sh\n"\r
2923 "rm -f gshub.log\n"\r
2924 "if [ ! -f gs.pids ]\n"\r
2925 "then\n"\r
2926 "exit\n"\r
2927 "fi\n"\r
2928 "for pgid in `cat gs.pids`\n"\r
2929 "do\n"\r
2930 "kill -TERM -$pgid\n"\r
2931 "done\n"\r
2932 "sleep 1\n"\r
2933 "for pgid in `cat gs.pids`\n"\r
2934 "do\n"\r
2935 "kill -9 -$pgid\n"\r
2936 "done\n"\r
2937 "rm gs.pids\n");\r
2938 \r
2939         fclose(outfl);\r
2940         system("chmod +x stopit");\r
2941 \r
2942 //-----------------------------------------------\r
2943 \r
2944 /* For now disable support for virtual interfaces\r
2945         outfl = fopen("set_vinterface_hash.bat", "w");\r
2946         if(outfl==NULL){\r
2947                 fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");\r
2948                 exit(0);\r
2949         }\r
2950 \r
2951 //              The format should be determined by an entry in the ifres.xml file,\r
2952 //              but for now hardcode the only example I have.\r
2953         for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){\r
2954                 if(rts_hload.count((*ssi))){\r
2955                         string iface_name = (*ssi);\r
2956                         string iface_number = "";\r
2957                         for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){\r
2958                                 if(isdigit(iface_name[j])){\r
2959                                         iface_number = iface_name[j];\r
2960                                         if(j>0 && isdigit(iface_name[j-1]))\r
2961                                                 iface_number = iface_name[j-1] + iface_number;\r
2962                                 }\r
2963                         }\r
2964 \r
2965                         fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());\r
2966                         vector<int> halloc = rts_hload[iface_name];\r
2967                         int prev_limit = 0;\r
2968                         for(j=0;j<halloc.size();++j){\r
2969                                 if(j>0)\r
2970                                         fprintf(outfl,":");\r
2971                                 fprintf(outfl,"%d-%d",prev_limit,halloc[j]);\r
2972                                 prev_limit = halloc[j];\r
2973                         }\r
2974                         fprintf(outfl,"\n");\r
2975                 }\r
2976         }\r
2977         fclose(outfl);\r
2978         system("chmod +x set_vinterface_hash.bat");\r
2979 */\r
2980 }\r
2981 \r
2982 //              Code for implementing a local schema\r
2983 /*\r
2984                 table_list qpSchema;\r
2985 \r
2986 //                              Load the schemas of any LFTAs.\r
2987                 int l;\r
2988                 for(l=0;l<hfta_nbr;++l){\r
2989                         stream_query *sq0 = split_queries[l];\r
2990                         table_def *td = sq0->get_output_tabledef();\r
2991                         qpSchema.append_table(td);\r
2992                 }\r
2993 //                              load the schemas of any other ref'd tables.\r
2994 //                              (e.g., hftas)\r
2995                 vector<tablevar_t *>  input_tbl_names = split_queries[hfta_nbr]->get_input_tables();\r
2996                 int ti;\r
2997                 for(ti=0;ti<input_tbl_names.size();++ti){\r
2998                         int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());\r
2999                         if(tbl_ref < 0){\r
3000                                 tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());\r
3001                                 if(tbl_ref < 0){\r
3002                                         fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());\r
3003                                         exit(1);\r
3004                                 }\r
3005                                 qpSchema.append_table(Schema->get_table(tbl_ref));\r
3006                         }\r
3007                 }\r
3008 */\r
3009 \r
3010 //              Functions related to parsing.\r
3011 \r
3012 /*\r
3013 static int split_string(char *instr,char sep, char **words,int max_words){\r
3014    char *loc;\r
3015    char *str;\r
3016    int nwords = 0;\r
3017 \r
3018    str = instr;\r
3019    words[nwords++] = str;\r
3020    while( (loc = strchr(str,sep)) != NULL){\r
3021         *loc = '\0';\r
3022         str = loc+1;\r
3023         if(nwords >= max_words){\r
3024                 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);\r
3025                 nwords = max_words-1;\r
3026         }\r
3027         words[nwords++] = str;\r
3028    }\r
3029 \r
3030    return(nwords);\r
3031 }\r
3032 \r
3033 */\r
3034 \r