1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #include<unistd.h> // for gethostname
19 #include "parse_fta.h"
20 #include "parse_schema.h"
21 #include "parse_ext_fcns.h"
22 #include"analyze_fta.h"
23 #include"query_plan.h"
24 #include"generate_lfta_code.h"
25 #include"stream_query.h"
26 #include"generate_utils.h"
28 #include"generate_nic_code.h"
39 #include <sys/types.h>
45 // to verify that some files exist.
46 #include <sys/types.h>
49 #include "parse_partn.h"
51 #include "print_plan.h"
53 // Interface to the xml parser
56 #include"field_list.h"
58 extern int xmlParserparse(void);
59 extern FILE *xmlParserin;
60 extern int xmlParserdebug;
62 std::vector<std::string> xml_attr_vec;
63 std::vector<std::string> xml_val_vec;
64 std::string xml_a, xml_v;
65 xml_t *xml_leaves = NULL;
67 // Interface to the field list verifier
68 field_list *field_verifier = NULL;
70 #define TMPSTRLEN 1000
73 #define PATH_DELIM '/'
76 char tmp_schema_str[10000];
78 // maximum delay between two hearbeats produced
79 // by UDOP. Used when its not explicity
80 // provided in udop definition
81 #define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
83 // Default lfta hash table size, must be power of 2.
84 int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
86 // Interface to FTA definition lexer and parser ...
88 extern int FtaParserparse(void);
89 extern FILE *FtaParserin;
90 extern int FtaParserdebug;
92 fta_parse_t *fta_parse_result;
93 var_defs_t *fta_parse_defines;
97 // Interface to external function lexer and parser ...
99 extern int Ext_fcnsParserparse(void);
100 extern FILE *Ext_fcnsParserin;
101 extern int Ext_fcnsParserdebug;
103 ext_fcn_list *Ext_fcns;
106 // Interface to partition definition parser
107 extern int PartnParserparse();
108 partn_def_list_t *partn_parse_result = NULL;
116 // forward delcaration of local utility function
117 void generate_makefile(vector<string> &input_file_names, int nfiles,
118 vector<string> &hfta_names, opview_set &opviews,
119 vector<string> &machine_names,
120 string schema_file_name,
121 vector<string> &interface_names,
122 ifq_t *ifdb, string &config_dir_path,
125 map<string, vector<int> > &rts_hload
128 //static int split_string(char *instr,char sep, char **words,int max_words);
131 FILE *schema_summary_output = NULL; // query names
133 // Dump schema summary
134 void dump_summary(stream_query *str){
135 fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
137 table_def *sch = str->get_output_tabledef();
139 vector<field_entry *> flds = sch->get_fields();
141 for(f=0;f<flds.size();++f){
142 if(f>0) fprintf(schema_summary_output,"|");
143 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
145 fprintf(schema_summary_output,"\n");
146 for(f=0;f<flds.size();++f){
147 if(f>0) fprintf(schema_summary_output,"|");
148 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
150 fprintf(schema_summary_output,"\n");
154 string hostname; // name of current host.
156 bool generate_stats = false;
157 string root_path = "../..";
160 int main(int argc, char **argv){
161 char tmpstr[TMPSTRLEN];
165 set<int>::iterator si;
167 vector<string> query_names; // for lfta.c registration
168 map<string, vector<int> > mach_query_names; // list queries of machine
169 vector<int> snap_lengths; // for lfta.c registration
170 vector<string> interface_names; // for lfta.c registration
171 vector<string> machine_names; // machine of interface
172 vector<bool> lfta_reuse_options; // for lfta.c registration
173 vector<int> lfta_liveness_timeouts; // fot qtree.xml generation
174 vector<string> hfta_names; // hfta cource code names, for
175 // creating make file.
176 vector<string> qnames; // ensure unique names
177 map<string, int> lfta_names; // keep track of unique lftas.
180 // set these to 1 to debug the parser
182 Ext_fcnsParserdebug = 0;
184 FILE *lfta_out; // lfta.c output.
185 FILE *fta_in; // input file
186 FILE *table_schemas_in; // source tables definition file
187 FILE *query_name_output; // query names
188 FILE *qtree_output; // interconnections of query nodes
190 // -------------------------------
191 // Handling of Input Arguments
192 // -------------------------------
193 char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
194 const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
195 "\t[-B] : debug only (don't create output files)\n"
196 "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
197 "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
198 "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
199 "\t[-C] : use <config directory> for definition files\n"
200 "\t[-l] : use <library directory> for library queries\n"
201 "\t[-N] : output query names in query_names.txt\n"
202 "\t[-H] : create HFTA only (no schema_file)\n"
203 "\t[-Q] : use query name for hfta suffix\n"
204 "\t[-M] : generate make file and runit, stopit scripts\n"
205 "\t[-S] : enable LFTA statistics (alters Makefile).\n"
206 "\t[-f] : Output schema summary to schema_summary.txt\n"
207 "\t[-P] : link with PADS\n"
208 "\t[-h] : override host name.\n"
209 "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
210 "\t[-R] : path to root of GS-lite\n"
213 // parameters gathered from command line processing
214 string external_fcns_path;
215 // string internal_fcn_path;
216 string config_dir_path;
217 string library_path = "./";
218 vector<string> input_file_names;
219 string schema_file_name;
220 bool debug_only = false;
221 bool hfta_only = false;
222 bool output_query_names = false;
223 bool output_schema_summary=false;
224 bool numeric_hfta_flname = true;
225 bool create_makefile = false;
226 bool distributed_mode = false;
227 bool partitioned_mode = false;
228 bool use_live_hosts_file = false;
229 bool use_pads = false;
230 bool clean_make = false;
231 int n_virtual_interfaces = 1;
234 while((chopt = getopt(argc,argv,optstr)) != -1){
240 distributed_mode = true;
243 partitioned_mode = true;
246 use_live_hosts_file = true;
250 config_dir_path = string(optarg) + string("/");
254 library_path = string(optarg) + string("/");
257 output_query_names = true;
260 numeric_hfta_flname = false;
263 if(schema_file_name == ""){
268 output_schema_summary=true;
271 create_makefile=true;
292 n_virtual_interfaces = atoi(optarg);
293 if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
294 fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
295 n_virtual_interfaces = 1;
300 fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
301 fprintf(stderr,"%s\n", usage_str);
304 fprintf(stderr, "Argument was %c\n", optopt);
305 fprintf(stderr,"Invalid arguments\n");
306 fprintf(stderr,"%s\n", usage_str);
312 for (int i = 0; i < argc; ++i) {
313 if((schema_file_name == "") && !hfta_only){
314 schema_file_name = argv[i];
316 input_file_names.push_back(argv[i]);
320 if(input_file_names.size() == 0){
321 fprintf(stderr,"%s\n", usage_str);
326 string clean_cmd = "rm Makefile hfta_*.cc";
327 int clean_ret = system(clean_cmd.c_str());
329 fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
334 nic_prop_db *npdb = new nic_prop_db(config_dir_path);
336 // Open globally used file names.
338 // prepend config directory to schema file
339 schema_file_name = config_dir_path + schema_file_name;
340 external_fcns_path = config_dir_path + string("external_fcns.def");
341 string ifx_fname = config_dir_path + string("ifres.xml");
343 // Find interface query file(s).
345 gethostname(tmpstr,TMPSTRLEN);
348 hostname_len = strlen(tmpstr);
349 string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
350 vector<string> ifq_fls;
352 ifq_fls.push_back(ifq_fname);
355 // Get the field list, if it exists
356 string flist_fl = config_dir_path + "field_list.xml";
358 if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
359 fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
360 xml_leaves = new xml_t();
361 xmlParser_setfileinput(flf_in);
362 if(xmlParserparse()){
363 fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
365 field_verifier = new field_list(xml_leaves);
370 if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
371 fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
377 if(!(debug_only || hfta_only)){
378 if((lfta_out = fopen("lfta.c","w")) == NULL){
379 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
385 // Get the output specification file.
387 // query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
388 string ospec_fl = "output_spec.cfg";
390 vector<ospec_str *> output_specs;
391 multimap<string, int> qname_to_ospec;
392 if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
395 while(fgets(tmpstr,TMPSTRLEN,osp_in)){
397 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
399 // make operator type lowercase
401 for(tmpc=flds[1];*tmpc!='\0';++tmpc)
402 *tmpc = tolower(*tmpc);
404 ospec_str *tmp_ospec = new ospec_str();
405 tmp_ospec->query = flds[0];
406 tmp_ospec->operator_type = flds[1];
407 tmp_ospec->operator_param = flds[2];
408 tmp_ospec->output_directory = flds[3];
409 tmp_ospec->bucketwidth = atoi(flds[4]);
410 tmp_ospec->partitioning_flds = flds[5];
411 tmp_ospec->n_partitions = atoi(flds[6]);
412 qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
413 output_specs.push_back(tmp_ospec);
415 fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
420 fprintf(stderr,"output_spec.cfg not found. The query set has no output. exiting.\n");
425 string pspec_fl = "hfta_parallelism.cfg";
427 map<string, int> hfta_parallelism;
428 if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
431 while(fgets(tmpstr,TMPSTRLEN,psp_in)){
432 bool good_entry = true;
434 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
436 string hname = flds[0];
437 int par = atoi(flds[1]);
438 if(par <= 0 || par > n_virtual_interfaces){
439 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
442 if(good_entry && n_virtual_interfaces % par != 0){
443 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
447 hfta_parallelism[hname] = par;
451 fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
455 // LFTA hash table sizes
456 string htspec_fl = "lfta_htsize.cfg";
457 FILE *htsp_in = NULL;
458 map<string, int> lfta_htsize;
459 if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
462 while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
463 bool good_entry = true;
465 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
467 string lfta_name = flds[0];
468 int htsz = atoi(flds[1]);
470 lfta_htsize[lfta_name] = htsz;
472 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
477 fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
480 // LFTA vitual interface hash split
481 string rtlspec_fl = "rts_load.cfg";
483 map<string, vector<int> > rts_hload;
484 if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
489 while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
490 bool good_entry = true;
494 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
496 iface_name = flds[0];
499 for(j=1;j<nflds;++j){
500 int h = atoi(flds[j]);
504 hload.push_back(cumm_h);
510 rts_hload[iface_name] = hload;
512 fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
519 if(output_query_names){
520 if((query_name_output = fopen("query_names.txt","w")) == NULL){
521 fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
526 if(output_schema_summary){
527 if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
528 fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
533 if((qtree_output = fopen("qtree.xml","w")) == NULL){
534 fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
537 fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
538 fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
539 fprintf(qtree_output,"<QueryNodes>\n");
542 // Get an initial Schema
545 // Parse the table schema definitions.
546 fta_parse_result = new fta_parse_t();
547 FtaParser_setfileinput(table_schemas_in);
548 if(FtaParserparse()){
549 fprintf(stderr,"Table schema parse failed.\n");
552 if(fta_parse_result->parse_type != TABLE_PARSE){
553 fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
556 Schema = fta_parse_result->tables;
558 // Ensure that all schema_ids, if set, are distinct.
559 // Obsolete? There is code elsewhere to ensure that schema IDs are
560 // distinct on a per-interface basis.
564 for(int t=0;t<Schema->size();++t){
565 int sch_id = Schema->get_table(t)->get_schema_id();
567 if(found_ids.find(sch_id) != found_ids.end()){
568 dup_ids.insert(sch_id);
570 found_ids.insert(sch_id);
573 if(dup_ids.size()>0){
574 fprintf(stderr, "Error, the schema has duplicate schema_ids:");
575 for(auto dit=dup_ids.begin();dit!=dup_ids.end(); ++dit)
576 fprintf(stderr," %d",(*dit));
577 fprintf(stderr,"\n");
584 // Process schema field inheritance
586 retval = Schema->unroll_tables(err_str);
588 fprintf(stderr,"Error processing schema field inheritance:\n %s\n", err_str.c_str() );
592 // hfta only => we will try to fetch schemas from the registry.
593 // therefore, start off with an empty schema.
594 Schema = new table_list();
598 // Open and parse the external functions file.
599 Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
600 if(Ext_fcnsParserin == NULL){
601 fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
602 Ext_fcns = new ext_fcn_list();
604 if(Ext_fcnsParserparse()){
605 fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
606 Ext_fcns = new ext_fcn_list();
609 if(Ext_fcns->validate_fcns(err_str)){
610 fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
614 // Open and parse the interface resources file.
615 // ifq_t *ifaces_db = new ifq_t();
617 // if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
618 // fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
619 // ifx_fname.c_str(), ierr.c_str());
622 // if(ifaces_db->load_ifqs(ifq_fname, ierr)){
623 // fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
624 // ifq_fname.c_str(), ierr.c_str());
629 // The LFTA code string.
630 // Put the standard preamble here.
631 // NOTE: the hash macros, fcns should go into the run time
632 map<string, string> lfta_val;
633 map<string, string> lfta_prefilter_val;
636 "#include <limits.h>\n\n"
637 "#include \"rts.h\"\n"
638 "#include \"fta.h\"\n"
639 "#include \"lapp.h\"\n"
640 "#include \"rts_udaf.h\"\n\n"
642 // Get any locally defined parsing headers
644 memset(&glob_result, 0, sizeof(glob_result));
646 // do the glob operation
647 int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
648 if(return_value == 0){
649 for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
651 int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
652 lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n\n";
655 fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
659 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
660 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
661 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
662 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
667 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
669 "#define SLOT_FILLED 0x04\n"
670 "#define SLOT_GEN_BITS 0x03\n"
671 "#define SLOT_HASH_BITS 0xfffffff8\n"
672 "#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
673 "#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
674 "#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
677 "#define lfta_BOOL_to_hash(x) (x)\n"
678 "#define lfta_USHORT_to_hash(x) (x)\n"
679 "#define lfta_UINT_to_hash(x) (x)\n"
680 "#define lfta_IP_to_hash(x) (x)\n"
681 "#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
682 "#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
683 "#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
684 "#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
685 "#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
686 "static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
687 " gs_uint32_t i,ret=0,tmp_sum = 0;\n"
688 " for(i=0;i<x.length;++i){\n"
689 " tmp_sum |= (x.data[i]) << (8*(i%4));\n"
695 " if((i%4)!=0) ret ^=tmp_sum;\n"
701 //////////////////////////////////////////////////////////////////
702 ///// Get all of the query parse trees
706 int hfta_count = 0; // for numeric suffixes to hfta .cc files
708 //---------------------------
709 // Global info needed for post processing.
711 // Set of operator views ref'd in the query set.
713 // lftas on a per-machine basis.
714 map<string, vector<stream_query *> > lfta_mach_lists;
715 int nfiles = input_file_names.size();
716 vector<stream_query *> hfta_list; // list of hftas.
717 map<string, stream_query *> sq_map; // map from query name to stream query.
720 //////////////////////////////////////////
722 // Open and parse the interface resources file.
723 ifq_t *ifaces_db = new ifq_t();
725 if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
726 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
727 ifx_fname.c_str(), ierr.c_str());
730 if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
731 fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
732 ifq_fls[0].c_str(), ierr.c_str());
736 map<string, string> qname_to_flname; // for detecting duplicate query names
740 // Parse the files to create a vector of parse trees.
741 // Load qnodes with information to perform a topo sort
742 // based on query dependencies.
743 vector<query_node *> qnodes; // for topo sort.
744 map<string,int> name_node_map; // map query name to qnodes entry
745 for(i=0;i<input_file_names.size();i++){
747 if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
748 fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
751 fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
753 // Parse the FTA query
754 fta_parse_result = new fta_parse_t();
755 FtaParser_setfileinput(fta_in);
756 if(FtaParserparse()){
757 fprintf(stderr,"FTA parse failed.\n");
760 if(fta_parse_result->parse_type != QUERY_PARSE){
761 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
765 // returns a list of parse trees
766 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
767 for(p=0;p<qlist.size();++p){
768 table_exp_t *fta_parse_tree = qlist[p];
769 // query_parse_trees.push_back(fta_parse_tree);
771 // compute the default name -- extract from query name
772 strcpy(tmpstr,input_file_names[i].c_str());
773 char *qname = strrchr(tmpstr,PATH_DELIM);
778 char *qname_end = strchr(qname,'.');
779 if(qname_end != NULL) *qname_end = '\0';
780 string qname_str = qname;
781 string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
783 // Deternmine visibility. Should I be attaching all of the output methods?
784 if(qname_to_ospec.count(imputed_qname)>0)
785 fta_parse_tree->set_visible(true);
787 fta_parse_tree->set_visible(false);
790 // Create a manipulable repesentation of the parse tree.
791 // the qnode inherits the visibility assigned to the parse tree.
792 int pos = qnodes.size();
793 qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
794 name_node_map[ qnodes[pos]->name ] = pos;
795 //printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
796 // qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
797 // qfiles.push_back(i);
799 // Check for duplicate query names
800 // NOTE : in hfta-only generation, I should
801 // also check with the names of the registered queries.
802 if(qname_to_flname.count(qnodes[pos]->name) > 0){
803 fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
804 qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
807 if(Schema->find_tbl(qnodes[pos]->name) >= 0){
808 fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
809 qnodes[pos]->name.c_str(), input_file_names[i].c_str());
812 qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
818 // Add the library queries
821 for(pos=0;pos<qnodes.size();++pos){
823 for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
824 string src_tbl = qnodes[pos]->refd_tbls[fi];
825 if(qname_to_flname.count(src_tbl) == 0){
826 int last_sep = src_tbl.find_last_of('/');
827 if(last_sep != string::npos){
828 fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
829 string target_qname = src_tbl.substr(last_sep+1);
830 string qpathname = library_path + src_tbl + ".gsql";
831 if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
832 fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
834 fprintf(stderr,"After exit\n");
836 fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
837 // Parse the FTA query
838 fta_parse_result = new fta_parse_t();
839 FtaParser_setfileinput(fta_in);
840 if(FtaParserparse()){
841 fprintf(stderr,"FTA parse failed.\n");
844 if(fta_parse_result->parse_type != QUERY_PARSE){
845 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
849 map<string, int> local_query_map;
850 vector<string> local_query_names;
851 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
852 for(p=0;p<qlist.size();++p){
853 table_exp_t *fta_parse_tree = qlist[p];
854 fta_parse_tree->set_visible(false); // assumed to not produce output
855 string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
856 if(imputed_qname == target_qname)
857 imputed_qname = src_tbl;
858 if(local_query_map.count(imputed_qname)>0){
859 fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
862 local_query_map[ imputed_qname ] = p;
863 local_query_names.push_back(imputed_qname);
866 if(local_query_map.count(src_tbl)==0){
867 fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
871 vector<int> worklist;
872 set<int> added_queries;
873 vector<query_node *> new_qnodes;
874 worklist.push_back(local_query_map[target_qname]);
875 added_queries.insert(local_query_map[target_qname]);
877 int qpos = qnodes.size();
878 for(qq=0;qq<worklist.size();++qq){
879 int q_id = worklist[qq];
880 query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
881 new_qnodes.push_back( new_qnode);
882 vector<string> refd_tbls = new_qnode->refd_tbls;
884 for(ff = 0;ff<refd_tbls.size();++ff){
885 if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
887 if(name_node_map.count(refd_tbls[ff])>0){
888 fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s, and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
891 worklist.push_back(local_query_map[refd_tbls[ff]]);
897 for(qq=0;qq<new_qnodes.size();++qq){
898 int qpos = qnodes.size();
899 qnodes.push_back(new_qnodes[qq]);
900 name_node_map[qnodes[qpos]->name ] = qpos;
901 qname_to_flname[qnodes[qpos]->name ] = qpathname;
915 //---------------------------------------
920 string udop_missing_sources;
921 for(i=0;i<qnodes.size();++i){
923 for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
924 int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
926 if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
927 if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
928 int pos = qnodes.size();
929 qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
930 name_node_map[ qnodes[pos]->name ] = pos;
931 qnodes[pos]->is_externally_visible = false; // its visible
932 // Need to mark the source queries as visible.
934 string missing_sources = "";
935 for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
936 string src_tbl = qnodes[pos]->refd_tbls[si];
937 if(name_node_map.count(src_tbl)==0){
938 missing_sources += src_tbl + " ";
941 if(missing_sources != ""){
942 udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
949 if(udop_missing_sources != ""){
950 fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
956 ////////////////////////////////////////////////////////////////////
957 /// Check parse trees to verify that some
958 /// global properties are met :
959 /// if q1 reads from q2, then
960 /// q2 is processed before q1
961 /// q1 can supply q2's parameters
962 /// Verify there is no cycle in the reads-from graph.
964 // Compute an order in which to process the
967 // Start by building the reads-from lists.
970 for(i=0;i<qnodes.size();++i){
972 vector<string> refd_tbls = qnodes[i]->refd_tbls;
973 for(fi = 0;fi<refd_tbls.size();++fi){
974 if(name_node_map.count(refd_tbls[fi])>0){
975 //printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
976 (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
982 // If one query reads the result of another,
983 // check for parameter compatibility. Currently it must
984 // be an exact match. I will move to requiring
985 // containment after re-ordering, but will require
986 // some analysis for code generation which is not
988 //printf("There are %d query nodes.\n",qnodes.size());
991 for(i=0;i<qnodes.size();++i){
992 vector<var_pair_t *> target_params = qnodes[i]->params;
993 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
994 vector<var_pair_t *> source_params = qnodes[(*si)]->params;
995 if(target_params.size() != source_params.size()){
996 fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1000 for(p=0;p<target_params.size();++p){
1001 if(! (target_params[p]->name == source_params[p]->name &&
1002 target_params[p]->val == source_params[p]->val ) ){
1003 fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1012 // Start by counting inedges.
1013 for(i=0;i<qnodes.size();++i){
1014 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1015 qnodes[(*si)]->n_consumers++;
1019 // The roots are the nodes with indegree zero.
1021 for(i=0;i<qnodes.size();++i){
1022 if(qnodes[i]->n_consumers == 0){
1023 if(qnodes[i]->is_externally_visible == false){
1024 fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible. Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
1030 // Remove the parts of the subtree that produce no output.
1031 set<int> valid_roots;
1032 set<int> discarded_nodes;
1033 set<int> candidates;
1034 while(roots.size() >0){
1035 for(si=roots.begin();si!=roots.end();++si){
1036 if(qnodes[(*si)]->is_externally_visible){
1037 valid_roots.insert((*si));
1039 discarded_nodes.insert((*si));
1040 set<int>::iterator sir;
1041 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1042 qnodes[(*sir)]->n_consumers--;
1043 if(qnodes[(*sir)]->n_consumers == 0)
1044 candidates.insert( (*sir));
1051 roots = valid_roots;
1052 if(discarded_nodes.size()>0){
1053 fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
1055 for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
1056 if(di>0 && (di%8)==0) fprintf(stderr,"\n");
1058 fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
1060 fprintf(stderr,"\n");
1063 // Compute the sources_to set, ignoring discarded nodes.
1064 for(i=0;i<qnodes.size();++i){
1065 if(discarded_nodes.count(i)==0)
1066 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1067 qnodes[(*si)]->sources_to.insert(i);
1072 // Find the nodes that are shared by multiple visible subtrees.
1073 // THe roots become inferred visible nodes.
1075 // Find the visible nodes.
1076 vector<int> visible_nodes;
1077 for(i=0;i<qnodes.size();i++){
1078 if(qnodes[i]->is_externally_visible){
1079 visible_nodes.push_back(i);
1083 // Find UDOPs referenced by visible nodes.
1085 for(i=0;i<visible_nodes.size();++i){
1086 workq.push_back(visible_nodes[i]);
1088 while(!workq.empty()){
1089 int node = workq.front();
1091 set<int>::iterator children;
1092 if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
1093 qnodes[node]->is_externally_visible = true;
1094 visible_nodes.push_back(node);
1095 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1096 if(qnodes[(*children)]->is_externally_visible == false){
1097 qnodes[(*children)]->is_externally_visible = true;
1098 visible_nodes.push_back((*children));
1102 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1103 workq.push_back((*children));
1110 for(i=0;i<qnodes.size();i++){
1111 qnodes[i]->subtree_roots.clear();
1114 // Walk the tree defined by a visible node, not descending into
1115 // subtrees rooted by a visible node. Mark the node visited with
1116 // the visible node ID.
1117 for(i=0;i<visible_nodes.size();++i){
1119 vroots.insert(visible_nodes[i]);
1120 while(vroots.size()>0){
1121 for(si=vroots.begin();si!=vroots.end();++si){
1122 qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
1124 set<int>::iterator sir;
1125 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1126 if(! qnodes[(*sir)]->is_externally_visible){
1127 candidates.insert( (*sir));
1131 vroots = candidates;
1135 // Find the nodes in multiple visible node subtrees, but with no parent
1136 // that has is in multile visible node subtrees. Mark these as inferred visible nodes.
1137 done = true; // until proven otherwise
1138 for(i=0;i<qnodes.size();i++){
1139 if(qnodes[i]->subtree_roots.size()>1){
1140 bool is_new_root = true;
1141 set<int>::iterator sir;
1142 for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
1143 if(qnodes[(*sir)]->subtree_roots.size()>1)
1144 is_new_root = false;
1147 qnodes[i]->is_externally_visible = true;
1148 qnodes[i]->inferred_visible_node = true;
1149 visible_nodes.push_back(i);
1160 // get visible nodes in topo ordering.
1161 // for(i=0;i<qnodes.size();i++){
1162 // qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
1164 vector<int> process_order;
1165 while(roots.size() >0){
1166 for(si=roots.begin();si!=roots.end();++si){
1167 if(discarded_nodes.count((*si))==0){
1168 process_order.push_back( (*si) );
1170 set<int>::iterator sir;
1171 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1172 qnodes[(*sir)]->n_consumers--;
1173 if(qnodes[(*sir)]->n_consumers == 0)
1174 candidates.insert( (*sir));
1182 //printf("process_order.size() =%d\n",process_order.size());
1184 // Search for cyclic dependencies
1186 for(i=0;i<qnodes.size();++i){
1187 if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
1188 if(found_dep.size() != 0) found_dep += ", ";
1189 found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
1192 if(found_dep.size()>0){
1193 fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
1197 // Get a list of query sets, in the order to be processed.
1198 // Start at visible root and do bfs.
1199 // The query set includes queries referenced indirectly,
1200 // as sources for user-defined operators. These are needed
1201 // to ensure that they are added to the schema, but are not part
1202 // of the query tree.
1204 // stream_node_sets contains queries reachable only through the
1205 // FROM clause, so I can tell which queries to add to the stream
1206 // query. (DISABLED, UDOPS are integrated, does this cause problems?)
1208 // NOTE: this code works because in order for data to be
1209 // read by multiple hftas, the node must be externally visible.
1210 // But visible nodes define roots of process sets.
1211 // internally visible nodes can feed data only
1212 // to other nodes in the same query file.
1213 // Therefore, any access can be restricted to a file,
1214 // hfta output sharing is done only on roots
1215 // never on interior nodes.
1220 // Conpute the base collection of hftas.
1221 vector<hfta_node *> hfta_sets;
1222 map<string, int> hfta_name_map;
1223 // vector< vector<int> > process_sets;
1224 // vector< set<int> > stream_node_sets;
1225 reverse(process_order.begin(), process_order.end()); // get listing in reverse order.
1226 // i.e. process leaves 1st.
1227 for(i=0;i<process_order.size();++i){
1228 if(qnodes[process_order[i]]->is_externally_visible == true){
1229 //printf("Visible.\n");
1230 int root = process_order[i];
1231 hfta_node *hnode = new hfta_node();
1232 hnode->name = qnodes[root]-> name;
1233 hnode->source_name = qnodes[root]-> name;
1234 hnode->is_udop = qnodes[root]->is_udop;
1235 hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
1237 vector<int> proc_list; proc_list.push_back(root);
1238 // Ensure that nodes are added only once.
1239 set<int> proc_set; proc_set.insert(root);
1240 roots.clear(); roots.insert(root);
1242 while(roots.size()>0){
1243 for(si=roots.begin();si!=roots.end();++si){
1244 //printf("Processing root %d\n",(*si));
1245 set<int>::iterator sir;
1246 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1247 //printf("reads fom %d\n",(*sir));
1248 if(qnodes[(*sir)]->is_externally_visible==false){
1249 candidates.insert( (*sir) );
1250 if(proc_set.count( (*sir) )==0){
1251 proc_set.insert( (*sir) );
1252 proc_list.push_back( (*sir) );
1261 reverse(proc_list.begin(), proc_list.end());
1262 hnode->query_node_indices = proc_list;
1263 hfta_name_map[hnode->name] = hfta_sets.size();
1264 hfta_sets.push_back(hnode);
1268 // Compute the reads_from / sources_to graphs for the hftas.
1270 for(i=0;i<hfta_sets.size();++i){
1271 hfta_node *hnode = hfta_sets[i];
1272 for(q=0;q<hnode->query_node_indices.size();q++){
1273 query_node *qnode = qnodes[hnode->query_node_indices[q]];
1274 for(s=0;s<qnode->refd_tbls.size();++s){
1275 if(hfta_name_map.count(qnode->refd_tbls[s])){
1276 int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
1277 hnode->reads_from.insert(other_hfta);
1278 hfta_sets[other_hfta]->sources_to.insert(i);
1284 // Compute a topological sort of the hfta_sets.
1286 vector<int> hfta_topsort;
1288 int hnode_srcs[hfta_sets.size()];
1289 for(i=0;i<hfta_sets.size();++i){
1291 if(hfta_sets[i]->sources_to.size() == 0)
1295 while(! workq.empty()){
1296 int node = workq.front();
1298 hfta_topsort.push_back(node);
1299 set<int>::iterator stsi;
1300 for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
1301 int parent = (*stsi);
1302 hnode_srcs[parent]++;
1303 if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
1304 workq.push_back(parent);
1309 // Decorate hfta nodes with the level of parallelism given as input.
1311 map<string, int>::iterator msii;
1312 for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1313 string hfta_name = (*msii).first;
1314 int par = (*msii).second;
1315 if(hfta_name_map.count(hfta_name) > 0){
1316 hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
1318 fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
1322 // Propagate levels of parallelism: children should have a level of parallelism
1323 // as large as any of its parents. Adjust children upwards to compensate.
1324 // Start at parents and adjust children, auto-propagation will occur.
1326 for(i=hfta_sets.size()-1;i>=0;i--){
1327 set<int>::iterator stsi;
1328 for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
1329 if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
1330 hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
1335 // Before all the name mangling, check if therey are any output_spec.cfg
1336 // or hfta_parallelism.cfg entries that do not have a matching query.
1338 string dangling_ospecs = "";
1339 for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
1340 string oq = (*msii).first;
1341 if(hfta_name_map.count(oq) == 0){
1342 dangling_ospecs += " "+(*msii).first;
1345 if(dangling_ospecs!=""){
1346 fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
1349 string dangling_par = "";
1350 for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1351 string oq = (*msii).first;
1352 if(hfta_name_map.count(oq) == 0){
1353 dangling_par += " "+(*msii).first;
1356 if(dangling_par!=""){
1357 fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
1362 // Replicate parallelized hftas. Do __copyX name mangling. Adjust
1363 // FROM clauses: retarget any name which is an internal node, and
1364 // any which is in hfta_sets (and which is parallelized). Add Merge nodes
1365 // when the source hfta has more parallelism than the target node.
1366 // Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
1369 int n_original_hfta_sets = hfta_sets.size();
1370 for(i=0;i<n_original_hfta_sets;++i){
1371 if(hfta_sets[i]->n_parallel > 1){
1372 hfta_sets[i]->do_generation =false; // set the deletion flag for this entry.
1373 set<string> local_nodes; // names of query nodes in the hfta.
1374 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1375 local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
1378 for(p=0;p<hfta_sets[i]->n_parallel;++p){
1379 string mangler = "__copy"+int_to_string(p);
1380 hfta_node *par_hfta = new hfta_node();
1381 par_hfta->name = hfta_sets[i]->name + mangler;
1382 par_hfta->source_name = hfta_sets[i]->name;
1383 par_hfta->is_udop = hfta_sets[i]->is_udop;
1384 par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
1385 par_hfta->n_parallel = hfta_sets[i]->n_parallel;
1386 par_hfta->parallel_idx = p;
1388 map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
1391 if(hfta_sets[i]->is_udop){
1392 int root = hfta_sets[i]->query_node_indices[0];
1394 string unequal_par_sources;
1395 set<int>::iterator rfsii;
1396 for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
1397 if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
1398 unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
1401 if(unequal_par_sources != ""){
1402 fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
1407 vector<string> new_sources;
1408 for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
1409 new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
1412 query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
1413 new_qn->name += mangler;
1414 new_qn->mangler = mangler;
1415 new_qn->refd_tbls = new_sources;
1416 par_hfta->query_node_indices.push_back(qnodes.size());
1417 par_qnode_map[new_qn->name] = qnodes.size();
1418 name_node_map[ new_qn->name ] = qnodes.size();
1419 qnodes.push_back(new_qn);
1421 // regular query node
1422 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1423 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1424 table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
1425 // rehome the from clause on mangled names.
1426 // create merge nodes as needed for external sources.
1427 for(f=0;f<dup_pt->fm->tlist.size();++f){
1428 if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
1429 dup_pt->fm->tlist[f]->schema_name += mangler;
1430 }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
1431 // Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
1432 int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
1433 if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
1434 dup_pt->fm->tlist[f]->schema_name += mangler;
1436 vector<string> src_tbls;
1437 int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
1439 fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
1442 for(s=0;s<stride;++s){
1443 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
1444 src_tbls.push_back(ext_src_name);
1446 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1447 string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
1448 dup_pt->fm->tlist[f]->schema_name = merge_node_name;
1449 // Make a qnode to represent the new merge node
1450 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1451 qn_pt->refd_tbls = src_tbls;
1452 qn_pt->is_udop = false;
1453 qn_pt->is_externally_visible = false;
1454 qn_pt->inferred_visible_node = false;
1455 par_hfta->query_node_indices.push_back(qnodes.size());
1456 par_qnode_map[merge_node_name] = qnodes.size();
1457 name_node_map[ merge_node_name ] = qnodes.size();
1458 qnodes.push_back(qn_pt);
1462 query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
1463 for(f=0;f<dup_pt->fm->tlist.size();++f){
1464 new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
1466 new_qn->params = qnodes[hqn_idx]->params;
1467 new_qn->is_udop = false;
1468 new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
1469 new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
1470 par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
1471 par_qnode_map[new_qn->name] = qnodes.size();
1472 name_node_map[ new_qn->name ] = qnodes.size();
1473 qnodes.push_back(new_qn);
1476 hfta_name_map[par_hfta->name] = hfta_sets.size();
1477 hfta_sets.push_back(par_hfta);
1480 // This hfta isn't being parallelized, but add merge nodes for any parallelized
1482 if(!hfta_sets[i]->is_udop){
1483 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1484 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1485 for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
1486 if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
1487 // Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
1488 int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
1489 if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
1490 vector<string> src_tbls;
1491 for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
1492 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
1493 src_tbls.push_back(ext_src_name);
1495 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1496 string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
1497 qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
1498 // Make a qnode to represent the new merge node
1499 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1500 qn_pt->refd_tbls = src_tbls;
1501 qn_pt->is_udop = false;
1502 qn_pt->is_externally_visible = false;
1503 qn_pt->inferred_visible_node = false;
1504 hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
1505 name_node_map[ merge_node_name ] = qnodes.size();
1506 qnodes.push_back(qn_pt);
1515 // Rebuild the reads_from / sources_to lists in the qnodes
1516 for(q=0;q<qnodes.size();++q){
1517 qnodes[q]->reads_from.clear();
1518 qnodes[q]->sources_to.clear();
1520 for(q=0;q<qnodes.size();++q){
1521 for(s=0;s<qnodes[q]->refd_tbls.size();++s){
1522 if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
1523 int rf = name_node_map[qnodes[q]->refd_tbls[s]];
1524 qnodes[q]->reads_from.insert(rf);
1525 qnodes[rf]->sources_to.insert(q);
1530 // Rebuild the reads_from / sources_to lists in hfta_sets
1531 for(q=0;q<hfta_sets.size();++q){
1532 hfta_sets[q]->reads_from.clear();
1533 hfta_sets[q]->sources_to.clear();
1535 for(q=0;q<hfta_sets.size();++q){
1536 for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
1537 int node = hfta_sets[q]->query_node_indices[s];
1538 set<int>::iterator rfsii;
1539 for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
1540 if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
1541 hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
1542 hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
1549 for(q=0;q<qnodes.size();++q){
1550 printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
1551 set<int>::iterator rsii;
1552 for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
1553 printf(" %d",(*rsii));
1554 printf(", and sources-to %d:",qnodes[q]->sources_to.size());
1555 for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
1556 printf(" %d",(*rsii));
1560 for(q=0;q<hfta_sets.size();++q){
1561 if(hfta_sets[q]->do_generation==false)
1563 printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
1564 set<int>::iterator rsii;
1565 for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
1566 printf(" %d",(*rsii));
1567 printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
1568 for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
1569 printf(" %d",(*rsii));
1576 // Re-topo sort the hftas
1577 hfta_topsort.clear();
1579 int hnode_srcs_2[hfta_sets.size()];
1580 for(i=0;i<hfta_sets.size();++i){
1581 hnode_srcs_2[i] = 0;
1582 if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
1587 while(workq.empty() == false){
1588 int node = workq.front();
1590 hfta_topsort.push_back(node);
1591 set<int>::iterator stsii;
1592 for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
1593 int child = (*stsii);
1594 hnode_srcs_2[child]++;
1595 if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
1596 workq.push_back(child);
1601 // Ensure that all of the query_node_indices in hfta_sets are topologically
1602 // sorted, don't rely on assumptions that all transforms maintain some kind of order.
1603 for(i=0;i<hfta_sets.size();++i){
1604 if(hfta_sets[i]->do_generation){
1605 map<int,int> n_accounted;
1606 vector<int> new_order;
1608 vector<int>::iterator vii;
1609 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1610 n_accounted[(*vii)]= 0;
1612 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1613 set<int>::iterator rfsii;
1614 for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
1615 if(n_accounted.count((*rfsii)) == 0){
1616 n_accounted[(*vii)]++;
1619 if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
1620 workq.push_back((*vii));
1624 while(workq.empty() == false){
1625 int node = workq.front();
1627 new_order.push_back(node);
1628 set<int>::iterator stsii;
1629 for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
1630 if(n_accounted.count((*stsii))){
1631 n_accounted[(*stsii)]++;
1632 if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
1633 workq.push_back((*stsii));
1638 hfta_sets[i]->query_node_indices = new_order;
1646 /// Global checkng is done, start the analysis and translation
1647 /// of the query parse tree in the order specified by process_order
1650 // Get a list of the LFTAs for global lfta optimization
1651 // TODO: separate building operators from spliting lftas,
1652 // that will make optimizations such as predicate pushing easier.
1653 vector<stream_query *> lfta_list;
1654 stream_query *rootq;
1657 map<string, map<int, string> > schema_of_schemaid; // ensure that schema IDs are unique to an <interface, schema>
1659 for(qi=hfta_topsort.size()-1;qi>=0;--qi){
1661 int hfta_id = hfta_topsort[qi];
1662 vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
1666 // Two possibilities, either its a UDOP, or its a collection of queries.
1667 // if(qnodes[curr_list.back()]->is_udop)
1668 if(hfta_sets[hfta_id]->is_udop){
1669 int node_id = curr_list.back();
1670 int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
1671 opview_entry *opv = new opview_entry();
1673 // Many of the UDOP properties aren't currently used.
1674 opv->parent_qname = "no_parent";
1675 opv->root_name = qnodes[node_id]->name;
1676 opv->view_name = qnodes[node_id]->file;
1678 sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
1679 opv->udop_alias = tmpstr;
1680 opv->mangler = qnodes[node_id]->mangler;
1682 if(opv->mangler != ""){
1683 int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
1684 Schema->mangle_subq_names(new_udop_schref,opv->mangler);
1687 // This piece of code makes each hfta which referes to the same udop
1688 // reference a distinct running udop. Do this at query optimization time?
1689 // fmtbl->set_udop_alias(opv->udop_alias);
1691 opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
1692 opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
1694 vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
1696 for(s=0;s<subq.size();++s){
1697 // Validate that the fields match.
1698 subquery_spec *sqs = subq[s];
1699 string subq_name = sqs->name + opv->mangler;
1700 vector<field_entry *> flds = Schema->get_fields(subq_name);
1701 if(flds.size() == 0){
1702 fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
1705 if(flds.size() < sqs->types.size()){
1706 fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
1709 bool failed = false;
1710 for(f=0;f<sqs->types.size();++f){
1711 data_type dte(sqs->types[f],sqs->modifiers[f]);
1712 data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
1713 if(! dte.subsumes_type(&dtf) ){
1714 fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
1718 if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
1719 string pstr = dte.get_temporal_string();
1720 fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
1727 /// Validation done, find the subquery, make a copy of the
1728 /// parse tree, and add it to the return list.
1729 for(q=0;q<qnodes.size();++q)
1730 if(qnodes[q]->name == subq_name)
1732 if(q==qnodes.size()){
1733 fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
1739 // Cross-link to from entry(s) in all sourced-to tables.
1740 set<int>::iterator sii;
1741 for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
1742 //printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
1743 vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
1745 for(ii=0;ii<tblvars.size();++ii){
1746 if(tblvars[ii]->schema_name == opv->root_name){
1747 tblvars[ii]->set_opview_idx(opviews.size());
1753 opviews.append(opv);
1756 // Analyze the parse trees in this query,
1757 // put them in rootq
1758 // vector<int> curr_list = process_sets[qi];
1761 ////////////////////////////////////////
1764 //printf("Process set %d, has %d queries\n",qi,curr_list.size());
1765 for(qj=0;qj<curr_list.size();++qj){
1767 fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
1769 // Select the current query parse tree
1770 table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
1772 // if hfta only, try to fetch any missing schemas
1773 // from the registry (using the print_schema program).
1774 // Here I use a hack to avoid analyzing the query -- all referenced
1775 // tables must be in the from clause
1776 // If there is a problem loading any table, just issue a warning,
1778 tablevar_list_t *fm = fta_parse_tree->get_from();
1779 vector<string> refd_tbls = fm->get_src_tbls(Schema);
1780 // iterate over all referenced tables
1782 for(t=0;t<refd_tbls.size();++t){
1783 int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
1785 if(tbl_ref < 0){ // if this table is not in the Schema
1788 string cmd="print_schema "+refd_tbls[t];
1789 FILE *schema_in = popen(cmd.c_str(), "r");
1790 if(schema_in == NULL){
1791 fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
1793 string schema_instr;
1794 while(fgets(tmpstr,TMPSTRLEN,schema_in)){
1795 schema_instr += tmpstr;
1797 fta_parse_result = new fta_parse_t();
1798 strcpy(tmp_schema_str,schema_instr.c_str());
1799 FtaParser_setstringinput(tmp_schema_str);
1800 if(FtaParserparse()){
1801 fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
1803 if( fta_parse_result->tables != NULL){
1805 for(tl=0;tl<fta_parse_result->tables->size();++tl){
1806 Schema->add_table(fta_parse_result->tables->get_table(tl));
1809 fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
1814 fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
1822 // Analyze the query.
1823 query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
1825 fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
1829 stream_query new_sq(qs, Schema);
1830 if(new_sq.error_code){
1831 fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
1835 // Add it to the Schema
1836 table_def *output_td = new_sq.get_output_tabledef();
1837 Schema->add_table(output_td);
1839 // Create a query plan from the analyzed parse tree.
1840 // If its a query referneced via FROM, add it to the stream query.
1842 rootq->add_query(new_sq);
1844 rootq = new stream_query(new_sq);
1845 // have the stream query object inherit properties form the analyzed
1846 // hfta_node object.
1847 rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
1848 rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
1854 // This stream query has all its parts
1855 // Build and optimize it.
1856 //printf("translate_fta: generating plan.\n");
1857 if(rootq->generate_plan(Schema)){
1858 fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
1862 // If we've found the query plan head, so now add the output operators
1863 if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
1864 pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
1865 multimap<string, int>::iterator mmsi;
1866 oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
1867 for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
1868 rootq->add_output_operator(output_specs[(*mmsi).second]);
1874 // Perform query splitting if necessary.
1876 vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
1879 //for(l=0;l<split_queries.size();++l){
1880 //printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
1886 if(split_queries.size() > 0){ // should be at least one component.
1888 // Compute the number of LFTAs.
1889 int n_lfta = split_queries.size();
1890 if(hfta_returned) n_lfta--;
1891 // Check if a schemaId constraint needs to be inserted.
1893 // Process the LFTA components.
1894 for(l=0;l<n_lfta;++l){
1895 if(lfta_names.count(split_queries[l]->query_name) == 0){
1896 // Grab the lfta for global optimization.
1897 vector<tablevar_t *> tvec = split_queries[l]->query_plan[0]->get_input_tbls();
1898 string liface = tvec[0]->get_interface(); // iface queries have been resolved
1899 string lmach = tvec[0]->get_machine();
1900 string schema_name = tvec[0]->get_schema_name();
1901 int schema_ref = tvec[0]->get_schema_ref();
1904 interface_names.push_back(liface);
1905 machine_names.push_back(lmach);
1906 //printf("Machine is %s\n",lmach.c_str());
1908 // Check if a schemaId constraint needs to be inserted.
1909 if(schema_ref<0){ // can result from some kinds of splits
1910 schema_ref = Schema->get_table_ref(schema_name);
1912 int schema_id = Schema->get_schema_id(schema_ref); // id associated with PROTOCOL
1915 iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
1917 fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
1920 if(iface->has_multiple_schemas()){
1921 if(schema_id<0){ // invalid schema_id
1922 fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
1925 vector<string> iface_schemas = iface->get_property("Schemas");
1926 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1927 fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1930 // Ensure that in liface, schema_id is used for only one schema
1931 if(schema_of_schemaid.count(liface)==0){
1932 map<int, string> empty_map;
1933 schema_of_schemaid[liface] = empty_map;
1935 if(schema_of_schemaid[liface].count(schema_id)==0){
1936 schema_of_schemaid[liface][schema_id] = schema_name;
1938 if(schema_of_schemaid[liface][schema_id] != schema_name){
1939 fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
1943 }else{ // single-schema interface
1944 schema_id = -1; // don't generate schema_id predicate
1945 vector<string> iface_schemas = iface->get_property("Schemas");
1946 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1947 fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1950 if(iface_schemas.size()>1){
1951 fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
1956 // If we need to check the schema_id, insert a predicate into the lfta.
1957 // TODO not just schema_id, the full all_schema_ids set.
1959 colref_t *schid_cr = new colref_t("schemaId");
1960 schid_cr->schema_ref = schema_ref;
1961 schid_cr->tablevar_ref = 0;
1962 scalarexp_t *schid_se = new scalarexp_t(schid_cr);
1963 data_type *schid_dt = new data_type("uint");
1964 schid_se->dt = schid_dt;
1966 string schid_str = int_to_string(schema_id);
1967 literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
1968 scalarexp_t *lit_se = new scalarexp_t(schid_lit);
1969 lit_se->dt = schid_dt;
1971 predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
1972 vector<cnf_elem *> clist;
1973 make_cnf_from_pr(schid_pr, clist);
1974 analyze_cnf(clist[0]);
1975 clist[0]->cost = 1; // cheap one comparison
1976 // cnf built, now insert it.
1977 split_queries[l]->query_plan[0]->append_to_where(clist[0]);
1986 // Set the ht size from the recommendation, if there is one in the rec file
1987 if(lfta_htsize.count(split_queries[l]->query_name)>0){
1988 split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
1992 lfta_names[split_queries[l]->query_name] = lfta_list.size();
1993 split_queries[l]->set_gid(lfta_list.size()); // set lfta global id
1994 lfta_list.push_back(split_queries[l]);
1995 lfta_mach_lists[lmach].push_back(split_queries[l]);
1997 // THe following is a hack,
1998 // as I should be generating LFTA code through
1999 // the stream_query object.
2000 split_queries[l]->query_plan[0]->bind_to_schema(Schema);
2001 // split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
2004 // Create query description to embed in lfta.c
2005 string lfta_schema_str = split_queries[l]->make_schema();
2006 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2008 // get NIC capabilities.
2010 nic_property *nicprop = NULL;
2011 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2012 if(iface_codegen_type.size()){
2013 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2015 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2020 lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
2023 snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
2024 query_names.push_back(split_queries[l]->query_name);
2025 mach_query_names[lmach].push_back(query_names.size()-1);
2026 // NOTE: I will assume a 1-1 correspondance between
2027 // mach_query_names[lmach] and lfta_mach_lists[lmach]
2028 // where mach_query_names[lmach][i] contains the index into
2029 // query_names, which names the lfta, and
2030 // mach_query_names[lmach][i] is the stream_query * of the
2031 // corresponding lfta.
2032 // Later, lfta_iface_qnames are the query names matching lfta_iface_lists
2036 // check if lfta is reusable
2037 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
2039 bool lfta_reusable = false;
2040 if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
2041 split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
2042 lfta_reusable = true;
2044 lfta_reuse_options.push_back(lfta_reusable);
2046 // LFTA will inherit the liveness timeout specification from the containing query
2047 // it is too conservative as lfta are expected to spend less time per tuple
2050 // extract liveness timeout from query definition
2051 int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
2052 if (!liveness_timeout) {
2053 // fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
2054 // split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2055 liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2057 lfta_liveness_timeouts.push_back(liveness_timeout);
2059 // Add it to the schema
2060 table_def *td = split_queries[l]->get_output_tabledef();
2061 Schema->append_table(td);
2062 //printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
2067 // If the output is lfta-only, dump out the query name.
2068 if(split_queries.size() == 1 && !hfta_returned){
2069 if(output_query_names ){
2070 fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
2074 fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
2079 // output schema summary
2080 if(output_schema_summary){
2081 dump_summary(split_queries[0]);
2087 if(hfta_returned){ // query also has an HFTA component
2088 int hfta_nbr = split_queries.size()-1;
2090 hfta_list.push_back(split_queries[hfta_nbr]);
2092 // report on generated query names
2093 if(output_query_names){
2094 string hfta_name =split_queries[hfta_nbr]->query_name;
2095 fprintf(query_name_output,"%s H\n",hfta_name.c_str());
2096 for(l=0;l<hfta_nbr;++l){
2097 string lfta_name =split_queries[l]->query_name;
2098 fprintf(query_name_output,"%s L\n",lfta_name.c_str());
2102 // fprintf(stderr,"query names are ");
2103 // for(l=0;l<hfta_nbr;++l){
2104 // if(l>0) fprintf(stderr,",");
2105 // string fta_name =split_queries[l]->query_name;
2106 // fprintf(stderr," %s",fta_name.c_str());
2108 // fprintf(stderr,"\n");
2113 fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
2114 fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
2121 //-----------------------------------------------------------------
2122 // Compute and propagate the SE in PROTOCOL fields compute a field.
2123 //-----------------------------------------------------------------
2125 for(i=0;i<lfta_list.size();i++){
2126 lfta_list[i]->generate_protocol_se(sq_map, Schema);
2127 sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
2129 for(i=0;i<hfta_list.size();i++){
2130 hfta_list[i]->generate_protocol_se(sq_map, Schema);
2131 sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
2136 //------------------------------------------------------------------------
2137 // Perform individual FTA optimizations
2138 //-----------------------------------------------------------------------
2140 if (partitioned_mode) {
2142 // open partition definition file
2143 string part_fname = config_dir_path + "partition.txt";
2145 FILE* partfd = fopen(part_fname.c_str(), "r");
2147 fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
2150 PartnParser_setfileinput(partfd);
2151 if (PartnParserparse()) {
2152 fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
2159 print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
2161 int num_hfta = hfta_list.size();
2162 for(i=0; i < hfta_list.size(); ++i){
2163 hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
2166 // Add all new hftas to schema
2167 for(i=num_hfta; i < hfta_list.size(); ++i){
2168 table_def *td = hfta_list[i]->get_output_tabledef();
2169 Schema->append_table(td);
2172 print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
2176 //------------------------------------------------------------------------
2177 // Do global (cross-fta) optimization
2178 //-----------------------------------------------------------------------
2185 set<string> extra_external_libs;
2187 for(i=0;i<hfta_list.size();++i){ // query also has an HFTA component
2190 // build hfta file name, create output
2191 if(numeric_hfta_flname){
2192 sprintf(tmpstr,"hfta_%d",hfta_count);
2193 hfta_names.push_back(tmpstr);
2194 sprintf(tmpstr,"hfta_%d.cc",hfta_count);
2196 sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
2197 hfta_names.push_back(tmpstr);
2198 sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
2200 FILE *hfta_fl = fopen(tmpstr,"w");
2201 if(hfta_fl == NULL){
2202 fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
2205 fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
2207 // If there is a field verifier, warn about
2208 // lack of compatability
2209 // NOTE : this code assumes that visible non-lfta queries
2210 // are those at the root of a stream query.
2211 string hfta_comment;
2213 string hfta_namespace;
2214 if(hfta_list[i]->defines.count("comment")>0)
2215 hfta_comment = hfta_list[i]->defines["comment"];
2216 if(hfta_list[i]->defines.count("Comment")>0)
2217 hfta_comment = hfta_list[i]->defines["Comment"];
2218 if(hfta_list[i]->defines.count("COMMENT")>0)
2219 hfta_comment = hfta_list[i]->defines["COMMENT"];
2220 if(hfta_list[i]->defines.count("title")>0)
2221 hfta_title = hfta_list[i]->defines["title"];
2222 if(hfta_list[i]->defines.count("Title")>0)
2223 hfta_title = hfta_list[i]->defines["Title"];
2224 if(hfta_list[i]->defines.count("TITLE")>0)
2225 hfta_title = hfta_list[i]->defines["TITLE"];
2226 if(hfta_list[i]->defines.count("namespace")>0)
2227 hfta_namespace = hfta_list[i]->defines["namespace"];
2228 if(hfta_list[i]->defines.count("Namespace")>0)
2229 hfta_namespace = hfta_list[i]->defines["Namespace"];
2230 if(hfta_list[i]->defines.count("NAMESPACE")>0)
2231 hfta_namespace = hfta_list[i]->defines["NAMESPACE"];
2233 if(field_verifier != NULL){
2235 if(hfta_comment == "")
2236 warning_str += "\tcomment not found.\n";
2238 // Obsolete stuff that Carsten wanted
2239 // if(hfta_title == "")
2240 // warning_str += "\ttitle not found.\n";
2241 // if(hfta_namespace == "")
2242 // warning_str += "\tnamespace not found.\n";
2245 // There is a get_tbl_keys method implemented for qp_nodes,
2246 // integrate it into steam_query, then call it to find keys,
2247 // and annotate feidls with their key-ness.
2248 // If there is a "keys" proprty in the defines block, override anything returned
2249 // from the automated analysis
2251 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2253 for(fi=0;fi<flds.size();fi++){
2254 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2256 if(warning_str != "")
2257 fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
2258 hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
2261 // Get the fields in this query
2262 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2264 // do key processing
2265 string hfta_keys_s = "";
2266 if(hfta_list[i]->defines.count("keys")>0)
2267 hfta_keys_s = hfta_list[i]->defines["keys"];
2268 if(hfta_list[i]->defines.count("Keys")>0)
2269 hfta_keys_s = hfta_list[i]->defines["Keys"];
2270 if(hfta_list[i]->defines.count("KEYS")>0)
2271 hfta_keys_s = hfta_list[i]->defines["KEYS"];
2272 string xtra_keys_s = "";
2273 if(hfta_list[i]->defines.count("extra_keys")>0)
2274 xtra_keys_s = hfta_list[i]->defines["extra_keys"];
2275 if(hfta_list[i]->defines.count("Extra_Keys")>0)
2276 xtra_keys_s = hfta_list[i]->defines["Extra_Keys"];
2277 if(hfta_list[i]->defines.count("EXTRA_KEYS")>0)
2278 xtra_keys_s = hfta_list[i]->defines["EXTRA_KEYS"];
2280 vector<string> hfta_keys;
2281 vector<string> partial_keys;
2282 vector<string> xtra_keys;
2283 if(hfta_keys_s==""){
2284 hfta_keys = hfta_list[i]->get_tbl_keys(partial_keys);
2285 if(xtra_keys_s.size()>0){
2286 xtra_keys = split_string(xtra_keys_s, ',');
2288 for(int xi=0;xi<xtra_keys.size();++xi){
2289 if(find(hfta_keys.begin(), hfta_keys.end(), xtra_keys[xi]) == hfta_keys.end()){
2290 hfta_keys.push_back(xtra_keys[xi]);
2294 hfta_keys = split_string(hfta_keys_s, ',');
2296 // validate that all of the keys exist in the output.
2297 // (exit on error, as its a bad specificiation)
2298 vector<string> missing_keys;
2299 for(int ki=0;ki<hfta_keys.size(); ++ki){
2301 for(fi=0;fi<flds.size();++fi){
2302 if(hfta_keys[ki] == flds[fi]->get_name())
2306 missing_keys.push_back(hfta_keys[ki]);
2308 if(missing_keys.size()>0){
2309 fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the outputg:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2310 for(int hi=0; hi<missing_keys.size(); ++hi){
2311 fprintf(stderr," %s", missing_keys[hi].c_str());
2313 fprintf(stderr,"\n");
2317 fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2318 if(hfta_comment != "")
2319 fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
2320 if(hfta_title != "")
2321 fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
2322 if(hfta_namespace != "")
2323 fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
2324 fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
2325 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2327 // write info about fields to qtree.xml
2329 for(fi=0;fi<flds.size();fi++){
2330 fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2331 if(flds[fi]->get_modifier_list()->size()){
2332 fprintf(qtree_output,"mods='%s'", flds[fi]->get_modifier_list()->to_string().c_str());
2334 fprintf(qtree_output," />\n");
2337 for(int hi=0;hi<hfta_keys.size();++hi){
2338 fprintf(qtree_output, "\t\t<Key value='%s'/>\n",hfta_keys[hi].c_str());
2340 for(int hi=0;hi<partial_keys.size();++hi){
2341 fprintf(qtree_output, "\t\t<PartialKey value='%s'/>\n",partial_keys[hi].c_str());
2343 for(int hi=0;hi<xtra_keys.size();++hi){
2344 fprintf(qtree_output, "\t\t<ExtraKey value='%s'/>\n",xtra_keys[hi].c_str());
2348 // extract liveness timeout from query definition
2349 int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
2350 if (!liveness_timeout) {
2351 // fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
2352 // hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2353 liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2355 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
2357 vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
2359 for(itv=0;itv<tmp_tv.size();++itv){
2360 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
2362 string ifrs = hfta_list[i]->collect_refd_ifaces();
2364 fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
2366 fprintf(qtree_output,"\t</HFTA>\n");
2370 // debug only -- do code generation to catch generation-time errors.
2371 hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
2374 hfta_count++; // for hfta file names with numeric suffixes
2376 hfta_list[i]->get_external_libs(extra_external_libs);
2380 string ext_lib_string;
2381 set<string>::iterator ssi_el;
2382 for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
2383 ext_lib_string += (*ssi_el)+" ";
2387 // Report on the set of operator views
2388 for(i=0;i<opviews.size();++i){
2389 opview_entry *opve = opviews.get_entry(i);
2390 fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
2391 fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
2392 fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
2393 fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
2394 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2396 if (!opve->liveness_timeout) {
2397 // fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
2398 // opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2399 opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2401 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
2403 for(j=0;j<opve->subq_names.size();j++)
2404 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
2405 fprintf(qtree_output,"\t</UDOP>\n");
2409 //-----------------------------------------------------------------
2411 // Create interface-specific meta code files.
2412 // first, open and parse the interface resources file.
2413 ifaces_db = new ifq_t();
2415 if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
2416 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
2417 ifx_fname.c_str(), ierr.c_str());
2421 map<string, vector<stream_query *> >::iterator svsi;
2422 for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
2423 string lmach = (*svsi).first;
2425 // For this machine, create a set of lftas per interface.
2426 vector<stream_query *> mach_lftas = (*svsi).second;
2427 map<string, vector<stream_query *> > lfta_iface_lists;
2429 for(li=0;li<mach_lftas.size();++li){
2430 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2431 string lfta_iface = tvec[0]->get_interface();
2432 lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2435 map<string, vector<stream_query *> >::iterator lsvsi;
2436 for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
2438 string liface = (*lsvsi).first;
2439 vector<stream_query *> iface_lftas = (*lsvsi).second;
2440 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2441 if(iface_codegen_type.size()){
2442 nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2444 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2447 string mcs = generate_nic_code(iface_lftas, nicprop);
2450 mcf_flnm = lmach + "_"+liface+".mcf";
2452 mcf_flnm = hostname + "_"+liface+".mcf";
2454 if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
2455 fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
2458 fprintf(mcf_fl,"%s",mcs.c_str());
2460 //printf("mcs of machine %s, iface %s of type %s is \n%s\n",
2461 //lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
2470 //-----------------------------------------------------------------
2473 // Find common filter predicates in the LFTAs.
2474 // in addition generate structs to store the temporal attributes unpacked by prefilter
2476 map<string, vector<stream_query *> >::iterator ssqi;
2477 for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
2479 string lmach = (*ssqi).first;
2480 bool packed_return = false;
2484 // The LFTAs of this machine.
2485 vector<stream_query *> mach_lftas = (*ssqi).second;
2486 // break up on a per-interface basis.
2487 map<string, vector<stream_query *> > lfta_iface_lists;
2488 map<string, vector<int> > lfta_iface_qname_ix; // need the query name
2490 for(li=0;li<mach_lftas.size();++li){
2491 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2492 string lfta_iface = tvec[0]->get_interface();
2493 lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2494 lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
2498 // Are the return values "packed"?
2499 // This should be done on a per-interface basis.
2500 // But this is defunct code for gs-lite
2501 for(li=0;li<mach_lftas.size();++li){
2502 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2503 string liface = tvec[0]->get_interface();
2504 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2505 if(iface_codegen_type.size()){
2506 if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
2507 packed_return = true;
2513 // Separate lftas by interface, collect results on a per-interface basis.
2515 vector<cnf_set *> no_preds; // fallback if there is no prefilter
2516 map<string, vector<cnf_set *> > prefilter_preds;
2517 set<unsigned int> pred_ids; // this can be global for all interfaces
2518 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2519 string liface = (*mvsi).first;
2520 vector<cnf_set *> empty_list;
2521 prefilter_preds[liface] = empty_list;
2522 if(! packed_return){
2523 get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
2526 // get NIC capabilities. (Is this needed?)
2527 nic_property *nicprop = NULL;
2528 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2529 if(iface_codegen_type.size()){
2530 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2532 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2539 // Now that we know the prefilter preds, generate the lfta code.
2540 // Do this for all lftas in this machine.
2541 for(li=0;li<mach_lftas.size();++li){
2542 set<unsigned int> subsumed_preds;
2543 set<unsigned int>::iterator sii;
2545 for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
2547 if((pid>>16) == li){
2548 subsumed_preds.insert(pid & 0xffff);
2552 string lfta_schema_str = mach_lftas[li]->make_schema();
2553 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2554 nic_property *nicprop = NULL; // no NIC properties?
2555 lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
2559 // generate structs to store the temporal attributes
2560 // unpacked by prefilter
2561 col_id_set temp_cids;
2562 get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
2563 lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
2565 // Compute the lfta bit signatures and the lfta colrefs
2566 // do this on a per-interface basis
2568 lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2570 map<string, vector<long long int> > lfta_sigs; // used again later
2571 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2572 string liface = (*mvsi).first;
2573 vector<long long int> empty_list;
2574 lfta_sigs[liface] = empty_list;
2576 vector<col_id_set> lfta_cols;
2577 vector<int> lfta_snap_length;
2578 for(li=0;li<lfta_iface_lists[liface].size();++li){
2579 unsigned long long int mask=0, bpos=1;
2581 for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
2582 if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
2586 lfta_sigs[liface].push_back(mask);
2587 lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
2588 lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
2591 //for(li=0;li<mach_lftas.size();++li){
2592 //printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
2593 //col_id_set::iterator tcisi;
2594 //for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
2595 //printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
2600 // generate the prefilter
2601 // Do this on a per-interface basis, except for the #define
2603 // lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2604 lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2606 lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2611 // Generate interface parameter lookup function
2612 lfta_val[lmach] += "// lookup interface properties by name\n";
2613 lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
2614 lfta_val[lmach] += "// returns NULL if given property does not exist\n";
2615 lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
2617 // collect a lit of interface names used by queries running on this host
2618 set<std::string> iface_names;
2619 for(i=0;i<mach_query_names[lmach].size();i++){
2620 int mi = mach_query_names[lmach][i];
2621 stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2623 if(interface_names[mi]=="")
2624 iface_names.insert("DEFAULTDEV");
2626 iface_names.insert(interface_names[mi]);
2629 // generate interface property lookup code for every interface
2630 set<std::string>::iterator sir;
2631 for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
2632 if (sir == iface_names.begin())
2633 lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2635 lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2637 // iterate through interface properties
2638 vector<string> iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
2640 fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
2643 if (iface_properties.empty())
2644 lfta_val[lmach] += "\t\treturn NULL;\n";
2646 for (int i = 0; i < iface_properties.size(); ++i) {
2648 lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2650 lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2652 // combine all values for the interface property using comma separator
2653 vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
2654 lfta_val[lmach] += "\t\t\treturn \"";
2655 for (int j = 0; j < vals.size(); ++j) {
2656 lfta_val[lmach] += vals[j];
2657 if (j != vals.size()-1)
2658 lfta_val[lmach] += ",";
2660 lfta_val[lmach] += "\";\n";
2662 lfta_val[lmach] += "\t\t} else\n";
2663 lfta_val[lmach] += "\t\t\treturn NULL;\n";
2666 lfta_val[lmach] += "\t} else\n";
2667 lfta_val[lmach] += "\t\treturn NULL;\n";
2668 lfta_val[lmach] += "}\n\n";
2671 // Generate a full list of FTAs for clearinghouse reference
2672 lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
2673 lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
2675 for (i = 0; i < query_names.size(); ++i) {
2677 lfta_val[lmach] += ", ";
2678 lfta_val[lmach] += "\"" + query_names[i] + "\"";
2680 for (i = 0; i < hfta_list.size(); ++i) {
2681 lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
2683 lfta_val[lmach] += ", NULL};\n\n";
2686 // Add the initialization function to lfta.c
2687 // Change to accept the interface name, and
2688 // set the prefilter function accordingly.
2689 // see the example in demo/err2
2690 lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
2692 // for(i=0;i<mach_query_names[lmach].size();i++)
2693 // int mi = mach_query_names[lmach][i];
2694 // stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2696 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2697 string liface = (*mvsi).first;
2698 vector<stream_query *> lfta_list = (*mvsi).second;
2699 for(i=0;i<lfta_list.size();i++){
2700 stream_query *lfta_sq = lfta_list[i];
2701 int mi = lfta_iface_qname_ix[liface][i];
2703 fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
2705 string this_iface = "DEFAULTDEV";
2706 if(interface_names[mi]!="")
2707 this_iface = '"'+interface_names[mi]+'"';
2708 lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
2709 lfta_val[lmach] += "\t\tfta_register(\""+query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
2710 // if(interface_names[mi]=="")
2711 // lfta_val[lmach]+="DEFAULTDEV";
2713 // lfta_val[lmach]+='"'+interface_names[mi]+'"';
2714 lfta_val[lmach] += this_iface;
2717 lfta_val[lmach] += ", "+generate_alloc_name(query_names[mi])
2718 +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(query_names[mi])
2720 sprintf(tmpstr,",%d",snap_lengths[mi]);
2721 lfta_val[lmach] += tmpstr;
2723 // unsigned long long int mask=0, bpos=1;
2725 // for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
2726 // if(prefilter_preds[f_pos]->lfta_id.count(i))
2728 // bpos = bpos << 1;
2732 // sprintf(tmpstr,",%lluull",mask);
2733 sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
2734 lfta_val[lmach]+=tmpstr;
2736 lfta_val[lmach] += ",0ull";
2739 lfta_val[lmach] += ");\n";
2743 // End of lfta prefilter stuff
2744 // --------------------------------------------------
2746 // If there is a field verifier, warn about
2747 // lack of compatability
2748 string lfta_comment;
2750 string lfta_namespace;
2751 map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
2752 if(ldefs.count("comment")>0)
2753 lfta_comment = lfta_sq->defines["comment"];
2754 if(ldefs.count("Comment")>0)
2755 lfta_comment = lfta_sq->defines["Comment"];
2756 if(ldefs.count("COMMENT")>0)
2757 lfta_comment = lfta_sq->defines["COMMENT"];
2758 if(ldefs.count("title")>0)
2759 lfta_title = lfta_sq->defines["title"];
2760 if(ldefs.count("Title")>0)
2761 lfta_title = lfta_sq->defines["Title"];
2762 if(ldefs.count("TITLE")>0)
2763 lfta_title = lfta_sq->defines["TITLE"];
2764 if(ldefs.count("NAMESPACE")>0)
2765 lfta_namespace = lfta_sq->defines["NAMESPACE"];
2766 if(ldefs.count("Namespace")>0)
2767 lfta_namespace = lfta_sq->defines["Namespace"];
2768 if(ldefs.count("namespace")>0)
2769 lfta_namespace = lfta_sq->defines["namespace"];
2771 string lfta_ht_size;
2772 if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
2773 lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
2774 if(ldefs.count("aggregate_slots")>0){
2775 lfta_ht_size = ldefs["aggregate_slots"];
2778 // NOTE : I'm assuming that visible lftas do not start with _fta.
2779 // -- will fail for non-visible simple selection queries.
2780 if(field_verifier != NULL && query_names[mi].substr(0,1) != "_"){
2782 if(lfta_comment == "")
2783 warning_str += "\tcomment not found.\n";
2784 // Obsolete stuff that carsten wanted
2785 // if(lfta_title == "")
2786 // warning_str += "\ttitle not found.\n";
2787 // if(lfta_namespace == "")
2788 // warning_str += "\tnamespace not found.\n";
2790 vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2792 for(fi=0;fi<flds.size();fi++){
2793 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2795 if(warning_str != "")
2796 fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
2797 query_names[mi].c_str(),warning_str.c_str());
2801 // Create qtree output
2802 fprintf(qtree_output,"\t<LFTA name='%s' >\n",query_names[mi].c_str());
2803 if(lfta_comment != "")
2804 fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
2805 if(lfta_title != "")
2806 fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
2807 if(lfta_namespace != "")
2808 fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
2809 if(lfta_ht_size != "")
2810 fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
2812 fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
2814 fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
2815 fprintf(qtree_output,"\t\t<Interface value='%s' />\n",interface_names[mi].c_str());
2816 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
2817 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2818 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
2819 // write info about fields to qtree.xml
2820 vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2822 for(fi=0;fi<flds.size();fi++){
2823 fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2824 if(flds[fi]->get_modifier_list()->size()){
2825 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2827 fprintf(qtree_output," />\n");
2829 fprintf(qtree_output,"\t</LFTA>\n");
2835 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2836 string liface = (*mvsi).first;
2838 " if (!strcmp(device, \""+liface+"\")) \n"
2839 " lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
2843 " if(lfta_prefilter == NULL){\n"
2844 " fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
2851 lfta_val[lmach] += "}\n\n";
2853 if(!(debug_only || hfta_only) ){
2856 lfta_flnm = lmach + "_lfta.c";
2858 lfta_flnm = hostname + "_lfta.c";
2859 if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
2860 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
2863 fprintf(lfta_out,"%s",lfta_header.c_str());
2864 fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
2865 fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
2870 // Say what are the operators which must execute
2871 if(opviews.size()>0)
2872 fprintf(stderr,"The queries use the following external operators:\n");
2873 for(i=0;i<opviews.size();++i){
2874 opview_entry *opv = opviews.get_entry(i);
2875 fprintf(stderr,"\t%s\n",opv->view_name.c_str());
2879 generate_makefile(input_file_names, nfiles, hfta_names, opviews,
2880 machine_names, schema_file_name,
2882 ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
2885 fprintf(qtree_output,"</QueryNodes>\n");
2890 ////////////////////////////////////////////////////////////
2892 void generate_makefile(vector<string> &input_file_names, int nfiles,
2893 vector<string> &hfta_names, opview_set &opviews,
2894 vector<string> &machine_names,
2895 string schema_file_name,
2896 vector<string> &interface_names,
2897 ifq_t *ifdb, string &config_dir_path,
2900 map<string, vector<int> > &rts_hload
2904 if(config_dir_path != ""){
2905 config_dir_path = "-C "+config_dir_path;
2909 bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
2910 bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
2912 // if(libz_exists && !libast_exists){
2913 // fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
2917 // Get set of operator executable files to run
2919 set<string>::iterator ssi;
2920 for(i=0;i<opviews.size();++i){
2921 opview_entry *opv = opviews.get_entry(i);
2922 if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
2925 FILE *outfl = fopen("Makefile", "w");
2927 fprintf(stderr,"Can't open Makefile for write, exiting.\n");
2932 ("CPP= g++ -O3 -g -I "+root_path+"/include -I "+root_path+"/include/hfta\n"
2933 "CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
2937 fprintf(outfl," -DLFTA_STATS");
2939 // Gather the set of interfaces
2940 // Also, gather "base interface names" for use in computing
2941 // the hash splitting to virtual interfaces.
2942 // TODO : must update to hanndle machines
2944 set<string> base_vifaces; // base interfaces of virtual interfaces
2945 map<string, string> ifmachines;
2946 map<string, string> ifattrs;
2947 for(i=0;i<interface_names.size();++i){
2948 ifaces.insert(interface_names[i]);
2949 ifmachines[interface_names[i]] = machine_names[i];
2951 size_t Xpos = interface_names[i].find_last_of("X");
2952 if(Xpos!=string::npos){
2953 string iface = interface_names[i].substr(0,Xpos);
2954 base_vifaces.insert(iface);
2956 // get interface attributes and add them to the list
2959 // Do we need to include protobuf libraries?
2960 // TODO Move to the interface library: get the libraries to include
2961 // for an interface type
2963 bool use_proto = false;
2966 for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
2967 string ifnm = (*ssi);
2968 vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
2969 for(int ift_i=0;ift_i<ift.size();ift_i++){
2970 if(ift[ift_i]=="PROTO"){
2980 for(i=0;i<hfta_names.size();++i)
2981 fprintf(outfl," %s",hfta_names[i].c_str());
2985 "rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a "+root_path+"/lib/libclearinghouse.a\n"
2986 "\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
2988 fprintf(outfl,"-L. ");
2990 ("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
2992 fprintf(outfl,"-lgscppads -lpads ");
2994 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
2996 fprintf(outfl, " -lpz -lz -lbz ");
2997 if(libz_exists && libast_exists)
2998 fprintf(outfl," -last ");
3000 fprintf(outfl, " -ldll -ldl ");
3002 fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
3003 fprintf(outfl," -lgscpaux");
3005 fprintf(outfl," -fprofile-arcs");
3010 "lfta.o: %s_lfta.c\n"
3011 "\t$(CC) -o lfta.o -c %s_lfta.c\n"
3013 "%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
3014 for(i=0;i<nfiles;++i)
3015 fprintf(outfl," %s",input_file_names[i].c_str());
3017 fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3019 fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3021 for(i=0;i<nfiles;++i)
3022 fprintf(outfl," %s",input_file_names[i].c_str());
3023 fprintf(outfl,"\n");
3025 for(i=0;i<hfta_names.size();++i)
3028 "\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
3031 "\t$(CPP) -o %s.o -c %s.cc\n"
3034 hfta_names[i].c_str(), hfta_names[i].c_str(),
3035 hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
3036 hfta_names[i].c_str(), hfta_names[i].c_str(),
3037 hfta_names[i].c_str(), hfta_names[i].c_str()
3042 "packet_schema.txt:\n"
3043 "\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
3045 "external_fcns.def:\n"
3046 "\tln -s "+root_path+"/cfg/external_fcns.def .\n"
3049 "\trm -rf core rts *.o %s_lfta.c external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
3050 for(i=0;i<hfta_names.size();++i)
3051 fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
3052 fprintf(outfl,"\n");
3058 // Gather the set of interfaces
3059 // TODO : must update to hanndle machines
3060 // TODO : lookup interface attributes and add them as a parameter to rts process
3061 outfl = fopen("runit", "w");
3063 fprintf(stderr,"Can't open runit for write, exiting.\n");
3071 +root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
3073 "if [ ! -f gshub.log ]\n"
3075 "\techo \"Failed to start bin/gshub.py\"\n"
3078 "ADDR=`cat gshub.log`\n"
3079 "ps opgid= $! >> gs.pids\n"
3080 "./rts $ADDR default ").c_str(), outfl);
3083 for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3084 string ifnm = (*ssi);
3085 fprintf(outfl, "%s ",ifnm.c_str());
3086 vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
3087 for(j=0;j<ifv.size();++j)
3088 fprintf(outfl, "%s ",ifv[j].c_str());
3090 fprintf(outfl, " &\n");
3091 fprintf(outfl, "echo $! >> gs.pids\n");
3092 for(i=0;i<hfta_names.size();++i)
3093 fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
3095 for(j=0;j<opviews.opview_list.size();++j){
3096 fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
3100 system("chmod +x runit");
3102 outfl = fopen("stopit", "w");
3104 fprintf(stderr,"Can't open stopit for write, exiting.\n");
3108 fprintf(outfl,"#!/bin/sh\n"
3110 "if [ ! -f gs.pids ]\n"
3114 "for pgid in `cat gs.pids`\n"
3116 "kill -TERM -$pgid\n"
3119 "for pgid in `cat gs.pids`\n"
3126 system("chmod +x stopit");
3128 //-----------------------------------------------
3130 /* For now disable support for virtual interfaces
3131 outfl = fopen("set_vinterface_hash.bat", "w");
3133 fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
3137 // The format should be determined by an entry in the ifres.xml file,
3138 // but for now hardcode the only example I have.
3139 for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
3140 if(rts_hload.count((*ssi))){
3141 string iface_name = (*ssi);
3142 string iface_number = "";
3143 for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
3144 if(isdigit(iface_name[j])){
3145 iface_number = iface_name[j];
3146 if(j>0 && isdigit(iface_name[j-1]))
3147 iface_number = iface_name[j-1] + iface_number;
3151 fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
3152 vector<int> halloc = rts_hload[iface_name];
3154 for(j=0;j<halloc.size();++j){
3157 fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
3158 prev_limit = halloc[j];
3160 fprintf(outfl,"\n");
3164 system("chmod +x set_vinterface_hash.bat");
3168 // Code for implementing a local schema
3170 table_list qpSchema;
3172 // Load the schemas of any LFTAs.
3174 for(l=0;l<hfta_nbr;++l){
3175 stream_query *sq0 = split_queries[l];
3176 table_def *td = sq0->get_output_tabledef();
3177 qpSchema.append_table(td);
3179 // load the schemas of any other ref'd tables.
3181 vector<tablevar_t *> input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
3183 for(ti=0;ti<input_tbl_names.size();++ti){
3184 int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
3186 tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
3188 fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
3191 qpSchema.append_table(Schema->get_table(tbl_ref));
3196 // Functions related to parsing.
3199 static int split_string(char *instr,char sep, char **words,int max_words){
3205 words[nwords++] = str;
3206 while( (loc = strchr(str,sep)) != NULL){
3209 if(nwords >= max_words){
3210 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
3211 nwords = max_words-1;
3213 words[nwords++] = str;