1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #include<unistd.h> // for gethostname
19 #include "parse_fta.h"
20 #include "parse_schema.h"
21 #include "parse_ext_fcns.h"
22 #include"analyze_fta.h"
23 #include"query_plan.h"
24 #include"generate_lfta_code.h"
25 #include"stream_query.h"
26 #include"generate_utils.h"
28 #include"generate_nic_code.h"
39 #include <sys/types.h>
45 // to verify that some files exist.
46 #include <sys/types.h>
49 #include "parse_partn.h"
51 #include "print_plan.h"
53 // Interface to the xml parser
56 #include"field_list.h"
58 extern int xmlParserparse(void);
59 extern FILE *xmlParserin;
60 extern int xmlParserdebug;
62 std::vector<std::string> xml_attr_vec;
63 std::vector<std::string> xml_val_vec;
64 std::string xml_a, xml_v;
65 xml_t *xml_leaves = NULL;
67 // Interface to the field list verifier
68 field_list *field_verifier = NULL;
70 #define TMPSTRLEN 1000
73 #define PATH_DELIM '/'
76 char tmp_schema_str[10000];
78 // maximum delay between two hearbeats produced
79 // by UDOP. Used when its not explicity
80 // provided in udop definition
81 #define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
83 // Default lfta hash table size, must be power of 2.
84 int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
86 // Interface to FTA definition lexer and parser ...
88 extern int FtaParserparse(void);
89 extern FILE *FtaParserin;
90 extern int FtaParserdebug;
92 fta_parse_t *fta_parse_result;
93 var_defs_t *fta_parse_defines;
97 // Interface to external function lexer and parser ...
99 extern int Ext_fcnsParserparse(void);
100 extern FILE *Ext_fcnsParserin;
101 extern int Ext_fcnsParserdebug;
103 ext_fcn_list *Ext_fcns;
106 // Interface to partition definition parser
107 extern int PartnParserparse();
108 partn_def_list_t *partn_parse_result = NULL;
116 // forward delcaration of local utility function
117 void generate_makefile(vector<string> &input_file_names, int nfiles,
118 vector<string> &hfta_names, opview_set &opviews,
119 vector<string> &machine_names,
120 string schema_file_name,
121 vector<string> &interface_names,
122 ifq_t *ifdb, string &config_dir_path,
125 map<string, vector<int> > &rts_hload
128 //static int split_string(char *instr,char sep, char **words,int max_words);
131 FILE *schema_summary_output = NULL; // query names
133 // Dump schema summary
134 void dump_summary(stream_query *str){
135 fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
137 table_def *sch = str->get_output_tabledef();
139 vector<field_entry *> flds = sch->get_fields();
141 for(f=0;f<flds.size();++f){
142 if(f>0) fprintf(schema_summary_output,"|");
143 fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
145 fprintf(schema_summary_output,"\n");
146 for(f=0;f<flds.size();++f){
147 if(f>0) fprintf(schema_summary_output,"|");
148 fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
150 fprintf(schema_summary_output,"\n");
154 string hostname; // name of current host.
156 bool generate_stats = false;
157 string root_path = "../..";
160 int main(int argc, char **argv){
161 char tmpstr[TMPSTRLEN];
165 set<int>::iterator si;
167 vector<string> registration_query_names; // for lfta.c registration
168 map<string, vector<int> > mach_query_names; // list queries of machine
169 vector<int> snap_lengths; // for lfta.c registration
170 vector<string> interface_names; // for lfta.c registration
171 vector<string> machine_names; // machine of interface
172 vector<bool> lfta_reuse_options; // for lfta.c registration
173 vector<int> lfta_liveness_timeouts; // fot qtree.xml generation
174 vector<string> hfta_names; // hfta cource code names, for
175 // creating make file.
176 vector<string> qnames; // ensure unique names
177 map<string, int> lfta_names; // keep track of unique lftas.
180 // set these to 1 to debug the parser
182 Ext_fcnsParserdebug = 0;
184 FILE *lfta_out; // lfta.c output.
185 FILE *fta_in; // input file
186 FILE *table_schemas_in; // source tables definition file
187 FILE *query_name_output; // query names
188 FILE *qtree_output; // interconnections of query nodes
190 // -------------------------------
191 // Handling of Input Arguments
192 // -------------------------------
193 char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
194 const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
195 "\t[-B] : debug only (don't create output files)\n"
196 "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
197 "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
198 "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
199 "\t[-C] : use <config directory> for definition files\n"
200 "\t[-l] : use <library directory> for library queries\n"
201 "\t[-N] : output query names in query_names.txt\n"
202 "\t[-H] : create HFTA only (no schema_file)\n"
203 "\t[-Q] : use query name for hfta suffix\n"
204 "\t[-M] : generate make file and runit, stopit scripts\n"
205 "\t[-S] : enable LFTA statistics (alters Makefile).\n"
206 "\t[-f] : Output schema summary to schema_summary.txt\n"
207 "\t[-P] : link with PADS\n"
208 "\t[-h] : override host name.\n"
209 "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
210 "\t[-R] : path to root of GS-lite\n"
213 // parameters gathered from command line processing
214 string external_fcns_path;
215 // string internal_fcn_path;
216 string config_dir_path;
217 string library_path = "./";
218 vector<string> input_file_names;
219 string schema_file_name;
220 bool debug_only = false;
221 bool hfta_only = false;
222 bool output_query_names = false;
223 bool output_schema_summary=false;
224 bool numeric_hfta_flname = true;
225 bool create_makefile = false;
226 bool distributed_mode = false;
227 bool partitioned_mode = false;
228 bool use_live_hosts_file = false;
229 bool use_pads = false;
230 bool clean_make = false;
231 int n_virtual_interfaces = 1;
234 while((chopt = getopt(argc,argv,optstr)) != -1){
240 distributed_mode = true;
243 partitioned_mode = true;
246 use_live_hosts_file = true;
250 config_dir_path = string(optarg) + string("/");
254 library_path = string(optarg) + string("/");
257 output_query_names = true;
260 numeric_hfta_flname = false;
263 if(schema_file_name == ""){
268 output_schema_summary=true;
271 create_makefile=true;
292 n_virtual_interfaces = atoi(optarg);
293 if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
294 fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
295 n_virtual_interfaces = 1;
300 fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
301 fprintf(stderr,"%s\n", usage_str);
304 fprintf(stderr, "Argument was %c\n", optopt);
305 fprintf(stderr,"Invalid arguments\n");
306 fprintf(stderr,"%s\n", usage_str);
312 for (int i = 0; i < argc; ++i) {
313 if((schema_file_name == "") && !hfta_only){
314 schema_file_name = argv[i];
316 input_file_names.push_back(argv[i]);
320 if(input_file_names.size() == 0){
321 fprintf(stderr,"%s\n", usage_str);
326 string clean_cmd = "rm Makefile hfta_*.cc";
327 int clean_ret = system(clean_cmd.c_str());
329 fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
334 nic_prop_db *npdb = new nic_prop_db(config_dir_path);
336 // Open globally used file names.
338 // prepend config directory to schema file
339 schema_file_name = config_dir_path + schema_file_name;
340 external_fcns_path = config_dir_path + string("external_fcns.def");
341 string ifx_fname = config_dir_path + string("ifres.xml");
343 // Find interface query file(s).
345 gethostname(tmpstr,TMPSTRLEN);
348 hostname_len = strlen(tmpstr);
349 string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
350 vector<string> ifq_fls;
352 ifq_fls.push_back(ifq_fname);
355 // Get the field list, if it exists
356 string flist_fl = config_dir_path + "field_list.xml";
358 if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
359 fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
360 xml_leaves = new xml_t();
361 xmlParser_setfileinput(flf_in);
362 if(xmlParserparse()){
363 fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
365 field_verifier = new field_list(xml_leaves);
370 if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
371 fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
377 if(!(debug_only || hfta_only)){
378 if((lfta_out = fopen("lfta.c","w")) == NULL){
379 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
385 // Get the output specification file.
387 // query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
388 string ospec_fl = "output_spec.cfg";
390 vector<ospec_str *> output_specs;
391 multimap<string, int> qname_to_ospec;
392 if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
395 while(fgets(tmpstr,TMPSTRLEN,osp_in)){
397 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
399 // make operator type lowercase
401 for(tmpc=flds[1];*tmpc!='\0';++tmpc)
402 *tmpc = tolower(*tmpc);
404 ospec_str *tmp_ospec = new ospec_str();
405 tmp_ospec->query = flds[0];
406 tmp_ospec->operator_type = flds[1];
407 tmp_ospec->operator_param = flds[2];
408 tmp_ospec->output_directory = flds[3];
409 tmp_ospec->bucketwidth = atoi(flds[4]);
410 tmp_ospec->partitioning_flds = flds[5];
411 tmp_ospec->n_partitions = atoi(flds[6]);
412 qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
413 output_specs.push_back(tmp_ospec);
415 fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
420 fprintf(stderr,"output_spec.cfg not found. The query set has no output. exiting.\n");
425 string pspec_fl = "hfta_parallelism.cfg";
427 map<string, int> hfta_parallelism;
428 if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
431 while(fgets(tmpstr,TMPSTRLEN,psp_in)){
432 bool good_entry = true;
434 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
436 string hname = flds[0];
437 int par = atoi(flds[1]);
438 if(par <= 0 || par > n_virtual_interfaces){
439 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
442 if(good_entry && n_virtual_interfaces % par != 0){
443 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
447 hfta_parallelism[hname] = par;
451 fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
455 // LFTA hash table sizes
456 string htspec_fl = "lfta_htsize.cfg";
457 FILE *htsp_in = NULL;
458 map<string, int> lfta_htsize;
459 if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
462 while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
463 bool good_entry = true;
465 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
467 string lfta_name = flds[0];
468 int htsz = atoi(flds[1]);
470 lfta_htsize[lfta_name] = htsz;
472 fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
477 fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
480 // LFTA vitual interface hash split
481 string rtlspec_fl = "rts_load.cfg";
483 map<string, vector<int> > rts_hload;
484 if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
489 while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
490 bool good_entry = true;
494 int nflds = split_string(tmpstr,',',flds,MAXFLDS);
496 iface_name = flds[0];
499 for(j=1;j<nflds;++j){
500 int h = atoi(flds[j]);
504 hload.push_back(cumm_h);
510 rts_hload[iface_name] = hload;
512 fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
519 if(output_query_names){
520 if((query_name_output = fopen("query_names.txt","w")) == NULL){
521 fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
526 if(output_schema_summary){
527 if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
528 fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
533 if((qtree_output = fopen("qtree.xml","w")) == NULL){
534 fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
537 fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
538 fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
539 fprintf(qtree_output,"<QueryNodes>\n");
542 // Get an initial Schema
545 // Parse the table schema definitions.
546 fta_parse_result = new fta_parse_t();
547 FtaParser_setfileinput(table_schemas_in);
548 if(FtaParserparse()){
549 fprintf(stderr,"Table schema parse failed.\n");
552 if(fta_parse_result->parse_type != TABLE_PARSE){
553 fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
556 Schema = fta_parse_result->tables;
558 // Ensure that all schema_ids, if set, are distinct.
559 // Obsolete? There is code elsewhere to ensure that schema IDs are
560 // distinct on a per-interface basis.
564 for(int t=0;t<Schema->size();++t){
565 int sch_id = Schema->get_table(t)->get_schema_id();
567 if(found_ids.find(sch_id) != found_ids.end()){
568 dup_ids.insert(sch_id);
570 found_ids.insert(sch_id);
573 if(dup_ids.size()>0){
574 fprintf(stderr, "Error, the schema has duplicate schema_ids:");
575 for(auto dit=dup_ids.begin();dit!=dup_ids.end(); ++dit)
576 fprintf(stderr," %d",(*dit));
577 fprintf(stderr,"\n");
584 // Process schema field inheritance
586 retval = Schema->unroll_tables(err_str);
588 fprintf(stderr,"Error processing schema field inheritance:\n %s\n", err_str.c_str() );
592 // hfta only => we will try to fetch schemas from the registry.
593 // therefore, start off with an empty schema.
594 Schema = new table_list();
598 // Open and parse the external functions file.
599 Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
600 if(Ext_fcnsParserin == NULL){
601 fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
602 Ext_fcns = new ext_fcn_list();
604 if(Ext_fcnsParserparse()){
605 fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
606 Ext_fcns = new ext_fcn_list();
609 if(Ext_fcns->validate_fcns(err_str)){
610 fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
614 // Open and parse the interface resources file.
615 // ifq_t *ifaces_db = new ifq_t();
617 // if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
618 // fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
619 // ifx_fname.c_str(), ierr.c_str());
622 // if(ifaces_db->load_ifqs(ifq_fname, ierr)){
623 // fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
624 // ifq_fname.c_str(), ierr.c_str());
629 // The LFTA code string.
630 // Put the standard preamble here.
631 // NOTE: the hash macros, fcns should go into the run time
632 map<string, string> lfta_val;
633 map<string, string> lfta_prefilter_val;
636 "#include <limits.h>\n"
637 "#include \"rts.h\"\n"
638 "#include \"fta.h\"\n"
639 "#include \"lapp.h\"\n"
640 "#include \"rts_udaf.h\"\n"
641 "#include<stdio.h>\n"
642 "#include <float.h>\n"
643 "#include \"rdtsc.h\"\n"
644 "#include \"watchlist.h\"\n\n"
647 // Get any locally defined parsing headers
649 memset(&glob_result, 0, sizeof(glob_result));
651 // do the glob operation TODO should be from GSROOT
652 int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
653 if(return_value == 0){
655 for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
657 int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
658 lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n";
662 fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
666 "#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
667 "#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
668 "#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
669 "#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
674 "gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
676 "#define SLOT_FILLED 0x04\n"
677 "#define SLOT_GEN_BITS 0x03\n"
678 "#define SLOT_HASH_BITS 0xfffffff8\n"
679 "#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
680 "#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
681 "#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
684 "#define lfta_BOOL_to_hash(x) (x)\n"
685 "#define lfta_USHORT_to_hash(x) (x)\n"
686 "#define lfta_UINT_to_hash(x) (x)\n"
687 "#define lfta_IP_to_hash(x) (x)\n"
688 "#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
689 "#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
690 "#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
691 "#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
692 "#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
693 "static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
694 " gs_uint32_t i,ret=0,tmp_sum = 0;\n"
695 " for(i=0;i<x.length;++i){\n"
696 " tmp_sum |= (x.data[i]) << (8*(i%4));\n"
702 " if((i%4)!=0) ret ^=tmp_sum;\n"
708 //////////////////////////////////////////////////////////////////
709 ///// Get all of the query parse trees
713 int hfta_count = 0; // for numeric suffixes to hfta .cc files
715 //---------------------------
716 // Global info needed for post processing.
718 // Set of operator views ref'd in the query set.
720 // lftas on a per-machine basis.
721 map<string, vector<stream_query *> > lfta_mach_lists;
722 int nfiles = input_file_names.size();
723 vector<stream_query *> hfta_list; // list of hftas.
724 map<string, stream_query *> sq_map; // map from query name to stream query.
727 //////////////////////////////////////////
729 // Open and parse the interface resources file.
730 ifq_t *ifaces_db = new ifq_t();
732 if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
733 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
734 ifx_fname.c_str(), ierr.c_str());
737 if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
738 fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
739 ifq_fls[0].c_str(), ierr.c_str());
743 map<string, string> qname_to_flname; // for detecting duplicate query names
747 // Parse the files to create a vector of parse trees.
748 // Load qnodes with information to perform a topo sort
749 // based on query dependencies.
750 vector<query_node *> qnodes; // for topo sort.
751 map<string,int> name_node_map; // map query name to qnodes entry
752 for(i=0;i<input_file_names.size();i++){
754 if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
755 fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
758 fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
760 // Parse the FTA query
761 fta_parse_result = new fta_parse_t();
762 FtaParser_setfileinput(fta_in);
763 if(FtaParserparse()){
764 fprintf(stderr,"FTA parse failed.\n");
767 if(fta_parse_result->parse_type != QUERY_PARSE){
768 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
772 // returns a list of parse trees
773 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
774 for(p=0;p<qlist.size();++p){
775 table_exp_t *fta_parse_tree = qlist[p];
776 // query_parse_trees.push_back(fta_parse_tree);
778 // compute the default name -- extract from query name
779 strcpy(tmpstr,input_file_names[i].c_str());
780 char *qname = strrchr(tmpstr,PATH_DELIM);
785 char *qname_end = strchr(qname,'.');
786 if(qname_end != NULL) *qname_end = '\0';
787 string qname_str = qname;
788 string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
790 // Deternmine visibility. Should I be attaching all of the output methods?
791 if(qname_to_ospec.count(imputed_qname)>0)
792 fta_parse_tree->set_visible(true);
794 fta_parse_tree->set_visible(false);
797 // Create a manipulable repesentation of the parse tree.
798 // the qnode inherits the visibility assigned to the parse tree.
799 int pos = qnodes.size();
800 qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
801 name_node_map[ qnodes[pos]->name ] = pos;
802 //printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
803 // qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
804 // qfiles.push_back(i);
806 // Check for duplicate query names
807 // NOTE : in hfta-only generation, I should
808 // also check with the names of the registered queries.
809 if(qname_to_flname.count(qnodes[pos]->name) > 0){
810 fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
811 qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
814 if(Schema->find_tbl(qnodes[pos]->name) >= 0){
815 fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
816 qnodes[pos]->name.c_str(), input_file_names[i].c_str());
819 qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
825 // Add the library queries
828 for(pos=0;pos<qnodes.size();++pos){
830 for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
831 string src_tbl = qnodes[pos]->refd_tbls[fi];
832 if(qname_to_flname.count(src_tbl) == 0){
833 int last_sep = src_tbl.find_last_of('/');
834 if(last_sep != string::npos){
835 fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
836 string target_qname = src_tbl.substr(last_sep+1);
837 string qpathname = library_path + src_tbl + ".gsql";
838 if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
839 fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
841 fprintf(stderr,"After exit\n");
843 fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
844 // Parse the FTA query
845 fta_parse_result = new fta_parse_t();
846 FtaParser_setfileinput(fta_in);
847 if(FtaParserparse()){
848 fprintf(stderr,"FTA parse failed.\n");
851 if(fta_parse_result->parse_type != QUERY_PARSE){
852 fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
856 map<string, int> local_query_map;
857 vector<string> local_query_names;
858 vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
859 for(p=0;p<qlist.size();++p){
860 table_exp_t *fta_parse_tree = qlist[p];
861 fta_parse_tree->set_visible(false); // assumed to not produce output
862 string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
863 if(imputed_qname == target_qname)
864 imputed_qname = src_tbl;
865 if(local_query_map.count(imputed_qname)>0){
866 fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
869 local_query_map[ imputed_qname ] = p;
870 local_query_names.push_back(imputed_qname);
873 if(local_query_map.count(src_tbl)==0){
874 fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
878 vector<int> worklist;
879 set<int> added_queries;
880 vector<query_node *> new_qnodes;
881 worklist.push_back(local_query_map[target_qname]);
882 added_queries.insert(local_query_map[target_qname]);
884 int qpos = qnodes.size();
885 for(qq=0;qq<worklist.size();++qq){
886 int q_id = worklist[qq];
887 query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
888 new_qnodes.push_back( new_qnode);
889 vector<string> refd_tbls = new_qnode->refd_tbls;
891 for(ff = 0;ff<refd_tbls.size();++ff){
892 if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
894 if(name_node_map.count(refd_tbls[ff])>0){
895 fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s, and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
898 worklist.push_back(local_query_map[refd_tbls[ff]]);
904 for(qq=0;qq<new_qnodes.size();++qq){
905 int qpos = qnodes.size();
906 qnodes.push_back(new_qnodes[qq]);
907 name_node_map[qnodes[qpos]->name ] = qpos;
908 qname_to_flname[qnodes[qpos]->name ] = qpathname;
922 //---------------------------------------
927 string udop_missing_sources;
928 for(i=0;i<qnodes.size();++i){
930 for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
931 int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
933 if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
934 if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
935 int pos = qnodes.size();
936 qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
937 name_node_map[ qnodes[pos]->name ] = pos;
938 qnodes[pos]->is_externally_visible = false; // its visible
939 // Need to mark the source queries as visible.
941 string missing_sources = "";
942 for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
943 string src_tbl = qnodes[pos]->refd_tbls[si];
944 if(name_node_map.count(src_tbl)==0){
945 missing_sources += src_tbl + " ";
948 if(missing_sources != ""){
949 udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
956 if(udop_missing_sources != ""){
957 fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
963 ////////////////////////////////////////////////////////////////////
964 /// Check parse trees to verify that some
965 /// global properties are met :
966 /// if q1 reads from q2, then
967 /// q2 is processed before q1
968 /// q1 can supply q2's parameters
969 /// Verify there is no cycle in the reads-from graph.
971 // Compute an order in which to process the
974 // Start by building the reads-from lists.
977 for(i=0;i<qnodes.size();++i){
979 vector<string> refd_tbls = qnodes[i]->refd_tbls;
980 for(fi = 0;fi<refd_tbls.size();++fi){
981 if(name_node_map.count(refd_tbls[fi])>0){
982 //printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
983 (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
989 // If one query reads the result of another,
990 // check for parameter compatibility. Currently it must
991 // be an exact match. I will move to requiring
992 // containment after re-ordering, but will require
993 // some analysis for code generation which is not
995 //printf("There are %d query nodes.\n",qnodes.size());
998 for(i=0;i<qnodes.size();++i){
999 vector<var_pair_t *> target_params = qnodes[i]->params;
1000 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1001 vector<var_pair_t *> source_params = qnodes[(*si)]->params;
1002 if(target_params.size() != source_params.size()){
1003 fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1007 for(p=0;p<target_params.size();++p){
1008 if(! (target_params[p]->name == source_params[p]->name &&
1009 target_params[p]->val == source_params[p]->val ) ){
1010 fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
1019 // Start by counting inedges.
1020 for(i=0;i<qnodes.size();++i){
1021 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1022 qnodes[(*si)]->n_consumers++;
1026 // The roots are the nodes with indegree zero.
1028 for(i=0;i<qnodes.size();++i){
1029 if(qnodes[i]->n_consumers == 0){
1030 if(qnodes[i]->is_externally_visible == false){
1031 fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible. Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
1037 // Remove the parts of the subtree that produce no output.
1038 set<int> valid_roots;
1039 set<int> discarded_nodes;
1040 set<int> candidates;
1041 while(roots.size() >0){
1042 for(si=roots.begin();si!=roots.end();++si){
1043 if(qnodes[(*si)]->is_externally_visible){
1044 valid_roots.insert((*si));
1046 discarded_nodes.insert((*si));
1047 set<int>::iterator sir;
1048 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1049 qnodes[(*sir)]->n_consumers--;
1050 if(qnodes[(*sir)]->n_consumers == 0)
1051 candidates.insert( (*sir));
1058 roots = valid_roots;
1059 if(discarded_nodes.size()>0){
1060 fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
1062 for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
1063 if(di>0 && (di%8)==0) fprintf(stderr,"\n");
1065 fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
1067 fprintf(stderr,"\n");
1070 // Compute the sources_to set, ignoring discarded nodes.
1071 for(i=0;i<qnodes.size();++i){
1072 if(discarded_nodes.count(i)==0)
1073 for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
1074 qnodes[(*si)]->sources_to.insert(i);
1079 // Find the nodes that are shared by multiple visible subtrees.
1080 // THe roots become inferred visible nodes.
1082 // Find the visible nodes.
1083 vector<int> visible_nodes;
1084 for(i=0;i<qnodes.size();i++){
1085 if(qnodes[i]->is_externally_visible){
1086 visible_nodes.push_back(i);
1090 // Find UDOPs referenced by visible nodes.
1092 for(i=0;i<visible_nodes.size();++i){
1093 workq.push_back(visible_nodes[i]);
1095 while(!workq.empty()){
1096 int node = workq.front();
1098 set<int>::iterator children;
1099 if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
1100 qnodes[node]->is_externally_visible = true;
1101 visible_nodes.push_back(node);
1102 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1103 if(qnodes[(*children)]->is_externally_visible == false){
1104 qnodes[(*children)]->is_externally_visible = true;
1105 visible_nodes.push_back((*children));
1109 for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
1110 workq.push_back((*children));
1117 for(i=0;i<qnodes.size();i++){
1118 qnodes[i]->subtree_roots.clear();
1121 // Walk the tree defined by a visible node, not descending into
1122 // subtrees rooted by a visible node. Mark the node visited with
1123 // the visible node ID.
1124 for(i=0;i<visible_nodes.size();++i){
1126 vroots.insert(visible_nodes[i]);
1127 while(vroots.size()>0){
1128 for(si=vroots.begin();si!=vroots.end();++si){
1129 qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
1131 set<int>::iterator sir;
1132 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1133 if(! qnodes[(*sir)]->is_externally_visible){
1134 candidates.insert( (*sir));
1138 vroots = candidates;
1142 // Find the nodes in multiple visible node subtrees, but with no parent
1143 // that has is in multile visible node subtrees. Mark these as inferred visible nodes.
1144 done = true; // until proven otherwise
1145 for(i=0;i<qnodes.size();i++){
1146 if(qnodes[i]->subtree_roots.size()>1){
1147 bool is_new_root = true;
1148 set<int>::iterator sir;
1149 for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
1150 if(qnodes[(*sir)]->subtree_roots.size()>1)
1151 is_new_root = false;
1154 qnodes[i]->is_externally_visible = true;
1155 qnodes[i]->inferred_visible_node = true;
1156 visible_nodes.push_back(i);
1167 // get visible nodes in topo ordering.
1168 // for(i=0;i<qnodes.size();i++){
1169 // qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
1171 vector<int> process_order;
1172 while(roots.size() >0){
1173 for(si=roots.begin();si!=roots.end();++si){
1174 if(discarded_nodes.count((*si))==0){
1175 process_order.push_back( (*si) );
1177 set<int>::iterator sir;
1178 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1179 qnodes[(*sir)]->n_consumers--;
1180 if(qnodes[(*sir)]->n_consumers == 0)
1181 candidates.insert( (*sir));
1189 //printf("process_order.size() =%d\n",process_order.size());
1191 // Search for cyclic dependencies
1193 for(i=0;i<qnodes.size();++i){
1194 if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
1195 if(found_dep.size() != 0) found_dep += ", ";
1196 found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
1199 if(found_dep.size()>0){
1200 fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
1204 // Get a list of query sets, in the order to be processed.
1205 // Start at visible root and do bfs.
1206 // The query set includes queries referenced indirectly,
1207 // as sources for user-defined operators. These are needed
1208 // to ensure that they are added to the schema, but are not part
1209 // of the query tree.
1211 // stream_node_sets contains queries reachable only through the
1212 // FROM clause, so I can tell which queries to add to the stream
1213 // query. (DISABLED, UDOPS are integrated, does this cause problems?)
1215 // NOTE: this code works because in order for data to be
1216 // read by multiple hftas, the node must be externally visible.
1217 // But visible nodes define roots of process sets.
1218 // internally visible nodes can feed data only
1219 // to other nodes in the same query file.
1220 // Therefore, any access can be restricted to a file,
1221 // hfta output sharing is done only on roots
1222 // never on interior nodes.
1227 // Conpute the base collection of hftas.
1228 vector<hfta_node *> hfta_sets;
1229 map<string, int> hfta_name_map;
1230 // vector< vector<int> > process_sets;
1231 // vector< set<int> > stream_node_sets;
1232 reverse(process_order.begin(), process_order.end()); // get listing in reverse order.
1233 // i.e. process leaves 1st.
1234 for(i=0;i<process_order.size();++i){
1235 if(qnodes[process_order[i]]->is_externally_visible == true){
1236 //printf("Visible.\n");
1237 int root = process_order[i];
1238 hfta_node *hnode = new hfta_node();
1239 hnode->name = qnodes[root]-> name;
1240 hnode->source_name = qnodes[root]-> name;
1241 hnode->is_udop = qnodes[root]->is_udop;
1242 hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
1244 vector<int> proc_list; proc_list.push_back(root);
1245 // Ensure that nodes are added only once.
1246 set<int> proc_set; proc_set.insert(root);
1247 roots.clear(); roots.insert(root);
1249 while(roots.size()>0){
1250 for(si=roots.begin();si!=roots.end();++si){
1251 //printf("Processing root %d\n",(*si));
1252 set<int>::iterator sir;
1253 for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
1254 //printf("reads fom %d\n",(*sir));
1255 if(qnodes[(*sir)]->is_externally_visible==false){
1256 candidates.insert( (*sir) );
1257 if(proc_set.count( (*sir) )==0){
1258 proc_set.insert( (*sir) );
1259 proc_list.push_back( (*sir) );
1268 reverse(proc_list.begin(), proc_list.end());
1269 hnode->query_node_indices = proc_list;
1270 hfta_name_map[hnode->name] = hfta_sets.size();
1271 hfta_sets.push_back(hnode);
1275 // Compute the reads_from / sources_to graphs for the hftas.
1277 for(i=0;i<hfta_sets.size();++i){
1278 hfta_node *hnode = hfta_sets[i];
1279 for(q=0;q<hnode->query_node_indices.size();q++){
1280 query_node *qnode = qnodes[hnode->query_node_indices[q]];
1281 for(s=0;s<qnode->refd_tbls.size();++s){
1282 if(hfta_name_map.count(qnode->refd_tbls[s])){
1283 int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
1284 hnode->reads_from.insert(other_hfta);
1285 hfta_sets[other_hfta]->sources_to.insert(i);
1291 // Compute a topological sort of the hfta_sets.
1293 vector<int> hfta_topsort;
1295 int hnode_srcs[hfta_sets.size()];
1296 for(i=0;i<hfta_sets.size();++i){
1298 if(hfta_sets[i]->sources_to.size() == 0)
1302 while(! workq.empty()){
1303 int node = workq.front();
1305 hfta_topsort.push_back(node);
1306 set<int>::iterator stsi;
1307 for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
1308 int parent = (*stsi);
1309 hnode_srcs[parent]++;
1310 if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
1311 workq.push_back(parent);
1316 // Decorate hfta nodes with the level of parallelism given as input.
1318 map<string, int>::iterator msii;
1319 for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1320 string hfta_name = (*msii).first;
1321 int par = (*msii).second;
1322 if(hfta_name_map.count(hfta_name) > 0){
1323 hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
1325 fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
1329 // Propagate levels of parallelism: children should have a level of parallelism
1330 // as large as any of its parents. Adjust children upwards to compensate.
1331 // Start at parents and adjust children, auto-propagation will occur.
1333 for(i=hfta_sets.size()-1;i>=0;i--){
1334 set<int>::iterator stsi;
1335 for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
1336 if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
1337 hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
1342 // Before all the name mangling, check if therey are any output_spec.cfg
1343 // or hfta_parallelism.cfg entries that do not have a matching query.
1345 string dangling_ospecs = "";
1346 for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
1347 string oq = (*msii).first;
1348 if(hfta_name_map.count(oq) == 0){
1349 dangling_ospecs += " "+(*msii).first;
1352 if(dangling_ospecs!=""){
1353 fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
1356 string dangling_par = "";
1357 for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
1358 string oq = (*msii).first;
1359 if(hfta_name_map.count(oq) == 0){
1360 dangling_par += " "+(*msii).first;
1363 if(dangling_par!=""){
1364 fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
1369 // Replicate parallelized hftas. Do __copyX name mangling. Adjust
1370 // FROM clauses: retarget any name which is an internal node, and
1371 // any which is in hfta_sets (and which is parallelized). Add Merge nodes
1372 // when the source hfta has more parallelism than the target node.
1373 // Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
1376 int n_original_hfta_sets = hfta_sets.size();
1377 for(i=0;i<n_original_hfta_sets;++i){
1378 if(hfta_sets[i]->n_parallel > 1){
1379 hfta_sets[i]->do_generation =false; // set the deletion flag for this entry.
1380 set<string> local_nodes; // names of query nodes in the hfta.
1381 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1382 local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
1385 for(p=0;p<hfta_sets[i]->n_parallel;++p){
1386 string mangler = "__copy"+int_to_string(p);
1387 hfta_node *par_hfta = new hfta_node();
1388 par_hfta->name = hfta_sets[i]->name + mangler;
1389 par_hfta->source_name = hfta_sets[i]->name;
1390 par_hfta->is_udop = hfta_sets[i]->is_udop;
1391 par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
1392 par_hfta->n_parallel = hfta_sets[i]->n_parallel;
1393 par_hfta->parallel_idx = p;
1395 map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
1398 if(hfta_sets[i]->is_udop){
1399 int root = hfta_sets[i]->query_node_indices[0];
1401 string unequal_par_sources;
1402 set<int>::iterator rfsii;
1403 for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
1404 if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
1405 unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
1408 if(unequal_par_sources != ""){
1409 fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
1414 vector<string> new_sources;
1415 for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
1416 new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
1419 query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
1420 new_qn->name += mangler;
1421 new_qn->mangler = mangler;
1422 new_qn->refd_tbls = new_sources;
1423 par_hfta->query_node_indices.push_back(qnodes.size());
1424 par_qnode_map[new_qn->name] = qnodes.size();
1425 name_node_map[ new_qn->name ] = qnodes.size();
1426 qnodes.push_back(new_qn);
1428 // regular query node
1429 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1430 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1431 table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
1432 // rehome the from clause on mangled names.
1433 // create merge nodes as needed for external sources.
1434 for(f=0;f<dup_pt->fm->tlist.size();++f){
1435 if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
1436 dup_pt->fm->tlist[f]->schema_name += mangler;
1437 }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
1438 // Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
1439 int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
1440 if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
1441 dup_pt->fm->tlist[f]->schema_name += mangler;
1443 vector<string> src_tbls;
1444 int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
1446 fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
1449 for(s=0;s<stride;++s){
1450 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
1451 src_tbls.push_back(ext_src_name);
1453 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1454 string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
1455 dup_pt->fm->tlist[f]->schema_name = merge_node_name;
1456 // Make a qnode to represent the new merge node
1457 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1458 qn_pt->refd_tbls = src_tbls;
1459 qn_pt->is_udop = false;
1460 qn_pt->is_externally_visible = false;
1461 qn_pt->inferred_visible_node = false;
1462 par_hfta->query_node_indices.push_back(qnodes.size());
1463 par_qnode_map[merge_node_name] = qnodes.size();
1464 name_node_map[ merge_node_name ] = qnodes.size();
1465 qnodes.push_back(qn_pt);
1469 query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
1470 for(f=0;f<dup_pt->fm->tlist.size();++f){
1471 new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
1473 new_qn->params = qnodes[hqn_idx]->params;
1474 new_qn->is_udop = false;
1475 new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
1476 new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
1477 par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
1478 par_qnode_map[new_qn->name] = qnodes.size();
1479 name_node_map[ new_qn->name ] = qnodes.size();
1480 qnodes.push_back(new_qn);
1483 hfta_name_map[par_hfta->name] = hfta_sets.size();
1484 hfta_sets.push_back(par_hfta);
1487 // This hfta isn't being parallelized, but add merge nodes for any parallelized
1489 if(!hfta_sets[i]->is_udop){
1490 for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
1491 int hqn_idx = hfta_sets[i]->query_node_indices[h];
1492 for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
1493 if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
1494 // Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
1495 int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
1496 if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
1497 vector<string> src_tbls;
1498 for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
1499 string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
1500 src_tbls.push_back(ext_src_name);
1502 table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
1503 string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
1504 qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
1505 // Make a qnode to represent the new merge node
1506 query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
1507 qn_pt->refd_tbls = src_tbls;
1508 qn_pt->is_udop = false;
1509 qn_pt->is_externally_visible = false;
1510 qn_pt->inferred_visible_node = false;
1511 hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
1512 name_node_map[ merge_node_name ] = qnodes.size();
1513 qnodes.push_back(qn_pt);
1522 // Rebuild the reads_from / sources_to lists in the qnodes
1523 for(q=0;q<qnodes.size();++q){
1524 qnodes[q]->reads_from.clear();
1525 qnodes[q]->sources_to.clear();
1527 for(q=0;q<qnodes.size();++q){
1528 for(s=0;s<qnodes[q]->refd_tbls.size();++s){
1529 if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
1530 int rf = name_node_map[qnodes[q]->refd_tbls[s]];
1531 qnodes[q]->reads_from.insert(rf);
1532 qnodes[rf]->sources_to.insert(q);
1537 // Rebuild the reads_from / sources_to lists in hfta_sets
1538 for(q=0;q<hfta_sets.size();++q){
1539 hfta_sets[q]->reads_from.clear();
1540 hfta_sets[q]->sources_to.clear();
1542 for(q=0;q<hfta_sets.size();++q){
1543 for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
1544 int node = hfta_sets[q]->query_node_indices[s];
1545 set<int>::iterator rfsii;
1546 for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
1547 if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
1548 hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
1549 hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
1556 for(q=0;q<qnodes.size();++q){
1557 printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
1558 set<int>::iterator rsii;
1559 for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
1560 printf(" %d",(*rsii));
1561 printf(", and sources-to %d:",qnodes[q]->sources_to.size());
1562 for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
1563 printf(" %d",(*rsii));
1567 for(q=0;q<hfta_sets.size();++q){
1568 if(hfta_sets[q]->do_generation==false)
1570 printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
1571 set<int>::iterator rsii;
1572 for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
1573 printf(" %d",(*rsii));
1574 printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
1575 for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
1576 printf(" %d",(*rsii));
1583 // Re-topo sort the hftas
1584 hfta_topsort.clear();
1586 int hnode_srcs_2[hfta_sets.size()];
1587 for(i=0;i<hfta_sets.size();++i){
1588 hnode_srcs_2[i] = 0;
1589 if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
1594 while(workq.empty() == false){
1595 int node = workq.front();
1597 hfta_topsort.push_back(node);
1598 set<int>::iterator stsii;
1599 for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
1600 int child = (*stsii);
1601 hnode_srcs_2[child]++;
1602 if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
1603 workq.push_back(child);
1608 // Ensure that all of the query_node_indices in hfta_sets are topologically
1609 // sorted, don't rely on assumptions that all transforms maintain some kind of order.
1610 for(i=0;i<hfta_sets.size();++i){
1611 if(hfta_sets[i]->do_generation){
1612 map<int,int> n_accounted;
1613 vector<int> new_order;
1615 vector<int>::iterator vii;
1616 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1617 n_accounted[(*vii)]= 0;
1619 for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
1620 set<int>::iterator rfsii;
1621 for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
1622 if(n_accounted.count((*rfsii)) == 0){
1623 n_accounted[(*vii)]++;
1626 if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
1627 workq.push_back((*vii));
1631 while(workq.empty() == false){
1632 int node = workq.front();
1634 new_order.push_back(node);
1635 set<int>::iterator stsii;
1636 for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
1637 if(n_accounted.count((*stsii))){
1638 n_accounted[(*stsii)]++;
1639 if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
1640 workq.push_back((*stsii));
1645 hfta_sets[i]->query_node_indices = new_order;
1653 /// Global checkng is done, start the analysis and translation
1654 /// of the query parse tree in the order specified by process_order
1657 // Get a list of the LFTAs for global lfta optimization
1658 // TODO: separate building operators from spliting lftas,
1659 // that will make optimizations such as predicate pushing easier.
1660 vector<stream_query *> lfta_list;
1661 stream_query *rootq;
1664 map<string, map<int, string> > schema_of_schemaid; // ensure that schema IDs are unique to an <interface, schema>
1666 for(qi=hfta_topsort.size()-1;qi>=0;--qi){
1668 int hfta_id = hfta_topsort[qi];
1669 vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
1673 // Two possibilities, either its a UDOP, or its a collection of queries.
1674 // if(qnodes[curr_list.back()]->is_udop)
1675 if(hfta_sets[hfta_id]->is_udop){
1676 int node_id = curr_list.back();
1677 int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
1678 opview_entry *opv = new opview_entry();
1680 // Many of the UDOP properties aren't currently used.
1681 opv->parent_qname = "no_parent";
1682 opv->root_name = qnodes[node_id]->name;
1683 opv->view_name = qnodes[node_id]->file;
1685 sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
1686 opv->udop_alias = tmpstr;
1687 opv->mangler = qnodes[node_id]->mangler;
1689 if(opv->mangler != ""){
1690 int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
1691 Schema->mangle_subq_names(new_udop_schref,opv->mangler);
1694 // This piece of code makes each hfta which referes to the same udop
1695 // reference a distinct running udop. Do this at query optimization time?
1696 // fmtbl->set_udop_alias(opv->udop_alias);
1698 opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
1699 opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
1701 vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
1703 for(s=0;s<subq.size();++s){
1704 // Validate that the fields match.
1705 subquery_spec *sqs = subq[s];
1706 string subq_name = sqs->name + opv->mangler;
1707 vector<field_entry *> flds = Schema->get_fields(subq_name);
1708 if(flds.size() == 0){
1709 fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
1712 if(flds.size() < sqs->types.size()){
1713 fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
1716 bool failed = false;
1717 for(f=0;f<sqs->types.size();++f){
1718 data_type dte(sqs->types[f],sqs->modifiers[f]);
1719 data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
1720 if(! dte.subsumes_type(&dtf) ){
1721 fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
1725 if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
1726 string pstr = dte.get_temporal_string();
1727 fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
1734 /// Validation done, find the subquery, make a copy of the
1735 /// parse tree, and add it to the return list.
1736 for(q=0;q<qnodes.size();++q)
1737 if(qnodes[q]->name == subq_name)
1739 if(q==qnodes.size()){
1740 fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
1746 // Cross-link to from entry(s) in all sourced-to tables.
1747 set<int>::iterator sii;
1748 for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
1749 //printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
1750 vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
1752 for(ii=0;ii<tblvars.size();++ii){
1753 if(tblvars[ii]->schema_name == opv->root_name){
1754 tblvars[ii]->set_opview_idx(opviews.size());
1760 opviews.append(opv);
1763 // Analyze the parse trees in this query,
1764 // put them in rootq
1765 // vector<int> curr_list = process_sets[qi];
1768 ////////////////////////////////////////
1771 //printf("Process set %d, has %d queries\n",qi,curr_list.size());
1772 for(qj=0;qj<curr_list.size();++qj){
1774 fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
1776 // Select the current query parse tree
1777 table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
1779 // if hfta only, try to fetch any missing schemas
1780 // from the registry (using the print_schema program).
1781 // Here I use a hack to avoid analyzing the query -- all referenced
1782 // tables must be in the from clause
1783 // If there is a problem loading any table, just issue a warning,
1785 tablevar_list_t *fm = fta_parse_tree->get_from();
1786 vector<string> refd_tbls = fm->get_src_tbls(Schema);
1787 // iterate over all referenced tables
1789 for(t=0;t<refd_tbls.size();++t){
1790 int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
1792 if(tbl_ref < 0){ // if this table is not in the Schema
1795 string cmd="print_schema "+refd_tbls[t];
1796 FILE *schema_in = popen(cmd.c_str(), "r");
1797 if(schema_in == NULL){
1798 fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
1800 string schema_instr;
1801 while(fgets(tmpstr,TMPSTRLEN,schema_in)){
1802 schema_instr += tmpstr;
1804 fta_parse_result = new fta_parse_t();
1805 strcpy(tmp_schema_str,schema_instr.c_str());
1806 FtaParser_setstringinput(tmp_schema_str);
1807 if(FtaParserparse()){
1808 fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
1810 if( fta_parse_result->tables != NULL){
1812 for(tl=0;tl<fta_parse_result->tables->size();++tl){
1813 Schema->add_table(fta_parse_result->tables->get_table(tl));
1816 fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
1821 fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
1829 // Analyze the query.
1830 query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
1832 fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
1836 stream_query new_sq(qs, Schema);
1837 if(new_sq.error_code){
1838 fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
1842 // Add it to the Schema
1843 table_def *output_td = new_sq.get_output_tabledef();
1844 Schema->add_table(output_td);
1846 // Create a query plan from the analyzed parse tree.
1847 // If its a query referneced via FROM, add it to the stream query.
1849 rootq->add_query(new_sq);
1851 rootq = new stream_query(new_sq);
1852 // have the stream query object inherit properties form the analyzed
1853 // hfta_node object.
1854 rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
1855 rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
1861 // This stream query has all its parts
1862 // Build and optimize it.
1863 //printf("translate_fta: generating plan.\n");
1864 if(rootq->generate_plan(Schema)){
1865 fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
1869 // If we've found the query plan head, so now add the output operators
1870 if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
1871 pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
1872 multimap<string, int>::iterator mmsi;
1873 oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
1874 for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
1875 rootq->add_output_operator(output_specs[(*mmsi).second]);
1881 // Perform query splitting if necessary.
1883 vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
1886 //for(l=0;l<split_queries.size();++l){
1887 //printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
1893 if(split_queries.size() > 0){ // should be at least one component.
1895 // Compute the number of LFTAs.
1896 int n_lfta = split_queries.size();
1897 if(hfta_returned) n_lfta--;
1898 // Check if a schemaId constraint needs to be inserted.
1900 // Process the LFTA components.
1901 for(l=0;l<n_lfta;++l){
1902 if(lfta_names.count(split_queries[l]->query_name) == 0){
1903 // Grab the lfta for global optimization.
1904 vector<tablevar_t *> tvec = split_queries[l]->query_plan[0]->get_input_tbls();
1905 string liface = "_local_";
1906 // string lmach = "";
1907 string lmach = hostname;
1909 liface = tvec[0]->get_interface(); // iface queries have been resolved
1910 if(tvec[0]->get_machine() != ""){
1911 lmach = tvec[0]->get_machine();
1913 fprintf(stderr,"WARNING, lfta %s has empty machine name, using %s\n", split_queries[l]->query_plan[0]->node_name.c_str(), hostname.c_str());
1916 interface_names.push_back(liface);
1917 machine_names.push_back(lmach);
1920 vector<predicate_t *> schemaid_preds;
1921 for(int irv=0;irv<tvec.size();++irv){
1923 string schema_name = tvec[irv]->get_schema_name();
1924 string rvar_name = tvec[irv]->get_var_name();
1925 int schema_ref = tvec[irv]->get_schema_ref();
1928 // interface_names.push_back(liface);
1929 // machine_names.push_back(lmach);
1931 //printf("Machine is %s\n",lmach.c_str());
1933 // Check if a schemaId constraint needs to be inserted.
1934 if(schema_ref<0){ // can result from some kinds of splits
1935 schema_ref = Schema->get_table_ref(schema_name);
1937 int schema_id = Schema->get_schema_id(schema_ref); // id associated with PROTOCOL
1940 iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
1942 fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
1947 if(tvec[irv]->get_interface() != "_local_"){
1948 if(iface->has_multiple_schemas()){
1949 if(schema_id<0){ // invalid schema_id
1950 fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
1953 vector<string> iface_schemas = iface->get_property("Schemas");
1954 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1955 fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1958 // Ensure that in liface, schema_id is used for only one schema
1959 if(schema_of_schemaid.count(liface)==0){
1960 map<int, string> empty_map;
1961 schema_of_schemaid[liface] = empty_map;
1963 if(schema_of_schemaid[liface].count(schema_id)==0){
1964 schema_of_schemaid[liface][schema_id] = schema_name;
1966 if(schema_of_schemaid[liface][schema_id] != schema_name){
1967 fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
1971 }else{ // single-schema interface
1972 schema_id = -1; // don't generate schema_id predicate
1973 vector<string> iface_schemas = iface->get_property("Schemas");
1974 if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
1975 fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
1978 if(iface_schemas.size()>1){
1979 fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
1987 // If we need to check the schema_id, insert a predicate into the lfta.
1988 // TODO not just schema_id, the full all_schema_ids set.
1990 colref_t *schid_cr = new colref_t("schemaId");
1991 schid_cr->schema_ref = schema_ref;
1992 schid_cr->table_name = rvar_name;
1993 schid_cr->tablevar_ref = 0;
1994 schid_cr->default_table = false;
1995 scalarexp_t *schid_se = new scalarexp_t(schid_cr);
1996 data_type *schid_dt = new data_type("uint");
1997 schid_se->dt = schid_dt;
1999 string schid_str = int_to_string(schema_id);
2000 literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
2001 scalarexp_t *lit_se = new scalarexp_t(schid_lit);
2002 lit_se->dt = schid_dt;
2004 predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
2005 vector<cnf_elem *> clist;
2006 make_cnf_from_pr(schid_pr, clist);
2007 analyze_cnf(clist[0]);
2008 clist[0]->cost = 1; // cheap one comparison
2009 // cnf built, now insert it.
2010 split_queries[l]->query_plan[0]->append_to_where(clist[0]);
2012 // Specialized processing
2013 // filter join, get two schemaid preds
2014 string node_type = split_queries[l]->query_plan[0]->node_type();
2015 if(node_type == "filter_join"){
2016 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2018 fj->pred_t0.push_back(clist[0]);
2020 fj->pred_t1.push_back(clist[0]);
2022 schemaid_preds.push_back(schid_pr);
2024 // watchlist join, get the first schemaid pred
2025 if(node_type == "watch_join"){
2026 watch_join_qpn *fj = (watch_join_qpn *)split_queries[l]->query_plan[0];
2028 fj->pred_t0.push_back(clist[0]);
2029 schemaid_preds.push_back(schid_pr);
2034 // Specialized processing, currently filter join.
2035 if(schemaid_preds.size()>1){
2036 string node_type = split_queries[l]->query_plan[0]->node_type();
2037 if(node_type == "filter_join"){
2038 filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
2039 predicate_t *filter_pr = new predicate_t("OR", schemaid_preds[0], schemaid_preds[1]);
2040 vector<cnf_elem *> clist;
2041 make_cnf_from_pr(filter_pr, clist);
2042 analyze_cnf(clist[0]);
2043 clist[0]->cost = 1; // cheap one comparison
2044 fj->shared_pred.push_back(clist[0]);
2054 // Set the ht size from the recommendation, if there is one in the rec file
2055 if(lfta_htsize.count(split_queries[l]->query_name)>0){
2056 split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
2060 lfta_names[split_queries[l]->query_name] = lfta_list.size();
2061 split_queries[l]->set_gid(lfta_list.size()); // set lfta global id
2062 lfta_list.push_back(split_queries[l]);
2063 lfta_mach_lists[lmach].push_back(split_queries[l]);
2065 // THe following is a hack,
2066 // as I should be generating LFTA code through
2067 // the stream_query object.
2069 split_queries[l]->query_plan[0]->bind_to_schema(Schema);
2071 // split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
2074 // Create query description to embed in lfta.c
2075 string lfta_schema_str = split_queries[l]->make_schema();
2076 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2078 // get NIC capabilities.
2080 nic_property *nicprop = NULL;
2081 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2082 if(iface_codegen_type.size()){
2083 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2085 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2090 lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
2093 snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
2095 // TODO NOTE : I'd like it to be the case that registration_query_names
2096 // are the queries to be registered for subsciption.
2097 // but there is some complex bookkeeping here.
2098 registration_query_names.push_back(split_queries[l]->query_name);
2099 mach_query_names[lmach].push_back(registration_query_names.size()-1);
2100 // NOTE: I will assume a 1-1 correspondance between
2101 // mach_query_names[lmach] and lfta_mach_lists[lmach]
2102 // where mach_query_names[lmach][i] contains the index into
2103 // query_names, which names the lfta, and
2104 // mach_query_names[lmach][i] is the stream_query * of the
2105 // corresponding lfta.
2106 // Later, lfta_iface_qnames are the query names matching lfta_iface_lists
2110 // check if lfta is reusable
2111 // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
2113 bool lfta_reusable = false;
2114 if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
2115 split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
2116 lfta_reusable = true;
2118 lfta_reuse_options.push_back(lfta_reusable);
2120 // LFTA will inherit the liveness timeout specification from the containing query
2121 // it is too conservative as lfta are expected to spend less time per tuple
2124 // extract liveness timeout from query definition
2125 int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
2126 if (!liveness_timeout) {
2127 // fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
2128 // split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2129 liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2131 lfta_liveness_timeouts.push_back(liveness_timeout);
2133 // Add it to the schema
2134 table_def *td = split_queries[l]->get_output_tabledef();
2135 Schema->append_table(td);
2136 //printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
2141 // If the output is lfta-only, dump out the query name.
2142 if(split_queries.size() == 1 && !hfta_returned){
2143 if(output_query_names ){
2144 fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
2148 fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
2153 // output schema summary
2154 if(output_schema_summary){
2155 dump_summary(split_queries[0]);
2161 if(hfta_returned){ // query also has an HFTA component
2162 int hfta_nbr = split_queries.size()-1;
2164 hfta_list.push_back(split_queries[hfta_nbr]);
2166 // report on generated query names
2167 if(output_query_names){
2168 string hfta_name =split_queries[hfta_nbr]->query_name;
2169 fprintf(query_name_output,"%s H\n",hfta_name.c_str());
2170 for(l=0;l<hfta_nbr;++l){
2171 string lfta_name =split_queries[l]->query_name;
2172 fprintf(query_name_output,"%s L\n",lfta_name.c_str());
2176 // fprintf(stderr,"query names are ");
2177 // for(l=0;l<hfta_nbr;++l){
2178 // if(l>0) fprintf(stderr,",");
2179 // string fta_name =split_queries[l]->query_name;
2180 // fprintf(stderr," %s",fta_name.c_str());
2182 // fprintf(stderr,"\n");
2187 fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
2188 fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
2195 //-----------------------------------------------------------------
2196 // Compute and propagate the SE in PROTOCOL fields compute a field.
2197 //-----------------------------------------------------------------
2199 for(i=0;i<lfta_list.size();i++){
2200 lfta_list[i]->generate_protocol_se(sq_map, Schema);
2201 sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
2203 for(i=0;i<hfta_list.size();i++){
2204 hfta_list[i]->generate_protocol_se(sq_map, Schema);
2205 sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
2210 //------------------------------------------------------------------------
2211 // Perform individual FTA optimizations
2212 //-----------------------------------------------------------------------
2214 if (partitioned_mode) {
2216 // open partition definition file
2217 string part_fname = config_dir_path + "partition.txt";
2219 FILE* partfd = fopen(part_fname.c_str(), "r");
2221 fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
2224 PartnParser_setfileinput(partfd);
2225 if (PartnParserparse()) {
2226 fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
2233 print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2235 int num_hfta = hfta_list.size();
2236 for(i=0; i < hfta_list.size(); ++i){
2237 hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
2240 // Add all new hftas to schema
2241 for(i=num_hfta; i < hfta_list.size(); ++i){
2242 table_def *td = hfta_list[i]->get_output_tabledef();
2243 Schema->append_table(td);
2246 print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
2250 //------------------------------------------------------------------------
2251 // Do global (cross-fta) optimization
2252 //-----------------------------------------------------------------------
2259 set<string> extra_external_libs;
2261 for(i=0;i<hfta_list.size();++i){ // query also has an HFTA component
2264 // build hfta file name, create output
2265 if(numeric_hfta_flname){
2266 sprintf(tmpstr,"hfta_%d",hfta_count);
2267 hfta_names.push_back(tmpstr);
2268 sprintf(tmpstr,"hfta_%d.cc",hfta_count);
2270 sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
2271 hfta_names.push_back(tmpstr);
2272 sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
2274 FILE *hfta_fl = fopen(tmpstr,"w");
2275 if(hfta_fl == NULL){
2276 fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
2279 fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
2281 // If there is a field verifier, warn about
2282 // lack of compatability
2283 // NOTE : this code assumes that visible non-lfta queries
2284 // are those at the root of a stream query.
2285 string hfta_comment;
2287 string hfta_namespace;
2288 if(hfta_list[i]->defines.count("comment")>0)
2289 hfta_comment = hfta_list[i]->defines["comment"];
2290 if(hfta_list[i]->defines.count("Comment")>0)
2291 hfta_comment = hfta_list[i]->defines["Comment"];
2292 if(hfta_list[i]->defines.count("COMMENT")>0)
2293 hfta_comment = hfta_list[i]->defines["COMMENT"];
2294 if(hfta_list[i]->defines.count("title")>0)
2295 hfta_title = hfta_list[i]->defines["title"];
2296 if(hfta_list[i]->defines.count("Title")>0)
2297 hfta_title = hfta_list[i]->defines["Title"];
2298 if(hfta_list[i]->defines.count("TITLE")>0)
2299 hfta_title = hfta_list[i]->defines["TITLE"];
2300 if(hfta_list[i]->defines.count("namespace")>0)
2301 hfta_namespace = hfta_list[i]->defines["namespace"];
2302 if(hfta_list[i]->defines.count("Namespace")>0)
2303 hfta_namespace = hfta_list[i]->defines["Namespace"];
2304 if(hfta_list[i]->defines.count("NAMESPACE")>0)
2305 hfta_namespace = hfta_list[i]->defines["NAMESPACE"];
2307 if(field_verifier != NULL){
2309 if(hfta_comment == "")
2310 warning_str += "\tcomment not found.\n";
2312 // Obsolete stuff that Carsten wanted
2313 // if(hfta_title == "")
2314 // warning_str += "\ttitle not found.\n";
2315 // if(hfta_namespace == "")
2316 // warning_str += "\tnamespace not found.\n";
2319 // There is a get_tbl_keys method implemented for qp_nodes,
2320 // integrate it into steam_query, then call it to find keys,
2321 // and annotate feidls with their key-ness.
2322 // If there is a "keys" proprty in the defines block, override anything returned
2323 // from the automated analysis
2325 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2327 for(fi=0;fi<flds.size();fi++){
2328 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2330 if(warning_str != "")
2331 fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
2332 hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
2335 // Get the fields in this query
2336 vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
2338 // do key processing
2339 string hfta_keys_s = "";
2340 if(hfta_list[i]->defines.count("keys")>0)
2341 hfta_keys_s = hfta_list[i]->defines["keys"];
2342 if(hfta_list[i]->defines.count("Keys")>0)
2343 hfta_keys_s = hfta_list[i]->defines["Keys"];
2344 if(hfta_list[i]->defines.count("KEYS")>0)
2345 hfta_keys_s = hfta_list[i]->defines["KEYS"];
2346 string xtra_keys_s = "";
2347 if(hfta_list[i]->defines.count("extra_keys")>0)
2348 xtra_keys_s = hfta_list[i]->defines["extra_keys"];
2349 if(hfta_list[i]->defines.count("Extra_Keys")>0)
2350 xtra_keys_s = hfta_list[i]->defines["Extra_Keys"];
2351 if(hfta_list[i]->defines.count("EXTRA_KEYS")>0)
2352 xtra_keys_s = hfta_list[i]->defines["EXTRA_KEYS"];
2354 vector<string> hfta_keys;
2355 vector<string> partial_keys;
2356 vector<string> xtra_keys;
2357 if(hfta_keys_s==""){
2358 hfta_keys = hfta_list[i]->get_tbl_keys(partial_keys);
2359 if(xtra_keys_s.size()>0){
2360 xtra_keys = split_string(xtra_keys_s, ',');
2362 for(int xi=0;xi<xtra_keys.size();++xi){
2363 if(find(hfta_keys.begin(), hfta_keys.end(), xtra_keys[xi]) == hfta_keys.end()){
2364 hfta_keys.push_back(xtra_keys[xi]);
2368 hfta_keys = split_string(hfta_keys_s, ',');
2370 // validate that all of the keys exist in the output.
2371 // (exit on error, as its a bad specificiation)
2372 vector<string> missing_keys;
2373 for(int ki=0;ki<hfta_keys.size(); ++ki){
2375 for(fi=0;fi<flds.size();++fi){
2376 if(hfta_keys[ki] == flds[fi]->get_name())
2380 missing_keys.push_back(hfta_keys[ki]);
2382 if(missing_keys.size()>0){
2383 fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the output:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2384 for(int hi=0; hi<missing_keys.size(); ++hi){
2385 fprintf(stderr," %s", missing_keys[hi].c_str());
2387 fprintf(stderr,"\n");
2391 fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
2392 if(hfta_comment != "")
2393 fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
2394 if(hfta_title != "")
2395 fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
2396 if(hfta_namespace != "")
2397 fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
2398 fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
2399 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2401 // write info about fields to qtree.xml
2403 for(fi=0;fi<flds.size();fi++){
2404 fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2405 if(flds[fi]->get_modifier_list()->size()){
2406 fprintf(qtree_output,"mods='%s'", flds[fi]->get_modifier_list()->to_string().c_str());
2408 fprintf(qtree_output," />\n");
2411 for(int hi=0;hi<hfta_keys.size();++hi){
2412 fprintf(qtree_output, "\t\t<Key value='%s'/>\n",hfta_keys[hi].c_str());
2414 for(int hi=0;hi<partial_keys.size();++hi){
2415 fprintf(qtree_output, "\t\t<PartialKey value='%s'/>\n",partial_keys[hi].c_str());
2417 for(int hi=0;hi<xtra_keys.size();++hi){
2418 fprintf(qtree_output, "\t\t<ExtraKey value='%s'/>\n",xtra_keys[hi].c_str());
2422 // extract liveness timeout from query definition
2423 int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
2424 if (!liveness_timeout) {
2425 // fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
2426 // hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2427 liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2429 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
2431 vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
2433 for(itv=0;itv<tmp_tv.size();++itv){
2434 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
2436 string ifrs = hfta_list[i]->collect_refd_ifaces();
2438 fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
2440 fprintf(qtree_output,"\t</HFTA>\n");
2444 // debug only -- do code generation to catch generation-time errors.
2445 hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
2448 hfta_count++; // for hfta file names with numeric suffixes
2450 hfta_list[i]->get_external_libs(extra_external_libs);
2454 string ext_lib_string;
2455 set<string>::iterator ssi_el;
2456 for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
2457 ext_lib_string += (*ssi_el)+" ";
2461 // Report on the set of operator views
2462 for(i=0;i<opviews.size();++i){
2463 opview_entry *opve = opviews.get_entry(i);
2464 fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
2465 fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
2466 fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
2467 fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
2468 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2470 if (!opve->liveness_timeout) {
2471 // fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
2472 // opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
2473 opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
2475 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
2477 for(j=0;j<opve->subq_names.size();j++)
2478 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
2479 fprintf(qtree_output,"\t</UDOP>\n");
2483 //-----------------------------------------------------------------
2485 // Create interface-specific meta code files.
2486 // first, open and parse the interface resources file.
2487 ifaces_db = new ifq_t();
2489 if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
2490 fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
2491 ifx_fname.c_str(), ierr.c_str());
2495 map<string, vector<stream_query *> >::iterator svsi;
2496 for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
2497 string lmach = (*svsi).first;
2499 // For this machine, create a set of lftas per interface.
2500 vector<stream_query *> mach_lftas = (*svsi).second;
2501 map<string, vector<stream_query *> > lfta_iface_lists;
2503 for(li=0;li<mach_lftas.size();++li){
2504 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2505 string lfta_iface = "_local_";
2507 string lfta_iface = tvec[0]->get_interface();
2509 lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2512 map<string, vector<stream_query *> >::iterator lsvsi;
2513 for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
2515 string liface = (*lsvsi).first;
2516 vector<stream_query *> iface_lftas = (*lsvsi).second;
2517 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2518 if(iface_codegen_type.size()){
2519 nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2521 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2524 string mcs = generate_nic_code(iface_lftas, nicprop);
2527 mcf_flnm = lmach + "_"+liface+".mcf";
2529 mcf_flnm = hostname + "_"+liface+".mcf";
2531 if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
2532 fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
2535 fprintf(mcf_fl,"%s",mcs.c_str());
2537 //printf("mcs of machine %s, iface %s of type %s is \n%s\n",
2538 //lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
2547 //-----------------------------------------------------------------
2550 // Find common filter predicates in the LFTAs.
2551 // in addition generate structs to store the temporal attributes unpacked by prefilter
2553 map<string, vector<stream_query *> >::iterator ssqi;
2554 for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
2556 string lmach = (*ssqi).first;
2557 bool packed_return = false;
2561 // The LFTAs of this machine.
2562 vector<stream_query *> mach_lftas = (*ssqi).second;
2563 // break up on a per-interface basis.
2564 map<string, vector<stream_query *> > lfta_iface_lists;
2565 map<string, vector<int> > lfta_iface_qname_ix; // need the query name
2567 for(li=0;li<mach_lftas.size();++li){
2568 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2569 string lfta_iface = "_local_";
2571 lfta_iface = tvec[0]->get_interface();
2573 lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
2574 lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
2578 // Are the return values "packed"?
2579 // This should be done on a per-interface basis.
2580 // But this is defunct code for gs-lite
2581 for(li=0;li<mach_lftas.size();++li){
2582 vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
2583 string liface = "_local_";
2585 liface = tvec[0]->get_interface();
2587 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2588 if(iface_codegen_type.size()){
2589 if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
2590 packed_return = true;
2596 // Separate lftas by interface, collect results on a per-interface basis.
2598 vector<cnf_set *> no_preds; // fallback if there is no prefilter
2599 map<string, vector<cnf_set *> > prefilter_preds;
2600 set<unsigned int> pred_ids; // this can be global for all interfaces
2601 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2602 string liface = (*mvsi).first;
2603 vector<cnf_set *> empty_list;
2604 prefilter_preds[liface] = empty_list;
2605 if(! packed_return){
2606 get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
2609 // get NIC capabilities. (Is this needed?)
2610 nic_property *nicprop = NULL;
2611 vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
2612 if(iface_codegen_type.size()){
2613 nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
2615 fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
2622 // Now that we know the prefilter preds, generate the lfta code.
2623 // Do this for all lftas in this machine.
2624 for(li=0;li<mach_lftas.size();++li){
2625 set<unsigned int> subsumed_preds;
2626 set<unsigned int>::iterator sii;
2628 for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
2630 if((pid>>16) == li){
2631 subsumed_preds.insert(pid & 0xffff);
2635 string lfta_schema_str = mach_lftas[li]->make_schema();
2636 string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
2637 nic_property *nicprop = NULL; // no NIC properties?
2638 lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
2642 // generate structs to store the temporal attributes
2643 // unpacked by prefilter
2644 col_id_set temp_cids;
2645 get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
2646 lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
2648 // Compute the lfta bit signatures and the lfta colrefs
2649 // do this on a per-interface basis
2651 lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2653 map<string, vector<long long int> > lfta_sigs; // used again later
2654 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2655 string liface = (*mvsi).first;
2656 vector<long long int> empty_list;
2657 lfta_sigs[liface] = empty_list;
2659 vector<col_id_set> lfta_cols;
2660 vector<int> lfta_snap_length;
2661 for(li=0;li<lfta_iface_lists[liface].size();++li){
2662 unsigned long long int mask=0, bpos=1;
2664 for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
2665 if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
2669 lfta_sigs[liface].push_back(mask);
2670 lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
2671 lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
2674 //for(li=0;li<mach_lftas.size();++li){
2675 //printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
2676 //col_id_set::iterator tcisi;
2677 //for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
2678 //printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
2683 // generate the prefilter
2684 // Do this on a per-interface basis, except for the #define
2686 // lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
2687 lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2689 lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
2694 // Generate interface parameter lookup function
2695 lfta_val[lmach] += "// lookup interface properties by name\n";
2696 lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
2697 lfta_val[lmach] += "// returns NULL if given property does not exist\n";
2698 lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
2700 // collect a lit of interface names used by queries running on this host
2701 set<std::string> iface_names;
2702 for(i=0;i<mach_query_names[lmach].size();i++){
2703 int mi = mach_query_names[lmach][i];
2704 stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2706 if(interface_names[mi]=="")
2707 iface_names.insert("DEFAULTDEV");
2709 iface_names.insert(interface_names[mi]);
2712 // generate interface property lookup code for every interface
2713 set<std::string>::iterator sir;
2714 for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
2715 if (sir == iface_names.begin())
2716 lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2718 lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
2720 // iterate through interface properties
2721 vector<string> iface_properties;
2722 if(*sir!="_local_"){ // dummy watchlist interface, don't process.
2723 iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
2726 fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
2729 if (iface_properties.empty())
2730 lfta_val[lmach] += "\t\treturn NULL;\n";
2732 for (int i = 0; i < iface_properties.size(); ++i) {
2734 lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2736 lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
2738 // combine all values for the interface property using comma separator
2739 vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
2740 lfta_val[lmach] += "\t\t\treturn \"";
2741 for (int j = 0; j < vals.size(); ++j) {
2742 lfta_val[lmach] += vals[j];
2743 if (j != vals.size()-1)
2744 lfta_val[lmach] += ",";
2746 lfta_val[lmach] += "\";\n";
2748 lfta_val[lmach] += "\t\t} else\n";
2749 lfta_val[lmach] += "\t\t\treturn NULL;\n";
2752 lfta_val[lmach] += "\t} else\n";
2753 lfta_val[lmach] += "\t\treturn NULL;\n";
2754 lfta_val[lmach] += "}\n\n";
2757 // Generate a full list of FTAs for clearinghouse reference
2758 lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
2759 lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
2762 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2763 string liface = (*mvsi).first;
2764 if(liface != "_local_"){ // these don't register themselves
2765 vector<stream_query *> lfta_list = (*mvsi).second;
2766 for(i=0;i<lfta_list.size();i++){
2767 int mi = lfta_iface_qname_ix[liface][i];
2768 if(first) first = false;
2769 else lfta_val[lmach] += ", ";
2770 lfta_val[lmach] += "\"" + registration_query_names[mi] + "\"";
2774 // for (i = 0; i < registration_query_names.size(); ++i) {
2776 // lfta_val[lmach] += ", ";
2777 // lfta_val[lmach] += "\"" + registration_query_names[i] + "\"";
2780 for (i = 0; i < hfta_list.size(); ++i) {
2781 lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
2783 lfta_val[lmach] += ", NULL};\n\n";
2786 // Add the initialization function to lfta.c
2787 // Change to accept the interface name, and
2788 // set the prefilter function accordingly.
2789 // see the example in demo/err2
2790 lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
2791 lfta_val[lmach] += "// note: the last parameter in fta_register is the prefilter signature\n";
2793 // for(i=0;i<mach_query_names[lmach].size();i++)
2794 // int mi = mach_query_names[lmach][i];
2795 // stream_query *lfta_sq = lfta_mach_lists[lmach][i];
2797 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2798 string liface = (*mvsi).first;
2799 vector<stream_query *> lfta_list = (*mvsi).second;
2800 for(i=0;i<lfta_list.size();i++){
2801 stream_query *lfta_sq = lfta_list[i];
2802 int mi = lfta_iface_qname_ix[liface][i];
2804 if(liface == "_local_"){
2805 // Don't register an init function, do the init code inline
2806 lfta_val[lmach] += "// initialization code for "+registration_query_names[mi]+";\n";
2807 lfta_val[lmach] += "\t"+generate_watchlist_name(registration_query_names[mi])+".filename = "+generate_watchlist_name(registration_query_names[mi])+"__fstr;\n";
2811 fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
2813 string this_iface = "DEFAULTDEV";
2814 if(interface_names[mi]!="")
2815 this_iface = '"'+interface_names[mi]+'"';
2816 lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
2817 lfta_val[lmach] += "\t\tfta_register(\""+registration_query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
2818 // if(interface_names[mi]=="")
2819 // lfta_val[lmach]+="DEFAULTDEV";
2821 // lfta_val[lmach]+='"'+interface_names[mi]+'"';
2822 lfta_val[lmach] += this_iface;
2825 lfta_val[lmach] += ", "+generate_alloc_name(registration_query_names[mi])
2826 +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(registration_query_names[mi])
2828 sprintf(tmpstr,",%d",snap_lengths[mi]);
2829 lfta_val[lmach] += tmpstr;
2831 // unsigned long long int mask=0, bpos=1;
2833 // for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
2834 // if(prefilter_preds[f_pos]->lfta_id.count(i))
2836 // bpos = bpos << 1;
2840 // sprintf(tmpstr,",%lluull",mask);
2841 sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
2842 lfta_val[lmach]+=tmpstr;
2844 lfta_val[lmach] += ",0ull";
2847 lfta_val[lmach] += ");\n";
2851 // End of lfta prefilter stuff
2852 // --------------------------------------------------
2854 // If there is a field verifier, warn about
2855 // lack of compatability
2856 string lfta_comment;
2858 string lfta_namespace;
2859 map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
2860 if(ldefs.count("comment")>0)
2861 lfta_comment = lfta_sq->defines["comment"];
2862 if(ldefs.count("Comment")>0)
2863 lfta_comment = lfta_sq->defines["Comment"];
2864 if(ldefs.count("COMMENT")>0)
2865 lfta_comment = lfta_sq->defines["COMMENT"];
2866 if(ldefs.count("title")>0)
2867 lfta_title = lfta_sq->defines["title"];
2868 if(ldefs.count("Title")>0)
2869 lfta_title = lfta_sq->defines["Title"];
2870 if(ldefs.count("TITLE")>0)
2871 lfta_title = lfta_sq->defines["TITLE"];
2872 if(ldefs.count("NAMESPACE")>0)
2873 lfta_namespace = lfta_sq->defines["NAMESPACE"];
2874 if(ldefs.count("Namespace")>0)
2875 lfta_namespace = lfta_sq->defines["Namespace"];
2876 if(ldefs.count("namespace")>0)
2877 lfta_namespace = lfta_sq->defines["namespace"];
2879 string lfta_ht_size;
2880 if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
2881 lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
2882 if(ldefs.count("aggregate_slots")>0){
2883 lfta_ht_size = ldefs["aggregate_slots"];
2886 // NOTE : I'm assuming that visible lftas do not start with _fta.
2887 // -- will fail for non-visible simple selection queries.
2888 if(field_verifier != NULL && registration_query_names[mi].substr(0,1) != "_"){
2890 if(lfta_comment == "")
2891 warning_str += "\tcomment not found.\n";
2892 // Obsolete stuff that carsten wanted
2893 // if(lfta_title == "")
2894 // warning_str += "\ttitle not found.\n";
2895 // if(lfta_namespace == "")
2896 // warning_str += "\tnamespace not found.\n";
2898 vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2900 for(fi=0;fi<flds.size();fi++){
2901 field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
2903 if(warning_str != "")
2904 fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
2905 registration_query_names[mi].c_str(),warning_str.c_str());
2909 // Create qtree output
2910 fprintf(qtree_output,"\t<LFTA name='%s' >\n",registration_query_names[mi].c_str());
2911 if(lfta_comment != "")
2912 fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
2913 if(lfta_title != "")
2914 fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
2915 if(lfta_namespace != "")
2916 fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
2917 if(lfta_ht_size != "")
2918 fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
2920 fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
2922 fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
2923 fprintf(qtree_output,"\t\t<Interface value='%s' />\n",interface_names[mi].c_str());
2924 std::vector<tablevar_t *> itbls = lfta_sq->get_input_tables();
2925 for(int t=0;t<itbls.size();++t){
2926 fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",itbls[t]->get_schema_name().c_str());
2928 // fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
2929 fprintf(qtree_output,"\t\t<Rate value='100' />\n");
2930 fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
2931 // write info about fields to qtree.xml
2932 vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
2934 for(fi=0;fi<flds.size();fi++){
2935 fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
2936 if(flds[fi]->get_modifier_list()->size()){
2937 fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
2939 fprintf(qtree_output," />\n");
2941 fprintf(qtree_output,"\t</LFTA>\n");
2947 for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
2948 string liface = (*mvsi).first;
2950 " if (!strcmp(device, \""+liface+"\")) \n"
2951 " lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
2955 " if(lfta_prefilter == NULL){\n"
2956 " fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
2963 lfta_val[lmach] += "}\n\n";
2965 if(!(debug_only || hfta_only) ){
2968 lfta_flnm = lmach + "_lfta.c";
2970 lfta_flnm = hostname + "_lfta.c";
2971 if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
2972 fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
2975 fprintf(lfta_out,"%s",lfta_header.c_str());
2976 fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
2977 fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
2982 // Say what are the operators which must execute
2983 if(opviews.size()>0)
2984 fprintf(stderr,"The queries use the following external operators:\n");
2985 for(i=0;i<opviews.size();++i){
2986 opview_entry *opv = opviews.get_entry(i);
2987 fprintf(stderr,"\t%s\n",opv->view_name.c_str());
2991 generate_makefile(input_file_names, nfiles, hfta_names, opviews,
2992 machine_names, schema_file_name,
2994 ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
2997 fprintf(qtree_output,"</QueryNodes>\n");
3002 ////////////////////////////////////////////////////////////
3004 void generate_makefile(vector<string> &input_file_names, int nfiles,
3005 vector<string> &hfta_names, opview_set &opviews,
3006 vector<string> &machine_names,
3007 string schema_file_name,
3008 vector<string> &interface_names,
3009 ifq_t *ifdb, string &config_dir_path,
3012 map<string, vector<int> > &rts_hload
3016 if(config_dir_path != ""){
3017 config_dir_path = "-C "+config_dir_path;
3021 bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
3022 bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
3024 // if(libz_exists && !libast_exists){
3025 // fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
3029 // Get set of operator executable files to run
3031 set<string>::iterator ssi;
3032 for(i=0;i<opviews.size();++i){
3033 opview_entry *opv = opviews.get_entry(i);
3034 if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
3037 FILE *outfl = fopen("Makefile", "w");
3039 fprintf(stderr,"Can't open Makefile for write, exiting.\n");
3044 ("CPP= g++ -O3 -g -I "+root_path+"/include -I "+root_path+"/include/hfta\n"
3045 "CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
3049 fprintf(outfl," -DLFTA_STATS");
3051 // Gather the set of interfaces
3052 // Also, gather "base interface names" for use in computing
3053 // the hash splitting to virtual interfaces.
3054 // TODO : must update to hanndle machines
3056 set<string> base_vifaces; // base interfaces of virtual interfaces
3057 map<string, string> ifmachines;
3058 map<string, string> ifattrs;
3059 for(i=0;i<interface_names.size();++i){
3060 ifaces.insert(interface_names[i]);
3061 ifmachines[interface_names[i]] = machine_names[i];
3063 size_t Xpos = interface_names[i].find_last_of("X");
3064 if(Xpos!=string::npos){
3065 string iface = interface_names[i].substr(0,Xpos);
3066 base_vifaces.insert(iface);
3068 // get interface attributes and add them to the list
3071 // Do we need to include protobuf libraries?
3072 // TODO Move to the interface library: get the libraries to include
3073 // for an interface type
3075 bool use_proto = false;
3076 bool use_bsa = false;
3077 bool use_kafka = false;
3080 for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3081 string ifnm = (*ssi);
3082 vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
3083 for(int ift_i=0;ift_i<ift.size();ift_i++){
3084 if(ift[ift_i]=="PROTO"){
3088 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "BSA", erri, err_str);
3089 for(int ift_i=0;ift_i<ift.size();ift_i++){
3090 if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
3094 ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "KAFKA", erri, err_str);
3095 for(int ift_i=0;ift_i<ift.size();ift_i++){
3096 if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
3106 for(i=0;i<hfta_names.size();++i)
3107 fprintf(outfl," %s",hfta_names[i].c_str());
3111 "rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a "+root_path+"/lib/libclearinghouse.a\n"
3112 "\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
3114 fprintf(outfl,"-L. ");
3116 ("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
3118 fprintf(outfl,"-lgscppads -lpads ");
3120 "-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
3122 fprintf(outfl, " -lpz -lz -lbz ");
3123 if(libz_exists && libast_exists)
3124 fprintf(outfl," -last ");
3126 fprintf(outfl, " -ldll -ldl ");
3128 fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
3130 fprintf(outfl, " -lbsa_stream ");
3132 fprintf(outfl, " -lrdkafka ");
3133 fprintf(outfl," -lgscpaux");
3135 fprintf(outfl," -fprofile-arcs");
3140 "lfta.o: %s_lfta.c\n"
3141 "\t$(CC) -o lfta.o -c %s_lfta.c\n"
3143 "%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
3144 for(i=0;i<nfiles;++i)
3145 fprintf(outfl," %s",input_file_names[i].c_str());
3147 fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3149 fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
3151 for(i=0;i<nfiles;++i)
3152 fprintf(outfl," %s",input_file_names[i].c_str());
3153 fprintf(outfl,"\n");
3155 for(i=0;i<hfta_names.size();++i)
3158 "\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
3161 "\t$(CPP) -o %s.o -c %s.cc\n"
3164 hfta_names[i].c_str(), hfta_names[i].c_str(),
3165 hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
3166 hfta_names[i].c_str(), hfta_names[i].c_str(),
3167 hfta_names[i].c_str(), hfta_names[i].c_str()
3172 "packet_schema.txt:\n"
3173 "\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
3175 "external_fcns.def:\n"
3176 "\tln -s "+root_path+"/cfg/external_fcns.def .\n"
3179 "\trm -rf core rts *.o %s_lfta.c external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
3180 for(i=0;i<hfta_names.size();++i)
3181 fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
3182 fprintf(outfl,"\n");
3188 // Gather the set of interfaces
3189 // TODO : must update to hanndle machines
3190 // TODO : lookup interface attributes and add them as a parameter to rts process
3191 outfl = fopen("runit", "w");
3193 fprintf(stderr,"Can't open runit for write, exiting.\n");
3201 +root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
3203 "if [ ! -f gshub.log ]\n"
3205 "\techo \"Failed to start bin/gshub.py\"\n"
3208 "ADDR=`cat gshub.log`\n"
3209 "ps opgid= $! >> gs.pids\n"
3210 "./rts $ADDR default ").c_str(), outfl);
3213 for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
3214 string ifnm = (*ssi);
3215 // suppress internal _local_ interface
3216 if (ifnm == "_local_")
3218 fprintf(outfl, "%s ",ifnm.c_str());
3219 vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
3220 for(j=0;j<ifv.size();++j)
3221 fprintf(outfl, "%s ",ifv[j].c_str());
3223 fprintf(outfl, " &\n");
3224 fprintf(outfl, "echo $! >> gs.pids\n");
3225 for(i=0;i<hfta_names.size();++i)
3226 fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
3228 for(j=0;j<opviews.opview_list.size();++j){
3229 fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
3233 system("chmod +x runit");
3235 outfl = fopen("stopit", "w");
3237 fprintf(stderr,"Can't open stopit for write, exiting.\n");
3241 fprintf(outfl,"#!/bin/sh\n"
3243 "if [ ! -f gs.pids ]\n"
3247 "for pgid in `cat gs.pids`\n"
3249 "kill -TERM -$pgid\n"
3252 "for pgid in `cat gs.pids`\n"
3259 system("chmod +x stopit");
3261 //-----------------------------------------------
3263 /* For now disable support for virtual interfaces
3264 outfl = fopen("set_vinterface_hash.bat", "w");
3266 fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
3270 // The format should be determined by an entry in the ifres.xml file,
3271 // but for now hardcode the only example I have.
3272 for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
3273 if(rts_hload.count((*ssi))){
3274 string iface_name = (*ssi);
3275 string iface_number = "";
3276 for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
3277 if(isdigit(iface_name[j])){
3278 iface_number = iface_name[j];
3279 if(j>0 && isdigit(iface_name[j-1]))
3280 iface_number = iface_name[j-1] + iface_number;
3284 fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
3285 vector<int> halloc = rts_hload[iface_name];
3287 for(j=0;j<halloc.size();++j){
3290 fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
3291 prev_limit = halloc[j];
3293 fprintf(outfl,"\n");
3297 system("chmod +x set_vinterface_hash.bat");
3301 // Code for implementing a local schema
3303 table_list qpSchema;
3305 // Load the schemas of any LFTAs.
3307 for(l=0;l<hfta_nbr;++l){
3308 stream_query *sq0 = split_queries[l];
3309 table_def *td = sq0->get_output_tabledef();
3310 qpSchema.append_table(td);
3312 // load the schemas of any other ref'd tables.
3314 vector<tablevar_t *> input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
3316 for(ti=0;ti<input_tbl_names.size();++ti){
3317 int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
3319 tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
3321 fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
3324 qpSchema.append_table(Schema->get_table(tbl_ref));
3329 // Functions related to parsing.
3332 static int split_string(char *instr,char sep, char **words,int max_words){
3338 words[nwords++] = str;
3339 while( (loc = strchr(str,sep)) != NULL){
3342 if(nwords >= max_words){
3343 fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
3344 nwords = max_words-1;
3346 words[nwords++] = str;