1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #include<unistd.h> // for gethostname
19 #include "parse_fta.h"
20 #include "parse_schema.h"
21 #include "parse_ext_fcns.h"
22 #include"analyze_fta.h"
23 #include"query_plan.h"
24 #include"generate_lfta_code.h"
25 #include"stream_query.h"
26 #include"generate_utils.h"
28 #include"generate_nic_code.h"
39 #include <sys/types.h>
45 // to verify that some files exist.
46 #include <sys/types.h>
49 #include "parse_partn.h"
51 #include "print_plan.h"
53 // Interface to the xml parser
56 #include"field_list.h"
58 extern int xmlParserparse(void);
59 extern FILE *xmlParserin;
60 extern int xmlParserdebug;
62 std::vector<std::string> xml_attr_vec;
63 std::vector<std::string> xml_val_vec;
64 std::string xml_a, xml_v;
65 xml_t *xml_leaves = NULL;
67 // Interface to the field list verifier
68 field_list *field_verifier = NULL;
70 #define TMPSTRLEN 1000
73 #define PATH_DELIM '/'
76 char tmp_schema_str[10000];
78 // maximum delay between two hearbeats produced
79 // by UDOP. Used when its not explicity
80 // provided in udop definition
81 #define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
83 // Default lfta hash table size, must be power of 2.
84 int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
86 // Interface to FTA definition lexer and parser ...
88 extern int FtaParserparse(void);
89 extern FILE *FtaParserin;
90 extern int FtaParserdebug;
92 fta_parse_t *fta_parse_result;
93 var_defs_t *fta_parse_defines;
97 // Interface to external function lexer and parser ...
99 extern int Ext_fcnsParserparse(void);
100 extern FILE *Ext_fcnsParserin;
101 extern int Ext_fcnsParserdebug;
103 ext_fcn_list *Ext_fcns;
106 // Interface to partition definition parser
107 extern int PartnParserparse();
108 partn_def_list_t *partn_parse_result = NULL;
116 // forward delcaration of local utility function
117 void generate_makefile(vector<string> &input_file_names, int nfiles,
118 vector<string> &hfta_names, opview_set &opviews,
119 vector<string> &machine_names,
120 string schema_file_name,
121 vector<string> &interface_names,
122 ifq_t *ifdb, string &config_dir_path,
125 map<string, vector<int> > &rts_hload
128 //static int split_string(char *instr,char sep, char **words,int max_words);
131 FILE *schema_summary_output = NULL; // query names
133 // Dump schema summary
134 void dump_summary(stream_query *str){
135 fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
137 table_def *sch = str->get_output_tabledef();
139 vector<field_entry *> flds = sch->get_fields();
141 for(f=0;f<flds.size();++f){
142 if(f>0) fprintf(schema_summary_output,"|");
143 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
145 fprintf(schema_summary_output,"\n");
146 for(f=0;f<flds.size();++f){
147 if(f>0) fprintf(schema_summary_output,"|");
148 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
150 fprintf(schema_summary_output,"\n");
154 string hostname; // name of current host.
156 bool generate_stats = false;
157 string root_path = "../..";
160 int main(int argc, char **argv){
161 char tmpstr[TMPSTRLEN];
165 set<int>::iterator si;
167 vector<string> registration_query_names; // for lfta.c registration
168 map<string, vector<int> > mach_query_names; // list queries of machine
169 vector<int> snap_lengths; // for lfta.c registration
170 vector<string> interface_names; // for lfta.c registration
171 vector<string> machine_names; // machine of interface
172 vector<bool> lfta_reuse_options; // for lfta.c registration
173 vector<int> lfta_liveness_timeouts; // fot qtree.xml generation
174 vector<string> hfta_names; // hfta cource code names, for
175 // creating make file.
176 vector<string> qnames; // ensure unique names
177 map<string, int> lfta_names; // keep track of unique lftas.
180 // set these to 1 to debug the parser
182 Ext_fcnsParserdebug = 0;
184 FILE *lfta_out; // lfta.c output.
185 FILE *fta_in; // input file
186 FILE *table_schemas_in; // source tables definition file
187 FILE *query_name_output; // query names
188 FILE *qtree_output; // interconnections of query nodes
190 // -------------------------------
191 // Handling of Input Arguments
192 // -------------------------------
193 char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
194 const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
195 "\t[-B] : debug only (don't create output files)\n"
196 "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
197 "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
198 "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
199 "\t[-C] : use <config directory> for definition files\n"
200 "\t[-l] : use <library directory> for library queries\n"
201 "\t[-N] : output query names in query_names.txt\n"
202 "\t[-H] : create HFTA only (no schema_file)\n"
203 "\t[-Q] : use query name for hfta suffix\n"
204 "\t[-M] : generate make file and runit, stopit scripts\n"
205 "\t[-S] : enable LFTA statistics (alters Makefile).\n"
206 "\t[-f] : Output schema summary to schema_summary.txt\n"
207 "\t[-P] : link with PADS\n"
208 "\t[-h] : override host name.\n"
209 "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
210 "\t[-R] : path to root of GS-lite\n"
213 // parameters gathered from command line processing
214 string external_fcns_path;
215 // string internal_fcn_path;
216 string config_dir_path;
217 string library_path = "./";
218 vector<string> input_file_names;
219 string schema_file_name;
220 bool debug_only = false;
221 bool hfta_only = false;
222 bool output_query_names = false;
223 bool output_schema_summary=false;
224 bool numeric_hfta_flname = true;
225 bool create_makefile = false;
226 bool distributed_mode = false;
227 bool partitioned_mode = false;
228 bool use_live_hosts_file = false;
229 bool use_pads = false;
230 bool clean_make = false;
231 int n_virtual_interfaces = 1;
234 while((chopt = getopt(argc,argv,optstr)) != -1){
240 distributed_mode = true;
243 partitioned_mode = true;
246 use_live_hosts_file = true;
250 config_dir_path = string(optarg) + string("/");
254 library_path = string(optarg) + string("/");
257 output_query_names = true;
260 numeric_hfta_flname = false;
263 if(schema_file_name == ""){
268 output_schema_summary=true;
271 create_makefile=true;
292 n_virtual_interfaces = atoi(optarg);
293 if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
294 fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
295 n_virtual_interfaces = 1;
300 fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
301 fprintf(stderr,"%s\n", usage_str);
304 fprintf(stderr, "Argument was %c\n", optopt);
305 fprintf(stderr,"Invalid arguments\n");
306 fprintf(stderr,"%s\n", usage_str);
312 for (int i = 0; i < argc; ++i) {
313 if((schema_file_name == "") && !hfta_only){
314 schema_file_name = argv[i];
316 input_file_names.push_back(argv[i]);
320 if(input_file_names.size() == 0){
321 fprintf(stderr,"%s\n", usage_str);
326 string clean_cmd = "rm Makefile hfta_*.cc";
327 int clean_ret = system(clean_cmd.c_str());
329 fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
334 nic_prop_db *npdb = new nic_prop_db(config_dir_path);
336 // Open globally used file names.
338 // prepend config directory to schema file
339 schema_file_name = config_dir_path + schema_file_name;
340 external_fcns_path = config_dir_path + string("external_fcns.def");
341 string ifx_fname = config_dir_path + string("ifres.xml");
343 // Find interface query file(s).
345 gethostname(tmpstr,TMPSTRLEN);
348 hostname_len = strlen(tmpstr);
349 string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
350 vector<string> ifq_fls;
352 ifq_fls.push_back(ifq_fname);
355 // Get the field list, if it exists
356 string flist_fl = config_dir_path + "field_list.xml";
358 if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
359 fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
360 xml_leaves = new xml_t();
361 xmlParser_setfileinput(flf_in);
362 if(xmlParserparse()){
363 fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
365 field_verifier = new field_list(xml_leaves);
370 if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
371 fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
377 if(!(debug_only || hfta_only)){
378 if((lfta_out = fopen("lfta.c","w")) == NULL){
379 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
385 // Get the output specification file.
387 // query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
388 string ospec_fl = "output_spec.cfg";
390 vector<ospec_str *> output_specs;
391 multimap<string, int> qname_to_ospec;
392 if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
395 while(fgets(tmpstr,TMPSTRLEN,osp_in)){
397 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
399 // make operator type lowercase
401 for(tmpc=flds[1];*tmpc!='\0';++tmpc)
402 *tmpc = tolower(*tmpc);
404 ospec_str *tmp_ospec = new ospec_str();
405 tmp_ospec->query = flds[0];
406 tmp_ospec->operator_type = flds[1];
407 tmp_ospec->operator_param = flds[2];
408 tmp_ospec->output_directory = flds[3];
409 tmp_ospec->bucketwidth = atoi(flds[4]);
410 tmp_ospec->partitioning_flds = flds[5];
411 tmp_ospec->n_partitions = atoi(flds[6]);
412 qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
413 output_specs.push_back(tmp_ospec);
415 fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
420 fprintf(stderr,"output_spec.cfg not found. The query set has no output. exiting.\n");
425 string pspec_fl = "hfta_parallelism.cfg";
427 map<string, int> hfta_parallelism;
428 if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
431 while(fgets(tmpstr,TMPSTRLEN,psp_in)){
432 bool good_entry = true;
434 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
436 string hname = flds[0];
437 int par = atoi(flds[1]);
438 if(par <= 0 || par > n_virtual_interfaces){
439 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
442 if(good_entry && n_virtual_interfaces % par != 0){
443 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
447 hfta_parallelism[hname] = par;
451 fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
455 // LFTA hash table sizes
456 string htspec_fl = "lfta_htsize.cfg";
457 FILE *htsp_in = NULL;
458 map<string, int> lfta_htsize;
459 if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
462 while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
463 bool good_entry = true;
465 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
467 string lfta_name = flds[0];
468 int htsz = atoi(flds[1]);
470 lfta_htsize[lfta_name] = htsz;
472 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
477 fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
480 // LFTA vitual interface hash split
481 string rtlspec_fl = "rts_load.cfg";
483 map<string, vector<int> > rts_hload;
484 if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
489 while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
490 bool good_entry = true;
494 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
496 iface_name = flds[0];
499 for(j=1;j<nflds;++j){
500 int h = atoi(flds[j]);
504 hload.push_back(cumm_h);
510 rts_hload[iface_name] = hload;
512 fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
519 if(output_query_names){
520 if((query_name_output = fopen("query_names.txt","w")) == NULL){
521 fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
526 if(output_schema_summary){
527 if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
528 fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
533 if((qtree_output = fopen("qtree.xml","w")) == NULL){
534 fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
537 fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
538 fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
539 fprintf(qtree_output,"<QueryNodes>\n");
542 // Get an initial Schema
545 // Parse the table schema definitions.
546 fta_parse_result = new fta_parse_t();
547 FtaParser_setfileinput(table_schemas_in);
548 if(FtaParserparse()){
549 fprintf(stderr,"Table schema parse failed.\n");
552 if(fta_parse_result->parse_type != TABLE_PARSE){
553 fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
556 Schema = fta_parse_result->tables;
558 // Ensure that all schema_ids, if set, are distinct.
559 // Obsolete? There is code elsewhere to ensure that schema IDs are
560 // distinct on a per-interface basis.
564 for(int t=0;t<Schema->size();++t){
565 int sch_id = Schema->get_table(t)->get_schema_id();
567 if(found_ids.find(sch_id) != found_ids.end()){
568 dup_ids.insert(sch_id);
570 found_ids.insert(sch_id);
573 if(dup_ids.size()>0){
574 fprintf(stderr, "Error, the schema has duplicate schema_ids:");
575 for(auto dit=dup_ids.begin();dit!=dup_ids.end(); ++dit)
576 fprintf(stderr," %d",(*dit));
577 fprintf(stderr,"\n");
584 // Process schema field inheritance
586 retval = Schema->unroll_tables(err_str);
588 fprintf(stderr,"Error processing schema field inheritance:\n %s\n", err_str.c_str() );
592 // hfta only => we will try to fetch schemas from the registry.
593 // therefore, start off with an empty schema.
594 Schema = new table_list();
598 // Open and parse the external functions file.
599 Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
600 if(Ext_fcnsParserin == NULL){
601 fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
602 Ext_fcns = new ext_fcn_list();
604 if(Ext_fcnsParserparse()){
605 fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
606 Ext_fcns = new ext_fcn_list();
609 if(Ext_fcns->validate_fcns(err_str)){
610 fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
614 // Open and parse the interface resources file.
615 // ifq_t *ifaces_db = new ifq_t();
617 // if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
618 // fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
619 // ifx_fname.c_str(), ierr.c_str());
622 // if(ifaces_db->load_ifqs(ifq_fname, ierr)){
623 // fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
624 // ifq_fname.c_str(), ierr.c_str());
629 // The LFTA code string.
630 // Put the standard preamble here.
631 // NOTE: the hash macros, fcns should go into the run time
632 map<string, string> lfta_val;
633 map<string, string> lfta_prefilter_val;
636 "#include <limits.h>\n"
637 "#include \"rts.h\"\n"
638 "#include \"fta.h\"\n"
639 "#include \"lapp.h\"\n"
640 "#include \"rts_udaf.h\"\n"
641 "#include<stdio.h>\n"
642 "#include <float.h>\n"
643 "#include \"rdtsc.h\"\n"
644 "#include \"watchlist.h\"\n\n"
647 // Get any locally defined parsing headers
649 memset(&glob_result, 0, sizeof(glob_result));
651 // do the glob operation TODO should be from GSROOT
652 int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
653 if(return_value == 0){
655 for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
657 int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
658 lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n";
662 fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
666 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
667 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
668 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
669 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
674 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
676 "#define SLOT_FILLED 0x04\n"
677 "#define SLOT_GEN_BITS 0x03\n"
678 "#define SLOT_HASH_BITS 0xfffffff8\n"
679 "#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
680 "#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
681 "#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
684 "#define lfta_BOOL_to_hash(x) (x)\n"
685 "#define lfta_USHORT_to_hash(x) (x)\n"
686 "#define lfta_UINT_to_hash(x) (x)\n"
687 "#define lfta_IP_to_hash(x) (x)\n"
688 "#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
689 "#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
690 "#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
691 "#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
692 "#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
693 "static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
694 " gs_uint32_t i,ret=0,tmp_sum = 0;\n"
695 " for(i=0;i<x.length;++i){\n"
696 " tmp_sum |= (x.data[i]) << (8*(i%4));\n"
702 " if((i%4)!=0) ret ^=tmp_sum;\n"
708 //////////////////////////////////////////////////////////////////
709 ///// Get all of the query parse trees
713 int hfta_count = 0; // for numeric suffixes to hfta .cc files
715 //---------------------------
716 // Global info needed for post processing.
718 // Set of operator views ref'd in the query set.
720 // lftas on a per-machine basis.
721 map<string, vector<stream_query *> > lfta_mach_lists;
722 int nfiles = input_file_names.size();
723 vector<stream_query *> hfta_list; // list of hftas.
724 map<string, stream_query *> sq_map; // map from query name to stream query.
727 //////////////////////////////////////////
729 // Open and parse the interface resources file.
730 ifq_t *ifaces_db = new ifq_t();
732 if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
733 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
734 ifx_fname.c_str(), ierr.c_str());
737 if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
738 fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
739 ifq_fls[0].c_str(), ierr.c_str());
743 map<string, string> qname_to_flname; // for detecting duplicate query names
747 // Parse the files to create a vector of parse trees.
748 // Load qnodes with information to perform a topo sort
749 // based on query dependencies.
750 vector<query_node *> qnodes; // for topo sort.
751 map<string,int> name_node_map; // map query name to qnodes entry
752 for(i=0;i<input_file_names.size();i++){
754 if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
755 fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
758 fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
760 // Parse the FTA query
761 fta_parse_result = new fta_parse_t();
762 FtaParser_setfileinput(fta_in);
763 if(FtaParserparse()){
764 fprintf(stderr,"FTA parse failed.\n");
767 if(fta_parse_result->parse_type != QUERY_PARSE){
768 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
772 // returns a list of parse trees
773 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
774 for(p=0;p<qlist.size();++p){
775 table_exp_t *fta_parse_tree = qlist[p];
776 // query_parse_trees.push_back(fta_parse_tree);
778 // compute the default name -- extract from query name
779 strcpy(tmpstr,input_file_names[i].c_str());
780 char *qname = strrchr(tmpstr,PATH_DELIM);
785 char *qname_end = strchr(qname,'.');
786 if(qname_end != NULL) *qname_end = '\0';
787 string qname_str = qname;
788 string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
790 // Deternmine visibility. Should I be attaching all of the output methods?
791 if(qname_to_ospec.count(imputed_qname)>0)
792 fta_parse_tree->set_visible(true);
794 fta_parse_tree->set_visible(false);
797 // Create a manipulable repesentation of the parse tree.
798 // the qnode inherits the visibility assigned to the parse tree.
799 int pos = qnodes.size();
800 qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
801 name_node_map[ qnodes[pos]->name ] = pos;
802 //printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
803 // qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
804 // qfiles.push_back(i);
806 // Check for duplicate query names
807 // NOTE : in hfta-only generation, I should
808 // also check with the names of the registered queries.
809 if(qname_to_flname.count(qnodes[pos]->name) > 0){
810 fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
811 qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
814 if(Schema->find_tbl(qnodes[pos]->name) >= 0){
815 fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
816 qnodes[pos]->name.c_str(), input_file_names[i].c_str());
819 qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
825 // Add the library queries
828 for(pos=0;pos<qnodes.size();++pos){
830 for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
831 string src_tbl = qnodes[pos]->refd_tbls[fi];
832 if(qname_to_flname.count(src_tbl) == 0){
833 int last_sep = src_tbl.find_last_of('/');
834 if(last_sep != string::npos){
835 fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
836 string target_qname = src_tbl.substr(last_sep+1);
837 string qpathname = library_path + src_tbl + ".gsql";
838 if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
839 fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
841 fprintf(stderr,"After exit\n");
843 fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
844 // Parse the FTA query
845 fta_parse_result = new fta_parse_t();
846 FtaParser_setfileinput(fta_in);
847 if(FtaParserparse()){
848 fprintf(stderr,"FTA parse failed.\n");
851 if(fta_parse_result->parse_type != QUERY_PARSE){
852 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
856 map<string, int> local_query_map;
857 vector<string> local_query_names;
858 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
859 for(p=0;p<qlist.size();++p){
860 table_exp_t *fta_parse_tree = qlist[p];
861 fta_parse_tree->set_visible(false); // assumed to not produce output
862 string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
863 if(imputed_qname == target_qname)
864 imputed_qname = src_tbl;
865 if(local_query_map.count(imputed_qname)>0){
866 fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
869 local_query_map[ imputed_qname ] = p;
870 local_query_names.push_back(imputed_qname);
873 if(local_query_map.count(src_tbl)==0){
874 fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
878 vector<int> worklist;
879 set<int> added_queries;
880 vector<query_node *> new_qnodes;
881 worklist.push_back(local_query_map[target_qname]);
882 added_queries.insert(local_query_map[target_qname]);
884 int qpos = qnodes.size();
885 for(qq=0;qq<worklist.size();++qq){
886 int q_id = worklist[qq];
887 query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
888 new_qnodes.push_back( new_qnode);
889 vector<string> refd_tbls = new_qnode->refd_tbls;
891 for(ff = 0;ff<refd_tbls.size();++ff){
892 if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
894 if(name_node_map.count(refd_tbls[ff])>0){
895 fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s, and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
898 worklist.push_back(local_query_map[refd_tbls[ff]]);
904 for(qq=0;qq<new_qnodes.size();++qq){
905 int qpos = qnodes.size();
906 qnodes.push_back(new_qnodes[qq]);
907 name_node_map[qnodes[qpos]->name ] = qpos;
908 qname_to_flname[qnodes[qpos]->name ] = qpathname;
922 //---------------------------------------
927 string udop_missing_sources;
928 for(i=0;i<qnodes.size();++i){
930 for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
931 int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
933 if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
934 if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
935 int pos = qnodes.size();
936 qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
937 name_node_map[ qnodes[pos]->name ] = pos;
938 qnodes[pos]->is_externally_visible = false; // its visible
939 // Need to mark the source queries as visible.
941 string missing_sources = "";
942 for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
943 string src_tbl = qnodes[pos]->refd_tbls[si];
944 if(name_node_map.count(src_tbl)==0){
945 missing_sources += src_tbl + " ";
948 if(missing_sources != ""){
949 udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
956 if(udop_missing_sources != ""){
957 fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
963 ////////////////////////////////////////////////////////////////////
964 /// Check parse trees to verify that some
965 /// global properties are met :
966 /// if q1 reads from q2, then
967 /// q2 is processed before q1
968 /// q1 can supply q2's parameters
969 /// Verify there is no cycle in the reads-from graph.
971 // Compute an order in which to process the
974 // Start by building the reads-from lists.
977 for(i=0;i<qnodes.size();++i){
979 vector<string> refd_tbls = qnodes[i]->refd_tbls;
980 for(fi = 0;fi<refd_tbls.size();++fi){
981 if(name_node_map.count(refd_tbls[fi])>0){
982 //printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
983 (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
989 // If one query reads the result of another,
990 // check for parameter compatibility. Currently it must
991 // be an exact match. I will move to requiring
992 // containment after re-ordering, but will require
993 // some analysis for code generation which is not
995 //printf("There are %d query nodes.\n",qnodes.size());
998 for(i=0;i<qnodes.size();++i){
999 vector<var_pair_t *> target_params = qnodes[i]->params;
1000 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1001 vector<var_pair_t *> source_params = qnodes[(*si)]->params;
1002 if(target_params.size() != source_params.size()){
1003 fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1007 for(p=0;p<target_params.size();++p){
1008 if(! (target_params[p]->name == source_params[p]->name &&
1009 target_params[p]->val == source_params[p]->val ) ){
1010 fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1019 // Start by counting inedges.
1020 for(i=0;i<qnodes.size();++i){
1021 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1022 qnodes[(*si)]->n_consumers++;
1026 // The roots are the nodes with indegree zero.
1028 for(i=0;i<qnodes.size();++i){
1029 if(qnodes[i]->n_consumers == 0){
1030 if(qnodes[i]->is_externally_visible == false){
1031 fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible. Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
1037 // Remove the parts of the subtree that produce no output.
1038 set<int> valid_roots;
1039 set<int> discarded_nodes;
1040 set<int> candidates;
1041 while(roots.size() >0){
1042 for(si=roots.begin();si!=roots.end();++si){
1043 if(qnodes[(*si)]->is_externally_visible){
1044 valid_roots.insert((*si));
1046 discarded_nodes.insert((*si));
1047 set<int>::iterator sir;
1048 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1049 qnodes[(*sir)]->n_consumers--;
1050 if(qnodes[(*sir)]->n_consumers == 0)
1051 candidates.insert( (*sir));
1058 roots = valid_roots;
1059 if(discarded_nodes.size()>0){
1060 fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
1062 for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
1063 if(di>0 && (di%8)==0) fprintf(stderr,"\n");
1065 fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
1067 fprintf(stderr,"\n");
1070 // Compute the sources_to set, ignoring discarded nodes.
1071 for(i=0;i<qnodes.size();++i){
1072 if(discarded_nodes.count(i)==0)
1073 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1074 qnodes[(*si)]->sources_to.insert(i);
1079 // Find the nodes that are shared by multiple visible subtrees.
1080 // THe roots become inferred visible nodes.
1082 // Find the visible nodes.
1083 vector<int> visible_nodes;
1084 for(i=0;i<qnodes.size();i++){
1085 if(qnodes[i]->is_externally_visible){
1086 visible_nodes.push_back(i);
1090 // Find UDOPs referenced by visible nodes.
1092 for(i=0;i<visible_nodes.size();++i){
1093 workq.push_back(visible_nodes[i]);
1095 while(!workq.empty()){
1096 int node = workq.front();
1098 set<int>::iterator children;
1099 if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
1100 qnodes[node]->is_externally_visible = true;
1101 visible_nodes.push_back(node);
1102 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1103 if(qnodes[(*children)]->is_externally_visible == false){
1104 qnodes[(*children)]->is_externally_visible = true;
1105 visible_nodes.push_back((*children));
1109 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1110 workq.push_back((*children));
1117 for(i=0;i<qnodes.size();i++){
1118 qnodes[i]->subtree_roots.clear();
1121 // Walk the tree defined by a visible node, not descending into
1122 // subtrees rooted by a visible node. Mark the node visited with
1123 // the visible node ID.
1124 for(i=0;i<visible_nodes.size();++i){
1126 vroots.insert(visible_nodes[i]);
1127 while(vroots.size()>0){
1128 for(si=vroots.begin();si!=vroots.end();++si){
1129 qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
1131 set<int>::iterator sir;
1132 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1133 if(! qnodes[(*sir)]->is_externally_visible){
1134 candidates.insert( (*sir));
1138 vroots = candidates;
1142 // Find the nodes in multiple visible node subtrees, but with no parent
1143 // that has is in multile visible node subtrees. Mark these as inferred visible nodes.
1144 done = true; // until proven otherwise
1145 for(i=0;i<qnodes.size();i++){
1146 if(qnodes[i]->subtree_roots.size()>1){
1147 bool is_new_root = true;
1148 set<int>::iterator sir;
1149 for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
1150 if(qnodes[(*sir)]->subtree_roots.size()>1)
1151 is_new_root = false;
1154 qnodes[i]->is_externally_visible = true;
1155 qnodes[i]->inferred_visible_node = true;
1156 visible_nodes.push_back(i);
1167 // get visible nodes in topo ordering.
1168 // for(i=0;i<qnodes.size();i++){
1169 // qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
1171 vector<int> process_order;
1172 while(roots.size() >0){
1173 for(si=roots.begin();si!=roots.end();++si){
1174 if(discarded_nodes.count((*si))==0){
1175 process_order.push_back( (*si) );
1177 set<int>::iterator sir;
1178 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1179 qnodes[(*sir)]->n_consumers--;
1180 if(qnodes[(*sir)]->n_consumers == 0)
1181 candidates.insert( (*sir));
1189 //printf("process_order.size() =%d\n",process_order.size());
1191 // Search for cyclic dependencies
1193 for(i=0;i<qnodes.size();++i){
1194 if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
1195 if(found_dep.size() != 0) found_dep += ", ";
1196 found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
1199 if(found_dep.size()>0){
1200 fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
1204 // Get a list of query sets, in the order to be processed.
1205 // Start at visible root and do bfs.
1206 // The query set includes queries referenced indirectly,
1207 // as sources for user-defined operators. These are needed
1208 // to ensure that they are added to the schema, but are not part
1209 // of the query tree.
1211 // stream_node_sets contains queries reachable only through the
1212 // FROM clause, so I can tell which queries to add to the stream
1213 // query. (DISABLED, UDOPS are integrated, does this cause problems?)
1215 // NOTE: this code works because in order for data to be
1216 // read by multiple hftas, the node must be externally visible.
1217 // But visible nodes define roots of process sets.
1218 // internally visible nodes can feed data only
1219 // to other nodes in the same query file.
1220 // Therefore, any access can be restricted to a file,
1221 // hfta output sharing is done only on roots
1222 // never on interior nodes.
1227 // Conpute the base collection of hftas.
1228 vector<hfta_node *> hfta_sets;
1229 map<string, int> hfta_name_map;
1230 // vector< vector<int> > process_sets;
1231 // vector< set<int> > stream_node_sets;
1232 reverse(process_order.begin(), process_order.end()); // get listing in reverse order.
1233 // i.e. process leaves 1st.
1234 for(i=0;i<process_order.size();++i){
1235 if(qnodes[process_order[i]]->is_externally_visible == true){
1236 //printf("Visible.\n");
1237 int root = process_order[i];
1238 hfta_node *hnode = new hfta_node();
1239 hnode->name = qnodes[root]-> name;
1240 hnode->source_name = qnodes[root]-> name;
1241 hnode->is_udop = qnodes[root]->is_udop;
1242 hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
1244 vector<int> proc_list; proc_list.push_back(root);
1245 // Ensure that nodes are added only once.
1246 set<int> proc_set; proc_set.insert(root);
1247 roots.clear(); roots.insert(root);
1249 while(roots.size()>0){
1250 for(si=roots.begin();si!=roots.end();++si){
1251 //printf("Processing root %d\n",(*si));
1252 set<int>::iterator sir;
1253 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1254 //printf("reads fom %d\n",(*sir));
1255 if(qnodes[(*sir)]->is_externally_visible==false){
1256 candidates.insert( (*sir) );
1257 if(proc_set.count( (*sir) )==0){
1258 proc_set.insert( (*sir) );
1259 proc_list.push_back( (*sir) );
1268 reverse(proc_list.begin(), proc_list.end());
1269 hnode->query_node_indices = proc_list;
1270 hfta_name_map[hnode->name] = hfta_sets.size();
1271 hfta_sets.push_back(hnode);
1275 // Compute the reads_from / sources_to graphs for the hftas.
1277 for(i=0;i<hfta_sets.size();++i){
1278 hfta_node *hnode = hfta_sets[i];
1279 for(q=0;q<hnode->query_node_indices.size();q++){
1280 query_node *qnode = qnodes[hnode->query_node_indices[q]];
1281 for(s=0;s<qnode->refd_tbls.size();++s){
1282 if(hfta_name_map.count(qnode->refd_tbls[s])){
1283 int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
1284 hnode->reads_from.insert(other_hfta);
1285 hfta_sets[other_hfta]->sources_to.insert(i);
1291 // Compute a topological sort of the hfta_sets.
1293 vector<int> hfta_topsort;
1295 int hnode_srcs[hfta_sets.size()];
1296 for(i=0;i<hfta_sets.size();++i){
1298 if(hfta_sets[i]->sources_to.size() == 0)
1302 while(! workq.empty()){
1303 int node = workq.front();
1305 hfta_topsort.push_back(node);
1306 set<int>::iterator stsi;
1307 for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
1308 int parent = (*stsi);
1309 hnode_srcs[parent]++;
1310 if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
1311 workq.push_back(parent);
1316 // Decorate hfta nodes with the level of parallelism given as input.
1318 map<string, int>::iterator msii;
1319 for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1320 string hfta_name = (*msii).first;
1321 int par = (*msii).second;
1322 if(hfta_name_map.count(hfta_name) > 0){
1323 hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
1325 fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
1329 // Propagate levels of parallelism: children should have a level of parallelism
1330 // as large as any of its parents. Adjust children upwards to compensate.
1331 // Start at parents and adjust children, auto-propagation will occur.
1333 for(i=hfta_sets.size()-1;i>=0;i--){
1334 set<int>::iterator stsi;
1335 for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
1336 if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
1337 hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
1342 // Before all the name mangling, check if therey are any output_spec.cfg
1343 // or hfta_parallelism.cfg entries that do not have a matching query.
1345 string dangling_ospecs = "";
1346 for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
1347 string oq = (*msii).first;
1348 if(hfta_name_map.count(oq) == 0){
1349 dangling_ospecs += " "+(*msii).first;
1352 if(dangling_ospecs!=""){
1353 fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
1356 string dangling_par = "";
1357 for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1358 string oq = (*msii).first;
1359 if(hfta_name_map.count(oq) == 0){
1360 dangling_par += " "+(*msii).first;
1363 if(dangling_par!=""){
1364 fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
1369 // Replicate parallelized hftas. Do __copyX name mangling. Adjust
1370 // FROM clauses: retarget any name which is an internal node, and
1371 // any which is in hfta_sets (and which is parallelized). Add Merge nodes
1372 // when the source hfta has more parallelism than the target node.
1373 // Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
1376 int n_original_hfta_sets = hfta_sets.size();
1377 for(i=0;i<n_original_hfta_sets;++i){
1378 if(hfta_sets[i]->n_parallel > 1){
1379 hfta_sets[i]->do_generation =false; // set the deletion flag for this entry.
1380 set<string> local_nodes; // names of query nodes in the hfta.
1381 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1382 local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
1385 for(p=0;p<hfta_sets[i]->n_parallel;++p){
1386 string mangler = "__copy"+int_to_string(p);
1387 hfta_node *par_hfta = new hfta_node();
1388 par_hfta->name = hfta_sets[i]->name + mangler;
1389 par_hfta->source_name = hfta_sets[i]->name;
1390 par_hfta->is_udop = hfta_sets[i]->is_udop;
1391 par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
1392 par_hfta->n_parallel = hfta_sets[i]->n_parallel;
1393 par_hfta->parallel_idx = p;
1395 map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
1398 if(hfta_sets[i]->is_udop){
1399 int root = hfta_sets[i]->query_node_indices[0];
1401 string unequal_par_sources;
1402 set<int>::iterator rfsii;
1403 for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
1404 if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
1405 unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
1408 if(unequal_par_sources != ""){
1409 fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
1414 vector<string> new_sources;
1415 for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
1416 new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
1419 query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
1420 new_qn->name += mangler;
1421 new_qn->mangler = mangler;
1422 new_qn->refd_tbls = new_sources;
1423 par_hfta->query_node_indices.push_back(qnodes.size());
1424 par_qnode_map[new_qn->name] = qnodes.size();
1425 name_node_map[ new_qn->name ] = qnodes.size();
1426 qnodes.push_back(new_qn);
1428 // regular query node
1429 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1430 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1431 table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
1432 // rehome the from clause on mangled names.
1433 // create merge nodes as needed for external sources.
1434 for(f=0;f<dup_pt->fm->tlist.size();++f){
1435 if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
1436 dup_pt->fm->tlist[f]->schema_name += mangler;
1437 }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
1438 // Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
1439 int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
1440 if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
1441 dup_pt->fm->tlist[f]->schema_name += mangler;
1443 vector<string> src_tbls;
1444 int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
1446 fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
1449 for(s=0;s<stride;++s){
1450 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
1451 src_tbls.push_back(ext_src_name);
1453 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1454 string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
1455 dup_pt->fm->tlist[f]->schema_name = merge_node_name;
1456 // Make a qnode to represent the new merge node
1457 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1458 qn_pt->refd_tbls = src_tbls;
1459 qn_pt->is_udop = false;
1460 qn_pt->is_externally_visible = false;
1461 qn_pt->inferred_visible_node = false;
1462 par_hfta->query_node_indices.push_back(qnodes.size());
1463 par_qnode_map[merge_node_name] = qnodes.size();
1464 name_node_map[ merge_node_name ] = qnodes.size();
1465 qnodes.push_back(qn_pt);
1469 query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
1470 for(f=0;f<dup_pt->fm->tlist.size();++f){
1471 new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
1473 new_qn->params = qnodes[hqn_idx]->params;
1474 new_qn->is_udop = false;
1475 new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
1476 new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
1477 par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
1478 par_qnode_map[new_qn->name] = qnodes.size();
1479 name_node_map[ new_qn->name ] = qnodes.size();
1480 qnodes.push_back(new_qn);
1483 hfta_name_map[par_hfta->name] = hfta_sets.size();
1484 hfta_sets.push_back(par_hfta);
1487 // This hfta isn't being parallelized, but add merge nodes for any parallelized
1489 if(!hfta_sets[i]->is_udop){
1490 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1491 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1492 for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
1493 if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
1494 // Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
1495 int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
1496 if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
1497 vector<string> src_tbls;
1498 for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
1499 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
1500 src_tbls.push_back(ext_src_name);
1502 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1503 string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
1504 qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
1505 // Make a qnode to represent the new merge node
1506 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1507 qn_pt->refd_tbls = src_tbls;
1508 qn_pt->is_udop = false;
1509 qn_pt->is_externally_visible = false;
1510 qn_pt->inferred_visible_node = false;
1511 hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
1512 name_node_map[ merge_node_name ] = qnodes.size();
1513 qnodes.push_back(qn_pt);
1522 // Rebuild the reads_from / sources_to lists in the qnodes
1523 for(q=0;q<qnodes.size();++q){
1524 qnodes[q]->reads_from.clear();
1525 qnodes[q]->sources_to.clear();
1527 for(q=0;q<qnodes.size();++q){
1528 for(s=0;s<qnodes[q]->refd_tbls.size();++s){
1529 if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
1530 int rf = name_node_map[qnodes[q]->refd_tbls[s]];
1531 qnodes[q]->reads_from.insert(rf);
1532 qnodes[rf]->sources_to.insert(q);
1537 // Rebuild the reads_from / sources_to lists in hfta_sets
1538 for(q=0;q<hfta_sets.size();++q){
1539 hfta_sets[q]->reads_from.clear();
1540 hfta_sets[q]->sources_to.clear();
1542 for(q=0;q<hfta_sets.size();++q){
1543 for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
1544 int node = hfta_sets[q]->query_node_indices[s];
1545 set<int>::iterator rfsii;
1546 for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
1547 if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
1548 hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
1549 hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
1556 for(q=0;q<qnodes.size();++q){
1557 printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
1558 set<int>::iterator rsii;
1559 for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
1560 printf(" %d",(*rsii));
1561 printf(", and sources-to %d:",qnodes[q]->sources_to.size());
1562 for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
1563 printf(" %d",(*rsii));
1567 for(q=0;q<hfta_sets.size();++q){
1568 if(hfta_sets[q]->do_generation==false)
1570 printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
1571 set<int>::iterator rsii;
1572 for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
1573 printf(" %d",(*rsii));
1574 printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
1575 for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
1576 printf(" %d",(*rsii));
1583 // Re-topo sort the hftas
1584 hfta_topsort.clear();
1586 int hnode_srcs_2[hfta_sets.size()];
1587 for(i=0;i<hfta_sets.size();++i){
1588 hnode_srcs_2[i] = 0;
1589 if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
1594 while(workq.empty() == false){
1595 int node = workq.front();
1597 hfta_topsort.push_back(node);
1598 set<int>::iterator stsii;
1599 for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
1600 int child = (*stsii);
1601 hnode_srcs_2[child]++;
1602 if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
1603 workq.push_back(child);
1608 // Ensure that all of the query_node_indices in hfta_sets are topologically
1609 // sorted, don't rely on assumptions that all transforms maintain some kind of order.
1610 for(i=0;i<hfta_sets.size();++i){
1611 if(hfta_sets[i]->do_generation){
1612 map<int,int> n_accounted;
1613 vector<int> new_order;
1615 vector<int>::iterator vii;
1616 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1617 n_accounted[(*vii)]= 0;
1619 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1620 set<int>::iterator rfsii;
1621 for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
1622 if(n_accounted.count((*rfsii)) == 0){
1623 n_accounted[(*vii)]++;
1626 if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
1627 workq.push_back((*vii));
1631 while(workq.empty() == false){
1632 int node = workq.front();
1634 new_order.push_back(node);
1635 set<int>::iterator stsii;
1636 for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
1637 if(n_accounted.count((*stsii))){
1638 n_accounted[(*stsii)]++;
1639 if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
1640 workq.push_back((*stsii));
1645 hfta_sets[i]->query_node_indices = new_order;
1653 /// Global checkng is done, start the analysis and translation
1654 /// of the query parse tree in the order specified by process_order
1657 // Get a list of the LFTAs for global lfta optimization
1658 // TODO: separate building operators from spliting lftas,
1659 // that will make optimizations such as predicate pushing easier.
1660 vector<stream_query *> lfta_list;
1661 stream_query *rootq;
1664 map<string, map<int, string> > schema_of_schemaid; // ensure that schema IDs are unique to an <interface, schema>
1666 for(qi=hfta_topsort.size()-1;qi>=0;--qi){
1668 int hfta_id = hfta_topsort[qi];
1669 vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
1673 // Two possibilities, either its a UDOP, or its a collection of queries.
1674 // if(qnodes[curr_list.back()]->is_udop)
1675 if(hfta_sets[hfta_id]->is_udop){
1676 int node_id = curr_list.back();
1677 int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
1678 opview_entry *opv = new opview_entry();
1680 // Many of the UDOP properties aren't currently used.
1681 opv->parent_qname = "no_parent";
1682 opv->root_name = qnodes[node_id]->name;
1683 opv->view_name = qnodes[node_id]->file;
1685 sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
1686 opv->udop_alias = tmpstr;
1687 opv->mangler = qnodes[node_id]->mangler;
1689 if(opv->mangler != ""){
1690 int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
1691 Schema->mangle_subq_names(new_udop_schref,opv->mangler);
1694 // This piece of code makes each hfta which referes to the same udop
1695 // reference a distinct running udop. Do this at query optimization time?
1696 // fmtbl->set_udop_alias(opv->udop_alias);
1698 opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
1699 opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
1701 vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
1703 for(s=0;s<subq.size();++s){
1704 // Validate that the fields match.
1705 subquery_spec *sqs = subq[s];
1706 string subq_name = sqs->name + opv->mangler;
1707 vector<field_entry *> flds = Schema->get_fields(subq_name);
1708 if(flds.size() == 0){
1709 fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
1712 if(flds.size() < sqs->types.size()){
1713 fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
1716 bool failed = false;
1717 for(f=0;f<sqs->types.size();++f){
1718 data_type dte(sqs->types[f],sqs->modifiers[f]);
1719 data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
1720 if(! dte.subsumes_type(&dtf) ){
1721 fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
1725 if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
1726 string pstr = dte.get_temporal_string();
1727 fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
1734 /// Validation done, find the subquery, make a copy of the
1735 /// parse tree, and add it to the return list.
1736 for(q=0;q<qnodes.size();++q)
1737 if(qnodes[q]->name == subq_name)
1739 if(q==qnodes.size()){
1740 fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
1746 // Cross-link to from entry(s) in all sourced-to tables.
1747 set<int>::iterator sii;
1748 for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
1749 //printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
1750 vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
1752 for(ii=0;ii<tblvars.size();++ii){
1753 if(tblvars[ii]->schema_name == opv->root_name){
1754 tblvars[ii]->set_opview_idx(opviews.size());
1760 opviews.append(opv);
1763 // Analyze the parse trees in this query,
1764 // put them in rootq
1765 // vector<int> curr_list = process_sets[qi];
1768 ////////////////////////////////////////
1771 //printf("Process set %d, has %d queries\n",qi,curr_list.size());
1772 for(qj=0;qj<curr_list.size();++qj){
1774 fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
1776 // Select the current query parse tree
1777 table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
1779 // if hfta only, try to fetch any missing schemas
1780 // from the registry (using the print_schema program).
1781 // Here I use a hack to avoid analyzing the query -- all referenced
1782 // tables must be in the from clause
1783 // If there is a problem loading any table, just issue a warning,
1785 tablevar_list_t *fm = fta_parse_tree->get_from();
1786 vector<string> refd_tbls = fm->get_src_tbls(Schema);
1787 // iterate over all referenced tables
1789 for(t=0;t<refd_tbls.size();++t){
1790 int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
1792 if(tbl_ref < 0){ // if this table is not in the Schema
1795 string cmd="print_schema "+refd_tbls[t];
1796 FILE *schema_in = popen(cmd.c_str(), "r");
1797 if(schema_in == NULL){
1798 fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
1800 string schema_instr;
1801 while(fgets(tmpstr,TMPSTRLEN,schema_in)){
1802 schema_instr += tmpstr;
1804 fta_parse_result = new fta_parse_t();
1805 strcpy(tmp_schema_str,schema_instr.c_str());
1806 FtaParser_setstringinput(tmp_schema_str);
1807 if(FtaParserparse()){
1808 fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
1810 if( fta_parse_result->tables != NULL){
1812 for(tl=0;tl<fta_parse_result->tables->size();++tl){
1813 Schema->add_table(fta_parse_result->tables->get_table(tl));
1816 fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
1821 fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
1829 // Analyze the query.
1830 query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
1832 fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
1836 stream_query new_sq(qs, Schema);
1837 if(new_sq.error_code){
1838 fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
1842 // Add it to the Schema
1843 table_def *output_td = new_sq.get_output_tabledef();
1844 Schema->add_table(output_td);
1846 // Create a query plan from the analyzed parse tree.
1847 // If its a query referneced via FROM, add it to the stream query.
1849 rootq->add_query(new_sq);
1851 rootq = new stream_query(new_sq);
1852 // have the stream query object inherit properties form the analyzed
1853 // hfta_node object.
1854 rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
1855 rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
1861 // This stream query has all its parts
1862 // Build and optimize it.
1863 //printf("translate_fta: generating plan.\n");
1864 if(rootq->generate_plan(Schema)){
1865 fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
1869 // If we've found the query plan head, so now add the output operators
1870 if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
1871 pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
1872 multimap<string, int>::iterator mmsi;
1873 oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
1874 for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
1875 rootq->add_output_operator(output_specs[(*mmsi).second]);
1881 // Perform query splitting if necessary.
1883 vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
1886 //for(l=0;l<split_queries.size();++l){
1887 //printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
1893 if(split_queries.size() > 0){ // should be at least one component.
1895 // Compute the number of LFTAs.
1896 int n_lfta = split_queries.size();
1897 if(hfta_returned) n_lfta--;
1898 // Check if a schemaId constraint needs to be inserted.
1900 // Process the LFTA components.
1901 for(l=0;l<n_lfta;++l){
1902 if(lfta_names.count(split_queries[l]->query_name) == 0){
1903 // Grab the lfta for global optimization.
1904 vector<tablevar_t *> tvec = split_queries[l]->query_plan[0]->get_input_tbls();
1905 string liface = "_local_";
1906 // string lmach = "";
1907 string lmach = hostname;
1909 liface = tvec[0]->get_interface(); // iface queries have been resolved
1910 lmach = tvec[0]->get_machine();
1912 interface_names.push_back(liface);
1913 machine_names.push_back(lmach);
1916 vector<predicate_t *> schemaid_preds;
1917 for(int irv=0;irv<tvec.size();++irv){
1919 string schema_name = tvec[irv]->get_schema_name();
1920 string rvar_name = tvec[irv]->get_var_name();
1921 int schema_ref = tvec[irv]->get_schema_ref();
1924 // interface_names.push_back(liface);
1925 // machine_names.push_back(lmach);
1927 //printf("Machine is %s\n",lmach.c_str());
1929 // Check if a schemaId constraint needs to be inserted.
1930 if(schema_ref<0){ // can result from some kinds of splits
1931 schema_ref = Schema->get_table_ref(schema_name);
1933 int schema_id = Schema->get_schema_id(schema_ref); // id associated with PROTOCOL
1936 iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
1938 fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
1943 if(tvec[irv]->get_interface() != "_local_"){
1944 if(iface->has_multiple_schemas()){
1945 if(schema_id<0){ // invalid schema_id
1946 fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
1949 vector<string> iface_schemas = iface->get_property("Schemas");
1950 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1951 fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1954 // Ensure that in liface, schema_id is used for only one schema
1955 if(schema_of_schemaid.count(liface)==0){
1956 map<int, string> empty_map;
1957 schema_of_schemaid[liface] = empty_map;
1959 if(schema_of_schemaid[liface].count(schema_id)==0){
1960 schema_of_schemaid[liface][schema_id] = schema_name;
1962 if(schema_of_schemaid[liface][schema_id] != schema_name){
1963 fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
1967 }else{ // single-schema interface
1968 schema_id = -1; // don't generate schema_id predicate
1969 vector<string> iface_schemas = iface->get_property("Schemas");
1970 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1971 fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1974 if(iface_schemas.size()>1){
1975 fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
1983 // If we need to check the schema_id, insert a predicate into the lfta.
1984 // TODO not just schema_id, the full all_schema_ids set.
1986 colref_t *schid_cr = new colref_t("schemaId");
1987 schid_cr->schema_ref = schema_ref;
1988 schid_cr->table_name = rvar_name;
1989 schid_cr->tablevar_ref = 0;
1990 schid_cr->default_table = false;
1991 scalarexp_t *schid_se = new scalarexp_t(schid_cr);
1992 data_type *schid_dt = new data_type("uint");
1993 schid_se->dt = schid_dt;
1995 string schid_str = int_to_string(schema_id);
1996 literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
1997 scalarexp_t *lit_se = new scalarexp_t(schid_lit);
1998 lit_se->dt = schid_dt;
2000 predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
2001 vector<cnf_elem *> clist;
2002 make_cnf_from_pr(schid_pr, clist);
2003 analyze_cnf(clist[0]);
2004 clist[0]->cost = 1; // cheap one comparison
2005 // cnf built, now insert it.
2006 split_queries[l]->query_plan[0]->append_to_where(clist[0]);
2008 // Specialized processing
2009 // filter join, get two schemaid preds
2010 string node_type = split_queries[l]->query_plan[0]->node_type();
2011 if(node_type == "filter_join"){
2012 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2014 fj->pred_t0.push_back(clist[0]);
2016 fj->pred_t1.push_back(clist[0]);
2018 schemaid_preds.push_back(schid_pr);
2020 // watchlist join, get the first schemaid pred
2021 if(node_type == "watch_join"){
2022 watch_join_qpn *fj = (watch_join_qpn *)split_queries[l]->query_plan[0];
2024 fj->pred_t0.push_back(clist[0]);
2025 schemaid_preds.push_back(schid_pr);
2030 // Specialized processing, currently filter join.
2031 if(schemaid_preds.size()>1){
2032 string node_type = split_queries[l]->query_plan[0]->node_type();
2033 if(node_type == "filter_join"){
2034 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2035 predicate_t *filter_pr = new predicate_t("OR", schemaid_preds[0], schemaid_preds[1]);
2036 vector<cnf_elem *> clist;
2037 make_cnf_from_pr(filter_pr, clist);
2038 analyze_cnf(clist[0]);
2039 clist[0]->cost = 1; // cheap one comparison
2040 fj->shared_pred.push_back(clist[0]);
2050 // Set the ht size from the recommendation, if there is one in the rec file
2051 if(lfta_htsize.count(split_queries[l]->query_name)>0){
2052 split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
2056 lfta_names[split_queries[l]->query_name] = lfta_list.size();
2057 split_queries[l]->set_gid(lfta_list.size()); // set lfta global id
2058 lfta_list.push_back(split_queries[l]);
2059 lfta_mach_lists[lmach].push_back(split_queries[l]);
2061 // THe following is a hack,
2062 // as I should be generating LFTA code through
2063 // the stream_query object.
2065 split_queries[l]->query_plan[0]->bind_to_schema(Schema);
2067 // split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
2070 // Create query description to embed in lfta.c
2071 string lfta_schema_str = split_queries[l]->make_schema();
2072 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2074 // get NIC capabilities.
2076 nic_property *nicprop = NULL;
2077 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2078 if(iface_codegen_type.size()){
2079 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2081 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2086 lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
2089 snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
2091 // TODO NOTE : I'd like it to be the case that registration_query_names
2092 // are the queries to be registered for subsciption.
2093 // but there is some complex bookkeeping here.
2094 registration_query_names.push_back(split_queries[l]->query_name);
2095 mach_query_names[lmach].push_back(registration_query_names.size()-1);
2096 // NOTE: I will assume a 1-1 correspondance between
2097 // mach_query_names[lmach] and lfta_mach_lists[lmach]
2098 // where mach_query_names[lmach][i] contains the index into
2099 // query_names, which names the lfta, and
2100 // mach_query_names[lmach][i] is the stream_query * of the
2101 // corresponding lfta.
2102 // Later, lfta_iface_qnames are the query names matching lfta_iface_lists
2106 // check if lfta is reusable
2107 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
2109 bool lfta_reusable = false;
2110 if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
2111 split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
2112 lfta_reusable = true;
2114 lfta_reuse_options.push_back(lfta_reusable);
2116 // LFTA will inherit the liveness timeout specification from the containing query
2117 // it is too conservative as lfta are expected to spend less time per tuple
2120 // extract liveness timeout from query definition
2121 int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
2122 if (!liveness_timeout) {
2123 // fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
2124 // split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2125 liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2127 lfta_liveness_timeouts.push_back(liveness_timeout);
2129 // Add it to the schema
2130 table_def *td = split_queries[l]->get_output_tabledef();
2131 Schema->append_table(td);
2132 //printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
2137 // If the output is lfta-only, dump out the query name.
2138 if(split_queries.size() == 1 && !hfta_returned){
2139 if(output_query_names ){
2140 fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
2144 fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
2149 // output schema summary
2150 if(output_schema_summary){
2151 dump_summary(split_queries[0]);
2157 if(hfta_returned){ // query also has an HFTA component
2158 int hfta_nbr = split_queries.size()-1;
2160 hfta_list.push_back(split_queries[hfta_nbr]);
2162 // report on generated query names
2163 if(output_query_names){
2164 string hfta_name =split_queries[hfta_nbr]->query_name;
2165 fprintf(query_name_output,"%s H\n",hfta_name.c_str());
2166 for(l=0;l<hfta_nbr;++l){
2167 string lfta_name =split_queries[l]->query_name;
2168 fprintf(query_name_output,"%s L\n",lfta_name.c_str());
2172 // fprintf(stderr,"query names are ");
2173 // for(l=0;l<hfta_nbr;++l){
2174 // if(l>0) fprintf(stderr,",");
2175 // string fta_name =split_queries[l]->query_name;
2176 // fprintf(stderr," %s",fta_name.c_str());
2178 // fprintf(stderr,"\n");
2183 fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
2184 fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
2191 //-----------------------------------------------------------------
2192 // Compute and propagate the SE in PROTOCOL fields compute a field.
2193 //-----------------------------------------------------------------
2195 for(i=0;i<lfta_list.size();i++){
2196 lfta_list[i]->generate_protocol_se(sq_map, Schema);
2197 sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
2199 for(i=0;i<hfta_list.size();i++){
2200 hfta_list[i]->generate_protocol_se(sq_map, Schema);
2201 sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
2206 //------------------------------------------------------------------------
2207 // Perform individual FTA optimizations
2208 //-----------------------------------------------------------------------
2210 if (partitioned_mode) {
2212 // open partition definition file
2213 string part_fname = config_dir_path + "partition.txt";
2215 FILE* partfd = fopen(part_fname.c_str(), "r");
2217 fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
2220 PartnParser_setfileinput(partfd);
2221 if (PartnParserparse()) {
2222 fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
2229 print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2231 int num_hfta = hfta_list.size();
2232 for(i=0; i < hfta_list.size(); ++i){
2233 hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
2236 // Add all new hftas to schema
2237 for(i=num_hfta; i < hfta_list.size(); ++i){
2238 table_def *td = hfta_list[i]->get_output_tabledef();
2239 Schema->append_table(td);
2242 print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2246 //------------------------------------------------------------------------
2247 // Do global (cross-fta) optimization
2248 //-----------------------------------------------------------------------
2255 set<string> extra_external_libs;
2257 for(i=0;i<hfta_list.size();++i){ // query also has an HFTA component
2260 // build hfta file name, create output
2261 if(numeric_hfta_flname){
2262 sprintf(tmpstr,"hfta_%d",hfta_count);
2263 hfta_names.push_back(tmpstr);
2264 sprintf(tmpstr,"hfta_%d.cc",hfta_count);
2266 sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
2267 hfta_names.push_back(tmpstr);
2268 sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
2270 FILE *hfta_fl = fopen(tmpstr,"w");
2271 if(hfta_fl == NULL){
2272 fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
2275 fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
2277 // If there is a field verifier, warn about
2278 // lack of compatability
2279 // NOTE : this code assumes that visible non-lfta queries
2280 // are those at the root of a stream query.
2281 string hfta_comment;
2283 string hfta_namespace;
2284 if(hfta_list[i]->defines.count("comment")>0)
2285 hfta_comment = hfta_list[i]->defines["comment"];
2286 if(hfta_list[i]->defines.count("Comment")>0)
2287 hfta_comment = hfta_list[i]->defines["Comment"];
2288 if(hfta_list[i]->defines.count("COMMENT")>0)
2289 hfta_comment = hfta_list[i]->defines["COMMENT"];
2290 if(hfta_list[i]->defines.count("title")>0)
2291 hfta_title = hfta_list[i]->defines["title"];
2292 if(hfta_list[i]->defines.count("Title")>0)
2293 hfta_title = hfta_list[i]->defines["Title"];
2294 if(hfta_list[i]->defines.count("TITLE")>0)
2295 hfta_title = hfta_list[i]->defines["TITLE"];
2296 if(hfta_list[i]->defines.count("namespace")>0)
2297 hfta_namespace = hfta_list[i]->defines["namespace"];
2298 if(hfta_list[i]->defines.count("Namespace")>0)
2299 hfta_namespace = hfta_list[i]->defines["Namespace"];
2300 if(hfta_list[i]->defines.count("NAMESPACE")>0)
2301 hfta_namespace = hfta_list[i]->defines["NAMESPACE"];
2303 if(field_verifier != NULL){
2305 if(hfta_comment == "")
2306 warning_str += "\tcomment not found.\n";
2308 // Obsolete stuff that Carsten wanted
2309 // if(hfta_title == "")
2310 // warning_str += "\ttitle not found.\n";
2311 // if(hfta_namespace == "")
2312 // warning_str += "\tnamespace not found.\n";
2315 // There is a get_tbl_keys method implemented for qp_nodes,
2316 // integrate it into steam_query, then call it to find keys,
2317 // and annotate feidls with their key-ness.
2318 // If there is a "keys" proprty in the defines block, override anything returned
2319 // from the automated analysis
2321 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2323 for(fi=0;fi<flds.size();fi++){
2324 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2326 if(warning_str != "")
2327 fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
2328 hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
2331 // Get the fields in this query
2332 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2334 // do key processing
2335 string hfta_keys_s = "";
2336 if(hfta_list[i]->defines.count("keys")>0)
2337 hfta_keys_s = hfta_list[i]->defines["keys"];
2338 if(hfta_list[i]->defines.count("Keys")>0)
2339 hfta_keys_s = hfta_list[i]->defines["Keys"];
2340 if(hfta_list[i]->defines.count("KEYS")>0)
2341 hfta_keys_s = hfta_list[i]->defines["KEYS"];
2342 string xtra_keys_s = "";
2343 if(hfta_list[i]->defines.count("extra_keys")>0)
2344 xtra_keys_s = hfta_list[i]->defines["extra_keys"];
2345 if(hfta_list[i]->defines.count("Extra_Keys")>0)
2346 xtra_keys_s = hfta_list[i]->defines["Extra_Keys"];
2347 if(hfta_list[i]->defines.count("EXTRA_KEYS")>0)
2348 xtra_keys_s = hfta_list[i]->defines["EXTRA_KEYS"];
2350 vector<string> hfta_keys;
2351 vector<string> partial_keys;
2352 vector<string> xtra_keys;
2353 if(hfta_keys_s==""){
2354 hfta_keys = hfta_list[i]->get_tbl_keys(partial_keys);
2355 if(xtra_keys_s.size()>0){
2356 xtra_keys = split_string(xtra_keys_s, ',');
2358 for(int xi=0;xi<xtra_keys.size();++xi){
2359 if(find(hfta_keys.begin(), hfta_keys.end(), xtra_keys[xi]) == hfta_keys.end()){
2360 hfta_keys.push_back(xtra_keys[xi]);
2364 hfta_keys = split_string(hfta_keys_s, ',');
2366 // validate that all of the keys exist in the output.
2367 // (exit on error, as its a bad specificiation)
2368 vector<string> missing_keys;
2369 for(int ki=0;ki<hfta_keys.size(); ++ki){
2371 for(fi=0;fi<flds.size();++fi){
2372 if(hfta_keys[ki] == flds[fi]->get_name())
2376 missing_keys.push_back(hfta_keys[ki]);
2378 if(missing_keys.size()>0){
2379 fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the output:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2380 for(int hi=0; hi<missing_keys.size(); ++hi){
2381 fprintf(stderr," %s", missing_keys[hi].c_str());
2383 fprintf(stderr,"\n");
2387 fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2388 if(hfta_comment != "")
2389 fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
2390 if(hfta_title != "")
2391 fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
2392 if(hfta_namespace != "")
2393 fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
2394 fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
2395 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2397 // write info about fields to qtree.xml
2399 for(fi=0;fi<flds.size();fi++){
2400 fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2401 if(flds[fi]->get_modifier_list()->size()){
2402 fprintf(qtree_output,"mods='%s'", flds[fi]->get_modifier_list()->to_string().c_str());
2404 fprintf(qtree_output," />\n");
2407 for(int hi=0;hi<hfta_keys.size();++hi){
2408 fprintf(qtree_output, "\t\t<Key value='%s'/>\n",hfta_keys[hi].c_str());
2410 for(int hi=0;hi<partial_keys.size();++hi){
2411 fprintf(qtree_output, "\t\t<PartialKey value='%s'/>\n",partial_keys[hi].c_str());
2413 for(int hi=0;hi<xtra_keys.size();++hi){
2414 fprintf(qtree_output, "\t\t<ExtraKey value='%s'/>\n",xtra_keys[hi].c_str());
2418 // extract liveness timeout from query definition
2419 int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
2420 if (!liveness_timeout) {
2421 // fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
2422 // hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2423 liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2425 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
2427 vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
2429 for(itv=0;itv<tmp_tv.size();++itv){
2430 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
2432 string ifrs = hfta_list[i]->collect_refd_ifaces();
2434 fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
2436 fprintf(qtree_output,"\t</HFTA>\n");
2440 // debug only -- do code generation to catch generation-time errors.
2441 hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
2444 hfta_count++; // for hfta file names with numeric suffixes
2446 hfta_list[i]->get_external_libs(extra_external_libs);
2450 string ext_lib_string;
2451 set<string>::iterator ssi_el;
2452 for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
2453 ext_lib_string += (*ssi_el)+" ";
2457 // Report on the set of operator views
2458 for(i=0;i<opviews.size();++i){
2459 opview_entry *opve = opviews.get_entry(i);
2460 fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
2461 fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
2462 fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
2463 fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
2464 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2466 if (!opve->liveness_timeout) {
2467 // fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
2468 // opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2469 opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2471 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
2473 for(j=0;j<opve->subq_names.size();j++)
2474 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
2475 fprintf(qtree_output,"\t</UDOP>\n");
2479 //-----------------------------------------------------------------
2481 // Create interface-specific meta code files.
2482 // first, open and parse the interface resources file.
2483 ifaces_db = new ifq_t();
2485 if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
2486 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
2487 ifx_fname.c_str(), ierr.c_str());
2491 map<string, vector<stream_query *> >::iterator svsi;
2492 for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
2493 string lmach = (*svsi).first;
2495 // For this machine, create a set of lftas per interface.
2496 vector<stream_query *> mach_lftas = (*svsi).second;
2497 map<string, vector<stream_query *> > lfta_iface_lists;
2499 for(li=0;li<mach_lftas.size();++li){
2500 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2501 string lfta_iface = "_local_";
2503 string lfta_iface = tvec[0]->get_interface();
2505 lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2508 map<string, vector<stream_query *> >::iterator lsvsi;
2509 for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
2511 string liface = (*lsvsi).first;
2512 vector<stream_query *> iface_lftas = (*lsvsi).second;
2513 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2514 if(iface_codegen_type.size()){
2515 nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2517 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2520 string mcs = generate_nic_code(iface_lftas, nicprop);
2523 mcf_flnm = lmach + "_"+liface+".mcf";
2525 mcf_flnm = hostname + "_"+liface+".mcf";
2527 if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
2528 fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
2531 fprintf(mcf_fl,"%s",mcs.c_str());
2533 //printf("mcs of machine %s, iface %s of type %s is \n%s\n",
2534 //lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
2543 //-----------------------------------------------------------------
2546 // Find common filter predicates in the LFTAs.
2547 // in addition generate structs to store the temporal attributes unpacked by prefilter
2549 map<string, vector<stream_query *> >::iterator ssqi;
2550 for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
2552 string lmach = (*ssqi).first;
2553 bool packed_return = false;
2557 // The LFTAs of this machine.
2558 vector<stream_query *> mach_lftas = (*ssqi).second;
2559 // break up on a per-interface basis.
2560 map<string, vector<stream_query *> > lfta_iface_lists;
2561 map<string, vector<int> > lfta_iface_qname_ix; // need the query name
2563 for(li=0;li<mach_lftas.size();++li){
2564 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2565 string lfta_iface = "_local_";
2567 lfta_iface = tvec[0]->get_interface();
2569 lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2570 lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
2574 // Are the return values "packed"?
2575 // This should be done on a per-interface basis.
2576 // But this is defunct code for gs-lite
2577 for(li=0;li<mach_lftas.size();++li){
2578 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2579 string liface = "_local_";
2581 liface = tvec[0]->get_interface();
2583 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2584 if(iface_codegen_type.size()){
2585 if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
2586 packed_return = true;
2592 // Separate lftas by interface, collect results on a per-interface basis.
2594 vector<cnf_set *> no_preds; // fallback if there is no prefilter
2595 map<string, vector<cnf_set *> > prefilter_preds;
2596 set<unsigned int> pred_ids; // this can be global for all interfaces
2597 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2598 string liface = (*mvsi).first;
2599 vector<cnf_set *> empty_list;
2600 prefilter_preds[liface] = empty_list;
2601 if(! packed_return){
2602 get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
2605 // get NIC capabilities. (Is this needed?)
2606 nic_property *nicprop = NULL;
2607 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2608 if(iface_codegen_type.size()){
2609 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2611 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2618 // Now that we know the prefilter preds, generate the lfta code.
2619 // Do this for all lftas in this machine.
2620 for(li=0;li<mach_lftas.size();++li){
2621 set<unsigned int> subsumed_preds;
2622 set<unsigned int>::iterator sii;
2624 for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
2626 if((pid>>16) == li){
2627 subsumed_preds.insert(pid & 0xffff);
2631 string lfta_schema_str = mach_lftas[li]->make_schema();
2632 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2633 nic_property *nicprop = NULL; // no NIC properties?
2634 lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
2638 // generate structs to store the temporal attributes
2639 // unpacked by prefilter
2640 col_id_set temp_cids;
2641 get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
2642 lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
2644 // Compute the lfta bit signatures and the lfta colrefs
2645 // do this on a per-interface basis
2647 lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2649 map<string, vector<long long int> > lfta_sigs; // used again later
2650 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2651 string liface = (*mvsi).first;
2652 vector<long long int> empty_list;
2653 lfta_sigs[liface] = empty_list;
2655 vector<col_id_set> lfta_cols;
2656 vector<int> lfta_snap_length;
2657 for(li=0;li<lfta_iface_lists[liface].size();++li){
2658 unsigned long long int mask=0, bpos=1;
2660 for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
2661 if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
2665 lfta_sigs[liface].push_back(mask);
2666 lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
2667 lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
2670 //for(li=0;li<mach_lftas.size();++li){
2671 //printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
2672 //col_id_set::iterator tcisi;
2673 //for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
2674 //printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
2679 // generate the prefilter
2680 // Do this on a per-interface basis, except for the #define
2682 // lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2683 lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2685 lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2690 // Generate interface parameter lookup function
2691 lfta_val[lmach] += "// lookup interface properties by name\n";
2692 lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
2693 lfta_val[lmach] += "// returns NULL if given property does not exist\n";
2694 lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
2696 // collect a lit of interface names used by queries running on this host
2697 set<std::string> iface_names;
2698 for(i=0;i<mach_query_names[lmach].size();i++){
2699 int mi = mach_query_names[lmach][i];
2700 stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2702 if(interface_names[mi]=="")
2703 iface_names.insert("DEFAULTDEV");
2705 iface_names.insert(interface_names[mi]);
2708 // generate interface property lookup code for every interface
2709 set<std::string>::iterator sir;
2710 for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
2711 if (sir == iface_names.begin())
2712 lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2714 lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2716 // iterate through interface properties
2717 vector<string> iface_properties;
2718 if(*sir!="_local_"){ // dummy watchlist interface, don't process.
2719 iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
2722 fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
2725 if (iface_properties.empty())
2726 lfta_val[lmach] += "\t\treturn NULL;\n";
2728 for (int i = 0; i < iface_properties.size(); ++i) {
2730 lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2732 lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2734 // combine all values for the interface property using comma separator
2735 vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
2736 lfta_val[lmach] += "\t\t\treturn \"";
2737 for (int j = 0; j < vals.size(); ++j) {
2738 lfta_val[lmach] += vals[j];
2739 if (j != vals.size()-1)
2740 lfta_val[lmach] += ",";
2742 lfta_val[lmach] += "\";\n";
2744 lfta_val[lmach] += "\t\t} else\n";
2745 lfta_val[lmach] += "\t\t\treturn NULL;\n";
2748 lfta_val[lmach] += "\t} else\n";
2749 lfta_val[lmach] += "\t\treturn NULL;\n";
2750 lfta_val[lmach] += "}\n\n";
2753 // Generate a full list of FTAs for clearinghouse reference
2754 lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
2755 lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
2758 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2759 string liface = (*mvsi).first;
2760 if(liface != "_local_"){ // these don't register themselves
2761 vector<stream_query *> lfta_list = (*mvsi).second;
2762 for(i=0;i<lfta_list.size();i++){
2763 int mi = lfta_iface_qname_ix[liface][i];
2764 if(first) first = false;
2765 else lfta_val[lmach] += ", ";
2766 lfta_val[lmach] += "\"" + registration_query_names[mi] + "\"";
2770 // for (i = 0; i < registration_query_names.size(); ++i) {
2772 // lfta_val[lmach] += ", ";
2773 // lfta_val[lmach] += "\"" + registration_query_names[i] + "\"";
2776 for (i = 0; i < hfta_list.size(); ++i) {
2777 lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
2779 lfta_val[lmach] += ", NULL};\n\n";
2782 // Add the initialization function to lfta.c
2783 // Change to accept the interface name, and
2784 // set the prefilter function accordingly.
2785 // see the example in demo/err2
2786 lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
2787 lfta_val[lmach] += "// note: the last parameter in fta_register is the prefilter signature\n";
2789 // for(i=0;i<mach_query_names[lmach].size();i++)
2790 // int mi = mach_query_names[lmach][i];
2791 // stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2793 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2794 string liface = (*mvsi).first;
2795 vector<stream_query *> lfta_list = (*mvsi).second;
2796 for(i=0;i<lfta_list.size();i++){
2797 stream_query *lfta_sq = lfta_list[i];
2798 int mi = lfta_iface_qname_ix[liface][i];
2800 if(liface == "_local_"){
2801 // Don't register an init function, do the init code inline
2802 lfta_val[lmach] += "// initialization code for "+registration_query_names[mi]+";\n";
2803 lfta_val[lmach] += "\t"+generate_watchlist_name(registration_query_names[mi])+".filename = "+generate_watchlist_name(registration_query_names[mi])+"__fstr;\n";
2807 fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
2809 string this_iface = "DEFAULTDEV";
2810 if(interface_names[mi]!="")
2811 this_iface = '"'+interface_names[mi]+'"';
2812 lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
2813 lfta_val[lmach] += "\t\tfta_register(\""+registration_query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
2814 // if(interface_names[mi]=="")
2815 // lfta_val[lmach]+="DEFAULTDEV";
2817 // lfta_val[lmach]+='"'+interface_names[mi]+'"';
2818 lfta_val[lmach] += this_iface;
2821 lfta_val[lmach] += ", "+generate_alloc_name(registration_query_names[mi])
2822 +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(registration_query_names[mi])
2824 sprintf(tmpstr,",%d",snap_lengths[mi]);
2825 lfta_val[lmach] += tmpstr;
2827 // unsigned long long int mask=0, bpos=1;
2829 // for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
2830 // if(prefilter_preds[f_pos]->lfta_id.count(i))
2832 // bpos = bpos << 1;
2836 // sprintf(tmpstr,",%lluull",mask);
2837 sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
2838 lfta_val[lmach]+=tmpstr;
2840 lfta_val[lmach] += ",0ull";
2843 lfta_val[lmach] += ");\n";
2847 // End of lfta prefilter stuff
2848 // --------------------------------------------------
2850 // If there is a field verifier, warn about
2851 // lack of compatability
2852 string lfta_comment;
2854 string lfta_namespace;
2855 map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
2856 if(ldefs.count("comment")>0)
2857 lfta_comment = lfta_sq->defines["comment"];
2858 if(ldefs.count("Comment")>0)
2859 lfta_comment = lfta_sq->defines["Comment"];
2860 if(ldefs.count("COMMENT")>0)
2861 lfta_comment = lfta_sq->defines["COMMENT"];
2862 if(ldefs.count("title")>0)
2863 lfta_title = lfta_sq->defines["title"];
2864 if(ldefs.count("Title")>0)
2865 lfta_title = lfta_sq->defines["Title"];
2866 if(ldefs.count("TITLE")>0)
2867 lfta_title = lfta_sq->defines["TITLE"];
2868 if(ldefs.count("NAMESPACE")>0)
2869 lfta_namespace = lfta_sq->defines["NAMESPACE"];
2870 if(ldefs.count("Namespace")>0)
2871 lfta_namespace = lfta_sq->defines["Namespace"];
2872 if(ldefs.count("namespace")>0)
2873 lfta_namespace = lfta_sq->defines["namespace"];
2875 string lfta_ht_size;
2876 if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
2877 lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
2878 if(ldefs.count("aggregate_slots")>0){
2879 lfta_ht_size = ldefs["aggregate_slots"];
2882 // NOTE : I'm assuming that visible lftas do not start with _fta.
2883 // -- will fail for non-visible simple selection queries.
2884 if(field_verifier != NULL && registration_query_names[mi].substr(0,1) != "_"){
2886 if(lfta_comment == "")
2887 warning_str += "\tcomment not found.\n";
2888 // Obsolete stuff that carsten wanted
2889 // if(lfta_title == "")
2890 // warning_str += "\ttitle not found.\n";
2891 // if(lfta_namespace == "")
2892 // warning_str += "\tnamespace not found.\n";
2894 vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2896 for(fi=0;fi<flds.size();fi++){
2897 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2899 if(warning_str != "")
2900 fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
2901 registration_query_names[mi].c_str(),warning_str.c_str());
2905 // Create qtree output
2906 fprintf(qtree_output,"\t<LFTA name='%s' >\n",registration_query_names[mi].c_str());
2907 if(lfta_comment != "")
2908 fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
2909 if(lfta_title != "")
2910 fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
2911 if(lfta_namespace != "")
2912 fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
2913 if(lfta_ht_size != "")
2914 fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
2916 fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
2918 fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
2919 fprintf(qtree_output,"\t\t<Interface value='%s' />\n",interface_names[mi].c_str());
2920 std::vector<tablevar_t *> itbls = lfta_sq->get_input_tables();
2921 for(int t=0;t<itbls.size();++t){
2922 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",itbls[t]->get_schema_name().c_str());
2924 // fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
2925 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2926 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
2927 // write info about fields to qtree.xml
2928 vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2930 for(fi=0;fi<flds.size();fi++){
2931 fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2932 if(flds[fi]->get_modifier_list()->size()){
2933 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2935 fprintf(qtree_output," />\n");
2937 fprintf(qtree_output,"\t</LFTA>\n");
2943 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2944 string liface = (*mvsi).first;
2946 " if (!strcmp(device, \""+liface+"\")) \n"
2947 " lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
2951 " if(lfta_prefilter == NULL){\n"
2952 " fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
2959 lfta_val[lmach] += "}\n\n";
2961 if(!(debug_only || hfta_only) ){
2964 lfta_flnm = lmach + "_lfta.c";
2966 lfta_flnm = hostname + "_lfta.c";
2967 if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
2968 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
2971 fprintf(lfta_out,"%s",lfta_header.c_str());
2972 fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
2973 fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
2978 // Say what are the operators which must execute
2979 if(opviews.size()>0)
2980 fprintf(stderr,"The queries use the following external operators:\n");
2981 for(i=0;i<opviews.size();++i){
2982 opview_entry *opv = opviews.get_entry(i);
2983 fprintf(stderr,"\t%s\n",opv->view_name.c_str());
2987 generate_makefile(input_file_names, nfiles, hfta_names, opviews,
2988 machine_names, schema_file_name,
2990 ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
2993 fprintf(qtree_output,"</QueryNodes>\n");
2998 ////////////////////////////////////////////////////////////
3000 void generate_makefile(vector<string> &input_file_names, int nfiles,
3001 vector<string> &hfta_names, opview_set &opviews,
3002 vector<string> &machine_names,
3003 string schema_file_name,
3004 vector<string> &interface_names,
3005 ifq_t *ifdb, string &config_dir_path,
3008 map<string, vector<int> > &rts_hload
3012 if(config_dir_path != ""){
3013 config_dir_path = "-C "+config_dir_path;
3017 bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
3018 bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
3020 // if(libz_exists && !libast_exists){
3021 // fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
3025 // Get set of operator executable files to run
3027 set<string>::iterator ssi;
3028 for(i=0;i<opviews.size();++i){
3029 opview_entry *opv = opviews.get_entry(i);
3030 if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
3033 FILE *outfl = fopen("Makefile", "w");
3035 fprintf(stderr,"Can't open Makefile for write, exiting.\n");
3040 ("CPP= g++ -O3 -g -I "+root_path+"/include -I "+root_path+"/include/hfta\n"
3041 "CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
3045 fprintf(outfl," -DLFTA_STATS");
3047 // Gather the set of interfaces
3048 // Also, gather "base interface names" for use in computing
3049 // the hash splitting to virtual interfaces.
3050 // TODO : must update to hanndle machines
3052 set<string> base_vifaces; // base interfaces of virtual interfaces
3053 map<string, string> ifmachines;
3054 map<string, string> ifattrs;
3055 for(i=0;i<interface_names.size();++i){
3056 ifaces.insert(interface_names[i]);
3057 ifmachines[interface_names[i]] = machine_names[i];
3059 size_t Xpos = interface_names[i].find_last_of("X");
3060 if(Xpos!=string::npos){
3061 string iface = interface_names[i].substr(0,Xpos);
3062 base_vifaces.insert(iface);
3064 // get interface attributes and add them to the list
3067 // Do we need to include protobuf libraries?
3068 // TODO Move to the interface library: get the libraries to include
3069 // for an interface type
3071 bool use_proto = false;
3072 bool use_bsa = false;
3073 bool use_kafka = false;
3076 for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3077 string ifnm = (*ssi);
3078 vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
3079 for(int ift_i=0;ift_i<ift.size();ift_i++){
3080 if(ift[ift_i]=="PROTO"){
3084 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "BSA", erri, err_str);
3085 for(int ift_i=0;ift_i<ift.size();ift_i++){
3086 if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
3090 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "KAFKA", erri, err_str);
3091 for(int ift_i=0;ift_i<ift.size();ift_i++){
3092 if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
3102 for(i=0;i<hfta_names.size();++i)
3103 fprintf(outfl," %s",hfta_names[i].c_str());
3107 "rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a "+root_path+"/lib/libclearinghouse.a\n"
3108 "\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
3110 fprintf(outfl,"-L. ");
3112 ("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
3114 fprintf(outfl,"-lgscppads -lpads ");
3116 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
3118 fprintf(outfl, " -lpz -lz -lbz ");
3119 if(libz_exists && libast_exists)
3120 fprintf(outfl," -last ");
3122 fprintf(outfl, " -ldll -ldl ");
3124 fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
3126 fprintf(outfl, " -lbsa_stream ");
3128 fprintf(outfl, " -lrdkafka ");
3129 fprintf(outfl," -lgscpaux");
3131 fprintf(outfl," -fprofile-arcs");
3136 "lfta.o: %s_lfta.c\n"
3137 "\t$(CC) -o lfta.o -c %s_lfta.c\n"
3139 "%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
3140 for(i=0;i<nfiles;++i)
3141 fprintf(outfl," %s",input_file_names[i].c_str());
3143 fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3145 fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3147 for(i=0;i<nfiles;++i)
3148 fprintf(outfl," %s",input_file_names[i].c_str());
3149 fprintf(outfl,"\n");
3151 for(i=0;i<hfta_names.size();++i)
3154 "\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
3157 "\t$(CPP) -o %s.o -c %s.cc\n"
3160 hfta_names[i].c_str(), hfta_names[i].c_str(),
3161 hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
3162 hfta_names[i].c_str(), hfta_names[i].c_str(),
3163 hfta_names[i].c_str(), hfta_names[i].c_str()
3168 "packet_schema.txt:\n"
3169 "\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
3171 "external_fcns.def:\n"
3172 "\tln -s "+root_path+"/cfg/external_fcns.def .\n"
3175 "\trm -rf core rts *.o %s_lfta.c external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
3176 for(i=0;i<hfta_names.size();++i)
3177 fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
3178 fprintf(outfl,"\n");
3184 // Gather the set of interfaces
3185 // TODO : must update to hanndle machines
3186 // TODO : lookup interface attributes and add them as a parameter to rts process
3187 outfl = fopen("runit", "w");
3189 fprintf(stderr,"Can't open runit for write, exiting.\n");
3197 +root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
3199 "if [ ! -f gshub.log ]\n"
3201 "\techo \"Failed to start bin/gshub.py\"\n"
3204 "ADDR=`cat gshub.log`\n"
3205 "ps opgid= $! >> gs.pids\n"
3206 "./rts $ADDR default ").c_str(), outfl);
3209 for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3210 string ifnm = (*ssi);
3211 // suppress internal _local_ interface
3212 if (ifnm == "_local_")
3214 fprintf(outfl, "%s ",ifnm.c_str());
3215 vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
3216 for(j=0;j<ifv.size();++j)
3217 fprintf(outfl, "%s ",ifv[j].c_str());
3219 fprintf(outfl, " &\n");
3220 fprintf(outfl, "echo $! >> gs.pids\n");
3221 for(i=0;i<hfta_names.size();++i)
3222 fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
3224 for(j=0;j<opviews.opview_list.size();++j){
3225 fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
3229 system("chmod +x runit");
3231 outfl = fopen("stopit", "w");
3233 fprintf(stderr,"Can't open stopit for write, exiting.\n");
3237 fprintf(outfl,"#!/bin/sh\n"
3239 "if [ ! -f gs.pids ]\n"
3243 "for pgid in `cat gs.pids`\n"
3245 "kill -TERM -$pgid\n"
3248 "for pgid in `cat gs.pids`\n"
3255 system("chmod +x stopit");
3257 //-----------------------------------------------
3259 /* For now disable support for virtual interfaces
3260 outfl = fopen("set_vinterface_hash.bat", "w");
3262 fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
3266 // The format should be determined by an entry in the ifres.xml file,
3267 // but for now hardcode the only example I have.
3268 for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
3269 if(rts_hload.count((*ssi))){
3270 string iface_name = (*ssi);
3271 string iface_number = "";
3272 for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
3273 if(isdigit(iface_name[j])){
3274 iface_number = iface_name[j];
3275 if(j>0 && isdigit(iface_name[j-1]))
3276 iface_number = iface_name[j-1] + iface_number;
3280 fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
3281 vector<int> halloc = rts_hload[iface_name];
3283 for(j=0;j<halloc.size();++j){
3286 fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
3287 prev_limit = halloc[j];
3289 fprintf(outfl,"\n");
3293 system("chmod +x set_vinterface_hash.bat");
3297 // Code for implementing a local schema
3299 table_list qpSchema;
3301 // Load the schemas of any LFTAs.
3303 for(l=0;l<hfta_nbr;++l){
3304 stream_query *sq0 = split_queries[l];
3305 table_def *td = sq0->get_output_tabledef();
3306 qpSchema.append_table(td);
3308 // load the schemas of any other ref'd tables.
3310 vector<tablevar_t *> input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
3312 for(ti=0;ti<input_tbl_names.size();++ti){
3313 int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
3315 tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
3317 fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
3320 qpSchema.append_table(Schema->get_table(tbl_ref));
3325 // Functions related to parsing.
3328 static int split_string(char *instr,char sep, char **words,int max_words){
3334 words[nwords++] = str;
3335 while( (loc = strchr(str,sep)) != NULL){
3338 if(nwords >= max_words){
3339 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
3340 nwords = max_words-1;
3342 words[nwords++] = str;