1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #include<unistd.h> // for gethostname
19 #include "parse_fta.h"
20 #include "parse_schema.h"
21 #include "parse_ext_fcns.h"
22 #include"analyze_fta.h"
23 #include"query_plan.h"
24 #include"generate_lfta_code.h"
25 #include"stream_query.h"
26 #include"generate_utils.h"
28 #include"generate_nic_code.h"
39 #include <sys/types.h>
45 // to verify that some files exist.
46 #include <sys/types.h>
49 #include "parse_partn.h"
51 #include "print_plan.h"
53 // Interface to the xml parser
56 #include"field_list.h"
60 extern int xmlParserparse(void);
61 extern FILE *xmlParserin;
62 extern int xmlParserdebug;
64 std::vector<std::string> xml_attr_vec;
65 std::vector<std::string> xml_val_vec;
66 std::string xml_a, xml_v;
67 xml_t *xml_leaves = NULL;
69 // Interface to the field list verifier
70 field_list *field_verifier = NULL;
72 #define TMPSTRLEN 1000
75 #define PATH_DELIM '/'
78 char tmp_schema_str[10000];
80 // maximum delay between two hearbeats produced
81 // by UDOP. Used when its not explicity
82 // provided in udop definition
83 #define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
85 // Default lfta hash table size, must be power of 2.
86 int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
88 // Interface to FTA definition lexer and parser ...
90 extern int FtaParserparse(void);
91 extern FILE *FtaParserin;
92 extern int FtaParserdebug;
94 fta_parse_t *fta_parse_result;
95 var_defs_t *fta_parse_defines;
99 // Interface to external function lexer and parser ...
101 extern int Ext_fcnsParserparse(void);
102 extern FILE *Ext_fcnsParserin;
103 extern int Ext_fcnsParserdebug;
105 ext_fcn_list *Ext_fcns;
108 // Interface to partition definition parser
109 extern int PartnParserparse();
110 partn_def_list_t *partn_parse_result = NULL;
118 // forward delcaration of local utility function
119 void generate_makefile(vector<string> &input_file_names, int nfiles,
120 vector<string> &hfta_names, opview_set &opviews,
121 vector<string> &machine_names,
122 string schema_file_name,
123 vector<string> &interface_names,
124 ifq_t *ifdb, string &config_dir_path,
127 map<string, vector<int> > &rts_hload
130 //static int split_string(char *instr,char sep, char **words,int max_words);
133 FILE *schema_summary_output = NULL; // query names
135 // Dump schema summary
136 void dump_summary(stream_query *str){
137 for(int q=0;q<str->query_plan.size();++q){
138 qp_node *qp = str->query_plan[q];
140 continue; // there can be blanks
142 fprintf(schema_summary_output,"-----\n");
143 fprintf(schema_summary_output,"%s\n",qp->node_name.c_str());
145 table_def *sch = qp->get_fields();
147 vector<field_entry *> flds = sch->get_fields();
149 for(f=0;f<flds.size();++f){
150 if(f>0) fprintf(schema_summary_output,"|");
151 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
153 fprintf(schema_summary_output,"\n");
154 for(f=0;f<flds.size();++f){
155 if(f>0) fprintf(schema_summary_output,"|");
156 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
158 fprintf(schema_summary_output,"\n");
160 map<std::string, std::string> defines = qp->get_definitions();
162 if(defines.count("comment")>0){
163 comment = defines["comment"];
165 fprintf(schema_summary_output,"%s\n",comment.c_str());
167 vector<tablevar_t *> input_tables = qp->get_input_tbls();
168 for(int t=0; t<input_tables.size(); ++t){
169 if(t>0) fprintf(schema_summary_output,"|");
170 string machine = input_tables[t]->get_machine();
171 string iface = input_tables[t]->get_interface();
172 string schema = input_tables[t]->get_schema_name();
174 fprintf(schema_summary_output,"%s.",machine.c_str());
177 fprintf(schema_summary_output,"%s.",iface.c_str());
179 if(machine!="") fprintf(schema_summary_output,".");
181 fprintf(schema_summary_output,"%s",schema.c_str());
184 fprintf(schema_summary_output,"\n");
190 fprintf(schema_summary_output,"-----\n");
191 fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
193 table_def *sch = str->get_output_tabledef();
195 vector<field_entry *> flds = sch->get_fields();
197 for(f=0;f<flds.size();++f){
198 if(f>0) fprintf(schema_summary_output,"|");
199 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
201 fprintf(schema_summary_output,"\n");
202 for(f=0;f<flds.size();++f){
203 if(f>0) fprintf(schema_summary_output,"|");
204 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
206 fprintf(schema_summary_output,"\n");
209 if(str->defines.count("comment")>0){
210 comment = str->defines["comment"];
212 fprintf(schema_summary_output,"%s\n",comment.c_str());
214 vector<tablevar_t *> input_tables = str->get_input_tables();
215 for(int t=0; t<input_tables.size(); ++t){
216 if(t>0) fprintf(schema_summary_output,"|");
217 string machine = input_tables[t]->get_machine();
218 string iface = input_tables[t]->get_interface();
219 string schema = input_tables[t]->get_schema_name();
221 fprintf(schema_summary_output,"%s.",machine.c_str());
224 fprintf(schema_summary_output,"%s.",iface.c_str());
226 if(machine!="") fprintf(schema_summary_output,".");
228 fprintf(schema_summary_output,"%s",schema.c_str());
230 fprintf(schema_summary_output,"\n");
236 string hostname; // name of current host.
238 bool generate_stats = false;
239 string root_path = "../..";
242 int main(int argc, char **argv){
243 char tmpstr[TMPSTRLEN];
247 set<int>::iterator si;
249 vector<string> registration_query_names; // for lfta.c registration
250 map<string, vector<int> > mach_query_names; // list queries of machine
251 vector<int> snap_lengths; // for lfta.c registration
252 vector<int> snap_position; // for lfta.c registration
253 vector<string> interface_names; // for lfta.c registration
254 vector<string> machine_names; // machine of interface
255 vector<bool> lfta_reuse_options; // for lfta.c registration
256 vector<int> lfta_liveness_timeouts; // fot qtree.xml generation
257 vector<string> hfta_names; // hfta cource code names, for
258 // creating make file.
259 vector<string> qnames; // ensure unique names
260 map<string, int> lfta_names; // keep track of unique lftas.
263 // set these to 1 to debug the parser
265 Ext_fcnsParserdebug = 0;
267 FILE *lfta_out; // lfta.c output.
268 FILE *fta_in; // input file
269 FILE *table_schemas_in; // source tables definition file
270 FILE *query_name_output; // query names
271 FILE *qtree_output; // interconnections of query nodes
273 // -------------------------------
274 // Handling of Input Arguments
275 // -------------------------------
276 char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
277 const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
278 "\t[-B] : debug only (don't create output files)\n"
279 "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
280 "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
281 "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
282 "\t[-C] : use <config directory> for definition files\n"
283 "\t[-l] : use <library directory> for library queries\n"
284 "\t[-N] : output query names in query_names.txt\n"
285 "\t[-H] : create HFTA only (no schema_file)\n"
286 "\t[-Q] : use query name for hfta suffix\n"
287 "\t[-M] : generate make file and runit, stopit scripts\n"
288 "\t[-S] : enable LFTA statistics (alters Makefile).\n"
289 "\t[-f] : Output schema summary to schema_summary.txt\n"
290 "\t[-P] : link with PADS\n"
291 "\t[-h] : override host name.\n"
292 "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
293 "\t[-R] : path to root of GS-lite\n"
296 // parameters gathered from command line processing
297 string external_fcns_path;
298 // string internal_fcn_path;
299 string config_dir_path;
300 string library_path = "./";
301 vector<string> input_file_names;
302 string schema_file_name;
303 bool debug_only = false;
304 bool hfta_only = false;
305 bool output_query_names = false;
306 bool output_schema_summary=false;
307 bool numeric_hfta_flname = true;
308 bool create_makefile = false;
309 bool distributed_mode = false;
310 bool partitioned_mode = false;
311 bool use_live_hosts_file = false;
312 bool use_pads = false;
313 bool clean_make = false;
314 int n_virtual_interfaces = 1;
317 while((chopt = getopt(argc,argv,optstr)) != -1){
323 distributed_mode = true;
326 partitioned_mode = true;
329 use_live_hosts_file = true;
333 config_dir_path = string(optarg) + string("/");
337 library_path = string(optarg) + string("/");
340 output_query_names = true;
343 numeric_hfta_flname = false;
346 if(schema_file_name == ""){
351 output_schema_summary=true;
354 create_makefile=true;
375 n_virtual_interfaces = atoi(optarg);
376 if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
377 fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
378 n_virtual_interfaces = 1;
383 fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
384 fprintf(stderr,"%s\n", usage_str);
387 fprintf(stderr, "Argument was %c\n", optopt);
388 fprintf(stderr,"Invalid arguments\n");
389 fprintf(stderr,"%s\n", usage_str);
395 for (int i = 0; i < argc; ++i) {
396 if((schema_file_name == "") && !hfta_only){
397 schema_file_name = argv[i];
399 input_file_names.push_back(argv[i]);
403 if(input_file_names.size() == 0){
404 fprintf(stderr,"%s\n", usage_str);
409 string clean_cmd = "rm Makefile hfta_*.cc";
410 int clean_ret = system(clean_cmd.c_str());
412 fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
417 nic_prop_db *npdb = new nic_prop_db(config_dir_path);
419 // Open globally used file names.
421 // prepend config directory to schema file
422 schema_file_name = config_dir_path + schema_file_name;
423 external_fcns_path = config_dir_path + string("external_fcns.def");
424 string ifx_fname = config_dir_path + string("ifres.xml");
426 // Find interface query file(s).
428 gethostname(tmpstr,TMPSTRLEN);
431 hostname_len = strlen(tmpstr);
432 string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
433 vector<string> ifq_fls;
435 ifq_fls.push_back(ifq_fname);
438 // Get the field list, if it exists
439 string flist_fl = config_dir_path + "field_list.xml";
441 if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
442 fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
443 xml_leaves = new xml_t();
444 xmlParser_setfileinput(flf_in);
445 if(xmlParserparse()){
446 fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
448 field_verifier = new field_list(xml_leaves);
453 if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
454 fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
460 if(!(debug_only || hfta_only)){
461 if((lfta_out = fopen("lfta.c","w")) == NULL){
462 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
468 // Get the output specification file.
470 // query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
471 string ospec_fl = "output_spec.cfg";
473 vector<ospec_str *> output_specs;
474 multimap<string, int> qname_to_ospec;
475 if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
478 while(fgets(tmpstr,TMPSTRLEN,osp_in)){
480 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
481 if(tmpstr[0]!='\n' && tmpstr[0]!='\r' && tmpstr[0]!='\0' && tmpstr[0]!='#'){
483 // make operator type lowercase
485 for(tmpc=flds[1];*tmpc!='\0';++tmpc)
486 *tmpc = tolower(*tmpc);
488 ospec_str *tmp_ospec = new ospec_str();
489 tmp_ospec->query = flds[0];
490 tmp_ospec->operator_type = flds[1];
491 tmp_ospec->operator_param = flds[2];
492 tmp_ospec->output_directory = flds[3];
493 tmp_ospec->bucketwidth = atoi(flds[4]);
494 tmp_ospec->partitioning_flds = flds[5];
495 tmp_ospec->n_partitions = atoi(flds[6]);
496 qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
497 output_specs.push_back(tmp_ospec);
499 fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
505 fprintf(stderr,"output_spec.cfg not found. The query set has no output. exiting.\n");
510 string pspec_fl = "hfta_parallelism.cfg";
512 map<string, int> hfta_parallelism;
513 if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
516 while(fgets(tmpstr,TMPSTRLEN,psp_in)){
517 bool good_entry = true;
519 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
521 string hname = flds[0];
522 int par = atoi(flds[1]);
523 if(par <= 0 || par > n_virtual_interfaces){
524 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
527 if(good_entry && n_virtual_interfaces % par != 0){
528 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
532 hfta_parallelism[hname] = par;
536 fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
540 // LFTA hash table sizes
541 string htspec_fl = "lfta_htsize.cfg";
542 FILE *htsp_in = NULL;
543 map<string, int> lfta_htsize;
544 if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
547 while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
548 bool good_entry = true;
550 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
552 string lfta_name = flds[0];
553 int htsz = atoi(flds[1]);
555 lfta_htsize[lfta_name] = htsz;
557 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
562 fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
565 // LFTA vitual interface hash split
566 string rtlspec_fl = "rts_load.cfg";
568 map<string, vector<int> > rts_hload;
569 if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
574 while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
575 bool good_entry = true;
579 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
581 iface_name = flds[0];
584 for(j=1;j<nflds;++j){
585 int h = atoi(flds[j]);
589 hload.push_back(cumm_h);
595 rts_hload[iface_name] = hload;
597 fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
604 if(output_query_names){
605 if((query_name_output = fopen("query_names.txt","w")) == NULL){
606 fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
611 if(output_schema_summary){
612 if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
613 fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
618 if((qtree_output = fopen("qtree.xml","w")) == NULL){
619 fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
622 fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
623 fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
624 fprintf(qtree_output,"<QueryNodes>\n");
627 // Get an initial Schema
630 // Parse the table schema definitions.
631 fta_parse_result = new fta_parse_t();
632 FtaParser_setfileinput(table_schemas_in);
633 if(FtaParserparse()){
634 fprintf(stderr,"Table schema parse failed.\n");
637 if(fta_parse_result->parse_type != TABLE_PARSE){
638 fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
641 Schema = fta_parse_result->tables;
643 // Ensure that all schema_ids, if set, are distinct.
644 // Obsolete? There is code elsewhere to ensure that schema IDs are
645 // distinct on a per-interface basis.
649 for(int t=0;t<Schema->size();++t){
650 int sch_id = Schema->get_table(t)->get_schema_id();
652 if(found_ids.find(sch_id) != found_ids.end()){
653 dup_ids.insert(sch_id);
655 found_ids.insert(sch_id);
658 if(dup_ids.size()>0){
659 fprintf(stderr, "Error, the schema has duplicate schema_ids:");
660 for(auto dit=dup_ids.begin();dit!=dup_ids.end(); ++dit)
661 fprintf(stderr," %d",(*dit));
662 fprintf(stderr,"\n");
669 // Process schema field inheritance
671 retval = Schema->unroll_tables(err_str);
673 fprintf(stderr,"Error processing schema field inheritance:\n %s\n", err_str.c_str() );
677 // hfta only => we will try to fetch schemas from the registry.
678 // therefore, start off with an empty schema.
679 Schema = new table_list();
683 // Open and parse the external functions file.
684 Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
685 if(Ext_fcnsParserin == NULL){
686 fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
687 Ext_fcns = new ext_fcn_list();
689 if(Ext_fcnsParserparse()){
690 fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
691 Ext_fcns = new ext_fcn_list();
694 if(Ext_fcns->validate_fcns(err_str)){
695 fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
699 // Open and parse the interface resources file.
700 // ifq_t *ifaces_db = new ifq_t();
702 // if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
703 // fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
704 // ifx_fname.c_str(), ierr.c_str());
707 // if(ifaces_db->load_ifqs(ifq_fname, ierr)){
708 // fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
709 // ifq_fname.c_str(), ierr.c_str());
714 // The LFTA code string.
715 // Put the standard preamble here.
716 // NOTE: the hash macros, fcns should go into the run time
717 map<string, string> lfta_val;
718 map<string, string> lfta_prefilter_val;
721 "#include <limits.h>\n"
722 "#include \"rts.h\"\n"
723 "#include \"fta.h\"\n"
724 "#include \"lapp.h\"\n"
725 "#include \"rts_udaf.h\"\n"
726 "#include<stdio.h>\n"
727 "#include <float.h>\n"
728 "#include \"rdtsc.h\"\n"
729 "#include \"watchlist.h\"\n\n"
732 // Get any locally defined parsing headers
734 memset(&glob_result, 0, sizeof(glob_result));
736 // do the glob operation TODO should be from GSROOT
737 int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
738 if(return_value == 0){
740 for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
742 int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
743 lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n";
747 fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
751 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
752 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
753 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
754 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
759 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
761 "#define SLOT_FILLED 0x04\n"
762 "#define SLOT_GEN_BITS 0x03\n"
763 "#define SLOT_HASH_BITS 0xfffffff8\n"
764 "#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
765 "#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
766 "#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
769 "#define lfta_BOOL_to_hash(x) (x)\n"
770 "#define lfta_USHORT_to_hash(x) (x)\n"
771 "#define lfta_UINT_to_hash(x) (x)\n"
772 "#define lfta_IP_to_hash(x) (x)\n"
773 "#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
774 "#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
775 "#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
776 "#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
777 "#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
778 "static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
779 " gs_uint32_t i,ret=0,tmp_sum = 0;\n"
780 " for(i=0;i<x.length;++i){\n"
781 " tmp_sum |= (x.data[i]) << (8*(i%4));\n"
787 " if((i%4)!=0) ret ^=tmp_sum;\n"
793 //////////////////////////////////////////////////////////////////
794 ///// Get all of the query parse trees
798 int hfta_count = 0; // for numeric suffixes to hfta .cc files
800 //---------------------------
801 // Global info needed for post processing.
803 // Set of operator views ref'd in the query set.
805 // lftas on a per-machine basis.
806 map<string, vector<stream_query *> > lfta_mach_lists;
807 int nfiles = input_file_names.size();
808 vector<stream_query *> hfta_list; // list of hftas.
809 map<string, stream_query *> sq_map; // map from query name to stream query.
812 //////////////////////////////////////////
814 // Open and parse the interface resources file.
815 ifq_t *ifaces_db = new ifq_t();
817 if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
818 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
819 ifx_fname.c_str(), ierr.c_str());
822 if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
823 fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
824 ifq_fls[0].c_str(), ierr.c_str());
828 map<string, string> qname_to_flname; // for detecting duplicate query names
832 // Parse the files to create a vector of parse trees.
833 // Load qnodes with information to perform a topo sort
834 // based on query dependencies.
835 vector<query_node *> qnodes; // for topo sort.
836 map<string,int> name_node_map; // map query name to qnodes entry
837 for(i=0;i<input_file_names.size();i++){
839 if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
840 fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
843 fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
845 // Parse the FTA query
846 fta_parse_result = new fta_parse_t();
847 FtaParser_setfileinput(fta_in);
848 if(FtaParserparse()){
849 fprintf(stderr,"FTA parse failed.\n");
852 if(fta_parse_result->parse_type != QUERY_PARSE){
853 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
857 // returns a list of parse trees
858 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
859 for(p=0;p<qlist.size();++p){
860 table_exp_t *fta_parse_tree = qlist[p];
861 // query_parse_trees.push_back(fta_parse_tree);
863 // compute the default name -- extract from query name
864 strcpy(tmpstr,input_file_names[i].c_str());
865 char *qname = strrchr(tmpstr,PATH_DELIM);
870 char *qname_end = strchr(qname,'.');
871 if(qname_end != NULL) *qname_end = '\0';
872 string qname_str = qname;
873 string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
875 // Deternmine visibility. Should I be attaching all of the output methods?
876 if(qname_to_ospec.count(imputed_qname)>0)
877 fta_parse_tree->set_visible(true);
879 fta_parse_tree->set_visible(false);
882 // Create a manipulable repesentation of the parse tree.
883 // the qnode inherits the visibility assigned to the parse tree.
884 int pos = qnodes.size();
885 qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
886 name_node_map[ qnodes[pos]->name ] = pos;
887 //printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
888 // qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
889 // qfiles.push_back(i);
891 // Check for duplicate query names
892 // NOTE : in hfta-only generation, I should
893 // also check with the names of the registered queries.
894 if(qname_to_flname.count(qnodes[pos]->name) > 0){
895 fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
896 qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
899 if(Schema->find_tbl(qnodes[pos]->name) >= 0){
900 fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
901 qnodes[pos]->name.c_str(), input_file_names[i].c_str());
904 qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
910 // Add the library queries
913 for(pos=0;pos<qnodes.size();++pos){
915 for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
916 string src_tbl = qnodes[pos]->refd_tbls[fi];
917 if(qname_to_flname.count(src_tbl) == 0){
918 int last_sep = src_tbl.find_last_of('/');
919 if(last_sep != string::npos){
920 fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
921 string target_qname = src_tbl.substr(last_sep+1);
922 string qpathname = library_path + src_tbl + ".gsql";
923 if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
924 fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
926 fprintf(stderr,"After exit\n");
928 fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
929 // Parse the FTA query
930 fta_parse_result = new fta_parse_t();
931 FtaParser_setfileinput(fta_in);
932 if(FtaParserparse()){
933 fprintf(stderr,"FTA parse failed.\n");
936 if(fta_parse_result->parse_type != QUERY_PARSE){
937 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
941 map<string, int> local_query_map;
942 vector<string> local_query_names;
943 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
944 for(p=0;p<qlist.size();++p){
945 table_exp_t *fta_parse_tree = qlist[p];
946 fta_parse_tree->set_visible(false); // assumed to not produce output
947 string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
948 if(imputed_qname == target_qname)
949 imputed_qname = src_tbl;
950 if(local_query_map.count(imputed_qname)>0){
951 fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
954 local_query_map[ imputed_qname ] = p;
955 local_query_names.push_back(imputed_qname);
958 if(local_query_map.count(src_tbl)==0){
959 fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
963 vector<int> worklist;
964 set<int> added_queries;
965 vector<query_node *> new_qnodes;
966 worklist.push_back(local_query_map[target_qname]);
967 added_queries.insert(local_query_map[target_qname]);
969 int qpos = qnodes.size();
970 for(qq=0;qq<worklist.size();++qq){
971 int q_id = worklist[qq];
972 query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
973 new_qnodes.push_back( new_qnode);
974 vector<string> refd_tbls = new_qnode->refd_tbls;
976 for(ff = 0;ff<refd_tbls.size();++ff){
977 if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
979 if(name_node_map.count(refd_tbls[ff])>0){
980 fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s, and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
983 worklist.push_back(local_query_map[refd_tbls[ff]]);
989 for(qq=0;qq<new_qnodes.size();++qq){
990 int qpos = qnodes.size();
991 qnodes.push_back(new_qnodes[qq]);
992 name_node_map[qnodes[qpos]->name ] = qpos;
993 qname_to_flname[qnodes[qpos]->name ] = qpathname;
1007 //---------------------------------------
1012 string udop_missing_sources;
1013 for(i=0;i<qnodes.size();++i){
1015 for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
1016 int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
1018 if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
1019 if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
1020 int pos = qnodes.size();
1021 qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
1022 name_node_map[ qnodes[pos]->name ] = pos;
1023 qnodes[pos]->is_externally_visible = false; // its visible
1024 // Need to mark the source queries as visible.
1026 string missing_sources = "";
1027 for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
1028 string src_tbl = qnodes[pos]->refd_tbls[si];
1029 if(name_node_map.count(src_tbl)==0){
1030 missing_sources += src_tbl + " ";
1033 if(missing_sources != ""){
1034 udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
1041 if(udop_missing_sources != ""){
1042 fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
1048 ////////////////////////////////////////////////////////////////////
1049 /// Check parse trees to verify that some
1050 /// global properties are met :
1051 /// if q1 reads from q2, then
1052 /// q2 is processed before q1
1053 /// q1 can supply q2's parameters
1054 /// Verify there is no cycle in the reads-from graph.
1056 // Compute an order in which to process the
1059 // Start by building the reads-from lists.
1062 for(i=0;i<qnodes.size();++i){
1064 vector<string> refd_tbls = qnodes[i]->refd_tbls;
1065 for(fi = 0;fi<refd_tbls.size();++fi){
1066 if(name_node_map.count(refd_tbls[fi])>0){
1067 //printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
1068 (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
1074 // If one query reads the result of another,
1075 // check for parameter compatibility. Currently it must
1076 // be an exact match. I will move to requiring
1077 // containment after re-ordering, but will require
1078 // some analysis for code generation which is not
1080 //printf("There are %d query nodes.\n",qnodes.size());
1083 for(i=0;i<qnodes.size();++i){
1084 vector<var_pair_t *> target_params = qnodes[i]->params;
1085 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1086 vector<var_pair_t *> source_params = qnodes[(*si)]->params;
1087 if(target_params.size() != source_params.size()){
1088 fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1092 for(p=0;p<target_params.size();++p){
1093 if(! (target_params[p]->name == source_params[p]->name &&
1094 target_params[p]->val == source_params[p]->val ) ){
1095 fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1104 // Start by counting inedges.
1105 for(i=0;i<qnodes.size();++i){
1106 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1107 qnodes[(*si)]->n_consumers++;
1111 // The roots are the nodes with indegree zero.
1113 for(i=0;i<qnodes.size();++i){
1114 if(qnodes[i]->n_consumers == 0){
1115 if(qnodes[i]->is_externally_visible == false){
1116 fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible. Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
1122 // Remove the parts of the subtree that produce no output.
1123 set<int> valid_roots;
1124 set<int> discarded_nodes;
1125 set<int> candidates;
1126 while(roots.size() >0){
1127 for(si=roots.begin();si!=roots.end();++si){
1128 if(qnodes[(*si)]->is_externally_visible){
1129 valid_roots.insert((*si));
1131 discarded_nodes.insert((*si));
1132 set<int>::iterator sir;
1133 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1134 qnodes[(*sir)]->n_consumers--;
1135 if(qnodes[(*sir)]->n_consumers == 0)
1136 candidates.insert( (*sir));
1143 roots = valid_roots;
1144 if(discarded_nodes.size()>0){
1145 fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
1147 for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
1148 if(di>0 && (di%8)==0) fprintf(stderr,"\n");
1150 fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
1152 fprintf(stderr,"\n");
1155 // Compute the sources_to set, ignoring discarded nodes.
1156 for(i=0;i<qnodes.size();++i){
1157 if(discarded_nodes.count(i)==0)
1158 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1159 qnodes[(*si)]->sources_to.insert(i);
1164 // Find the nodes that are shared by multiple visible subtrees.
1165 // THe roots become inferred visible nodes.
1167 // Find the visible nodes.
1168 vector<int> visible_nodes;
1169 for(i=0;i<qnodes.size();i++){
1170 if(qnodes[i]->is_externally_visible){
1171 visible_nodes.push_back(i);
1175 // Find UDOPs referenced by visible nodes.
1177 for(i=0;i<visible_nodes.size();++i){
1178 workq.push_back(visible_nodes[i]);
1180 while(!workq.empty()){
1181 int node = workq.front();
1183 set<int>::iterator children;
1184 if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
1185 qnodes[node]->is_externally_visible = true;
1186 visible_nodes.push_back(node);
1187 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1188 if(qnodes[(*children)]->is_externally_visible == false){
1189 qnodes[(*children)]->is_externally_visible = true;
1190 visible_nodes.push_back((*children));
1194 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1195 workq.push_back((*children));
1202 for(i=0;i<qnodes.size();i++){
1203 qnodes[i]->subtree_roots.clear();
1206 // Walk the tree defined by a visible node, not descending into
1207 // subtrees rooted by a visible node. Mark the node visited with
1208 // the visible node ID.
1209 for(i=0;i<visible_nodes.size();++i){
1211 vroots.insert(visible_nodes[i]);
1212 while(vroots.size()>0){
1213 for(si=vroots.begin();si!=vroots.end();++si){
1214 qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
1216 set<int>::iterator sir;
1217 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1218 if(! qnodes[(*sir)]->is_externally_visible){
1219 candidates.insert( (*sir));
1223 vroots = candidates;
1227 // Find the nodes in multiple visible node subtrees, but with no parent
1228 // that has is in multile visible node subtrees. Mark these as inferred visible nodes.
1229 done = true; // until proven otherwise
1230 for(i=0;i<qnodes.size();i++){
1231 if(qnodes[i]->subtree_roots.size()>1){
1232 bool is_new_root = true;
1233 set<int>::iterator sir;
1234 for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
1235 if(qnodes[(*sir)]->subtree_roots.size()>1)
1236 is_new_root = false;
1239 qnodes[i]->is_externally_visible = true;
1240 qnodes[i]->inferred_visible_node = true;
1241 visible_nodes.push_back(i);
1252 // get visible nodes in topo ordering.
1253 // for(i=0;i<qnodes.size();i++){
1254 // qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
1256 vector<int> process_order;
1257 while(roots.size() >0){
1258 for(si=roots.begin();si!=roots.end();++si){
1259 if(discarded_nodes.count((*si))==0){
1260 process_order.push_back( (*si) );
1262 set<int>::iterator sir;
1263 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1264 qnodes[(*sir)]->n_consumers--;
1265 if(qnodes[(*sir)]->n_consumers == 0)
1266 candidates.insert( (*sir));
1274 //printf("process_order.size() =%d\n",process_order.size());
1276 // Search for cyclic dependencies
1278 for(i=0;i<qnodes.size();++i){
1279 if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
1280 if(found_dep.size() != 0) found_dep += ", ";
1281 found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
1284 if(found_dep.size()>0){
1285 fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
1289 // Get a list of query sets, in the order to be processed.
1290 // Start at visible root and do bfs.
1291 // The query set includes queries referenced indirectly,
1292 // as sources for user-defined operators. These are needed
1293 // to ensure that they are added to the schema, but are not part
1294 // of the query tree.
1296 // stream_node_sets contains queries reachable only through the
1297 // FROM clause, so I can tell which queries to add to the stream
1298 // query. (DISABLED, UDOPS are integrated, does this cause problems?)
1300 // NOTE: this code works because in order for data to be
1301 // read by multiple hftas, the node must be externally visible.
1302 // But visible nodes define roots of process sets.
1303 // internally visible nodes can feed data only
1304 // to other nodes in the same query file.
1305 // Therefore, any access can be restricted to a file,
1306 // hfta output sharing is done only on roots
1307 // never on interior nodes.
1312 // Conpute the base collection of hftas.
1313 vector<hfta_node *> hfta_sets;
1314 map<string, int> hfta_name_map;
1315 // vector< vector<int> > process_sets;
1316 // vector< set<int> > stream_node_sets;
1317 reverse(process_order.begin(), process_order.end()); // get listing in reverse .
1318 // order: process leaves 1st.
1319 for(i=0;i<process_order.size();++i){
1320 if(qnodes[process_order[i]]->is_externally_visible == true){
1321 //printf("Visible.\n");
1322 int root = process_order[i];
1323 hfta_node *hnode = new hfta_node();
1324 hnode->name = qnodes[root]-> name;
1325 hnode->source_name = qnodes[root]-> name;
1326 hnode->is_udop = qnodes[root]->is_udop;
1327 hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
1329 vector<int> proc_list; proc_list.push_back(root);
1330 // Ensure that nodes are added only once.
1331 set<int> proc_set; proc_set.insert(root);
1332 roots.clear(); roots.insert(root);
1334 while(roots.size()>0){
1335 for(si=roots.begin();si!=roots.end();++si){
1336 //printf("Processing root %d\n",(*si));
1337 set<int>::iterator sir;
1338 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1339 //printf("reads fom %d\n",(*sir));
1340 if(qnodes[(*sir)]->is_externally_visible==false){
1341 candidates.insert( (*sir) );
1342 if(proc_set.count( (*sir) )==0){
1343 proc_set.insert( (*sir) );
1344 proc_list.push_back( (*sir) );
1353 reverse(proc_list.begin(), proc_list.end());
1354 hnode->query_node_indices = proc_list;
1355 hfta_name_map[hnode->name] = hfta_sets.size();
1356 hfta_sets.push_back(hnode);
1360 // Compute the reads_from / sources_to graphs for the hftas.
1362 for(i=0;i<hfta_sets.size();++i){
1363 hfta_node *hnode = hfta_sets[i];
1364 for(q=0;q<hnode->query_node_indices.size();q++){
1365 query_node *qnode = qnodes[hnode->query_node_indices[q]];
1366 for(s=0;s<qnode->refd_tbls.size();++s){
1367 if(hfta_name_map.count(qnode->refd_tbls[s])){
1368 int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
1369 hnode->reads_from.insert(other_hfta);
1370 hfta_sets[other_hfta]->sources_to.insert(i);
1376 // Compute a topological sort of the hfta_sets.
1378 vector<int> hfta_topsort;
1380 int hnode_srcs[hfta_sets.size()];
1381 for(i=0;i<hfta_sets.size();++i){
1383 if(hfta_sets[i]->sources_to.size() == 0)
1387 while(! workq.empty()){
1388 int node = workq.front();
1390 hfta_topsort.push_back(node);
1391 set<int>::iterator stsi;
1392 for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
1393 int parent = (*stsi);
1394 hnode_srcs[parent]++;
1395 if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
1396 workq.push_back(parent);
1401 // Decorate hfta nodes with the level of parallelism given as input.
1403 map<string, int>::iterator msii;
1404 for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1405 string hfta_name = (*msii).first;
1406 int par = (*msii).second;
1407 if(hfta_name_map.count(hfta_name) > 0){
1408 hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
1410 fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
1414 // Propagate levels of parallelism: children should have a level of parallelism
1415 // as large as any of its parents. Adjust children upwards to compensate.
1416 // Start at parents and adjust children, auto-propagation will occur.
1418 for(i=hfta_sets.size()-1;i>=0;i--){
1419 set<int>::iterator stsi;
1420 for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
1421 if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
1422 hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
1427 // Before all the name mangling, check if therey are any output_spec.cfg
1428 // or hfta_parallelism.cfg entries that do not have a matching query.
1430 string dangling_ospecs = "";
1431 for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
1432 string oq = (*msii).first;
1433 if(hfta_name_map.count(oq) == 0){
1434 dangling_ospecs += " "+(*msii).first;
1437 if(dangling_ospecs!=""){
1438 fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
1442 string dangling_par = "";
1443 for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1444 string oq = (*msii).first;
1445 if(hfta_name_map.count(oq) == 0){
1446 dangling_par += " "+(*msii).first;
1449 if(dangling_par!=""){
1450 fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
1455 // Replicate parallelized hftas. Do __copyX name mangling. Adjust
1456 // FROM clauses: retarget any name which is an internal node, and
1457 // any which is in hfta_sets (and which is parallelized). Add Merge nodes
1458 // when the source hfta has more parallelism than the target node.
1459 // Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
1462 int n_original_hfta_sets = hfta_sets.size();
1463 for(i=0;i<n_original_hfta_sets;++i){
1464 if(hfta_sets[i]->n_parallel > 1){
1465 hfta_sets[i]->do_generation =false; // set the deletion flag for this entry.
1466 set<string> local_nodes; // names of query nodes in the hfta.
1467 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1468 local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
1471 for(p=0;p<hfta_sets[i]->n_parallel;++p){
1472 string mangler = "__copy"+int_to_string(p);
1473 hfta_node *par_hfta = new hfta_node();
1474 par_hfta->name = hfta_sets[i]->name + mangler;
1475 par_hfta->source_name = hfta_sets[i]->name;
1476 par_hfta->is_udop = hfta_sets[i]->is_udop;
1477 par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
1478 par_hfta->n_parallel = hfta_sets[i]->n_parallel;
1479 par_hfta->parallel_idx = p;
1481 map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
1484 if(hfta_sets[i]->is_udop){
1485 int root = hfta_sets[i]->query_node_indices[0];
1487 string unequal_par_sources;
1488 set<int>::iterator rfsii;
1489 for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
1490 if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
1491 unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
1494 if(unequal_par_sources != ""){
1495 fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
1500 vector<string> new_sources;
1501 for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
1502 new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
1505 query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
1506 new_qn->name += mangler;
1507 new_qn->mangler = mangler;
1508 new_qn->refd_tbls = new_sources;
1509 par_hfta->query_node_indices.push_back(qnodes.size());
1510 par_qnode_map[new_qn->name] = qnodes.size();
1511 name_node_map[ new_qn->name ] = qnodes.size();
1512 qnodes.push_back(new_qn);
1514 // regular query node
1515 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1516 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1517 table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
1518 // rehome the from clause on mangled names.
1519 // create merge nodes as needed for external sources.
1520 for(f=0;f<dup_pt->fm->tlist.size();++f){
1521 if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
1522 dup_pt->fm->tlist[f]->schema_name += mangler;
1523 }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
1524 // Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
1525 int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
1526 if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
1527 dup_pt->fm->tlist[f]->schema_name += mangler;
1529 vector<string> src_tbls;
1530 int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
1532 fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
1535 for(s=0;s<stride;++s){
1536 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
1537 src_tbls.push_back(ext_src_name);
1539 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1540 string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
1541 dup_pt->fm->tlist[f]->schema_name = merge_node_name;
1542 // Make a qnode to represent the new merge node
1543 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1544 qn_pt->refd_tbls = src_tbls;
1545 qn_pt->is_udop = false;
1546 qn_pt->is_externally_visible = false;
1547 qn_pt->inferred_visible_node = false;
1548 par_hfta->query_node_indices.push_back(qnodes.size());
1549 par_qnode_map[merge_node_name] = qnodes.size();
1550 name_node_map[ merge_node_name ] = qnodes.size();
1551 qnodes.push_back(qn_pt);
1555 query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
1556 for(f=0;f<dup_pt->fm->tlist.size();++f){
1557 new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
1559 new_qn->params = qnodes[hqn_idx]->params;
1560 new_qn->is_udop = false;
1561 new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
1562 new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
1563 par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
1564 par_qnode_map[new_qn->name] = qnodes.size();
1565 name_node_map[ new_qn->name ] = qnodes.size();
1566 qnodes.push_back(new_qn);
1569 hfta_name_map[par_hfta->name] = hfta_sets.size();
1570 hfta_sets.push_back(par_hfta);
1573 // This hfta isn't being parallelized, but add merge nodes for any parallelized
1575 if(!hfta_sets[i]->is_udop){
1576 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1577 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1578 for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
1579 if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
1580 // Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
1581 int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
1582 if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
1583 vector<string> src_tbls;
1584 for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
1585 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
1586 src_tbls.push_back(ext_src_name);
1588 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1589 string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
1590 qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
1591 // Make a qnode to represent the new merge node
1592 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1593 qn_pt->refd_tbls = src_tbls;
1594 qn_pt->is_udop = false;
1595 qn_pt->is_externally_visible = false;
1596 qn_pt->inferred_visible_node = false;
1597 hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
1598 name_node_map[ merge_node_name ] = qnodes.size();
1599 qnodes.push_back(qn_pt);
1608 // Rebuild the reads_from / sources_to lists in the qnodes
1609 for(q=0;q<qnodes.size();++q){
1610 qnodes[q]->reads_from.clear();
1611 qnodes[q]->sources_to.clear();
1613 for(q=0;q<qnodes.size();++q){
1614 for(s=0;s<qnodes[q]->refd_tbls.size();++s){
1615 if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
1616 int rf = name_node_map[qnodes[q]->refd_tbls[s]];
1617 qnodes[q]->reads_from.insert(rf);
1618 qnodes[rf]->sources_to.insert(q);
1623 // Rebuild the reads_from / sources_to lists in hfta_sets
1624 for(q=0;q<hfta_sets.size();++q){
1625 hfta_sets[q]->reads_from.clear();
1626 hfta_sets[q]->sources_to.clear();
1628 for(q=0;q<hfta_sets.size();++q){
1629 for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
1630 int node = hfta_sets[q]->query_node_indices[s];
1631 set<int>::iterator rfsii;
1632 for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
1633 if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
1634 hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
1635 hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
1642 for(q=0;q<qnodes.size();++q){
1643 printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
1644 set<int>::iterator rsii;
1645 for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
1646 printf(" %d",(*rsii));
1647 printf(", and sources-to %d:",qnodes[q]->sources_to.size());
1648 for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
1649 printf(" %d",(*rsii));
1653 for(q=0;q<hfta_sets.size();++q){
1654 if(hfta_sets[q]->do_generation==false)
1656 printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
1657 set<int>::iterator rsii;
1658 for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
1659 printf(" %d",(*rsii));
1660 printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
1661 for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
1662 printf(" %d",(*rsii));
1669 // Re-topo sort the hftas
1670 hfta_topsort.clear();
1672 int hnode_srcs_2[hfta_sets.size()];
1673 for(i=0;i<hfta_sets.size();++i){
1674 hnode_srcs_2[i] = 0;
1675 if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
1680 while(workq.empty() == false){
1681 int node = workq.front();
1683 hfta_topsort.push_back(node);
1684 set<int>::iterator stsii;
1685 for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
1686 int child = (*stsii);
1687 hnode_srcs_2[child]++;
1688 if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
1689 workq.push_back(child);
1694 // Ensure that all of the query_node_indices in hfta_sets are topologically
1695 // sorted, don't rely on assumptions that all transforms maintain some kind of order.
1696 for(i=0;i<hfta_sets.size();++i){
1697 if(hfta_sets[i]->do_generation){
1698 map<int,int> n_accounted;
1699 vector<int> new_order;
1701 vector<int>::iterator vii;
1702 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1703 n_accounted[(*vii)]= 0;
1705 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1706 set<int>::iterator rfsii;
1707 for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
1708 if(n_accounted.count((*rfsii)) == 0){
1709 n_accounted[(*vii)]++;
1712 if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
1713 workq.push_back((*vii));
1717 while(workq.empty() == false){
1718 int node = workq.front();
1720 new_order.push_back(node);
1721 set<int>::iterator stsii;
1722 for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
1723 if(n_accounted.count((*stsii))){
1724 n_accounted[(*stsii)]++;
1725 if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
1726 workq.push_back((*stsii));
1731 hfta_sets[i]->query_node_indices = new_order;
1739 /// Global checkng is done, start the analysis and translation
1740 /// of the query parse tree in the order specified by process_order
1743 // Get a list of the LFTAs for global lfta optimization
1744 // TODO: separate building operators from spliting lftas,
1745 // that will make optimizations such as predicate pushing easier.
1746 vector<stream_query *> lfta_list;
1747 stream_query *rootq;
1750 map<string, map<int, string> > schema_of_schemaid; // ensure that schema IDs are unique to an <interface, schema>
1752 for(qi=hfta_topsort.size()-1;qi>=0;--qi){
1754 int hfta_id = hfta_topsort[qi];
1755 vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
1759 // Two possibilities, either its a UDOP, or its a collection of queries.
1760 // if(qnodes[curr_list.back()]->is_udop)
1761 if(hfta_sets[hfta_id]->is_udop){
1762 int node_id = curr_list.back();
1763 int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
1764 opview_entry *opv = new opview_entry();
1766 // Many of the UDOP properties aren't currently used.
1767 opv->parent_qname = "no_parent";
1768 opv->root_name = qnodes[node_id]->name;
1769 opv->view_name = qnodes[node_id]->file;
1771 sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
1772 opv->udop_alias = tmpstr;
1773 opv->mangler = qnodes[node_id]->mangler;
1775 if(opv->mangler != ""){
1776 int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
1777 Schema->mangle_subq_names(new_udop_schref,opv->mangler);
1780 // This piece of code makes each hfta which referes to the same udop
1781 // reference a distinct running udop. Do this at query optimization time?
1782 // fmtbl->set_udop_alias(opv->udop_alias);
1784 opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
1785 opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
1787 vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
1789 for(s=0;s<subq.size();++s){
1790 // Validate that the fields match.
1791 subquery_spec *sqs = subq[s];
1792 string subq_name = sqs->name + opv->mangler;
1793 vector<field_entry *> flds = Schema->get_fields(subq_name);
1794 if(flds.size() == 0){
1795 fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
1798 if(flds.size() < sqs->types.size()){
1799 fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
1802 bool failed = false;
1803 for(f=0;f<sqs->types.size();++f){
1804 data_type dte(sqs->types[f],sqs->modifiers[f]);
1805 data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
1806 if(! dte.subsumes_type(&dtf) ){
1807 fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
1811 if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
1812 string pstr = dte.get_temporal_string();
1813 fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
1820 /// Validation done, find the subquery, make a copy of the
1821 /// parse tree, and add it to the return list.
1822 for(q=0;q<qnodes.size();++q)
1823 if(qnodes[q]->name == subq_name)
1825 if(q==qnodes.size()){
1826 fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
1832 // Cross-link to from entry(s) in all sourced-to tables.
1833 set<int>::iterator sii;
1834 for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
1835 //printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
1836 vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
1838 for(ii=0;ii<tblvars.size();++ii){
1839 if(tblvars[ii]->schema_name == opv->root_name){
1840 tblvars[ii]->set_opview_idx(opviews.size());
1846 opviews.append(opv);
1849 // Analyze the parse trees in this query,
1850 // put them in rootq
1851 // vector<int> curr_list = process_sets[qi];
1854 ////////////////////////////////////////
1857 //printf("Process set %d, has %d queries\n",qi,curr_list.size());
1858 for(qj=0;qj<curr_list.size();++qj){
1860 fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
1862 // Select the current query parse tree
1863 table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
1865 // if hfta only, try to fetch any missing schemas
1866 // from the registry (using the print_schema program).
1867 // Here I use a hack to avoid analyzing the query -- all referenced
1868 // tables must be in the from clause
1869 // If there is a problem loading any table, just issue a warning,
1871 tablevar_list_t *fm = fta_parse_tree->get_from();
1872 vector<string> refd_tbls = fm->get_src_tbls(Schema);
1873 // iterate over all referenced tables
1875 for(t=0;t<refd_tbls.size();++t){
1876 int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
1878 if(tbl_ref < 0){ // if this table is not in the Schema
1881 string cmd="print_schema "+refd_tbls[t];
1882 FILE *schema_in = popen(cmd.c_str(), "r");
1883 if(schema_in == NULL){
1884 fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
1886 string schema_instr;
1887 while(fgets(tmpstr,TMPSTRLEN,schema_in)){
1888 schema_instr += tmpstr;
1890 fta_parse_result = new fta_parse_t();
1891 strcpy(tmp_schema_str,schema_instr.c_str());
1892 FtaParser_setstringinput(tmp_schema_str);
1893 if(FtaParserparse()){
1894 fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
1896 if( fta_parse_result->tables != NULL){
1898 for(tl=0;tl<fta_parse_result->tables->size();++tl){
1899 Schema->add_table(fta_parse_result->tables->get_table(tl));
1902 fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
1907 fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
1915 // Analyze the query.
1916 query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
1918 fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
1922 stream_query new_sq(qs, Schema);
1923 if(new_sq.error_code){
1924 fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
1928 // Add it to the Schema
1929 table_def *output_td = new_sq.get_output_tabledef();
1930 Schema->add_table(output_td);
1932 // Create a query plan from the analyzed parse tree.
1933 // If its a query referneced via FROM, add it to the stream query.
1935 rootq->add_query(new_sq);
1937 rootq = new stream_query(new_sq);
1938 // have the stream query object inherit properties form the analyzed
1939 // hfta_node object.
1940 rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
1941 rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
1947 // This stream query has all its parts
1948 // Build and optimize it.
1949 //printf("translate_fta: generating plan.\n");
1950 if(rootq->generate_plan(Schema)){
1951 fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
1955 // If we've found the query plan head, so now add the output operators
1956 if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
1957 pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
1958 multimap<string, int>::iterator mmsi;
1959 oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
1960 for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
1961 rootq->add_output_operator(output_specs[(*mmsi).second]);
1967 // Perform query splitting if necessary.
1969 vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
1972 //for(l=0;l<split_queries.size();++l){
1973 //printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
1979 if(split_queries.size() > 0){ // should be at least one component.
1981 // Compute the number of LFTAs.
1982 int n_lfta = split_queries.size();
1983 if(hfta_returned) n_lfta--;
1984 // Check if a schemaId constraint needs to be inserted.
1986 // Process the LFTA components.
1987 for(l=0;l<n_lfta;++l){
1988 if(lfta_names.count(split_queries[l]->query_name) == 0){
1989 // Grab the lfta for global optimization.
1990 vector<tablevar_t *> tvec = split_queries[l]->query_plan[0]->get_input_tbls();
1991 string liface = "_local_";
1992 // string lmach = "";
1993 string lmach = hostname;
1995 liface = tvec[0]->get_interface(); // iface queries have been resolved
1996 if(tvec[0]->get_machine() != ""){
1997 lmach = tvec[0]->get_machine();
1999 fprintf(stderr,"WARNING, lfta %s has empty machine name, using %s\n", split_queries[l]->query_plan[0]->node_name.c_str(), hostname.c_str());
2002 interface_names.push_back(liface);
2003 machine_names.push_back(lmach);
2006 vector<predicate_t *> schemaid_preds;
2007 for(int irv=0;irv<tvec.size();++irv){
2009 string schema_name = tvec[irv]->get_schema_name();
2010 string rvar_name = tvec[irv]->get_var_name();
2011 int schema_ref = tvec[irv]->get_schema_ref();
2014 // interface_names.push_back(liface);
2015 // machine_names.push_back(lmach);
2017 //printf("Machine is %s\n",lmach.c_str());
2019 // Check if a schemaId constraint needs to be inserted.
2020 if(schema_ref<0){ // can result from some kinds of splits
2021 schema_ref = Schema->get_table_ref(schema_name);
2023 int schema_id = Schema->get_schema_id(schema_ref); // id associated with PROTOCOL
2026 iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
2028 fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
2033 if(tvec[irv]->get_interface() != "_local_"){
2034 if(iface->has_multiple_schemas()){
2035 if(schema_id<0){ // invalid schema_id
2036 fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
2039 vector<string> iface_schemas = iface->get_property("Schemas");
2040 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
2041 fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
2044 // Ensure that in liface, schema_id is used for only one schema
2045 if(schema_of_schemaid.count(liface)==0){
2046 map<int, string> empty_map;
2047 schema_of_schemaid[liface] = empty_map;
2049 if(schema_of_schemaid[liface].count(schema_id)==0){
2050 schema_of_schemaid[liface][schema_id] = schema_name;
2052 if(schema_of_schemaid[liface][schema_id] != schema_name){
2053 fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
2057 }else{ // single-schema interface
2058 schema_id = -1; // don't generate schema_id predicate
2059 vector<string> iface_schemas = iface->get_property("Schemas");
2060 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
2061 fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
2064 if(iface_schemas.size()>1){
2065 fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
2073 // If we need to check the schema_id, insert a predicate into the lfta.
2074 // TODO not just schema_id, the full all_schema_ids set.
2076 colref_t *schid_cr = new colref_t("schemaId");
2077 schid_cr->schema_ref = schema_ref;
2078 schid_cr->table_name = rvar_name;
2079 schid_cr->tablevar_ref = 0;
2080 schid_cr->default_table = false;
2081 scalarexp_t *schid_se = new scalarexp_t(schid_cr);
2082 data_type *schid_dt = new data_type("uint");
2083 schid_se->dt = schid_dt;
2085 string schid_str = int_to_string(schema_id);
2086 literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
2087 scalarexp_t *lit_se = new scalarexp_t(schid_lit);
2088 lit_se->dt = schid_dt;
2090 predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
2091 vector<cnf_elem *> clist;
2092 make_cnf_from_pr(schid_pr, clist);
2093 analyze_cnf(clist[0]);
2094 clist[0]->cost = 1; // cheap one comparison
2095 // cnf built, now insert it.
2096 split_queries[l]->query_plan[0]->append_to_where(clist[0]);
2098 // Specialized processing
2099 // filter join, get two schemaid preds
2100 string node_type = split_queries[l]->query_plan[0]->node_type();
2101 if(node_type == "filter_join"){
2102 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2104 fj->pred_t0.push_back(clist[0]);
2106 fj->pred_t1.push_back(clist[0]);
2108 schemaid_preds.push_back(schid_pr);
2110 // watchlist join, get the first schemaid pred
2111 if(node_type == "watch_join"){
2112 watch_join_qpn *fj = (watch_join_qpn *)split_queries[l]->query_plan[0];
2114 fj->pred_t0.push_back(clist[0]);
2115 schemaid_preds.push_back(schid_pr);
2120 // Specialized processing, currently filter join.
2121 if(schemaid_preds.size()>1){
2122 string node_type = split_queries[l]->query_plan[0]->node_type();
2123 if(node_type == "filter_join"){
2124 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2125 predicate_t *filter_pr = new predicate_t("OR", schemaid_preds[0], schemaid_preds[1]);
2126 vector<cnf_elem *> clist;
2127 make_cnf_from_pr(filter_pr, clist);
2128 analyze_cnf(clist[0]);
2129 clist[0]->cost = 1; // cheap one comparison
2130 fj->shared_pred.push_back(clist[0]);
2140 // Set the ht size from the recommendation, if there is one in the rec file
2141 if(lfta_htsize.count(split_queries[l]->query_name)>0){
2142 split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
2146 lfta_names[split_queries[l]->query_name] = lfta_list.size();
2147 split_queries[l]->set_gid(lfta_list.size()); // set lfta global id
2148 lfta_list.push_back(split_queries[l]);
2149 lfta_mach_lists[lmach].push_back(split_queries[l]);
2151 // THe following is a hack,
2152 // as I should be generating LFTA code through
2153 // the stream_query object.
2155 split_queries[l]->query_plan[0]->bind_to_schema(Schema);
2157 // split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
2160 // Create query description to embed in lfta.c
2161 string lfta_schema_str = split_queries[l]->make_schema();
2162 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2164 // get NIC capabilities.
2166 nic_property *nicprop = NULL;
2167 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2168 if(iface_codegen_type.size()){
2169 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2171 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2176 lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
2179 snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "snap"));
2180 snap_position.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "index"));
2182 // STOPPED HERE need to figure out how to generate the code that Vlad needs
2183 // from snap_postion
2185 // TODO NOTE : I'd like it to be the case that registration_query_names
2186 // are the queries to be registered for subsciption.
2187 // but there is some complex bookkeeping here.
2188 registration_query_names.push_back(split_queries[l]->query_name);
2189 mach_query_names[lmach].push_back(registration_query_names.size()-1);
2190 // NOTE: I will assume a 1-1 correspondance between
2191 // mach_query_names[lmach] and lfta_mach_lists[lmach]
2192 // where mach_query_names[lmach][i] contains the index into
2193 // query_names, which names the lfta, and
2194 // mach_query_names[lmach][i] is the stream_query * of the
2195 // corresponding lfta.
2196 // Later, lfta_iface_qnames are the query names matching lfta_iface_lists
2200 // check if lfta is reusable
2201 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
2203 bool lfta_reusable = false;
2204 if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
2205 split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
2206 lfta_reusable = true;
2208 lfta_reuse_options.push_back(lfta_reusable);
2210 // LFTA will inherit the liveness timeout specification from the containing query
2211 // it is too conservative as lfta are expected to spend less time per tuple
2214 // extract liveness timeout from query definition
2215 int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
2216 if (!liveness_timeout) {
2217 // fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
2218 // split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2219 liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2221 lfta_liveness_timeouts.push_back(liveness_timeout);
2223 // Add it to the schema
2224 table_def *td = split_queries[l]->get_output_tabledef();
2225 Schema->append_table(td);
2226 //printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
2231 // If the output is lfta-only, dump out the query name.
2232 if(split_queries.size() == 1 && !hfta_returned){
2233 if(output_query_names ){
2234 fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
2238 fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
2243 // output schema summary
2244 if(output_schema_summary){
2245 for(int o=0;o<split_queries.size(); ++o){
2246 dump_summary(split_queries[o]);
2251 if(hfta_returned){ // query also has an HFTA component
2252 int hfta_nbr = split_queries.size()-1;
2254 hfta_list.push_back(split_queries[hfta_nbr]);
2256 // report on generated query names
2257 if(output_query_names){
2258 string hfta_name =split_queries[hfta_nbr]->query_name;
2259 fprintf(query_name_output,"%s H\n",hfta_name.c_str());
2260 for(l=0;l<hfta_nbr;++l){
2261 string lfta_name =split_queries[l]->query_name;
2262 fprintf(query_name_output,"%s L\n",lfta_name.c_str());
2266 // fprintf(stderr,"query names are ");
2267 // for(l=0;l<hfta_nbr;++l){
2268 // if(l>0) fprintf(stderr,",");
2269 // string fta_name =split_queries[l]->query_name;
2270 // fprintf(stderr," %s",fta_name.c_str());
2272 // fprintf(stderr,"\n");
2277 fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
2278 fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
2285 //-----------------------------------------------------------------
2286 // Compute and propagate the SE in PROTOCOL fields compute a field.
2287 //-----------------------------------------------------------------
2289 for(i=0;i<lfta_list.size();i++){
2290 lfta_list[i]->generate_protocol_se(sq_map, Schema);
2291 sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
2293 for(i=0;i<hfta_list.size();i++){
2294 hfta_list[i]->generate_protocol_se(sq_map, Schema);
2295 sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
2300 //------------------------------------------------------------------------
2301 // Perform individual FTA optimizations
2302 //-----------------------------------------------------------------------
2304 if (partitioned_mode) {
2306 // open partition definition file
2307 string part_fname = config_dir_path + "partition.txt";
2309 FILE* partfd = fopen(part_fname.c_str(), "r");
2311 fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
2314 PartnParser_setfileinput(partfd);
2315 if (PartnParserparse()) {
2316 fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
2323 print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2325 int num_hfta = hfta_list.size();
2326 for(i=0; i < hfta_list.size(); ++i){
2327 hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
2330 // Add all new hftas to schema
2331 for(i=num_hfta; i < hfta_list.size(); ++i){
2332 table_def *td = hfta_list[i]->get_output_tabledef();
2333 Schema->append_table(td);
2336 print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2340 //------------------------------------------------------------------------
2341 // Do global (cross-fta) optimization
2342 //-----------------------------------------------------------------------
2349 set<string> extra_external_libs;
2351 for(i=0;i<hfta_list.size();++i){ // query also has an HFTA component
2354 // build hfta file name, create output
2355 if(numeric_hfta_flname){
2356 sprintf(tmpstr,"hfta_%d",hfta_count);
2357 hfta_names.push_back(tmpstr);
2358 sprintf(tmpstr,"hfta_%d.cc",hfta_count);
2360 sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
2361 hfta_names.push_back(tmpstr);
2362 sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
2364 FILE *hfta_fl = fopen(tmpstr,"w");
2365 if(hfta_fl == NULL){
2366 fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
2369 fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
2371 // If there is a field verifier, warn about
2372 // lack of compatability
2373 // NOTE : this code assumes that visible non-lfta queries
2374 // are those at the root of a stream query.
2375 string hfta_comment;
2377 string hfta_namespace;
2378 if(hfta_list[i]->defines.count("comment")>0)
2379 hfta_comment = hfta_list[i]->defines["comment"];
2380 if(hfta_list[i]->defines.count("Comment")>0)
2381 hfta_comment = hfta_list[i]->defines["Comment"];
2382 if(hfta_list[i]->defines.count("COMMENT")>0)
2383 hfta_comment = hfta_list[i]->defines["COMMENT"];
2384 if(hfta_list[i]->defines.count("title")>0)
2385 hfta_title = hfta_list[i]->defines["title"];
2386 if(hfta_list[i]->defines.count("Title")>0)
2387 hfta_title = hfta_list[i]->defines["Title"];
2388 if(hfta_list[i]->defines.count("TITLE")>0)
2389 hfta_title = hfta_list[i]->defines["TITLE"];
2390 if(hfta_list[i]->defines.count("namespace")>0)
2391 hfta_namespace = hfta_list[i]->defines["namespace"];
2392 if(hfta_list[i]->defines.count("Namespace")>0)
2393 hfta_namespace = hfta_list[i]->defines["Namespace"];
2394 if(hfta_list[i]->defines.count("NAMESPACE")>0)
2395 hfta_namespace = hfta_list[i]->defines["NAMESPACE"];
2397 if(field_verifier != NULL){
2399 if(hfta_comment == "")
2400 warning_str += "\tcomment not found.\n";
2402 // Obsolete stuff that Carsten wanted
2403 // if(hfta_title == "")
2404 // warning_str += "\ttitle not found.\n";
2405 // if(hfta_namespace == "")
2406 // warning_str += "\tnamespace not found.\n";
2408 // There is a get_tbl_keys method implemented for qp_nodes,
2409 // integrate it into steam_query, then call it to find keys,
2410 // and annotate feidls with their key-ness.
2411 // If there is a "keys" proprty in the defines block, override anything returned
2412 // from the automated analysis
2414 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2416 for(fi=0;fi<flds.size();fi++){
2417 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2419 if(warning_str != "")
2420 fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
2421 hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
2424 // Get the fields in this query
2425 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2427 // do key processing
2428 string hfta_keys_s = "";
2429 if(hfta_list[i]->defines.count("keys")>0)
2430 hfta_keys_s = hfta_list[i]->defines["keys"];
2431 if(hfta_list[i]->defines.count("Keys")>0)
2432 hfta_keys_s = hfta_list[i]->defines["Keys"];
2433 if(hfta_list[i]->defines.count("KEYS")>0)
2434 hfta_keys_s = hfta_list[i]->defines["KEYS"];
2435 string xtra_keys_s = "";
2436 if(hfta_list[i]->defines.count("extra_keys")>0)
2437 xtra_keys_s = hfta_list[i]->defines["extra_keys"];
2438 if(hfta_list[i]->defines.count("Extra_Keys")>0)
2439 xtra_keys_s = hfta_list[i]->defines["Extra_Keys"];
2440 if(hfta_list[i]->defines.count("EXTRA_KEYS")>0)
2441 xtra_keys_s = hfta_list[i]->defines["EXTRA_KEYS"];
2443 vector<string> hfta_keys;
2444 vector<string> partial_keys;
2445 vector<string> xtra_keys;
2446 if(hfta_keys_s==""){
2447 hfta_keys = hfta_list[i]->get_tbl_keys(partial_keys);
2448 if(xtra_keys_s.size()>0){
2449 xtra_keys = split_string(xtra_keys_s, ',');
2451 for(int xi=0;xi<xtra_keys.size();++xi){
2452 if(find(hfta_keys.begin(), hfta_keys.end(), xtra_keys[xi]) == hfta_keys.end()){
2453 hfta_keys.push_back(xtra_keys[xi]);
2457 hfta_keys = split_string(hfta_keys_s, ',');
2459 // validate that all of the keys exist in the output.
2460 // (exit on error, as its a bad specificiation)
2461 vector<string> missing_keys;
2462 for(int ki=0;ki<hfta_keys.size(); ++ki){
2464 for(fi=0;fi<flds.size();++fi){
2465 if(hfta_keys[ki] == flds[fi]->get_name())
2469 missing_keys.push_back(hfta_keys[ki]);
2471 if(missing_keys.size()>0){
2472 fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the output:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2473 for(int hi=0; hi<missing_keys.size(); ++hi){
2474 fprintf(stderr," %s", missing_keys[hi].c_str());
2476 fprintf(stderr,"\n");
2480 fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2481 if(hfta_comment != "")
2482 fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
2483 if(hfta_title != "")
2484 fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
2485 if(hfta_namespace != "")
2486 fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
2487 fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
2488 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2490 // write info about fields to qtree.xml
2492 for(fi=0;fi<flds.size();fi++){
2493 fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2494 if(flds[fi]->get_modifier_list()->size()){
2495 fprintf(qtree_output,"mods='%s'", flds[fi]->get_modifier_list()->to_string().c_str());
2497 fprintf(qtree_output," />\n");
2500 for(int hi=0;hi<hfta_keys.size();++hi){
2501 fprintf(qtree_output, "\t\t<Key value='%s'/>\n",hfta_keys[hi].c_str());
2503 for(int hi=0;hi<partial_keys.size();++hi){
2504 fprintf(qtree_output, "\t\t<PartialKey value='%s'/>\n",partial_keys[hi].c_str());
2506 for(int hi=0;hi<xtra_keys.size();++hi){
2507 fprintf(qtree_output, "\t\t<ExtraKey value='%s'/>\n",xtra_keys[hi].c_str());
2511 // extract liveness timeout from query definition
2512 int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
2513 if (!liveness_timeout) {
2514 // fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
2515 // hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2516 liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2518 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
2520 vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
2522 for(itv=0;itv<tmp_tv.size();++itv){
2523 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
2525 string ifrs = hfta_list[i]->collect_refd_ifaces();
2527 fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
2529 fprintf(qtree_output,"\t</HFTA>\n");
2533 // debug only -- do code generation to catch generation-time errors.
2534 hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
2537 hfta_count++; // for hfta file names with numeric suffixes
2539 hfta_list[i]->get_external_libs(extra_external_libs);
2543 string ext_lib_string;
2544 set<string>::iterator ssi_el;
2545 for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
2546 ext_lib_string += (*ssi_el)+" ";
2550 // Report on the set of operator views
2551 for(i=0;i<opviews.size();++i){
2552 opview_entry *opve = opviews.get_entry(i);
2553 fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
2554 fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
2555 fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
2556 fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
2557 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2559 if (!opve->liveness_timeout) {
2560 // fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
2561 // opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2562 opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2564 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
2566 for(j=0;j<opve->subq_names.size();j++)
2567 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
2568 fprintf(qtree_output,"\t</UDOP>\n");
2572 //-----------------------------------------------------------------
2574 // Create interface-specific meta code files.
2575 // first, open and parse the interface resources file.
2576 ifaces_db = new ifq_t();
2578 if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
2579 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
2580 ifx_fname.c_str(), ierr.c_str());
2584 map<string, vector<stream_query *> >::iterator svsi;
2585 for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
2586 string lmach = (*svsi).first;
2588 // For this machine, create a set of lftas per interface.
2589 vector<stream_query *> mach_lftas = (*svsi).second;
2590 map<string, vector<stream_query *> > lfta_iface_lists;
2592 for(li=0;li<mach_lftas.size();++li){
2593 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2594 string lfta_iface = "_local_";
2596 string lfta_iface = tvec[0]->get_interface();
2598 lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2601 map<string, vector<stream_query *> >::iterator lsvsi;
2602 for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
2604 string liface = (*lsvsi).first;
2605 vector<stream_query *> iface_lftas = (*lsvsi).second;
2606 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2607 if(iface_codegen_type.size()){
2608 nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2610 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2613 string mcs = generate_nic_code(iface_lftas, nicprop);
2616 mcf_flnm = lmach + "_"+liface+".mcf";
2618 mcf_flnm = hostname + "_"+liface+".mcf";
2620 if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
2621 fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
2624 fprintf(mcf_fl,"%s",mcs.c_str());
2626 //printf("mcs of machine %s, iface %s of type %s is \n%s\n",
2627 //lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
2636 //-----------------------------------------------------------------
2639 // Find common filter predicates in the LFTAs.
2640 // in addition generate structs to store the
2641 // temporal attributes unpacked by prefilter
2642 // compute & provide interface for per-interface
2643 // record extraction properties
2645 map<string, vector<stream_query *> >::iterator ssqi;
2646 for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
2648 string lmach = (*ssqi).first;
2649 bool packed_return = false;
2653 // The LFTAs of this machine.
2654 vector<stream_query *> mach_lftas = (*ssqi).second;
2655 // break up on a per-interface basis.
2656 map<string, vector<stream_query *> > lfta_iface_lists;
2657 map<string, vector<int> > lfta_iface_qname_ix; // need the query name
2659 for(li=0;li<mach_lftas.size();++li){
2660 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2661 string lfta_iface = "_local_";
2663 lfta_iface = tvec[0]->get_interface();
2665 lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2666 lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
2670 // Are the return values "packed"?
2671 // This should be done on a per-interface basis.
2672 // But this is defunct code for gs-lite
2673 for(li=0;li<mach_lftas.size();++li){
2674 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2675 string liface = "_local_";
2677 liface = tvec[0]->get_interface();
2679 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2680 if(iface_codegen_type.size()){
2681 if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
2682 packed_return = true;
2688 // Separate lftas by interface, collect results on a per-interface basis.
2690 vector<cnf_set *> no_preds; // fallback if there is no prefilter
2691 map<string, vector<cnf_set *> > prefilter_preds;
2692 set<unsigned int> pred_ids; // this can be global for all interfaces
2693 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2694 string liface = (*mvsi).first;
2695 vector<cnf_set *> empty_list;
2696 prefilter_preds[liface] = empty_list;
2697 if(! packed_return){
2698 get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
2701 // get NIC capabilities. (Is this needed?)
2702 nic_property *nicprop = NULL;
2703 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2704 if(iface_codegen_type.size()){
2705 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2707 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2714 // Now that we know the prefilter preds, generate the lfta code.
2715 // Do this for all lftas in this machine.
2716 for(li=0;li<mach_lftas.size();++li){
2717 set<unsigned int> subsumed_preds;
2718 set<unsigned int>::iterator sii;
2720 for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
2722 if((pid>>16) == li){
2723 subsumed_preds.insert(pid & 0xffff);
2727 string lfta_schema_str = mach_lftas[li]->make_schema();
2728 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2729 nic_property *nicprop = NULL; // no NIC properties?
2730 lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
2734 // generate structs to store the temporal attributes
2735 // unpacked by prefilter
2736 col_id_set temp_cids;
2737 get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
2738 lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
2740 // Compute the lfta bit signatures and the lfta colrefs
2741 // do this on a per-interface basis
2743 lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2745 map<string, vector<long long int> > lfta_sigs; // used again later
2746 map<string, int> lfta_snap_pos; // optimize csv parsing
2747 // compute now, use in get_iface_properties
2748 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2749 string liface = (*mvsi).first;
2750 vector<long long int> empty_list;
2751 lfta_sigs[liface] = empty_list;
2752 lfta_snap_pos[liface] = -1;
2754 vector<col_id_set> lfta_cols;
2755 vector<int> lfta_snap_length;
2756 for(li=0;li<lfta_iface_lists[liface].size();++li){
2757 unsigned long long int mask=0, bpos=1;
2759 for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
2760 if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
2764 lfta_sigs[liface].push_back(mask);
2765 lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
2766 lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "snap"));
2767 int this_snap_pos = compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "index");
2768 if(this_snap_pos > lfta_snap_pos[liface])
2769 lfta_snap_pos[liface] = this_snap_pos;
2772 //for(li=0;li<mach_lftas.size();++li){
2773 //printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
2774 //col_id_set::iterator tcisi;
2775 //for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
2776 //printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
2781 // generate the prefilter
2782 // Do this on a per-interface basis, except for the #define
2784 // lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2785 lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2787 lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2792 // Generate interface parameter lookup function
2793 lfta_val[lmach] += "// lookup interface properties by name\n";
2794 lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
2795 lfta_val[lmach] += "// returns NULL if given property does not exist\n";
2796 lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
2798 // collect a lit of interface names used by queries running on this host
2799 set<std::string> iface_names;
2800 for(i=0;i<mach_query_names[lmach].size();i++){
2801 int mi = mach_query_names[lmach][i];
2802 stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2804 if(interface_names[mi]=="")
2805 iface_names.insert("DEFAULTDEV");
2807 iface_names.insert(interface_names[mi]);
2810 // generate interface property lookup code for every interface
2811 set<std::string>::iterator sir;
2812 for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
2813 if (sir == iface_names.begin())
2814 lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2816 lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2818 // iterate through interface properties
2819 vector<string> iface_properties;
2820 if(*sir!="_local_"){ // dummy watchlist interface, don't process.
2821 iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
2824 fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
2827 if (iface_properties.empty())
2828 lfta_val[lmach] += "\t\treturn NULL;\n";
2830 for (int i = 0; i < iface_properties.size(); ++i) {
2832 lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2834 lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2836 // combine all values for the interface property using comma separator
2837 vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
2838 lfta_val[lmach] += "\t\t\treturn \"";
2839 for (int j = 0; j < vals.size(); ++j) {
2840 lfta_val[lmach] += vals[j];
2841 if (j != vals.size()-1)
2842 lfta_val[lmach] += ",";
2844 lfta_val[lmach] += "\";\n";
2846 lfta_val[lmach] += "\t\t}else if(!strcmp(property_name, \"_max_csv_pos\")){\n";
2847 lfta_val[lmach] += "\t\t\treturn \""+int_to_string(lfta_snap_pos[(*sir)])+"\";\n";
2848 lfta_val[lmach] += "\t\t} else\n";
2849 lfta_val[lmach] += "\t\t\treturn NULL;\n";
2852 lfta_val[lmach] += "\t} else\n";
2853 lfta_val[lmach] += "\t\treturn NULL;\n";
2854 lfta_val[lmach] += "}\n\n";
2857 // Generate a full list of FTAs for clearinghouse reference
2858 lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
2859 lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
2862 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2863 string liface = (*mvsi).first;
2864 if(liface != "_local_"){ // these don't register themselves
2865 vector<stream_query *> lfta_list = (*mvsi).second;
2866 for(i=0;i<lfta_list.size();i++){
2867 int mi = lfta_iface_qname_ix[liface][i];
2868 if(first) first = false;
2869 else lfta_val[lmach] += ", ";
2870 lfta_val[lmach] += "\"" + registration_query_names[mi] + "\"";
2874 // for (i = 0; i < registration_query_names.size(); ++i) {
2876 // lfta_val[lmach] += ", ";
2877 // lfta_val[lmach] += "\"" + registration_query_names[i] + "\"";
2880 for (i = 0; i < hfta_list.size(); ++i) {
2881 lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
2883 lfta_val[lmach] += ", NULL};\n\n";
2886 // Add the initialization function to lfta.c
2887 // Change to accept the interface name, and
2888 // set the prefilter function accordingly.
2889 // see the example in demo/err2
2890 lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
2891 lfta_val[lmach] += "// note: the last parameter in fta_register is the prefilter signature\n";
2893 // for(i=0;i<mach_query_names[lmach].size();i++)
2894 // int mi = mach_query_names[lmach][i];
2895 // stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2897 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2898 string liface = (*mvsi).first;
2899 vector<stream_query *> lfta_list = (*mvsi).second;
2900 for(i=0;i<lfta_list.size();i++){
2901 stream_query *lfta_sq = lfta_list[i];
2902 int mi = lfta_iface_qname_ix[liface][i];
2904 if(liface == "_local_"){
2905 // Don't register an init function, do the init code inline
2906 lfta_val[lmach] += "// initialization code for "+registration_query_names[mi]+";\n";
2907 lfta_val[lmach] += "\t"+generate_watchlist_name(registration_query_names[mi])+".filename = "+generate_watchlist_name(registration_query_names[mi])+"__fstr;\n";
2911 fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
2913 string this_iface = "DEFAULTDEV";
2914 if(interface_names[mi]!="")
2915 this_iface = '"'+interface_names[mi]+'"';
2916 lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
2917 lfta_val[lmach] += "\t\tfta_register(\""+registration_query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
2918 // if(interface_names[mi]=="")
2919 // lfta_val[lmach]+="DEFAULTDEV";
2921 // lfta_val[lmach]+='"'+interface_names[mi]+'"';
2922 lfta_val[lmach] += this_iface;
2925 lfta_val[lmach] += ", "+generate_alloc_name(registration_query_names[mi])
2926 +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(registration_query_names[mi])
2928 sprintf(tmpstr,",%d",snap_lengths[mi]);
2929 lfta_val[lmach] += tmpstr;
2931 // unsigned long long int mask=0, bpos=1;
2933 // for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
2934 // if(prefilter_preds[f_pos]->lfta_id.count(i))
2936 // bpos = bpos << 1;
2940 // sprintf(tmpstr,",%lluull",mask);
2941 sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
2942 lfta_val[lmach]+=tmpstr;
2944 lfta_val[lmach] += ",0ull";
2947 lfta_val[lmach] += ");\n";
2951 // End of lfta prefilter stuff
2952 // --------------------------------------------------
2954 // If there is a field verifier, warn about
2955 // lack of compatability
2956 string lfta_comment;
2958 string lfta_namespace;
2959 map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
2960 if(ldefs.count("comment")>0)
2961 lfta_comment = lfta_sq->defines["comment"];
2962 if(ldefs.count("Comment")>0)
2963 lfta_comment = lfta_sq->defines["Comment"];
2964 if(ldefs.count("COMMENT")>0)
2965 lfta_comment = lfta_sq->defines["COMMENT"];
2966 if(ldefs.count("title")>0)
2967 lfta_title = lfta_sq->defines["title"];
2968 if(ldefs.count("Title")>0)
2969 lfta_title = lfta_sq->defines["Title"];
2970 if(ldefs.count("TITLE")>0)
2971 lfta_title = lfta_sq->defines["TITLE"];
2972 if(ldefs.count("NAMESPACE")>0)
2973 lfta_namespace = lfta_sq->defines["NAMESPACE"];
2974 if(ldefs.count("Namespace")>0)
2975 lfta_namespace = lfta_sq->defines["Namespace"];
2976 if(ldefs.count("namespace")>0)
2977 lfta_namespace = lfta_sq->defines["namespace"];
2979 string lfta_ht_size;
2980 if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
2981 lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
2982 if(ldefs.count("aggregate_slots")>0){
2983 lfta_ht_size = ldefs["aggregate_slots"];
2986 // NOTE : I'm assuming that visible lftas do not start with _fta.
2987 // -- will fail for non-visible simple selection queries.
2988 if(field_verifier != NULL && registration_query_names[mi].substr(0,1) != "_"){
2990 if(lfta_comment == "")
2991 warning_str += "\tcomment not found.\n";
2992 // Obsolete stuff that carsten wanted
2993 // if(lfta_title == "")
2994 // warning_str += "\ttitle not found.\n";
2995 // if(lfta_namespace == "")
2996 // warning_str += "\tnamespace not found.\n";
2998 vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
3000 for(fi=0;fi<flds.size();fi++){
3001 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
3003 if(warning_str != "")
3004 fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
3005 registration_query_names[mi].c_str(),warning_str.c_str());
3009 // Create qtree output
3010 fprintf(qtree_output,"\t<LFTA name='%s' >\n",registration_query_names[mi].c_str());
3011 if(lfta_comment != "")
3012 fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
3013 if(lfta_title != "")
3014 fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
3015 if(lfta_namespace != "")
3016 fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
3017 if(lfta_ht_size != "")
3018 fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
3020 fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
3022 fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
3023 fprintf(qtree_output,"\t\t<Interface value='%s' />\n",interface_names[mi].c_str());
3024 std::vector<tablevar_t *> itbls = lfta_sq->get_input_tables();
3025 for(int t=0;t<itbls.size();++t){
3026 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",itbls[t]->get_schema_name().c_str());
3028 // fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
3029 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
3030 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
3031 // write info about fields to qtree.xml
3032 vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
3034 for(fi=0;fi<flds.size();fi++){
3035 fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
3036 if(flds[fi]->get_modifier_list()->size()){
3037 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
3039 fprintf(qtree_output," />\n");
3041 fprintf(qtree_output,"\t</LFTA>\n");
3047 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
3048 string liface = (*mvsi).first;
3050 " if (!strcmp(device, \""+liface+"\")) \n"
3051 " lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
3055 " if(lfta_prefilter == NULL){\n"
3056 " fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
3063 lfta_val[lmach] += "}\n\n";
3065 if(!(debug_only || hfta_only) ){
3068 lfta_flnm = lmach + "_lfta.c";
3070 lfta_flnm = hostname + "_lfta.c";
3071 if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
3072 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
3075 fprintf(lfta_out,"%s",lfta_header.c_str());
3076 fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
3077 fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
3082 // Say what are the operators which must execute
3083 if(opviews.size()>0)
3084 fprintf(stderr,"The queries use the following external operators:\n");
3085 for(i=0;i<opviews.size();++i){
3086 opview_entry *opv = opviews.get_entry(i);
3087 fprintf(stderr,"\t%s\n",opv->view_name.c_str());
3091 generate_makefile(input_file_names, nfiles, hfta_names, opviews,
3092 machine_names, schema_file_name,
3094 ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
3097 fprintf(qtree_output,"</QueryNodes>\n");
3102 ////////////////////////////////////////////////////////////
3104 void generate_makefile(vector<string> &input_file_names, int nfiles,
3105 vector<string> &hfta_names, opview_set &opviews,
3106 vector<string> &machine_names,
3107 string schema_file_name,
3108 vector<string> &interface_names,
3109 ifq_t *ifdb, string &config_dir_path,
3112 map<string, vector<int> > &rts_hload
3116 if(config_dir_path != ""){
3117 config_dir_path = "-C "+config_dir_path;
3121 bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
3122 bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
3124 // if(libz_exists && !libast_exists){
3125 // fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
3129 // Get set of operator executable files to run
3131 set<string>::iterator ssi;
3132 for(i=0;i<opviews.size();++i){
3133 opview_entry *opv = opviews.get_entry(i);
3134 if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
3137 FILE *outfl = fopen("Makefile", "w");
3139 fprintf(stderr,"Can't open Makefile for write, exiting.\n");
3144 ("CPP= g++ -O3 -g -I "+root_path+"/include -I "+root_path+"/include/hfta\n"
3145 "CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
3149 fprintf(outfl," -DLFTA_STATS");
3151 // Gather the set of interfaces
3152 // Also, gather "base interface names" for use in computing
3153 // the hash splitting to virtual interfaces.
3154 // TODO : must update to hanndle machines
3156 set<string> base_vifaces; // base interfaces of virtual interfaces
3157 map<string, string> ifmachines;
3158 map<string, string> ifattrs;
3159 for(i=0;i<interface_names.size();++i){
3160 ifaces.insert(interface_names[i]);
3161 ifmachines[interface_names[i]] = machine_names[i];
3163 size_t Xpos = interface_names[i].find_last_of("X");
3164 if(Xpos!=string::npos){
3165 string iface = interface_names[i].substr(0,Xpos);
3166 base_vifaces.insert(iface);
3168 // get interface attributes and add them to the list
3171 // Do we need to include protobuf libraries?
3172 // TODO Move to the interface library: get the libraries to include
3173 // for an interface type
3175 bool use_proto = false;
3176 bool use_bsa = false;
3177 bool use_kafka = false;
3178 bool use_ssl = false;
3181 for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3182 string ifnm = (*ssi);
3183 vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
3184 for(int ift_i=0;ift_i<ift.size();ift_i++){
3185 if(ift[ift_i]=="PROTO"){
3186 #ifdef PROTO_ENABLED
3189 fprintf(stderr,"Runtime libraries built without PROTO support. Rebuild with PROTO_ENABLED defined in gsoptions.h\n");
3194 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "BSA", erri, err_str);
3195 for(int ift_i=0;ift_i<ift.size();ift_i++){
3196 if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
3200 fprintf(stderr,"Runtime libraries built without BSA support. Rebuild with BSA_ENABLED defined in gsoptions.h\n");
3205 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "KAFKA", erri, err_str);
3206 for(int ift_i=0;ift_i<ift.size();ift_i++){
3207 if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
3208 #ifdef KAFKA_ENABLED
3211 fprintf(stderr,"Runtime libraries built without KAFKA support. Rebuild with KAFKA_ENABLED defined in gsoptions.h\n");
3216 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "ENCRYPTED", erri, err_str);
3217 for(int ift_i=0;ift_i<ift.size();ift_i++){
3218 if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
3222 fprintf(stderr,"Runtime libraries built without SSL support. Rebuild with SSL_ENABLED defined in gsoptions.h\n");
3233 for(i=0;i<hfta_names.size();++i)
3234 fprintf(outfl," %s",hfta_names[i].c_str());
3238 "rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a "+root_path+"/lib/libclearinghouse.a\n"
3239 "\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
3241 fprintf(outfl,"-L. ");
3243 ("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
3245 fprintf(outfl,"-lgscppads -lpads ");
3247 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz -lrt");
3249 fprintf(outfl, " -lpz -lz -lbz ");
3250 if(libz_exists && libast_exists)
3251 fprintf(outfl," -last ");
3253 fprintf(outfl, " -ldll -ldl ");
3255 #ifdef PROTO_ENABLED
3256 fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
3259 fprintf(outfl, " -lbsa_stream ");
3261 #ifdef KAFKA_ENABLED
3262 fprintf(outfl, " -lrdkafka ");
3265 fprintf(outfl, " -lssl -lcrypto ");
3267 fprintf(outfl," -lgscpaux");
3269 fprintf(outfl," -fprofile-arcs");
3274 "lfta.o: %s_lfta.c\n"
3275 "\t$(CC) -o lfta.o -c %s_lfta.c\n"
3277 "%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
3278 for(i=0;i<nfiles;++i)
3279 fprintf(outfl," %s",input_file_names[i].c_str());
3281 fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3283 fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3285 for(i=0;i<nfiles;++i)
3286 fprintf(outfl," %s",input_file_names[i].c_str());
3287 fprintf(outfl,"\n");
3289 for(i=0;i<hfta_names.size();++i)
3292 "\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
3295 "\t$(CPP) -o %s.o -c %s.cc\n"
3298 hfta_names[i].c_str(), hfta_names[i].c_str(),
3299 hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
3300 hfta_names[i].c_str(), hfta_names[i].c_str(),
3301 hfta_names[i].c_str(), hfta_names[i].c_str()
3306 "packet_schema.txt:\n"
3307 "\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
3309 "external_fcns.def:\n"
3310 "\tln -s "+root_path+"/cfg/external_fcns.def .\n"
3313 "\trm -rf core rts *.o %s_lfta.c external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
3314 for(i=0;i<hfta_names.size();++i)
3315 fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
3316 fprintf(outfl,"\n");
3322 // Gather the set of interfaces
3323 // TODO : must update to hanndle machines
3324 // TODO : lookup interface attributes and add them as a parameter to rts process
3325 outfl = fopen("runit", "w");
3327 fprintf(stderr,"Can't open runit for write, exiting.\n");
3335 +root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
3337 "if [ ! -f gshub.log ]\n"
3339 "\techo \"Failed to start bin/gshub.py\"\n"
3342 "ADDR=`cat gshub.log`\n"
3343 "ps opgid= $! >> gs.pids\n"
3344 "./rts $ADDR default ").c_str(), outfl);
3347 for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3348 string ifnm = (*ssi);
3349 // suppress internal _local_ interface
3350 if (ifnm == "_local_")
3352 fprintf(outfl, "%s ",ifnm.c_str());
3353 vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
3354 for(j=0;j<ifv.size();++j)
3355 fprintf(outfl, "%s ",ifv[j].c_str());
3357 fprintf(outfl, " &\n");
3358 fprintf(outfl, "echo $! >> gs.pids\n");
3359 for(i=0;i<hfta_names.size();++i)
3360 fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
3362 for(j=0;j<opviews.opview_list.size();++j){
3363 fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
3367 system("chmod +x runit");
3369 outfl = fopen("stopit", "w");
3371 fprintf(stderr,"Can't open stopit for write, exiting.\n");
3375 fprintf(outfl,"#!/bin/sh\n"
3377 "if [ ! -f gs.pids ]\n"
3381 "for pgid in `cat gs.pids`\n"
3383 "kill -TERM -$pgid\n"
3386 "for pgid in `cat gs.pids`\n"
3393 system("chmod +x stopit");
3395 //-----------------------------------------------
3397 /* For now disable support for virtual interfaces
3398 outfl = fopen("set_vinterface_hash.bat", "w");
3400 fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
3404 // The format should be determined by an entry in the ifres.xml file,
3405 // but for now hardcode the only example I have.
3406 for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
3407 if(rts_hload.count((*ssi))){
3408 string iface_name = (*ssi);
3409 string iface_number = "";
3410 for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
3411 if(isdigit(iface_name[j])){
3412 iface_number = iface_name[j];
3413 if(j>0 && isdigit(iface_name[j-1]))
3414 iface_number = iface_name[j-1] + iface_number;
3418 fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
3419 vector<int> halloc = rts_hload[iface_name];
3421 for(j=0;j<halloc.size();++j){
3424 fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
3425 prev_limit = halloc[j];
3427 fprintf(outfl,"\n");
3431 system("chmod +x set_vinterface_hash.bat");
3435 // Code for implementing a local schema
3437 table_list qpSchema;
3439 // Load the schemas of any LFTAs.
3441 for(l=0;l<hfta_nbr;++l){
3442 stream_query *sq0 = split_queries[l];
3443 table_def *td = sq0->get_output_tabledef();
3444 qpSchema.append_table(td);
3446 // load the schemas of any other ref'd tables.
3448 vector<tablevar_t *> input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
3450 for(ti=0;ti<input_tbl_names.size();++ti){
3451 int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
3453 tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
3455 fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
3458 qpSchema.append_table(Schema->get_table(tbl_ref));
3463 // Functions related to parsing.
3466 static int split_string(char *instr,char sep, char **words,int max_words){
3472 words[nwords++] = str;
3473 while( (loc = strchr(str,sep)) != NULL){
3476 if(nwords >= max_words){
3477 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
3478 nwords = max_words-1;
3480 words[nwords++] = str;