-/* ------------------------------------------------
-Copyright 2014 AT&T Intellectual Property
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ------------------------------------------- */
-
-#include<unistd.h> // for gethostname
-
-#include <string>
-#include "parse_fta.h"
-#include "parse_schema.h"
-#include "parse_ext_fcns.h"
-#include"analyze_fta.h"
-#include"query_plan.h"
-#include"generate_lfta_code.h"
-#include"stream_query.h"
-#include"generate_utils.h"
-#include"nic_def.h"
-#include"generate_nic_code.h"
-
-#include <stdlib.h>
-#include <stdio.h>
-#include<ctype.h>
-#include<glob.h>
-#include<string.h>
-
-#include<list>
-
-// for the scandir
- #include <sys/types.h>
- #include <dirent.h>
-
-
-#include<errno.h>
-
-// to verify that some files exist.
- #include <sys/types.h>
- #include <sys/stat.h>
-
-#include "parse_partn.h"
-
-#include "print_plan.h"
-
-// Interface to the xml parser
-
-#include"xml_t.h"
-#include"field_list.h"
-
-extern int xmlParserparse(void);
-extern FILE *xmlParserin;
-extern int xmlParserdebug;
-
-std::vector<std::string> xml_attr_vec;
-std::vector<std::string> xml_val_vec;
-std::string xml_a, xml_v;
-xml_t *xml_leaves = NULL;
-
-// Interface to the field list verifier
-field_list *field_verifier = NULL;
-
-#define TMPSTRLEN 1000
-
-#ifndef PATH_DELIM
- #define PATH_DELIM '/'
-#endif
-
-char tmp_schema_str[10000];
-
-// maximum delay between two hearbeats produced
-// by UDOP. Used when its not explicity
-// provided in udop definition
-#define DEFAULT_UDOP_LIVENESS_TIMEOUT 5
-
-// Default lfta hash table size, must be power of 2.
-int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;
-
-// Interface to FTA definition lexer and parser ...
-
-extern int FtaParserparse(void);
-extern FILE *FtaParserin;
-extern int FtaParserdebug;
-
-fta_parse_t *fta_parse_result;
-var_defs_t *fta_parse_defines;
-
-
-
-// Interface to external function lexer and parser ...
-
-extern int Ext_fcnsParserparse(void);
-extern FILE *Ext_fcnsParserin;
-extern int Ext_fcnsParserdebug;
-
-ext_fcn_list *Ext_fcns;
-
-
-// Interface to partition definition parser
-extern int PartnParserparse();
-partn_def_list_t *partn_parse_result = NULL;
-
-
-
-using namespace std;
-//extern int errno;
-
-
-// forward delcaration of local utility function
-void generate_makefile(vector<string> &input_file_names, int nfiles,
- vector<string> &hfta_names, opview_set &opviews,
- vector<string> &machine_names,
- string schema_file_name,
- vector<string> &interface_names,
- ifq_t *ifdb, string &config_dir_path,
- bool use_pads,
- string extra_libs,
- map<string, vector<int> > &rts_hload
- );
-
-//static int split_string(char *instr,char sep, char **words,int max_words);
-#define MAXFLDS 100
-
- FILE *schema_summary_output = NULL; // query names
-
-// Dump schema summary
-void dump_summary(stream_query *str){
- fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
-
- table_def *sch = str->get_output_tabledef();
-
- vector<field_entry *> flds = sch->get_fields();
- int f;
- for(f=0;f<flds.size();++f){
- if(f>0) fprintf(schema_summary_output,"|");
- fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
- }
- fprintf(schema_summary_output,"\n");
- for(f=0;f<flds.size();++f){
- if(f>0) fprintf(schema_summary_output,"|");
- fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
- }
- fprintf(schema_summary_output,"\n");
-}
-
-// Globals
-string hostname; // name of current host.
-int hostname_len;
-bool generate_stats = false;
-string root_path = "../..";
-
-
-int main(int argc, char **argv){
- char tmpstr[TMPSTRLEN];
- string err_str;
- int q,s,h,f;
-
- set<int>::iterator si;
-
- vector<string> query_names; // for lfta.c registration
- map<string, vector<int> > mach_query_names; // list queries of machine
- vector<int> snap_lengths; // for lfta.c registration
- vector<string> interface_names; // for lfta.c registration
- vector<string> machine_names; // machine of interface
- vector<bool> lfta_reuse_options; // for lfta.c registration
- vector<int> lfta_liveness_timeouts; // fot qtree.xml generation
- vector<string> hfta_names; // hfta cource code names, for
- // creating make file.
- vector<string> qnames; // ensure unique names
- map<string, int> lfta_names; // keep track of unique lftas.
-
-
-// set these to 1 to debug the parser
- FtaParserdebug = 0;
- Ext_fcnsParserdebug = 0;
-
- FILE *lfta_out; // lfta.c output.
- FILE *fta_in; // input file
- FILE *table_schemas_in; // source tables definition file
- FILE *query_name_output; // query names
- FILE *qtree_output; // interconnections of query nodes
-
- // -------------------------------
- // Handling of Input Arguments
- // -------------------------------
- char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";
- const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"
- "\t[-B] : debug only (don't create output files)\n"
- "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"
- "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"
- "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"
- "\t[-C] : use <config directory> for definition files\n"
- "\t[-l] : use <library directory> for library queries\n"
- "\t[-N] : output query names in query_names.txt\n"
- "\t[-H] : create HFTA only (no schema_file)\n"
- "\t[-Q] : use query name for hfta suffix\n"
- "\t[-M] : generate make file and runit, stopit scripts\n"
- "\t[-S] : enable LFTA statistics (alters Makefile).\n"
- "\t[-f] : Output schema summary to schema_summary.txt\n"
- "\t[-P] : link with PADS\n"
- "\t[-h] : override host name.\n"
- "\t[-c] : clean out Makefile and hfta_*.cc first.\n"
- "\t[-R] : path to root of GS-lite\n"
-;
-
-// parameters gathered from command line processing
- string external_fcns_path;
-// string internal_fcn_path;
- string config_dir_path;
- string library_path = "./";
- vector<string> input_file_names;
- string schema_file_name;
- bool debug_only = false;
- bool hfta_only = false;
- bool output_query_names = false;
- bool output_schema_summary=false;
- bool numeric_hfta_flname = true;
- bool create_makefile = false;
- bool distributed_mode = false;
- bool partitioned_mode = false;
- bool use_live_hosts_file = false;
- bool use_pads = false;
- bool clean_make = false;
- int n_virtual_interfaces = 1;
-
- char chopt;
- while((chopt = getopt(argc,argv,optstr)) != -1){
- switch(chopt){
- case 'B':
- debug_only = true;
- break;
- case 'D':
- distributed_mode = true;
- break;
- case 'p':
- partitioned_mode = true;
- break;
- case 'L':
- use_live_hosts_file = true;
- break;
- case 'C':
- if(optarg != NULL)
- config_dir_path = string(optarg) + string("/");
- break;
- case 'l':
- if(optarg != NULL)
- library_path = string(optarg) + string("/");
- break;
- case 'N':
- output_query_names = true;
- break;
- case 'Q':
- numeric_hfta_flname = false;
- break;
- case 'H':
- if(schema_file_name == ""){
- hfta_only = true;
- }
- break;
- case 'f':
- output_schema_summary=true;
- break;
- case 'M':
- create_makefile=true;
- break;
- case 'S':
- generate_stats=true;
- break;
- case 'P':
- use_pads = true;
- break;
- case 'c':
- clean_make = true;
- break;
- case 'h':
- if(optarg != NULL)
- hostname = optarg;
- break;
- case 'R':
- if(optarg != NULL)
- root_path = optarg;
- break;
- case 'n':
- if(optarg != NULL){
- n_virtual_interfaces = atoi(optarg);
- if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){
- fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);
- n_virtual_interfaces = 1;
- }
- }
- break;
- case '?':
- fprintf(stderr,"Error, argument %c not recognized.\n",optopt);
- fprintf(stderr,"%s\n", usage_str);
- exit(1);
- default:
- fprintf(stderr, "Argument was %c\n", optopt);
- fprintf(stderr,"Invalid arguments\n");
- fprintf(stderr,"%s\n", usage_str);
- exit(1);
- }
- }
- argc -= optind;
- argv += optind;
- for (int i = 0; i < argc; ++i) {
- if((schema_file_name == "") && !hfta_only){
- schema_file_name = argv[i];
- }else{
- input_file_names.push_back(argv[i]);
- }
- }
-
- if(input_file_names.size() == 0){
- fprintf(stderr,"%s\n", usage_str);
- exit(1);
- }
-
- if(clean_make){
- string clean_cmd = "rm Makefile hfta_*.cc";
- int clean_ret = system(clean_cmd.c_str());
- if(clean_ret){
- fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);
- }
- }
-
-
- nic_prop_db *npdb = new nic_prop_db(config_dir_path);
-
-// Open globally used file names.
-
- // prepend config directory to schema file
- schema_file_name = config_dir_path + schema_file_name;
- external_fcns_path = config_dir_path + string("external_fcns.def");
- string ifx_fname = config_dir_path + string("ifres.xml");
-
-// Find interface query file(s).
- if(hostname == ""){
- gethostname(tmpstr,TMPSTRLEN);
- hostname = tmpstr;
- }
- hostname_len = strlen(tmpstr);
- string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");
- vector<string> ifq_fls;
-
- ifq_fls.push_back(ifq_fname);
-
-
-// Get the field list, if it exists
- string flist_fl = config_dir_path + "field_list.xml";
- FILE *flf_in = NULL;
- if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {
- fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());
- xml_leaves = new xml_t();
- xmlParser_setfileinput(flf_in);
- if(xmlParserparse()){
- fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());
- }else{
- field_verifier = new field_list(xml_leaves);
- }
- }
-
- if(!hfta_only){
- if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {
- fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));
- exit(1);
- }
- }
-
-/*
- if(!(debug_only || hfta_only)){
- if((lfta_out = fopen("lfta.c","w")) == NULL){
- fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
- exit(1);
- }
- }
-*/
-
-// Get the output specification file.
-// format is
-// query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions
- string ospec_fl = "output_spec.cfg";
- FILE *osp_in = NULL;
- vector<ospec_str *> output_specs;
- multimap<string, int> qname_to_ospec;
- if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {
- char *flds[MAXFLDS];
- int o_lineno = 0;
- while(fgets(tmpstr,TMPSTRLEN,osp_in)){
- o_lineno++;
- int nflds = split_string(tmpstr,',',flds,MAXFLDS);
- if(nflds == 7){
-// make operator type lowercase
- char *tmpc;
- for(tmpc=flds[1];*tmpc!='\0';++tmpc)
- *tmpc = tolower(*tmpc);
-
- ospec_str *tmp_ospec = new ospec_str();
- tmp_ospec->query = flds[0];
- tmp_ospec->operator_type = flds[1];
- tmp_ospec->operator_param = flds[2];
- tmp_ospec->output_directory = flds[3];
- tmp_ospec->bucketwidth = atoi(flds[4]);
- tmp_ospec->partitioning_flds = flds[5];
- tmp_ospec->n_partitions = atoi(flds[6]);
- qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
- output_specs.push_back(tmp_ospec);
- }else{
- fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
- }
- }
- fclose(osp_in);
- }else{
- fprintf(stderr,"output_spec.cfg not found. The query set has no output. exiting.\n");
- exit(1);
- }
-
-// hfta parallelism
- string pspec_fl = "hfta_parallelism.cfg";
- FILE *psp_in = NULL;
- map<string, int> hfta_parallelism;
- if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){
- char *flds[MAXFLDS];
- int o_lineno = 0;
- while(fgets(tmpstr,TMPSTRLEN,psp_in)){
- bool good_entry = true;
- o_lineno++;
- int nflds = split_string(tmpstr,',',flds,MAXFLDS);
- if(nflds == 2){
- string hname = flds[0];
- int par = atoi(flds[1]);
- if(par <= 0 || par > n_virtual_interfaces){
- fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
- good_entry = false;
- }
- if(good_entry && n_virtual_interfaces % par != 0){
- fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);
- good_entry = false;
- }
- if(good_entry)
- hfta_parallelism[hname] = par;
- }
- }
- }else{
- fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());
- }
-
-
-// LFTA hash table sizes
- string htspec_fl = "lfta_htsize.cfg";
- FILE *htsp_in = NULL;
- map<string, int> lfta_htsize;
- if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){
- char *flds[MAXFLDS];
- int o_lineno = 0;
- while(fgets(tmpstr,TMPSTRLEN,htsp_in)){
- bool good_entry = true;
- o_lineno++;
- int nflds = split_string(tmpstr,',',flds,MAXFLDS);
- if(nflds == 2){
- string lfta_name = flds[0];
- int htsz = atoi(flds[1]);
- if(htsz>0){
- lfta_htsize[lfta_name] = htsz;
- }else{
- fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);
- }
- }
- }
- }else{
- fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());
- }
-
-// LFTA vitual interface hash split
- string rtlspec_fl = "rts_load.cfg";
- FILE *rtl_in = NULL;
- map<string, vector<int> > rts_hload;
- if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){
- char *flds[MAXFLDS];
- int r_lineno = 0;
- string iface_name;
- vector<int> hload;
- while(fgets(tmpstr,TMPSTRLEN,rtl_in)){
- bool good_entry = true;
- r_lineno++;
- iface_name = "";
- hload.clear();
- int nflds = split_string(tmpstr,',',flds,MAXFLDS);
- if(nflds >1){
- iface_name = flds[0];
- int cumm_h = 0;
- int j;
- for(j=1;j<nflds;++j){
- int h = atoi(flds[j]);
- if(h<=0)
- good_entry = false;
- cumm_h += h;
- hload.push_back(cumm_h);
- }
- }else{
- good_entry = false;
- }
- if(good_entry){
- rts_hload[iface_name] = hload;
- }else{
- fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());
- }
- }
- }
-
-
-
- if(output_query_names){
- if((query_name_output = fopen("query_names.txt","w")) == NULL){
- fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));
- exit(1);
- }
- }
-
- if(output_schema_summary){
- if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){
- fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));
- exit(1);
- }
- }
-
- if((qtree_output = fopen("qtree.xml","w")) == NULL){
- fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));
- exit(1);
- }
- fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");
- fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");
- fprintf(qtree_output,"<QueryNodes>\n");
-
-
-// Get an initial Schema
- table_list *Schema;
- if(!hfta_only){
-// Parse the table schema definitions.
- fta_parse_result = new fta_parse_t();
- FtaParser_setfileinput(table_schemas_in);
- if(FtaParserparse()){
- fprintf(stderr,"Table schema parse failed.\n");
- exit(1);
- }
- if(fta_parse_result->parse_type != TABLE_PARSE){
- fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());
- exit(1);
- }
- Schema = fta_parse_result->tables;
-
-// Process schema field inheritance
- int retval;
- retval = Schema->unroll_tables(err_str);
- if(retval){
- fprintf(stderr,"Error processing schema filed inheritance:\n %s\n", err_str.c_str() );
- exit(1);
- }
- }else{
-// hfta only => we will try to fetch schemas from the registry.
-// therefore, start off with an empty schema.
- Schema = new table_list();
- }
-
-
-// Open and parse the external functions file.
- Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");
- if(Ext_fcnsParserin == NULL){
- fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");
- Ext_fcns = new ext_fcn_list();
- }else{
- if(Ext_fcnsParserparse()){
- fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");
- Ext_fcns = new ext_fcn_list();
- }
- }
- if(Ext_fcns->validate_fcns(err_str)){
- fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());
- exit(1);
- }
-
-// Open and parse the interface resources file.
-// ifq_t *ifaces_db = new ifq_t();
-// string ierr;
-// if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
-// fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
-// ifx_fname.c_str(), ierr.c_str());
-// exit(1);
-// }
-// if(ifaces_db->load_ifqs(ifq_fname, ierr)){
-// fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
-// ifq_fname.c_str(), ierr.c_str());
-// exit(1);
-// }
-
-
-// The LFTA code string.
-// Put the standard preamble here.
-// NOTE: the hash macros, fcns should go into the run time
- map<string, string> lfta_val;
- map<string, string> lfta_prefilter_val;
-
- string lfta_header =
-"#include <limits.h>\n\n"
-"#include \"rts.h\"\n"
-"#include \"fta.h\"\n"
-"#include \"lapp.h\"\n"
-"#include \"rts_udaf.h\"\n\n"
-;
-// Get any locally defined parsing headers
- glob_t glob_result;
- memset(&glob_result, 0, sizeof(glob_result));
-
- // do the glob operation
- int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
- if(return_value == 0){
- for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
- char *flds[1000];
- int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
- lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n\n";
- }
- }else{
- fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
- }
-
-/*
-"#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"
-"#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"
-"#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"
-"#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"
-*/
-
- lfta_header +=
-"\n"
-"gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"
-"\n"
-"#define SLOT_FILLED 0x04\n"
-"#define SLOT_GEN_BITS 0x03\n"
-"#define SLOT_HASH_BITS 0xfffffff8\n"
-"#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"
-"#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"
-"#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"
-"\n\n"
-
-"#define lfta_BOOL_to_hash(x) (x)\n"
-"#define lfta_USHORT_to_hash(x) (x)\n"
-"#define lfta_UINT_to_hash(x) (x)\n"
-"#define lfta_IP_to_hash(x) (x)\n"
-"#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"
-"#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"
-"#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
-"#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"
-"#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"
-"static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"
-" gs_uint32_t i,ret=0,tmp_sum = 0;\n"
-" for(i=0;i<x.length;++i){\n"
-" tmp_sum |= (x.data[i]) << (8*(i%4));\n"
-" if((i%4) == 3){\n"
-" ret ^= tmp_sum;\n"
-" tmp_sum = 0;\n"
-" }\n"
-" }\n"
-" if((i%4)!=0) ret ^=tmp_sum;\n"
-" return(ret);\n"
-"}\n\n\n";
-
-
-
-//////////////////////////////////////////////////////////////////
-///// Get all of the query parse trees
-
-
- int i,p;
- int hfta_count = 0; // for numeric suffixes to hfta .cc files
-
-//---------------------------
-// Global info needed for post processing.
-
-// Set of operator views ref'd in the query set.
- opview_set opviews;
-// lftas on a per-machine basis.
- map<string, vector<stream_query *> > lfta_mach_lists;
- int nfiles = input_file_names.size();
- vector<stream_query *> hfta_list; // list of hftas.
- map<string, stream_query *> sq_map; // map from query name to stream query.
-
-
-//////////////////////////////////////////
-
-// Open and parse the interface resources file.
- ifq_t *ifaces_db = new ifq_t();
- string ierr;
- if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
- fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
- ifx_fname.c_str(), ierr.c_str());
- exit(1);
- }
- if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){
- fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",
- ifq_fls[0].c_str(), ierr.c_str());
- exit(1);
- }
-
- map<string, string> qname_to_flname; // for detecting duplicate query names
-
-
-
-// Parse the files to create a vector of parse trees.
-// Load qnodes with information to perform a topo sort
-// based on query dependencies.
- vector<query_node *> qnodes; // for topo sort.
- map<string,int> name_node_map; // map query name to qnodes entry
- for(i=0;i<input_file_names.size();i++){
-
- if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {
- fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));
- continue;
- }
-fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());
-
-// Parse the FTA query
- fta_parse_result = new fta_parse_t();
- FtaParser_setfileinput(fta_in);
- if(FtaParserparse()){
- fprintf(stderr,"FTA parse failed.\n");
- exit(1);
- }
- if(fta_parse_result->parse_type != QUERY_PARSE){
- fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
- exit(1);
- }
-
-// returns a list of parse trees
- vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
- for(p=0;p<qlist.size();++p){
- table_exp_t *fta_parse_tree = qlist[p];
-// query_parse_trees.push_back(fta_parse_tree);
-
-// compute the default name -- extract from query name
- strcpy(tmpstr,input_file_names[i].c_str());
- char *qname = strrchr(tmpstr,PATH_DELIM);
- if(qname == NULL)
- qname = tmpstr;
- else
- qname++;
- char *qname_end = strchr(qname,'.');
- if(qname_end != NULL) *qname_end = '\0';
- string qname_str = qname;
- string imputed_qname = impute_query_name(fta_parse_tree, qname_str);
-
-// Deternmine visibility. Should I be attaching all of the output methods?
- if(qname_to_ospec.count(imputed_qname)>0)
- fta_parse_tree->set_visible(true);
- else
- fta_parse_tree->set_visible(false);
-
-
-// Create a manipulable repesentation of the parse tree.
-// the qnode inherits the visibility assigned to the parse tree.
- int pos = qnodes.size();
- qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));
- name_node_map[ qnodes[pos]->name ] = pos;
-//printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);
-// qnames.push_back(impute_query_name(fta_parse_tree, qname_str));
-// qfiles.push_back(i);
-
-// Check for duplicate query names
-// NOTE : in hfta-only generation, I should
-// also check with the names of the registered queries.
- if(qname_to_flname.count(qnodes[pos]->name) > 0){
- fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",
- qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());
- exit(1);
- }
- if(Schema->find_tbl(qnodes[pos]->name) >= 0){
- fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",
- qnodes[pos]->name.c_str(), input_file_names[i].c_str());
- exit(1);
- }
- qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();
-
-
- }
- }
-
-// Add the library queries
-
- int pos;
- for(pos=0;pos<qnodes.size();++pos){
- int fi;
- for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){
- string src_tbl = qnodes[pos]->refd_tbls[fi];
- if(qname_to_flname.count(src_tbl) == 0){
- int last_sep = src_tbl.find_last_of('/');
- if(last_sep != string::npos){
-fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());
- string target_qname = src_tbl.substr(last_sep+1);
- string qpathname = library_path + src_tbl + ".gsql";
- if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {
- fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));
- exit(1);
- fprintf(stderr,"After exit\n");
- }
-fprintf(stderr,"Parsing file %s\n",qpathname.c_str());
-// Parse the FTA query
- fta_parse_result = new fta_parse_t();
- FtaParser_setfileinput(fta_in);
- if(FtaParserparse()){
- fprintf(stderr,"FTA parse failed.\n");
- exit(1);
- }
- if(fta_parse_result->parse_type != QUERY_PARSE){
- fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());
- exit(1);
- }
-
- map<string, int> local_query_map;
- vector<string> local_query_names;
- vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;
- for(p=0;p<qlist.size();++p){
- table_exp_t *fta_parse_tree = qlist[p];
- fta_parse_tree->set_visible(false); // assumed to not produce output
- string imputed_qname = impute_query_name(fta_parse_tree, target_qname);
- if(imputed_qname == target_qname)
- imputed_qname = src_tbl;
- if(local_query_map.count(imputed_qname)>0){
- fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());
- exit(1);
- }
- local_query_map[ imputed_qname ] = p;
- local_query_names.push_back(imputed_qname);
- }
-
- if(local_query_map.count(src_tbl)==0){
- fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());
- exit(1);
- }
-
- vector<int> worklist;
- set<int> added_queries;
- vector<query_node *> new_qnodes;
- worklist.push_back(local_query_map[target_qname]);
- added_queries.insert(local_query_map[target_qname]);
- int qq;
- int qpos = qnodes.size();
- for(qq=0;qq<worklist.size();++qq){
- int q_id = worklist[qq];
- query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );
- new_qnodes.push_back( new_qnode);
- vector<string> refd_tbls = new_qnode->refd_tbls;
- int ff;
- for(ff = 0;ff<refd_tbls.size();++ff){
- if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){
-
- if(name_node_map.count(refd_tbls[ff])>0){
- fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s, and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );
- exit(1);
- }else{
- worklist.push_back(local_query_map[refd_tbls[ff]]);
- }
- }
- }
- }
-
- for(qq=0;qq<new_qnodes.size();++qq){
- int qpos = qnodes.size();
- qnodes.push_back(new_qnodes[qq]);
- name_node_map[qnodes[qpos]->name ] = qpos;
- qname_to_flname[qnodes[qpos]->name ] = qpathname;
- }
- }
- }
- }
- }
-
-
-
-
-
-
-
-
-//---------------------------------------
-
-
-// Add the UDOPS.
-
- string udop_missing_sources;
- for(i=0;i<qnodes.size();++i){
- int fi;
- for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){
- int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);
- if(sid >= 0){
- if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){
- if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){
- int pos = qnodes.size();
- qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));
- name_node_map[ qnodes[pos]->name ] = pos;
- qnodes[pos]->is_externally_visible = false; // its visible
- // Need to mark the source queries as visible.
- int si;
- string missing_sources = "";
- for(si=0;si<qnodes[pos]->refd_tbls.size();++si){
- string src_tbl = qnodes[pos]->refd_tbls[si];
- if(name_node_map.count(src_tbl)==0){
- missing_sources += src_tbl + " ";
- }
- }
- if(missing_sources != ""){
- udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";
- }
- }
- }
- }
- }
- }
- if(udop_missing_sources != ""){
- fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());
- exit(1);
- }
-
-
-
-////////////////////////////////////////////////////////////////////
-/// Check parse trees to verify that some
-/// global properties are met :
-/// if q1 reads from q2, then
-/// q2 is processed before q1
-/// q1 can supply q2's parameters
-/// Verify there is no cycle in the reads-from graph.
-
-// Compute an order in which to process the
-// queries.
-
-// Start by building the reads-from lists.
-//
-
- for(i=0;i<qnodes.size();++i){
- int qi, fi;
- vector<string> refd_tbls = qnodes[i]->refd_tbls;
- for(fi = 0;fi<refd_tbls.size();++fi){
- if(name_node_map.count(refd_tbls[fi])>0){
-//printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);
- (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);
- }
- }
- }
-
-
-// If one query reads the result of another,
-// check for parameter compatibility. Currently it must
-// be an exact match. I will move to requiring
-// containment after re-ordering, but will require
-// some analysis for code generation which is not
-// yet in place.
-//printf("There are %d query nodes.\n",qnodes.size());
-
-
- for(i=0;i<qnodes.size();++i){
- vector<var_pair_t *> target_params = qnodes[i]->params;
- for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
- vector<var_pair_t *> source_params = qnodes[(*si)]->params;
- if(target_params.size() != source_params.size()){
- fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
- exit(1);
- }
- int p;
- for(p=0;p<target_params.size();++p){
- if(! (target_params[p]->name == source_params[p]->name &&
- target_params[p]->val == source_params[p]->val ) ){
- fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());
- exit(1);
- }
- }
- }
- }
-
-
-// Find the roots.
-// Start by counting inedges.
- for(i=0;i<qnodes.size();++i){
- for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
- qnodes[(*si)]->n_consumers++;
- }
- }
-
-// The roots are the nodes with indegree zero.
- set<int> roots;
- for(i=0;i<qnodes.size();++i){
- if(qnodes[i]->n_consumers == 0){
- if(qnodes[i]->is_externally_visible == false){
- fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible. Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());
- }
- roots.insert(i);
- }
- }
-
-// Remove the parts of the subtree that produce no output.
- set<int> valid_roots;
- set<int> discarded_nodes;
- set<int> candidates;
- while(roots.size() >0){
- for(si=roots.begin();si!=roots.end();++si){
- if(qnodes[(*si)]->is_externally_visible){
- valid_roots.insert((*si));
- }else{
- discarded_nodes.insert((*si));
- set<int>::iterator sir;
- for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
- qnodes[(*sir)]->n_consumers--;
- if(qnodes[(*sir)]->n_consumers == 0)
- candidates.insert( (*sir));
- }
- }
- }
- roots = candidates;
- candidates.clear();
- }
- roots = valid_roots;
- if(discarded_nodes.size()>0){
- fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");
- int di = 0;
- for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){
- if(di>0 && (di%8)==0) fprintf(stderr,"\n");
- di++;
- fprintf(stderr," %s",qnodes[(*si)]->name.c_str());
- }
- fprintf(stderr,"\n");
- }
-
-// Compute the sources_to set, ignoring discarded nodes.
- for(i=0;i<qnodes.size();++i){
- if(discarded_nodes.count(i)==0)
- for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){
- qnodes[(*si)]->sources_to.insert(i);
- }
- }
-
-
-// Find the nodes that are shared by multiple visible subtrees.
-// THe roots become inferred visible nodes.
-
-// Find the visible nodes.
- vector<int> visible_nodes;
- for(i=0;i<qnodes.size();i++){
- if(qnodes[i]->is_externally_visible){
- visible_nodes.push_back(i);
- }
- }
-
-// Find UDOPs referenced by visible nodes.
- list<int> workq;
- for(i=0;i<visible_nodes.size();++i){
- workq.push_back(visible_nodes[i]);
- }
- while(!workq.empty()){
- int node = workq.front();
- workq.pop_front();
- set<int>::iterator children;
- if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){
- qnodes[node]->is_externally_visible = true;
- visible_nodes.push_back(node);
- for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
- if(qnodes[(*children)]->is_externally_visible == false){
- qnodes[(*children)]->is_externally_visible = true;
- visible_nodes.push_back((*children));
- }
- }
- }
- for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){
- workq.push_back((*children));
- }
- }
-
- bool done = false;
- while(!done){
-// reset the nodes
- for(i=0;i<qnodes.size();i++){
- qnodes[i]->subtree_roots.clear();
- }
-
-// Walk the tree defined by a visible node, not descending into
-// subtrees rooted by a visible node. Mark the node visited with
-// the visible node ID.
- for(i=0;i<visible_nodes.size();++i){
- set<int> vroots;
- vroots.insert(visible_nodes[i]);
- while(vroots.size()>0){
- for(si=vroots.begin();si!=vroots.end();++si){
- qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);
-
- set<int>::iterator sir;
- for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
- if(! qnodes[(*sir)]->is_externally_visible){
- candidates.insert( (*sir));
- }
- }
- }
- vroots = candidates;
- candidates.clear();
- }
- }
-// Find the nodes in multiple visible node subtrees, but with no parent
-// that has is in multile visible node subtrees. Mark these as inferred visible nodes.
- done = true; // until proven otherwise
- for(i=0;i<qnodes.size();i++){
- if(qnodes[i]->subtree_roots.size()>1){
- bool is_new_root = true;
- set<int>::iterator sir;
- for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){
- if(qnodes[(*sir)]->subtree_roots.size()>1)
- is_new_root = false;
- }
- if(is_new_root){
- qnodes[i]->is_externally_visible = true;
- qnodes[i]->inferred_visible_node = true;
- visible_nodes.push_back(i);
- done = false;
- }
- }
- }
- }
-
-
-
-
-
-// get visible nodes in topo ordering.
-// for(i=0;i<qnodes.size();i++){
-// qnodes[i]->n_consumers = qnodes[i]->sources_to.size();
-// }
- vector<int> process_order;
- while(roots.size() >0){
- for(si=roots.begin();si!=roots.end();++si){
- if(discarded_nodes.count((*si))==0){
- process_order.push_back( (*si) );
- }
- set<int>::iterator sir;
- for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
- qnodes[(*sir)]->n_consumers--;
- if(qnodes[(*sir)]->n_consumers == 0)
- candidates.insert( (*sir));
- }
- }
- roots = candidates;
- candidates.clear();
- }
-
-
-//printf("process_order.size() =%d\n",process_order.size());
-
-// Search for cyclic dependencies
- string found_dep;
- for(i=0;i<qnodes.size();++i){
- if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){
- if(found_dep.size() != 0) found_dep += ", ";
- found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";
- }
- }
- if(found_dep.size()>0){
- fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());
- exit(1);
- }
-
-// Get a list of query sets, in the order to be processed.
-// Start at visible root and do bfs.
-// The query set includes queries referenced indirectly,
-// as sources for user-defined operators. These are needed
-// to ensure that they are added to the schema, but are not part
-// of the query tree.
-
-// stream_node_sets contains queries reachable only through the
-// FROM clause, so I can tell which queries to add to the stream
-// query. (DISABLED, UDOPS are integrated, does this cause problems?)
-
-// NOTE: this code works because in order for data to be
-// read by multiple hftas, the node must be externally visible.
-// But visible nodes define roots of process sets.
-// internally visible nodes can feed data only
-// to other nodes in the same query file.
-// Therefore, any access can be restricted to a file,
-// hfta output sharing is done only on roots
-// never on interior nodes.
-
-
-
-
-// Conpute the base collection of hftas.
- vector<hfta_node *> hfta_sets;
- map<string, int> hfta_name_map;
-// vector< vector<int> > process_sets;
-// vector< set<int> > stream_node_sets;
- reverse(process_order.begin(), process_order.end()); // get listing in reverse order.
- // i.e. process leaves 1st.
- for(i=0;i<process_order.size();++i){
- if(qnodes[process_order[i]]->is_externally_visible == true){
-//printf("Visible.\n");
- int root = process_order[i];
- hfta_node *hnode = new hfta_node();
- hnode->name = qnodes[root]-> name;
- hnode->source_name = qnodes[root]-> name;
- hnode->is_udop = qnodes[root]->is_udop;
- hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;
-
- vector<int> proc_list; proc_list.push_back(root);
-// Ensure that nodes are added only once.
- set<int> proc_set; proc_set.insert(root);
- roots.clear(); roots.insert(root);
- candidates.clear();
- while(roots.size()>0){
- for(si=roots.begin();si!=roots.end();++si){
-//printf("Processing root %d\n",(*si));
- set<int>::iterator sir;
- for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){
-//printf("reads fom %d\n",(*sir));
- if(qnodes[(*sir)]->is_externally_visible==false){
- candidates.insert( (*sir) );
- if(proc_set.count( (*sir) )==0){
- proc_set.insert( (*sir) );
- proc_list.push_back( (*sir) );
- }
- }
- }
- }
- roots = candidates;
- candidates.clear();
- }
-
- reverse(proc_list.begin(), proc_list.end());
- hnode->query_node_indices = proc_list;
- hfta_name_map[hnode->name] = hfta_sets.size();
- hfta_sets.push_back(hnode);
- }
- }
-
-// Compute the reads_from / sources_to graphs for the hftas.
-
- for(i=0;i<hfta_sets.size();++i){
- hfta_node *hnode = hfta_sets[i];
- for(q=0;q<hnode->query_node_indices.size();q++){
- query_node *qnode = qnodes[hnode->query_node_indices[q]];
- for(s=0;s<qnode->refd_tbls.size();++s){
- if(hfta_name_map.count(qnode->refd_tbls[s])){
- int other_hfta = hfta_name_map[qnode->refd_tbls[s]];
- hnode->reads_from.insert(other_hfta);
- hfta_sets[other_hfta]->sources_to.insert(i);
- }
- }
- }
- }
-
-// Compute a topological sort of the hfta_sets.
-
- vector<int> hfta_topsort;
- workq.clear();
- int hnode_srcs[hfta_sets.size()];
- for(i=0;i<hfta_sets.size();++i){
- hnode_srcs[i] = 0;
- if(hfta_sets[i]->sources_to.size() == 0)
- workq.push_back(i);
- }
-
- while(! workq.empty()){
- int node = workq.front();
- workq.pop_front();
- hfta_topsort.push_back(node);
- set<int>::iterator stsi;
- for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){
- int parent = (*stsi);
- hnode_srcs[parent]++;
- if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){
- workq.push_back(parent);
- }
- }
- }
-
-// Decorate hfta nodes with the level of parallelism given as input.
-
- map<string, int>::iterator msii;
- for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
- string hfta_name = (*msii).first;
- int par = (*msii).second;
- if(hfta_name_map.count(hfta_name) > 0){
- hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;
- }else{
- fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());
- }
- }
-
-// Propagate levels of parallelism: children should have a level of parallelism
-// as large as any of its parents. Adjust children upwards to compensate.
-// Start at parents and adjust children, auto-propagation will occur.
-
- for(i=hfta_sets.size()-1;i>=0;i--){
- set<int>::iterator stsi;
- for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){
- if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){
- hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;
- }
- }
- }
-
-// Before all the name mangling, check if therey are any output_spec.cfg
-// or hfta_parallelism.cfg entries that do not have a matching query.
-
- string dangling_ospecs = "";
- for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){
- string oq = (*msii).first;
- if(hfta_name_map.count(oq) == 0){
- dangling_ospecs += " "+(*msii).first;
- }
- }
- if(dangling_ospecs!=""){
- fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
- }
-
- string dangling_par = "";
- for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){
- string oq = (*msii).first;
- if(hfta_name_map.count(oq) == 0){
- dangling_par += " "+(*msii).first;
- }
- }
- if(dangling_par!=""){
- fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());
- }
-
-
-
-// Replicate parallelized hftas. Do __copyX name mangling. Adjust
-// FROM clauses: retarget any name which is an internal node, and
-// any which is in hfta_sets (and which is parallelized). Add Merge nodes
-// when the source hfta has more parallelism than the target node.
-// Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.
-
-
- int n_original_hfta_sets = hfta_sets.size();
- for(i=0;i<n_original_hfta_sets;++i){
- if(hfta_sets[i]->n_parallel > 1){
- hfta_sets[i]->do_generation =false; // set the deletion flag for this entry.
- set<string> local_nodes; // names of query nodes in the hfta.
- for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
- local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);
- }
-
- for(p=0;p<hfta_sets[i]->n_parallel;++p){
- string mangler = "__copy"+int_to_string(p);
- hfta_node *par_hfta = new hfta_node();
- par_hfta->name = hfta_sets[i]->name + mangler;
- par_hfta->source_name = hfta_sets[i]->name;
- par_hfta->is_udop = hfta_sets[i]->is_udop;
- par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;
- par_hfta->n_parallel = hfta_sets[i]->n_parallel;
- par_hfta->parallel_idx = p;
-
- map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.
-
-// Is it a UDOP?
- if(hfta_sets[i]->is_udop){
- int root = hfta_sets[i]->query_node_indices[0];
-
- string unequal_par_sources;
- set<int>::iterator rfsii;
- for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){
- if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){
- unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";
- }
- }
- if(unequal_par_sources != ""){
- fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());
- exit(1);
- }
-
- int rti;
- vector<string> new_sources;
- for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){
- new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);
- }
-
- query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);
- new_qn->name += mangler;
- new_qn->mangler = mangler;
- new_qn->refd_tbls = new_sources;
- par_hfta->query_node_indices.push_back(qnodes.size());
- par_qnode_map[new_qn->name] = qnodes.size();
- name_node_map[ new_qn->name ] = qnodes.size();
- qnodes.push_back(new_qn);
- }else{
-// regular query node
- for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
- int hqn_idx = hfta_sets[i]->query_node_indices[h];
- table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);
-// rehome the from clause on mangled names.
-// create merge nodes as needed for external sources.
- for(f=0;f<dup_pt->fm->tlist.size();++f){
- if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){
- dup_pt->fm->tlist[f]->schema_name += mangler;
- }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){
-// Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
- int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];
- if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){
- dup_pt->fm->tlist[f]->schema_name += mangler;
- }else{
- vector<string> src_tbls;
- int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;
- if(stride == 0){
- fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());
- exit(1);
- }
- for(s=0;s<stride;++s){
- string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);
- src_tbls.push_back(ext_src_name);
- }
- table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
- string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);
- dup_pt->fm->tlist[f]->schema_name = merge_node_name;
-// Make a qnode to represent the new merge node
- query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
- qn_pt->refd_tbls = src_tbls;
- qn_pt->is_udop = false;
- qn_pt->is_externally_visible = false;
- qn_pt->inferred_visible_node = false;
- par_hfta->query_node_indices.push_back(qnodes.size());
- par_qnode_map[merge_node_name] = qnodes.size();
- name_node_map[ merge_node_name ] = qnodes.size();
- qnodes.push_back(qn_pt);
- }
- }
- }
- query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);
- for(f=0;f<dup_pt->fm->tlist.size();++f){
- new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);
- }
- new_qn->params = qnodes[hqn_idx]->params;
- new_qn->is_udop = false;
- new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;
- new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;
- par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());
- par_qnode_map[new_qn->name] = qnodes.size();
- name_node_map[ new_qn->name ] = qnodes.size();
- qnodes.push_back(new_qn);
- }
- }
- hfta_name_map[par_hfta->name] = hfta_sets.size();
- hfta_sets.push_back(par_hfta);
- }
- }else{
-// This hfta isn't being parallelized, but add merge nodes for any parallelized
-// hfta sources.
- if(!hfta_sets[i]->is_udop){
- for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){
- int hqn_idx = hfta_sets[i]->query_node_indices[h];
- for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){
- if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){
-// Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.
- int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];
- if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){
- vector<string> src_tbls;
- for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){
- string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);
- src_tbls.push_back(ext_src_name);
- }
- table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);
- string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);
- qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;
-// Make a qnode to represent the new merge node
- query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);
- qn_pt->refd_tbls = src_tbls;
- qn_pt->is_udop = false;
- qn_pt->is_externally_visible = false;
- qn_pt->inferred_visible_node = false;
- hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());
- name_node_map[ merge_node_name ] = qnodes.size();
- qnodes.push_back(qn_pt);
- }
- }
- }
- }
- }
- }
- }
-
-// Rebuild the reads_from / sources_to lists in the qnodes
- for(q=0;q<qnodes.size();++q){
- qnodes[q]->reads_from.clear();
- qnodes[q]->sources_to.clear();
- }
- for(q=0;q<qnodes.size();++q){
- for(s=0;s<qnodes[q]->refd_tbls.size();++s){
- if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){
- int rf = name_node_map[qnodes[q]->refd_tbls[s]];
- qnodes[q]->reads_from.insert(rf);
- qnodes[rf]->sources_to.insert(q);
- }
- }
- }
-
-// Rebuild the reads_from / sources_to lists in hfta_sets
- for(q=0;q<hfta_sets.size();++q){
- hfta_sets[q]->reads_from.clear();
- hfta_sets[q]->sources_to.clear();
- }
- for(q=0;q<hfta_sets.size();++q){
- for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){
- int node = hfta_sets[q]->query_node_indices[s];
- set<int>::iterator rfsii;
- for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){
- if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){
- hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);
- hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);
- }
- }
- }
- }
-
-/*
-for(q=0;q<qnodes.size();++q){
- printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());
- set<int>::iterator rsii;
- for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)
- printf(" %d",(*rsii));
- printf(", and sources-to %d:",qnodes[q]->sources_to.size());
- for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)
- printf(" %d",(*rsii));
- printf("\n");
-}
-
-for(q=0;q<hfta_sets.size();++q){
- if(hfta_sets[q]->do_generation==false)
- continue;
- printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());
- set<int>::iterator rsii;
- for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)
- printf(" %d",(*rsii));
- printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());
- for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)
- printf(" %d",(*rsii));
- printf("\n");
-}
-*/
-
-
-
-// Re-topo sort the hftas
- hfta_topsort.clear();
- workq.clear();
- int hnode_srcs_2[hfta_sets.size()];
- for(i=0;i<hfta_sets.size();++i){
- hnode_srcs_2[i] = 0;
- if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){
- workq.push_back(i);
- }
- }
-
- while(workq.empty() == false){
- int node = workq.front();
- workq.pop_front();
- hfta_topsort.push_back(node);
- set<int>::iterator stsii;
- for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){
- int child = (*stsii);
- hnode_srcs_2[child]++;
- if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){
- workq.push_back(child);
- }
- }
- }
-
-// Ensure that all of the query_node_indices in hfta_sets are topologically
-// sorted, don't rely on assumptions that all transforms maintain some kind of order.
- for(i=0;i<hfta_sets.size();++i){
- if(hfta_sets[i]->do_generation){
- map<int,int> n_accounted;
- vector<int> new_order;
- workq.clear();
- vector<int>::iterator vii;
- for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
- n_accounted[(*vii)]= 0;
- }
- for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){
- set<int>::iterator rfsii;
- for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){
- if(n_accounted.count((*rfsii)) == 0){
- n_accounted[(*vii)]++;
- }
- }
- if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){
- workq.push_back((*vii));
- }
- }
-
- while(workq.empty() == false){
- int node = workq.front();
- workq.pop_front();
- new_order.push_back(node);
- set<int>::iterator stsii;
- for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){
- if(n_accounted.count((*stsii))){
- n_accounted[(*stsii)]++;
- if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){
- workq.push_back((*stsii));
- }
- }
- }
- }
- hfta_sets[i]->query_node_indices = new_order;
- }
- }
-
-
-
-
-
-/// Global checkng is done, start the analysis and translation
-/// of the query parse tree in the order specified by process_order
-
-
-// Get a list of the LFTAs for global lfta optimization
-// TODO: separate building operators from spliting lftas,
-// that will make optimizations such as predicate pushing easier.
- vector<stream_query *> lfta_list;
-
- stream_query *rootq;
-
- int qi,qj;
-
- for(qi=hfta_topsort.size()-1;qi>=0;--qi){
-
- int hfta_id = hfta_topsort[qi];
- vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;
-
-
-
-// Two possibilities, either its a UDOP, or its a collection of queries.
-// if(qnodes[curr_list.back()]->is_udop)
- if(hfta_sets[hfta_id]->is_udop){
- int node_id = curr_list.back();
- int udop_schref = Schema->find_tbl(qnodes[node_id]->file);
- opview_entry *opv = new opview_entry();
-
-// Many of the UDOP properties aren't currently used.
- opv->parent_qname = "no_parent";
- opv->root_name = qnodes[node_id]->name;
- opv->view_name = qnodes[node_id]->file;
- opv->pos = qi;
- sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());
- opv->udop_alias = tmpstr;
- opv->mangler = qnodes[node_id]->mangler;
-
- if(opv->mangler != ""){
- int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);
- Schema->mangle_subq_names(new_udop_schref,opv->mangler);
- }
-
-// This piece of code makes each hfta which referes to the same udop
-// reference a distinct running udop. Do this at query optimization time?
-// fmtbl->set_udop_alias(opv->udop_alias);
-
- opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));
- opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());
-
- vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);
- int s,f,q;
- for(s=0;s<subq.size();++s){
-// Validate that the fields match.
- subquery_spec *sqs = subq[s];
- string subq_name = sqs->name + opv->mangler;
- vector<field_entry *> flds = Schema->get_fields(subq_name);
- if(flds.size() == 0){
- fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());
- return(1);
- }
- if(flds.size() < sqs->types.size()){
- fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());
- return(1);
- }
- bool failed = false;
- for(f=0;f<sqs->types.size();++f){
- data_type dte(sqs->types[f],sqs->modifiers[f]);
- data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());
- if(! dte.subsumes_type(&dtf) ){
- fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());
- failed = true;
- }
-/*
- if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){
- string pstr = dte.get_temporal_string();
- fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);
- failed = true;
- }
-*/
- }
- if(failed)
- return(1);
-/// Validation done, find the subquery, make a copy of the
-/// parse tree, and add it to the return list.
- for(q=0;q<qnodes.size();++q)
- if(qnodes[q]->name == subq_name)
- break;
- if(q==qnodes.size()){
- fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());
- return(1);
- }
-
- }
-
-// Cross-link to from entry(s) in all sourced-to tables.
- set<int>::iterator sii;
- for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){
-//printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());
- vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();
- int ii;
- for(ii=0;ii<tblvars.size();++ii){
- if(tblvars[ii]->schema_name == opv->root_name){
- tblvars[ii]->set_opview_idx(opviews.size());
- }
-
- }
- }
-
- opviews.append(opv);
- }else{
-
-// Analyze the parse trees in this query,
-// put them in rootq
-// vector<int> curr_list = process_sets[qi];
-
-
-////////////////////////////////////////
-
- rootq = NULL;
-//printf("Process set %d, has %d queries\n",qi,curr_list.size());
- for(qj=0;qj<curr_list.size();++qj){
- i = curr_list[qj];
- fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);
-
-// Select the current query parse tree
- table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;
-
-// if hfta only, try to fetch any missing schemas
-// from the registry (using the print_schema program).
-// Here I use a hack to avoid analyzing the query -- all referenced
-// tables must be in the from clause
-// If there is a problem loading any table, just issue a warning,
-//
- tablevar_list_t *fm = fta_parse_tree->get_from();
- vector<string> refd_tbls = fm->get_src_tbls(Schema);
-// iterate over all referenced tables
- int t;
- for(t=0;t<refd_tbls.size();++t){
- int tbl_ref = Schema->get_table_ref(refd_tbls[t]);
-
- if(tbl_ref < 0){ // if this table is not in the Schema
-
- if(hfta_only){
- string cmd="print_schema "+refd_tbls[t];
- FILE *schema_in = popen(cmd.c_str(), "r");
- if(schema_in == NULL){
- fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());
- }else{
- string schema_instr;
- while(fgets(tmpstr,TMPSTRLEN,schema_in)){
- schema_instr += tmpstr;
- }
- fta_parse_result = new fta_parse_t();
- strcpy(tmp_schema_str,schema_instr.c_str());
- FtaParser_setstringinput(tmp_schema_str);
- if(FtaParserparse()){
- fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());
- }else{
- if( fta_parse_result->tables != NULL){
- int tl;
- for(tl=0;tl<fta_parse_result->tables->size();++tl){
- Schema->add_table(fta_parse_result->tables->get_table(tl));
- }
- }else{
- fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());
- }
- }
- }
- }else{
- fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());
- exit(1);
- }
-
- }
- }
-
-
-// Analyze the query.
- query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);
- if(qs == NULL){
- fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());
- exit(1);
- }
-
- stream_query new_sq(qs, Schema);
- if(new_sq.error_code){
- fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());
- exit(1);
- }
-
-// Add it to the Schema
- table_def *output_td = new_sq.get_output_tabledef();
- Schema->add_table(output_td);
-
-// Create a query plan from the analyzed parse tree.
-// If its a query referneced via FROM, add it to the stream query.
- if(rootq){
- rootq->add_query(new_sq);
- }else{
- rootq = new stream_query(new_sq);
-// have the stream query object inherit properties form the analyzed
-// hfta_node object.
- rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);
- rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();
- }
-
-
- }
-
-// This stream query has all its parts
-// Build and optimize it.
-//printf("translate_fta: generating plan.\n");
- if(rootq->generate_plan(Schema)){
- fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());
- continue;
- }
-
-// If we've found the query plan head, so now add the output operators
- if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){
- pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;
- multimap<string, int>::iterator mmsi;
- oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);
- for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){
- rootq->add_output_operator(output_specs[(*mmsi).second]);
- }
- }
-
-
-
-// Perform query splitting if necessary.
- bool hfta_returned;
- vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);
-
- int l;
-//for(l=0;l<split_queries.size();++l){
-//printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());
-//}
-
-
-
-
- if(split_queries.size() > 0){ // should be at least one component.
-
-// Compute the number of LFTAs.
- int n_lfta = split_queries.size();
- if(hfta_returned) n_lfta--;
-
-
-// Process the LFTA components.
- for(l=0;l<n_lfta;++l){
- if(lfta_names.count(split_queries[l]->query_name) == 0){
-// Grab the lfta for global optimization.
- vector<tablevar_t *> tvec = split_queries[l]->query_plan[0]->get_input_tbls();
- string liface = tvec[0]->get_interface();
- string lmach = tvec[0]->get_machine();
- if (lmach == "")
- lmach = hostname;
- interface_names.push_back(liface);
- machine_names.push_back(lmach);
-//printf("Machine is %s\n",lmach.c_str());
-
-// Set the ht size from the recommendation, if there is one in the rec file
- if(lfta_htsize.count(split_queries[l]->query_name)>0){
- split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
- }
-
-
- lfta_names[split_queries[l]->query_name] = lfta_list.size();
- split_queries[l]->set_gid(lfta_list.size()); // set lfta global id
- lfta_list.push_back(split_queries[l]);
- lfta_mach_lists[lmach].push_back(split_queries[l]);
-
-// THe following is a hack,
-// as I should be generating LFTA code through
-// the stream_query object.
- split_queries[l]->query_plan[0]->bind_to_schema(Schema);
-// split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
-
-/*
-// Create query description to embed in lfta.c
- string lfta_schema_str = split_queries[l]->make_schema();
- string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
-
-// get NIC capabilities.
- int erri;
- nic_property *nicprop = NULL;
- vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
- if(iface_codegen_type.size()){
- nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
- if(!nicprop){
- fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
- exit(1);
- }
- }
-
- lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
-*/
-
- snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
- query_names.push_back(split_queries[l]->query_name);
- mach_query_names[lmach].push_back(query_names.size()-1);
-// NOTE: I will assume a 1-1 correspondance between
-// mach_query_names[lmach] and lfta_mach_lists[lmach]
-// where mach_query_names[lmach][i] contains the index into
-// query_names, which names the lfta, and
-// mach_query_names[lmach][i] is the stream_query * of the
-// corresponding lfta.
-// Later, lfta_iface_qnames are the query names matching lfta_iface_lists
-
-
-
- // check if lfta is reusable
- // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters
-
- bool lfta_reusable = false;
- if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||
- split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {
- lfta_reusable = true;
- }
- lfta_reuse_options.push_back(lfta_reusable);
-
- // LFTA will inherit the liveness timeout specification from the containing query
- // it is too conservative as lfta are expected to spend less time per tuple
- // then full query
-
- // extract liveness timeout from query definition
- int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());
- if (!liveness_timeout) {
-// fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",
-// split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
- liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
- }
- lfta_liveness_timeouts.push_back(liveness_timeout);
-
-// Add it to the schema
- table_def *td = split_queries[l]->get_output_tabledef();
- Schema->append_table(td);
-//printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());
-
- }
- }
-
-// If the output is lfta-only, dump out the query name.
- if(split_queries.size() == 1 && !hfta_returned){
- if(output_query_names ){
- fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());
- }
-/*
-else{
- fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());
- }
-*/
-
-/*
-// output schema summary
- if(output_schema_summary){
- dump_summary(split_queries[0]);
- }
-*/
- }
-
-
- if(hfta_returned){ // query also has an HFTA component
- int hfta_nbr = split_queries.size()-1;
-
- hfta_list.push_back(split_queries[hfta_nbr]);
-
-// report on generated query names
- if(output_query_names){
- string hfta_name =split_queries[hfta_nbr]->query_name;
- fprintf(query_name_output,"%s H\n",hfta_name.c_str());
- for(l=0;l<hfta_nbr;++l){
- string lfta_name =split_queries[l]->query_name;
- fprintf(query_name_output,"%s L\n",lfta_name.c_str());
- }
- }
-// else{
-// fprintf(stderr,"query names are ");
-// for(l=0;l<hfta_nbr;++l){
-// if(l>0) fprintf(stderr,",");
-// string fta_name =split_queries[l]->query_name;
-// fprintf(stderr," %s",fta_name.c_str());
-// }
-// fprintf(stderr,"\n");
-// }
- }
-
- }else{
- fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());
- fprintf(stderr,"%s\n",rootq->get_error_str().c_str());
- exit(1);
- }
- }
-}
-
-
-//-----------------------------------------------------------------
-// Compute and propagate the SE in PROTOCOL fields compute a field.
-//-----------------------------------------------------------------
-
-for(i=0;i<lfta_list.size();i++){
- lfta_list[i]->generate_protocol_se(sq_map, Schema);
- sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];
-}
-for(i=0;i<hfta_list.size();i++){
- hfta_list[i]->generate_protocol_se(sq_map, Schema);
- sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];
-}
-
-
-
-//------------------------------------------------------------------------
-// Perform individual FTA optimizations
-//-----------------------------------------------------------------------
-
-if (partitioned_mode) {
-
- // open partition definition file
- string part_fname = config_dir_path + "partition.txt";
-
- FILE* partfd = fopen(part_fname.c_str(), "r");
- if (!partfd) {
- fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());
- exit(1);
- }
- PartnParser_setfileinput(partfd);
- if (PartnParserparse()) {
- fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());
- exit(1);
- }
- fclose(partfd);
-}
-
-
-print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
-
-int num_hfta = hfta_list.size();
-for(i=0; i < hfta_list.size(); ++i){
- hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);
-}
-
-// Add all new hftas to schema
-for(i=num_hfta; i < hfta_list.size(); ++i){
- table_def *td = hfta_list[i]->get_output_tabledef();
- Schema->append_table(td);
-}
-
-print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
-
-
-
-//------------------------------------------------------------------------
-// Do global (cross-fta) optimization
-//-----------------------------------------------------------------------
-
-
-
-
-
-
-set<string> extra_external_libs;
-
-for(i=0;i<hfta_list.size();++i){ // query also has an HFTA component
-
- if(! debug_only){
-// build hfta file name, create output
- if(numeric_hfta_flname){
- sprintf(tmpstr,"hfta_%d",hfta_count);
- hfta_names.push_back(tmpstr);
- sprintf(tmpstr,"hfta_%d.cc",hfta_count);
- }else{
- sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());
- hfta_names.push_back(tmpstr);
- sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());
- }
- FILE *hfta_fl = fopen(tmpstr,"w");
- if(hfta_fl == NULL){
- fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);
- exit(1);
- }
- fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());
-
-// If there is a field verifier, warn about
-// lack of compatability
-// NOTE : this code assumes that visible non-lfta queries
-// are those at the root of a stream query.
- string hfta_comment;
- string hfta_title;
- string hfta_namespace;
- if(hfta_list[i]->defines.count("comment")>0)
- hfta_comment = hfta_list[i]->defines["comment"];
- if(hfta_list[i]->defines.count("Comment")>0)
- hfta_comment = hfta_list[i]->defines["Comment"];
- if(hfta_list[i]->defines.count("COMMENT")>0)
- hfta_comment = hfta_list[i]->defines["COMMENT"];
- if(hfta_list[i]->defines.count("title")>0)
- hfta_title = hfta_list[i]->defines["title"];
- if(hfta_list[i]->defines.count("Title")>0)
- hfta_title = hfta_list[i]->defines["Title"];
- if(hfta_list[i]->defines.count("TITLE")>0)
- hfta_title = hfta_list[i]->defines["TITLE"];
- if(hfta_list[i]->defines.count("namespace")>0)
- hfta_namespace = hfta_list[i]->defines["namespace"];
- if(hfta_list[i]->defines.count("Namespace")>0)
- hfta_namespace = hfta_list[i]->defines["Namespace"];
- if(hfta_list[i]->defines.count("Namespace")>0)
- hfta_namespace = hfta_list[i]->defines["Namespace"];
-
- if(field_verifier != NULL){
- string warning_str;
- if(hfta_comment == "")
- warning_str += "\tcomment not found.\n";
- if(hfta_title == "")
- warning_str += "\ttitle not found.\n";
- if(hfta_namespace == "")
- warning_str += "\tnamespace not found.\n";
-
- vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
- int fi;
- for(fi=0;fi<flds.size();fi++){
- field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
- }
- if(warning_str != "")
- fprintf(stderr,"Warning, in HFTA stream %s:\n%s",
- hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
- }
-
- fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
- if(hfta_comment != "")
- fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
- if(hfta_title != "")
- fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());
- if(hfta_namespace != "")
- fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());
- fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);
- fprintf(qtree_output,"\t\t<Rate value='100' />\n");
-
-// write info about fields to qtree.xml
- vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
- int fi;
- for(fi=0;fi<flds.size();fi++){
- fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
- if(flds[fi]->get_modifier_list()->size()){
- fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
- }
- fprintf(qtree_output," />\n");
- }
-
- // extract liveness timeout from query definition
- int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
- if (!liveness_timeout) {
-// fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",
-// hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
- liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
- }
- fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);
-
- vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();
- int itv;
- for(itv=0;itv<tmp_tv.size();++itv){
- fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());
- }
- string ifrs = hfta_list[i]->collect_refd_ifaces();
- if(ifrs != ""){
- fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());
- }
- fprintf(qtree_output,"\t</HFTA>\n");
-
- fclose(hfta_fl);
- }else{
-// debug only -- do code generation to catch generation-time errors.
- hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);
- }
-
- hfta_count++; // for hfta file names with numeric suffixes
-
- hfta_list[i]->get_external_libs(extra_external_libs);
-
- }
-
-string ext_lib_string;
-set<string>::iterator ssi_el;
-for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)
- ext_lib_string += (*ssi_el)+" ";
-
-
-
-// Report on the set of operator views
- for(i=0;i<opviews.size();++i){
- opview_entry *opve = opviews.get_entry(i);
- fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());
- fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());
- fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());
- fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());
- fprintf(qtree_output,"\t\t<Rate value='100' />\n");
-
- if (!opve->liveness_timeout) {
-// fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",
-// opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);
- opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;
- }
- fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);
- int j;
- for(j=0;j<opve->subq_names.size();j++)
- fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());
- fprintf(qtree_output,"\t</UDOP>\n");
- }
-
-
-//-----------------------------------------------------------------
-
-// Create interface-specific meta code files.
-// first, open and parse the interface resources file.
- ifaces_db = new ifq_t();
- ierr = "";
- if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){
- fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",
- ifx_fname.c_str(), ierr.c_str());
- exit(1);
- }
-
- map<string, vector<stream_query *> >::iterator svsi;
- for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){
- string lmach = (*svsi).first;
-
- // For this machine, create a set of lftas per interface.
- vector<stream_query *> mach_lftas = (*svsi).second;
- map<string, vector<stream_query *> > lfta_iface_lists;
- int li;
- for(li=0;li<mach_lftas.size();++li){
- vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
- string lfta_iface = tvec[0]->get_interface();
- lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
- }
-
- map<string, vector<stream_query *> >::iterator lsvsi;
- for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){
- int erri;
- string liface = (*lsvsi).first;
- vector<stream_query *> iface_lftas = (*lsvsi).second;
- vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
- if(iface_codegen_type.size()){
- nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
- if(!nicprop){
- fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
- exit(1);
- }
- string mcs = generate_nic_code(iface_lftas, nicprop);
- string mcf_flnm;
- if(lmach != "")
- mcf_flnm = lmach + "_"+liface+".mcf";
- else
- mcf_flnm = hostname + "_"+liface+".mcf";
- FILE *mcf_fl ;
- if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){
- fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));
- exit(1);
- }
- fprintf(mcf_fl,"%s",mcs.c_str());
- fclose(mcf_fl);
-//printf("mcs of machine %s, iface %s of type %s is \n%s\n",
-//lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());
- }
- }
-
-
- }
-
-
-
-//-----------------------------------------------------------------
-
-
-// Find common filter predicates in the LFTAs.
-// in addition generate structs to store the temporal attributes unpacked by prefilter
-
- map<string, vector<stream_query *> >::iterator ssqi;
- for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
-
- string lmach = (*ssqi).first;
- bool packed_return = false;
- int li, erri;
-
-
-// The LFTAs of this machine.
- vector<stream_query *> mach_lftas = (*ssqi).second;
-// break up on a per-interface basis.
- map<string, vector<stream_query *> > lfta_iface_lists;
- map<string, vector<int> > lfta_iface_qname_ix; // need the query name
- // for fta_init
- for(li=0;li<mach_lftas.size();++li){
- vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
- string lfta_iface = tvec[0]->get_interface();
- lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
- lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
- }
-
-
-// Are the return values "packed"?
-// This should be done on a per-interface basis.
-// But this is defunct code for gs-lite
- for(li=0;li<mach_lftas.size();++li){
- vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
- string liface = tvec[0]->get_interface();
- vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
- if(iface_codegen_type.size()){
- if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
- packed_return = true;
- }
- }
- }
-
-
-// Separate lftas by interface, collect results on a per-interface basis.
-
- vector<cnf_set *> no_preds; // fallback if there is no prefilter
- map<string, vector<cnf_set *> > prefilter_preds;
- set<unsigned int> pred_ids; // this can be global for all interfaces
- for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
- string liface = (*mvsi).first;
- vector<cnf_set *> empty_list;
- prefilter_preds[liface] = empty_list;
- if(! packed_return){
- get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);
- }
-
-// get NIC capabilities. (Is this needed?)
- nic_property *nicprop = NULL;
- vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
- if(iface_codegen_type.size()){
- nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);
- if(!nicprop){
- fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());
- exit(1);
- }
- }
- }
-
-
-// Now that we know the prefilter preds, generate the lfta code.
-// Do this for all lftas in this machine.
- for(li=0;li<mach_lftas.size();++li){
- set<unsigned int> subsumed_preds;
- set<unsigned int>::iterator sii;
-#ifdef PREFILTER_OK
- for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){
- int pid = (*sii);
- if((pid>>16) == li){
- subsumed_preds.insert(pid & 0xffff);
- }
- }
-#endif
- string lfta_schema_str = mach_lftas[li]->make_schema();
- string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);
- nic_property *nicprop = NULL; // no NIC properties?
- lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);
- }
-
-
-// generate structs to store the temporal attributes
-// unpacked by prefilter
- col_id_set temp_cids;
- get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);
- lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);
-
-// Compute the lfta bit signatures and the lfta colrefs
-// do this on a per-interface basis
-#ifdef PREFILTER_OK
- lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
-#endif
- map<string, vector<long long int> > lfta_sigs; // used again later
- for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
- string liface = (*mvsi).first;
- vector<long long int> empty_list;
- lfta_sigs[liface] = empty_list;
-
- vector<col_id_set> lfta_cols;
- vector<int> lfta_snap_length;
- for(li=0;li<lfta_iface_lists[liface].size();++li){
- unsigned long long int mask=0, bpos=1;
- int f_pos;
- for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){
- if(prefilter_preds[liface][f_pos]->lfta_id.count(li))
- mask |= bpos;
- bpos = bpos << 1;
- }
- lfta_sigs[liface].push_back(mask);
- lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
- lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
- }
-
-//for(li=0;li<mach_lftas.size();++li){
-//printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);
-//col_id_set::iterator tcisi;
-//for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){
-//printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);
-//}
-//}
-
-
-// generate the prefilter
-// Do this on a per-interface basis, except for the #define
-#ifdef PREFILTER_OK
-// lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
- lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
-#else
- lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);
-
-#endif
- }
-
-// Generate interface parameter lookup function
- lfta_val[lmach] += "// lookup interface properties by name\n";
- lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";
- lfta_val[lmach] += "// returns NULL if given property does not exist\n";
- lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";
-
-// collect a lit of interface names used by queries running on this host
- set<std::string> iface_names;
- for(i=0;i<mach_query_names[lmach].size();i++){
- int mi = mach_query_names[lmach][i];
- stream_query *lfta_sq = lfta_mach_lists[lmach][i];
-
- if(interface_names[mi]=="")
- iface_names.insert("DEFAULTDEV");
- else
- iface_names.insert(interface_names[mi]);
- }
-
-// generate interface property lookup code for every interface
- set<std::string>::iterator sir;
- for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {
- if (sir == iface_names.begin())
- lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";
- else
- lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
-
- // iterate through interface properties
- vector<string> iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
- if (erri) {
- fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
- exit(1);
- }
- if (iface_properties.empty())
- lfta_val[lmach] += "\t\treturn NULL;\n";
- else {
- for (int i = 0; i < iface_properties.size(); ++i) {
- if (i == 0)
- lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
- else
- lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";
-
- // combine all values for the interface property using comma separator
- vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
- for (int j = 0; j < vals.size(); ++j) {
- lfta_val[lmach] += "\t\t\treturn \"" + vals[j];
- if (j != vals.size()-1)
- lfta_val[lmach] += ",";
- }
- lfta_val[lmach] += "\";\n";
- }
- lfta_val[lmach] += "\t\t} else\n";
- lfta_val[lmach] += "\t\t\treturn NULL;\n";
- }
- }
- lfta_val[lmach] += "\t} else\n";
- lfta_val[lmach] += "\t\treturn NULL;\n";
- lfta_val[lmach] += "}\n\n";
-
-
-// Generate a full list of FTAs for clearinghouse reference
- lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
- lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
-
- for (i = 0; i < query_names.size(); ++i) {
- if (i)
- lfta_val[lmach] += ", ";
- lfta_val[lmach] += "\"" + query_names[i] + "\"";
- }
- for (i = 0; i < hfta_list.size(); ++i) {
- lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
- }
- lfta_val[lmach] += ", NULL};\n\n";
-
-
-// Add the initialization function to lfta.c
-// Change to accept the interface name, and
-// set the prefilter function accordingly.
-// see the example in demo/err2
- lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
-
-// for(i=0;i<mach_query_names[lmach].size();i++)
-// int mi = mach_query_names[lmach][i];
-// stream_query *lfta_sq = lfta_mach_lists[lmach][i];
-
- for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
- string liface = (*mvsi).first;
- vector<stream_query *> lfta_list = (*mvsi).second;
- for(i=0;i<lfta_list.size();i++){
- stream_query *lfta_sq = lfta_list[i];
- int mi = lfta_iface_qname_ix[liface][i];
-
- fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
-
- string this_iface = "DEFAULTDEV";
- if(interface_names[mi]!="")
- this_iface = '"'+interface_names[mi]+'"';
- lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
- lfta_val[lmach] += "\t\tfta_register(\""+query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
-// if(interface_names[mi]=="")
-// lfta_val[lmach]+="DEFAULTDEV";
-// else
-// lfta_val[lmach]+='"'+interface_names[mi]+'"';
- lfta_val[lmach] += this_iface;
-
-
- lfta_val[lmach] += ", "+generate_alloc_name(query_names[mi])
- +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(query_names[mi])
- +"\n#endif\n";
- sprintf(tmpstr,",%d",snap_lengths[mi]);
- lfta_val[lmach] += tmpstr;
-
-// unsigned long long int mask=0, bpos=1;
-// int f_pos;
-// for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){
-// if(prefilter_preds[f_pos]->lfta_id.count(i))
-// mask |= bpos;
-// bpos = bpos << 1;
-// }
-
-#ifdef PREFILTER_OK
-// sprintf(tmpstr,",%lluull",mask);
- sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);
- lfta_val[lmach]+=tmpstr;
-#else
- lfta_val[lmach] += ",0ull";
-#endif
-
- lfta_val[lmach] += ");\n";
-
-
-
-// End of lfta prefilter stuff
-// --------------------------------------------------
-
-// If there is a field verifier, warn about
-// lack of compatability
- string lfta_comment;
- string lfta_title;
- string lfta_namespace;
- map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();
- if(ldefs.count("comment")>0)
- lfta_comment = lfta_sq->defines["comment"];
- if(ldefs.count("Comment")>0)
- lfta_comment = lfta_sq->defines["Comment"];
- if(ldefs.count("COMMENT")>0)
- lfta_comment = lfta_sq->defines["COMMENT"];
- if(ldefs.count("title")>0)
- lfta_title = lfta_sq->defines["title"];
- if(ldefs.count("Title")>0)
- lfta_title = lfta_sq->defines["Title"];
- if(ldefs.count("TITLE")>0)
- lfta_title = lfta_sq->defines["TITLE"];
- if(ldefs.count("NAMESPACE")>0)
- lfta_namespace = lfta_sq->defines["NAMESPACE"];
- if(ldefs.count("Namespace")>0)
- lfta_namespace = lfta_sq->defines["Namespace"];
- if(ldefs.count("namespace")>0)
- lfta_namespace = lfta_sq->defines["namespace"];
-
- string lfta_ht_size;
- if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")
- lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
- if(ldefs.count("aggregate_slots")>0){
- lfta_ht_size = ldefs["aggregate_slots"];
- }
-
-// NOTE : I'm assuming that visible lftas do not start with _fta.
-// -- will fail for non-visible simple selection queries.
- if(field_verifier != NULL && query_names[mi].substr(0,1) != "_"){
- string warning_str;
- if(lfta_comment == "")
- warning_str += "\tcomment not found.\n";
- if(lfta_title == "")
- warning_str += "\ttitle not found.\n";
- if(lfta_namespace == "")
- warning_str += "\tnamespace not found.\n";
-
- vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
- int fi;
- for(fi=0;fi<flds.size();fi++){
- field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);
- }
- if(warning_str != "")
- fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
- query_names[mi].c_str(),warning_str.c_str());
- }
-
-
-// Create qtree output
- fprintf(qtree_output,"\t<LFTA name='%s' >\n",query_names[mi].c_str());
- if(lfta_comment != "")
- fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
- if(lfta_title != "")
- fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
- if(lfta_namespace != "")
- fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
- if(lfta_ht_size != "")
- fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
- if(lmach != "")
- fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
- else
- fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
- fprintf(qtree_output,"\t\t<Interface value='%s' />\n",interface_names[mi].c_str());
- fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
- fprintf(qtree_output,"\t\t<Rate value='100' />\n");
- fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
-// write info about fields to qtree.xml
- vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
- int fi;
- for(fi=0;fi<flds.size();fi++){
- fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
- if(flds[fi]->get_modifier_list()->size()){
- fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
- }
- fprintf(qtree_output," />\n");
- }
- fprintf(qtree_output,"\t</LFTA>\n");
-
-
- }
- }
-
- for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
- string liface = (*mvsi).first;
- lfta_val[lmach] +=
-" if (!strcmp(device, \""+liface+"\")) \n"
-" lfta_prefilter = &lfta_prefilter_"+liface+"; \n"
-;
- }
- lfta_val[lmach] +=
-" if(lfta_prefilter == NULL){\n"
-" fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"
-" exit(1);\n"
-" }\n"
-;
-
-
-
- lfta_val[lmach] += "}\n\n";
-
- if(!(debug_only || hfta_only) ){
- string lfta_flnm;
- if(lmach != "")
- lfta_flnm = lmach + "_lfta.c";
- else
- lfta_flnm = hostname + "_lfta.c";
- if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){
- fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));
- exit(1);
- }
- fprintf(lfta_out,"%s",lfta_header.c_str());
- fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());
- fprintf(lfta_out,"%s",lfta_val[lmach].c_str());
- fclose(lfta_out);
- }
- }
-
-// Say what are the operators which must execute
- if(opviews.size()>0)
- fprintf(stderr,"The queries use the following external operators:\n");
- for(i=0;i<opviews.size();++i){
- opview_entry *opv = opviews.get_entry(i);
- fprintf(stderr,"\t%s\n",opv->view_name.c_str());
- }
-
- if(create_makefile)
- generate_makefile(input_file_names, nfiles, hfta_names, opviews,
- machine_names, schema_file_name,
- interface_names,
- ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);
-
-
- fprintf(qtree_output,"</QueryNodes>\n");
-
- return(0);
-}
-
-////////////////////////////////////////////////////////////
-
-void generate_makefile(vector<string> &input_file_names, int nfiles,
- vector<string> &hfta_names, opview_set &opviews,
- vector<string> &machine_names,
- string schema_file_name,
- vector<string> &interface_names,
- ifq_t *ifdb, string &config_dir_path,
- bool use_pads,
- string extra_libs,
- map<string, vector<int> > &rts_hload
- ){
- int i,j;
-
- if(config_dir_path != ""){
- config_dir_path = "-C "+config_dir_path;
- }
-
- struct stat sb;
- bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;
- bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;
-
-// if(libz_exists && !libast_exists){
-// fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");
-// exit(1);
-// }
-
-// Get set of operator executable files to run
- set<string> op_fls;
- set<string>::iterator ssi;
- for(i=0;i<opviews.size();++i){
- opview_entry *opv = opviews.get_entry(i);
- if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);
- }
-
- FILE *outfl = fopen("Makefile", "w");
- if(outfl==NULL){
- fprintf(stderr,"Can't open Makefile for write, exiting.\n");
- exit(0);
- }
-
- fputs(
-("CPP= g++ -O3 -g -I "+root_path+"/include -I "+root_path+"/include/hfta\n"
-"CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"
-).c_str(), outfl
-);
- if(generate_stats)
- fprintf(outfl," -DLFTA_STATS");
-
-// Gather the set of interfaces
-// Also, gather "base interface names" for use in computing
-// the hash splitting to virtual interfaces.
-// TODO : must update to hanndle machines
- set<string> ifaces;
- set<string> base_vifaces; // base interfaces of virtual interfaces
- map<string, string> ifmachines;
- map<string, string> ifattrs;
- for(i=0;i<interface_names.size();++i){
- ifaces.insert(interface_names[i]);
- ifmachines[interface_names[i]] = machine_names[i];
-
- size_t Xpos = interface_names[i].find_last_of("X");
- if(Xpos!=string::npos){
- string iface = interface_names[i].substr(0,Xpos);
- base_vifaces.insert(iface);
- }
- // get interface attributes and add them to the list
- }
-
-// Do we need to include protobuf libraries?
- bool use_proto = false;
- int erri;
- string err_str;
- for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
- string ifnm = (*ssi);
- vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
- for(int ift_i=0;ift_i<ift.size();ift_i++){
- if(ift[ift_i]=="PROTO"){
- use_proto = true;
- }
- }
- }
-
- fprintf(outfl,
-"\n"
-"\n"
-"all: rts");
- for(i=0;i<hfta_names.size();++i)
- fprintf(outfl," %s",hfta_names[i].c_str());
- fputs(
-("\n"
-"\n"
-"rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a "+root_path+"/lib/libclearinghouse.a\n"
-"\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);
- if(use_pads)
- fprintf(outfl,"-L. ");
- fputs(
-("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);
- if(use_pads)
- fprintf(outfl,"-lgscppads -lpads ");
- fprintf(outfl,
-"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
- if(use_pads)
- fprintf(outfl, " -lpz -lz -lbz ");
- if(libz_exists && libast_exists)
- fprintf(outfl," -last ");
- if(use_pads)
- fprintf(outfl, " -ldll -ldl ");
- if(use_proto)
- fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
- fprintf(outfl," -lgscpaux");
-#ifdef GCOV
- fprintf(outfl," -fprofile-arcs");
-#endif
- fprintf(outfl,
-"\n"
-"\n"
-"lfta.o: %s_lfta.c\n"
-"\t$(CC) -o lfta.o -c %s_lfta.c\n"
-"\n"
-"%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
- for(i=0;i<nfiles;++i)
- fprintf(outfl," %s",input_file_names[i].c_str());
- if(hostname == ""){
- fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
- }else{
- fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
- }
- for(i=0;i<nfiles;++i)
- fprintf(outfl," %s",input_file_names[i].c_str());
- fprintf(outfl,"\n");
-
- for(i=0;i<hfta_names.size();++i)
- fprintf(outfl,
-("%s: %s.o\n"
-"\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"
-"\n"
-"%s.o: %s.cc\n"
-"\t$(CPP) -o %s.o -c %s.cc\n"
-"\n"
-"\n").c_str(),
- hfta_names[i].c_str(), hfta_names[i].c_str(),
- hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),
- hfta_names[i].c_str(), hfta_names[i].c_str(),
- hfta_names[i].c_str(), hfta_names[i].c_str()
- );
-
- fprintf(outfl,
-("\n"
-"packet_schema.txt:\n"
-"\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
-"\n"
-"external_fcns.def:\n"
-"\tln -s "+root_path+"/cfg/external_fcns.def .\n"
-"\n"
-"clean:\n"
-"\trm -rf core rts *.o %s_lfta.c external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
- for(i=0;i<hfta_names.size();++i)
- fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
- fprintf(outfl,"\n");
-
- fclose(outfl);
-
-
-
-// Gather the set of interfaces
-// TODO : must update to hanndle machines
-// TODO : lookup interface attributes and add them as a parameter to rts process
- outfl = fopen("runit", "w");
- if(outfl==NULL){
- fprintf(stderr,"Can't open runit for write, exiting.\n");
- exit(0);
- }
-
-
- fputs(
-("#!/bin/sh\n"
-"./stopit\n"
-+root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"
-"sleep 5\n"
-"if [ ! -f gshub.log ]\n"
-"then\n"
-"\techo \"Failed to start bin/gshub.py\"\n"
-"\texit -1\n"
-"fi\n"
-"ADDR=`cat gshub.log`\n"
-"ps opgid= $! >> gs.pids\n"
-"./rts $ADDR default ").c_str(), outfl);
-// int erri;
-// string err_str;
- for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
- string ifnm = (*ssi);
- fprintf(outfl, "%s ",ifnm.c_str());
- vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
- for(j=0;j<ifv.size();++j)
- fprintf(outfl, "%s ",ifv[j].c_str());
- }
- fprintf(outfl, " &\n");
- fprintf(outfl, "echo $! >> gs.pids\n");
- for(i=0;i<hfta_names.size();++i)
- fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());
-
- for(j=0;j<opviews.opview_list.size();++j){
- fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());
- }
-
- fclose(outfl);
- system("chmod +x runit");
-
- outfl = fopen("stopit", "w");
- if(outfl==NULL){
- fprintf(stderr,"Can't open stopit for write, exiting.\n");
- exit(0);
- }
-
- fprintf(outfl,"#!/bin/sh\n"
-"rm -f gshub.log\n"
-"if [ ! -f gs.pids ]\n"
-"then\n"
-"exit\n"
-"fi\n"
-"for pgid in `cat gs.pids`\n"
-"do\n"
-"kill -TERM -$pgid\n"
-"done\n"
-"sleep 1\n"
-"for pgid in `cat gs.pids`\n"
-"do\n"
-"kill -9 -$pgid\n"
-"done\n"
-"rm gs.pids\n");
-
- fclose(outfl);
- system("chmod +x stopit");
-
-//-----------------------------------------------
-
-/* For now disable support for virtual interfaces
- outfl = fopen("set_vinterface_hash.bat", "w");
- if(outfl==NULL){
- fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");
- exit(0);
- }
-
-// The format should be determined by an entry in the ifres.xml file,
-// but for now hardcode the only example I have.
- for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){
- if(rts_hload.count((*ssi))){
- string iface_name = (*ssi);
- string iface_number = "";
- for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){
- if(isdigit(iface_name[j])){
- iface_number = iface_name[j];
- if(j>0 && isdigit(iface_name[j-1]))
- iface_number = iface_name[j-1] + iface_number;
- }
- }
-
- fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());
- vector<int> halloc = rts_hload[iface_name];
- int prev_limit = 0;
- for(j=0;j<halloc.size();++j){
- if(j>0)
- fprintf(outfl,":");
- fprintf(outfl,"%d-%d",prev_limit,halloc[j]);
- prev_limit = halloc[j];
- }
- fprintf(outfl,"\n");
- }
- }
- fclose(outfl);
- system("chmod +x set_vinterface_hash.bat");
-*/
-}
-
-// Code for implementing a local schema
-/*
- table_list qpSchema;
-
-// Load the schemas of any LFTAs.
- int l;
- for(l=0;l<hfta_nbr;++l){
- stream_query *sq0 = split_queries[l];
- table_def *td = sq0->get_output_tabledef();
- qpSchema.append_table(td);
- }
-// load the schemas of any other ref'd tables.
-// (e.g., hftas)
- vector<tablevar_t *> input_tbl_names = split_queries[hfta_nbr]->get_input_tables();
- int ti;
- for(ti=0;ti<input_tbl_names.size();++ti){
- int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());
- if(tbl_ref < 0){
- tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());
- if(tbl_ref < 0){
- fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());
- exit(1);
- }
- qpSchema.append_table(Schema->get_table(tbl_ref));
- }
- }
-*/
-
-// Functions related to parsing.
-
-/*
-static int split_string(char *instr,char sep, char **words,int max_words){
- char *loc;
- char *str;
- int nwords = 0;
-
- str = instr;
- words[nwords++] = str;
- while( (loc = strchr(str,sep)) != NULL){
- *loc = '\0';
- str = loc+1;
- if(nwords >= max_words){
- fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);
- nwords = max_words-1;
- }
- words[nwords++] = str;
- }
-
- return(nwords);
-}
-
-*/
-
+/* ------------------------------------------------\r
+Copyright 2014 AT&T Intellectual Property\r
+ Licensed under the Apache License, Version 2.0 (the "License");\r
+ you may not use this file except in compliance with the License.\r
+ You may obtain a copy of the License at\r
+\r
+ http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+ ------------------------------------------- */\r
+\r
+#include<unistd.h> // for gethostname\r
+\r
+#include <string>\r
+#include "parse_fta.h"\r
+#include "parse_schema.h"\r
+#include "parse_ext_fcns.h"\r
+#include"analyze_fta.h"\r
+#include"query_plan.h"\r
+#include"generate_lfta_code.h"\r
+#include"stream_query.h"\r
+#include"generate_utils.h"\r
+#include"nic_def.h"\r
+#include"generate_nic_code.h"\r
+\r
+#include <stdlib.h>\r
+#include <stdio.h>\r
+#include<ctype.h>\r
+#include<glob.h>\r
+#include<string.h>\r
+\r
+#include<list>\r
+\r
+// for the scandir\r
+ #include <sys/types.h>\r
+ #include <dirent.h>\r
+\r
+\r
+#include<errno.h>\r
+\r
+// to verify that some files exist.\r
+ #include <sys/types.h>\r
+ #include <sys/stat.h>\r
+\r
+#include "parse_partn.h"\r
+\r
+#include "print_plan.h"\r
+\r
+// Interface to the xml parser\r
+\r
+#include"xml_t.h"\r
+#include"field_list.h"\r
+\r
+extern int xmlParserparse(void);\r
+extern FILE *xmlParserin;\r
+extern int xmlParserdebug;\r
+\r
+std::vector<std::string> xml_attr_vec;\r
+std::vector<std::string> xml_val_vec;\r
+std::string xml_a, xml_v;\r
+xml_t *xml_leaves = NULL;\r
+\r
+// Interface to the field list verifier\r
+field_list *field_verifier = NULL;\r
+\r
+#define TMPSTRLEN 1000\r
+\r
+#ifndef PATH_DELIM\r
+ #define PATH_DELIM '/'\r
+#endif\r
+\r
+char tmp_schema_str[10000];\r
+\r
+// maximum delay between two hearbeats produced\r
+// by UDOP. Used when its not explicity\r
+// provided in udop definition\r
+#define DEFAULT_UDOP_LIVENESS_TIMEOUT 5\r
+\r
+// Default lfta hash table size, must be power of 2.\r
+int DEFAULT_LFTA_HASH_TABLE_SIZE = 4096;\r
+\r
+// Interface to FTA definition lexer and parser ...\r
+\r
+extern int FtaParserparse(void);\r
+extern FILE *FtaParserin;\r
+extern int FtaParserdebug;\r
+\r
+fta_parse_t *fta_parse_result;\r
+var_defs_t *fta_parse_defines;\r
+\r
+\r
+\r
+// Interface to external function lexer and parser ...\r
+\r
+extern int Ext_fcnsParserparse(void);\r
+extern FILE *Ext_fcnsParserin;\r
+extern int Ext_fcnsParserdebug;\r
+\r
+ext_fcn_list *Ext_fcns;\r
+\r
+\r
+// Interface to partition definition parser\r
+extern int PartnParserparse();\r
+partn_def_list_t *partn_parse_result = NULL;\r
+\r
+\r
+\r
+using namespace std;\r
+//extern int errno;\r
+\r
+\r
+// forward delcaration of local utility function\r
+void generate_makefile(vector<string> &input_file_names, int nfiles,\r
+ vector<string> &hfta_names, opview_set &opviews,\r
+ vector<string> &machine_names,\r
+ string schema_file_name,\r
+ vector<string> &interface_names,\r
+ ifq_t *ifdb, string &config_dir_path,\r
+ bool use_pads,\r
+ string extra_libs,\r
+ map<string, vector<int> > &rts_hload\r
+ );\r
+\r
+//static int split_string(char *instr,char sep, char **words,int max_words);\r
+#define MAXFLDS 100\r
+\r
+ FILE *schema_summary_output = NULL; // query names\r
+\r
+// Dump schema summary\r
+void dump_summary(stream_query *str){\r
+ fprintf(schema_summary_output,"%s\n",str->query_name.c_str());\r
+\r
+ table_def *sch = str->get_output_tabledef();\r
+\r
+ vector<field_entry *> flds = sch->get_fields();\r
+ int f;\r
+ for(f=0;f<flds.size();++f){\r
+ if(f>0) fprintf(schema_summary_output,"|");\r
+ fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());\r
+ }\r
+ fprintf(schema_summary_output,"\n");\r
+ for(f=0;f<flds.size();++f){\r
+ if(f>0) fprintf(schema_summary_output,"|");\r
+ fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());\r
+ }\r
+ fprintf(schema_summary_output,"\n");\r
+}\r
+\r
+// Globals\r
+string hostname; // name of current host.\r
+int hostname_len;\r
+bool generate_stats = false;\r
+string root_path = "../..";\r
+\r
+\r
+int main(int argc, char **argv){\r
+ char tmpstr[TMPSTRLEN];\r
+ string err_str;\r
+ int q,s,h,f;\r
+\r
+ set<int>::iterator si;\r
+\r
+ vector<string> query_names; // for lfta.c registration\r
+ map<string, vector<int> > mach_query_names; // list queries of machine\r
+ vector<int> snap_lengths; // for lfta.c registration\r
+ vector<string> interface_names; // for lfta.c registration\r
+ vector<string> machine_names; // machine of interface\r
+ vector<bool> lfta_reuse_options; // for lfta.c registration\r
+ vector<int> lfta_liveness_timeouts; // fot qtree.xml generation\r
+ vector<string> hfta_names; // hfta cource code names, for\r
+ // creating make file.\r
+ vector<string> qnames; // ensure unique names\r
+ map<string, int> lfta_names; // keep track of unique lftas.\r
+\r
+\r
+// set these to 1 to debug the parser\r
+ FtaParserdebug = 0;\r
+ Ext_fcnsParserdebug = 0;\r
+\r
+ FILE *lfta_out; // lfta.c output.\r
+ FILE *fta_in; // input file\r
+ FILE *table_schemas_in; // source tables definition file\r
+ FILE *query_name_output; // query names\r
+ FILE *qtree_output; // interconnections of query nodes\r
+\r
+ // -------------------------------\r
+ // Handling of Input Arguments\r
+ // -------------------------------\r
+ char optstr[] = "BDpLC:l:HNQMf:PSh:n:cR:";\r
+ const char *usage_str = "Usage: %s [-B] [-D] [-p] [-L] [-N] [-H] [-Q] [-M] [-C <config directory>] [-l <library directory>] [-f] [-P] [-S] [-n n_virtual_interfaces] [-h hostname] [-R root_path] [schema_file] input_file [input file ...]\n"\r
+ "\t[-B] : debug only (don't create output files)\n"\r
+ "\t[-D] : distributed mode (will use cluster.ifq instead of local .ifq file)\n"\r
+ "\t[-p] : partitioned mode (will use partition information in partition.txt to drive query optimization)\n"\r
+ "\t[-L] : use live_hosts.txt file to restrict queries to a set of live hosts\n"\r
+ "\t[-C] : use <config directory> for definition files\n"\r
+ "\t[-l] : use <library directory> for library queries\n"\r
+ "\t[-N] : output query names in query_names.txt\n"\r
+ "\t[-H] : create HFTA only (no schema_file)\n"\r
+ "\t[-Q] : use query name for hfta suffix\n"\r
+ "\t[-M] : generate make file and runit, stopit scripts\n"\r
+ "\t[-S] : enable LFTA statistics (alters Makefile).\n"\r
+ "\t[-f] : Output schema summary to schema_summary.txt\n"\r
+ "\t[-P] : link with PADS\n"\r
+ "\t[-h] : override host name.\n"\r
+ "\t[-c] : clean out Makefile and hfta_*.cc first.\n"\r
+ "\t[-R] : path to root of GS-lite\n"\r
+;\r
+\r
+// parameters gathered from command line processing\r
+ string external_fcns_path;\r
+// string internal_fcn_path;\r
+ string config_dir_path;\r
+ string library_path = "./";\r
+ vector<string> input_file_names;\r
+ string schema_file_name;\r
+ bool debug_only = false;\r
+ bool hfta_only = false;\r
+ bool output_query_names = false;\r
+ bool output_schema_summary=false;\r
+ bool numeric_hfta_flname = true;\r
+ bool create_makefile = false;\r
+ bool distributed_mode = false;\r
+ bool partitioned_mode = false;\r
+ bool use_live_hosts_file = false;\r
+ bool use_pads = false;\r
+ bool clean_make = false;\r
+ int n_virtual_interfaces = 1;\r
+\r
+ char chopt;\r
+ while((chopt = getopt(argc,argv,optstr)) != -1){\r
+ switch(chopt){\r
+ case 'B':\r
+ debug_only = true;\r
+ break;\r
+ case 'D':\r
+ distributed_mode = true;\r
+ break;\r
+ case 'p':\r
+ partitioned_mode = true;\r
+ break;\r
+ case 'L':\r
+ use_live_hosts_file = true;\r
+ break;\r
+ case 'C':\r
+ if(optarg != NULL)\r
+ config_dir_path = string(optarg) + string("/");\r
+ break;\r
+ case 'l':\r
+ if(optarg != NULL)\r
+ library_path = string(optarg) + string("/");\r
+ break;\r
+ case 'N':\r
+ output_query_names = true;\r
+ break;\r
+ case 'Q':\r
+ numeric_hfta_flname = false;\r
+ break;\r
+ case 'H':\r
+ if(schema_file_name == ""){\r
+ hfta_only = true;\r
+ }\r
+ break;\r
+ case 'f':\r
+ output_schema_summary=true;\r
+ break;\r
+ case 'M':\r
+ create_makefile=true;\r
+ break;\r
+ case 'S':\r
+ generate_stats=true;\r
+ break;\r
+ case 'P':\r
+ use_pads = true;\r
+ break;\r
+ case 'c':\r
+ clean_make = true;\r
+ break;\r
+ case 'h':\r
+ if(optarg != NULL)\r
+ hostname = optarg;\r
+ break;\r
+ case 'R':\r
+ if(optarg != NULL)\r
+ root_path = optarg;\r
+ break;\r
+ case 'n':\r
+ if(optarg != NULL){\r
+ n_virtual_interfaces = atoi(optarg);\r
+ if(n_virtual_interfaces < 1 || n_virtual_interfaces > 128){\r
+ fprintf(stderr,"Warning, %d virtual interfaces specified, valid values are between 1 and 128 inclusive\n",n_virtual_interfaces);\r
+ n_virtual_interfaces = 1;\r
+ }\r
+ }\r
+ break;\r
+ case '?':\r
+ fprintf(stderr,"Error, argument %c not recognized.\n",optopt);\r
+ fprintf(stderr,"%s\n", usage_str);\r
+ exit(1);\r
+ default:\r
+ fprintf(stderr, "Argument was %c\n", optopt);\r
+ fprintf(stderr,"Invalid arguments\n");\r
+ fprintf(stderr,"%s\n", usage_str);\r
+ exit(1);\r
+ }\r
+ }\r
+ argc -= optind;\r
+ argv += optind;\r
+ for (int i = 0; i < argc; ++i) {\r
+ if((schema_file_name == "") && !hfta_only){\r
+ schema_file_name = argv[i];\r
+ }else{\r
+ input_file_names.push_back(argv[i]);\r
+ }\r
+ }\r
+\r
+ if(input_file_names.size() == 0){\r
+ fprintf(stderr,"%s\n", usage_str);\r
+ exit(1);\r
+ }\r
+\r
+ if(clean_make){\r
+ string clean_cmd = "rm Makefile hfta_*.cc";\r
+ int clean_ret = system(clean_cmd.c_str());\r
+ if(clean_ret){\r
+ fprintf(stderr,"Warning, return value %d when trying to clean out old source files.\n", clean_ret);\r
+ }\r
+ }\r
+\r
+\r
+ nic_prop_db *npdb = new nic_prop_db(config_dir_path);\r
+\r
+// Open globally used file names.\r
+\r
+ // prepend config directory to schema file\r
+ schema_file_name = config_dir_path + schema_file_name;\r
+ external_fcns_path = config_dir_path + string("external_fcns.def");\r
+ string ifx_fname = config_dir_path + string("ifres.xml");\r
+\r
+// Find interface query file(s).\r
+ if(hostname == ""){\r
+ gethostname(tmpstr,TMPSTRLEN);\r
+ hostname = tmpstr;\r
+ }\r
+ hostname_len = strlen(tmpstr);\r
+ string ifq_fname = config_dir_path + (distributed_mode ? "cluster" : hostname) + string(".ifq");\r
+ vector<string> ifq_fls;\r
+\r
+ ifq_fls.push_back(ifq_fname);\r
+\r
+\r
+// Get the field list, if it exists\r
+ string flist_fl = config_dir_path + "field_list.xml";\r
+ FILE *flf_in = NULL;\r
+ if((flf_in = fopen(flist_fl.c_str(), "r")) != NULL) {\r
+ fprintf(stderr,"Found field list file %s\n",flist_fl.c_str());\r
+ xml_leaves = new xml_t();\r
+ xmlParser_setfileinput(flf_in);\r
+ if(xmlParserparse()){\r
+ fprintf(stderr,"WARNING, could not parse field list file %s\n",flist_fl.c_str());\r
+ }else{\r
+ field_verifier = new field_list(xml_leaves);\r
+ }\r
+ }\r
+\r
+ if(!hfta_only){\r
+ if((table_schemas_in = fopen(schema_file_name.c_str(), "r")) == NULL) {\r
+ fprintf(stderr,"Can't open schema file %s\n%s\n",schema_file_name.c_str(),strerror(errno));\r
+ exit(1);\r
+ }\r
+ }\r
+\r
+/*\r
+ if(!(debug_only || hfta_only)){\r
+ if((lfta_out = fopen("lfta.c","w")) == NULL){\r
+ fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));\r
+ exit(1);\r
+ }\r
+ }\r
+*/\r
+\r
+// Get the output specification file.\r
+// format is\r
+// query, operator, operator_param, directory, bucketwidth, partitioning_se, n_partitions\r
+ string ospec_fl = "output_spec.cfg";\r
+ FILE *osp_in = NULL;\r
+ vector<ospec_str *> output_specs;\r
+ multimap<string, int> qname_to_ospec;\r
+ if((osp_in = fopen(ospec_fl.c_str(), "r")) != NULL) {\r
+ char *flds[MAXFLDS];\r
+ int o_lineno = 0;\r
+ while(fgets(tmpstr,TMPSTRLEN,osp_in)){\r
+ o_lineno++;\r
+ int nflds = split_string(tmpstr,',',flds,MAXFLDS);\r
+ if(nflds == 7){\r
+// make operator type lowercase\r
+ char *tmpc;\r
+ for(tmpc=flds[1];*tmpc!='\0';++tmpc)\r
+ *tmpc = tolower(*tmpc);\r
+\r
+ ospec_str *tmp_ospec = new ospec_str();\r
+ tmp_ospec->query = flds[0];\r
+ tmp_ospec->operator_type = flds[1];\r
+ tmp_ospec->operator_param = flds[2];\r
+ tmp_ospec->output_directory = flds[3];\r
+ tmp_ospec->bucketwidth = atoi(flds[4]);\r
+ tmp_ospec->partitioning_flds = flds[5];\r
+ tmp_ospec->n_partitions = atoi(flds[6]);\r
+ qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));\r
+ output_specs.push_back(tmp_ospec);\r
+ }else{\r
+ fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);\r
+ }\r
+ }\r
+ fclose(osp_in);\r
+ }else{\r
+ fprintf(stderr,"output_spec.cfg not found. The query set has no output. exiting.\n");\r
+ exit(1);\r
+ }\r
+\r
+// hfta parallelism\r
+ string pspec_fl = "hfta_parallelism.cfg";\r
+ FILE *psp_in = NULL;\r
+ map<string, int> hfta_parallelism;\r
+ if((psp_in = fopen(pspec_fl.c_str(), "r")) != NULL){\r
+ char *flds[MAXFLDS];\r
+ int o_lineno = 0;\r
+ while(fgets(tmpstr,TMPSTRLEN,psp_in)){\r
+ bool good_entry = true;\r
+ o_lineno++;\r
+ int nflds = split_string(tmpstr,',',flds,MAXFLDS);\r
+ if(nflds == 2){\r
+ string hname = flds[0];\r
+ int par = atoi(flds[1]);\r
+ if(par <= 0 || par > n_virtual_interfaces){\r
+ fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must be between 1 and %d\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);\r
+ good_entry = false;\r
+ }\r
+ if(good_entry && n_virtual_interfaces % par != 0){\r
+ fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, parallelism is %d, must divide the number of virtual interfaces (%d), ignoring.\n",o_lineno,pspec_fl.c_str(),par,n_virtual_interfaces);\r
+ good_entry = false;\r
+ }\r
+ if(good_entry)\r
+ hfta_parallelism[hname] = par;\r
+ }\r
+ }\r
+ }else{\r
+ fprintf(stderr,"WARNING, no file %s found, using single copies of hftas.\n",pspec_fl.c_str());\r
+ }\r
+\r
+\r
+// LFTA hash table sizes\r
+ string htspec_fl = "lfta_htsize.cfg";\r
+ FILE *htsp_in = NULL;\r
+ map<string, int> lfta_htsize;\r
+ if((htsp_in = fopen(htspec_fl.c_str(), "r")) != NULL){\r
+ char *flds[MAXFLDS];\r
+ int o_lineno = 0;\r
+ while(fgets(tmpstr,TMPSTRLEN,htsp_in)){\r
+ bool good_entry = true;\r
+ o_lineno++;\r
+ int nflds = split_string(tmpstr,',',flds,MAXFLDS);\r
+ if(nflds == 2){\r
+ string lfta_name = flds[0];\r
+ int htsz = atoi(flds[1]);\r
+ if(htsz>0){\r
+ lfta_htsize[lfta_name] = htsz;\r
+ }else{\r
+ fprintf(stderr,"Warning, line %d of %s is incorrectly formatted, htsize is %d, must be larger than 0.\n",o_lineno,htspec_fl.c_str(),htsz);\r
+ }\r
+ }\r
+ }\r
+ }else{\r
+ fprintf(stderr,"WARNING, no file %s found, using default LFTA hash table sizes.\n",htspec_fl.c_str());\r
+ }\r
+\r
+// LFTA vitual interface hash split\r
+ string rtlspec_fl = "rts_load.cfg";\r
+ FILE *rtl_in = NULL;\r
+ map<string, vector<int> > rts_hload;\r
+ if((rtl_in = fopen(rtlspec_fl.c_str(), "r")) != NULL){\r
+ char *flds[MAXFLDS];\r
+ int r_lineno = 0;\r
+ string iface_name;\r
+ vector<int> hload;\r
+ while(fgets(tmpstr,TMPSTRLEN,rtl_in)){\r
+ bool good_entry = true;\r
+ r_lineno++;\r
+ iface_name = "";\r
+ hload.clear();\r
+ int nflds = split_string(tmpstr,',',flds,MAXFLDS);\r
+ if(nflds >1){\r
+ iface_name = flds[0];\r
+ int cumm_h = 0;\r
+ int j;\r
+ for(j=1;j<nflds;++j){\r
+ int h = atoi(flds[j]);\r
+ if(h<=0)\r
+ good_entry = false;\r
+ cumm_h += h;\r
+ hload.push_back(cumm_h);\r
+ }\r
+ }else{\r
+ good_entry = false;\r
+ }\r
+ if(good_entry){\r
+ rts_hload[iface_name] = hload;\r
+ }else{\r
+ fprintf(stderr,"Warning, line %d of %s is malformed, skipping.\n",r_lineno, rtlspec_fl.c_str());\r
+ }\r
+ }\r
+ }\r
+\r
+\r
+\r
+ if(output_query_names){\r
+ if((query_name_output = fopen("query_names.txt","w")) == NULL){\r
+ fprintf(stderr,"Can't open output file %s\n%s\n","query_names.txt",strerror(errno));\r
+ exit(1);\r
+ }\r
+ }\r
+\r
+ if(output_schema_summary){\r
+ if((schema_summary_output = fopen("schema_summary.txt","w")) == NULL){\r
+ fprintf(stderr,"Can't open output file %s\n%s\n","schema_summary.txt",strerror(errno));\r
+ exit(1);\r
+ }\r
+ }\r
+\r
+ if((qtree_output = fopen("qtree.xml","w")) == NULL){\r
+ fprintf(stderr,"Can't open output file %s\n%s\n","qtree.xml",strerror(errno));\r
+ exit(1);\r
+ }\r
+ fprintf(qtree_output,"<?xml version='1.0' encoding='ISO-8859-1'?>\n");\r
+ fprintf(qtree_output,"<?xml-stylesheet type='text/xsl' href='qtree.xsl'?>\n");\r
+ fprintf(qtree_output,"<QueryNodes>\n");\r
+\r
+\r
+// Get an initial Schema\r
+ table_list *Schema;\r
+ if(!hfta_only){\r
+// Parse the table schema definitions.\r
+ fta_parse_result = new fta_parse_t();\r
+ FtaParser_setfileinput(table_schemas_in);\r
+ if(FtaParserparse()){\r
+ fprintf(stderr,"Table schema parse failed.\n");\r
+ exit(1);\r
+ }\r
+ if(fta_parse_result->parse_type != TABLE_PARSE){\r
+ fprintf(stderr,"ERROR, file %s is not a table definition file.\n",schema_file_name.c_str());\r
+ exit(1);\r
+ }\r
+ Schema = fta_parse_result->tables;\r
+\r
+// Process schema field inheritance\r
+ int retval;\r
+ retval = Schema->unroll_tables(err_str);\r
+ if(retval){\r
+ fprintf(stderr,"Error processing schema filed inheritance:\n %s\n", err_str.c_str() );\r
+ exit(1);\r
+ }\r
+ }else{\r
+// hfta only => we will try to fetch schemas from the registry.\r
+// therefore, start off with an empty schema.\r
+ Schema = new table_list();\r
+ }\r
+\r
+\r
+// Open and parse the external functions file.\r
+ Ext_fcnsParserin = fopen(external_fcns_path.c_str(),"r");\r
+ if(Ext_fcnsParserin == NULL){\r
+ fprintf(stderr,"Warning, can't find external functions definition file (external_fcns.def).\n");\r
+ Ext_fcns = new ext_fcn_list();\r
+ }else{\r
+ if(Ext_fcnsParserparse()){\r
+ fprintf(stderr,"Warning, error parsing external functions definition file (external_fcns.def).\n");\r
+ Ext_fcns = new ext_fcn_list();\r
+ }\r
+ }\r
+ if(Ext_fcns->validate_fcns(err_str)){\r
+ fprintf(stderr,"Error processing external functions definition file (external_fcns.def):\n%s\n",err_str.c_str());\r
+ exit(1);\r
+ }\r
+\r
+// Open and parse the interface resources file.\r
+// ifq_t *ifaces_db = new ifq_t();\r
+// string ierr;\r
+// if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){\r
+// fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",\r
+// ifx_fname.c_str(), ierr.c_str());\r
+// exit(1);\r
+// }\r
+// if(ifaces_db->load_ifqs(ifq_fname, ierr)){\r
+// fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",\r
+// ifq_fname.c_str(), ierr.c_str());\r
+// exit(1);\r
+// }\r
+\r
+\r
+// The LFTA code string.\r
+// Put the standard preamble here.\r
+// NOTE: the hash macros, fcns should go into the run time\r
+ map<string, string> lfta_val;\r
+ map<string, string> lfta_prefilter_val;\r
+\r
+ string lfta_header =\r
+"#include <limits.h>\n\n"\r
+"#include \"rts.h\"\n"\r
+"#include \"fta.h\"\n"\r
+"#include \"lapp.h\"\n"\r
+"#include \"rts_udaf.h\"\n\n"\r
+;\r
+// Get any locally defined parsing headers\r
+ glob_t glob_result;\r
+ memset(&glob_result, 0, sizeof(glob_result));\r
+\r
+ // do the glob operation\r
+ int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);\r
+ if(return_value == 0){\r
+ for(size_t i = 0; i < glob_result.gl_pathc; ++i) {\r
+ char *flds[1000];\r
+ int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);\r
+ lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n\n";\r
+ }\r
+ }else{\r
+ fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");\r
+ }\r
+\r
+/*\r
+"#define IS_FILLED(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> ((bucket & 15)<<1)))\n"\r
+"#define IS_NEW(bitmap,bucket) (bitmap[bucket >> 4] & (0x80000000 >> (((bucket & 15) << 1) + 1)))\n"\r
+"#define SET_EMPTY(bitmap,bucket) (bitmap[bucket >> 4] &= (~(0x80000000 >> ((bucket & 15)<<1))))\n"\r
+"#define SET_FILLED_AND_NEW(bitmap,bucket) (bitmap[bucket >> 4] |= (0xC0000000 >> ((bucket & 15)<<1)))\n"\r
+*/\r
+\r
+ lfta_header += \r
+"\n"\r
+"gs_uint64_t (*lfta_prefilter)(void *pkt) = NULL; // set at fta_init\n"\r
+"\n"\r
+"#define SLOT_FILLED 0x04\n"\r
+"#define SLOT_GEN_BITS 0x03\n"\r
+"#define SLOT_HASH_BITS 0xfffffff8\n"\r
+"#define SET_BF_BIT(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num)) + (bf_index)) >> 3] |= (0x80 >> ((((bucket) * (bf_num)) + (bf_index)) & 7)))\n"\r
+"#define IS_BF_SET(bf_table, bf_num, bf_index, bucket) (bf_table[(((bucket) * (bf_num))+(bf_index)) >> 3] & (0x80 >> ((((bucket) * (bf_num))+(bf_index)) & 7)))\n"\r
+"#define SET_BF_EMPTY(bf_table, bf_num, bf_index, bucket) (bf_table[((bucket * bf_num)+bf_index) >> 3] &= (~0x80 >> (((bucket * bf_num)+bf_index) & 7)))\n"\r
+"\n\n"\r
+\r
+"#define lfta_BOOL_to_hash(x) (x)\n"\r
+"#define lfta_USHORT_to_hash(x) (x)\n"\r
+"#define lfta_UINT_to_hash(x) (x)\n"\r
+"#define lfta_IP_to_hash(x) (x)\n"\r
+"#define lfta_IPV6_to_hash(x) ( (x.v[0]) ^ (x.v[1]) ^ (x.v[2]) ^ (x.v[3]))\n"\r
+"#define lfta_INT_to_hash(x) (gs_uint32_t)(x)\n"\r
+"#define lfta_ULLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"\r
+"#define lfta_LLONG_to_hash(x) ( (( (x) >>32)&0xffffffff) ^ ((x)&0xffffffff) )\n"\r
+"#define lfta_FLOAT_to_hash(x) ( (( ((gs_uint64_t)(x)) >>32)&0xffffffff) ^ (((gs_uint64_t)(x))&0xffffffff) )\n"\r
+"static gs_uint32_t lfta_V_STR_to_hash(struct gs_string x){\n"\r
+" gs_uint32_t i,ret=0,tmp_sum = 0;\n"\r
+" for(i=0;i<x.length;++i){\n"\r
+" tmp_sum |= (x.data[i]) << (8*(i%4));\n"\r
+" if((i%4) == 3){\n"\r
+" ret ^= tmp_sum;\n"\r
+" tmp_sum = 0;\n"\r
+" }\n"\r
+" }\n"\r
+" if((i%4)!=0) ret ^=tmp_sum;\n"\r
+" return(ret);\n"\r
+"}\n\n\n";\r
+\r
+\r
+\r
+//////////////////////////////////////////////////////////////////\r
+///// Get all of the query parse trees\r
+\r
+\r
+ int i,p;\r
+ int hfta_count = 0; // for numeric suffixes to hfta .cc files\r
+\r
+//---------------------------\r
+// Global info needed for post processing.\r
+\r
+// Set of operator views ref'd in the query set.\r
+ opview_set opviews;\r
+// lftas on a per-machine basis.\r
+ map<string, vector<stream_query *> > lfta_mach_lists;\r
+ int nfiles = input_file_names.size();\r
+ vector<stream_query *> hfta_list; // list of hftas.\r
+ map<string, stream_query *> sq_map; // map from query name to stream query.\r
+\r
+\r
+//////////////////////////////////////////\r
+\r
+// Open and parse the interface resources file.\r
+ ifq_t *ifaces_db = new ifq_t();\r
+ string ierr;\r
+ if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){\r
+ fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",\r
+ ifx_fname.c_str(), ierr.c_str());\r
+ exit(1);\r
+ }\r
+ if(ifaces_db->load_ifqs(ifq_fls[0], ierr)){\r
+ fprintf(stderr,"ERROR, can't load interface query file %s :\n%s",\r
+ ifq_fls[0].c_str(), ierr.c_str());\r
+ exit(1);\r
+ }\r
+\r
+ map<string, string> qname_to_flname; // for detecting duplicate query names\r
+\r
+\r
+\r
+// Parse the files to create a vector of parse trees.\r
+// Load qnodes with information to perform a topo sort\r
+// based on query dependencies.\r
+ vector<query_node *> qnodes; // for topo sort.\r
+ map<string,int> name_node_map; // map query name to qnodes entry\r
+ for(i=0;i<input_file_names.size();i++){\r
+\r
+ if((fta_in = fopen(input_file_names[i].c_str(), "r")) == NULL) {\r
+ fprintf(stderr,"Can't open input file %s\n%s",input_file_names[i].c_str(),strerror(errno));\r
+ continue;\r
+ }\r
+fprintf(stderr,"Parsing file %s\n",input_file_names[i].c_str());\r
+\r
+// Parse the FTA query\r
+ fta_parse_result = new fta_parse_t();\r
+ FtaParser_setfileinput(fta_in);\r
+ if(FtaParserparse()){\r
+ fprintf(stderr,"FTA parse failed.\n");\r
+ exit(1);\r
+ }\r
+ if(fta_parse_result->parse_type != QUERY_PARSE){\r
+ fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());\r
+ exit(1);\r
+ }\r
+\r
+// returns a list of parse trees\r
+ vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;\r
+ for(p=0;p<qlist.size();++p){\r
+ table_exp_t *fta_parse_tree = qlist[p];\r
+// query_parse_trees.push_back(fta_parse_tree);\r
+\r
+// compute the default name -- extract from query name\r
+ strcpy(tmpstr,input_file_names[i].c_str());\r
+ char *qname = strrchr(tmpstr,PATH_DELIM);\r
+ if(qname == NULL)\r
+ qname = tmpstr;\r
+ else\r
+ qname++;\r
+ char *qname_end = strchr(qname,'.');\r
+ if(qname_end != NULL) *qname_end = '\0';\r
+ string qname_str = qname;\r
+ string imputed_qname = impute_query_name(fta_parse_tree, qname_str);\r
+\r
+// Deternmine visibility. Should I be attaching all of the output methods?\r
+ if(qname_to_ospec.count(imputed_qname)>0)\r
+ fta_parse_tree->set_visible(true);\r
+ else\r
+ fta_parse_tree->set_visible(false);\r
+\r
+\r
+// Create a manipulable repesentation of the parse tree.\r
+// the qnode inherits the visibility assigned to the parse tree.\r
+ int pos = qnodes.size();\r
+ qnodes.push_back( new query_node(pos, imputed_qname, input_file_names[i], fta_parse_tree ));\r
+ name_node_map[ qnodes[pos]->name ] = pos;\r
+//printf("name_node_map[ %s ] = %d\n",qnodes[pos]->name.c_str(),pos);\r
+// qnames.push_back(impute_query_name(fta_parse_tree, qname_str));\r
+// qfiles.push_back(i);\r
+\r
+// Check for duplicate query names\r
+// NOTE : in hfta-only generation, I should\r
+// also check with the names of the registered queries.\r
+ if(qname_to_flname.count(qnodes[pos]->name) > 0){\r
+ fprintf(stderr,"ERROR duplicate query name %s in files %s and %s.\n",\r
+ qnodes[pos]->name.c_str(), (qname_to_flname[qnodes[pos]->name]).c_str(), input_file_names[i].c_str());\r
+ exit(1);\r
+ }\r
+ if(Schema->find_tbl(qnodes[pos]->name) >= 0){\r
+ fprintf(stderr,"ERROR duplicate query name %s (file %s) was already defined as a PROTOCOL.\n",\r
+ qnodes[pos]->name.c_str(), input_file_names[i].c_str());\r
+ exit(1);\r
+ }\r
+ qname_to_flname[qnodes[pos]->name] = input_file_names[i].c_str();\r
+\r
+\r
+ }\r
+ }\r
+\r
+// Add the library queries\r
+\r
+ int pos;\r
+ for(pos=0;pos<qnodes.size();++pos){\r
+ int fi;\r
+ for(fi = 0;fi<qnodes[pos]->refd_tbls.size();++fi){\r
+ string src_tbl = qnodes[pos]->refd_tbls[fi];\r
+ if(qname_to_flname.count(src_tbl) == 0){\r
+ int last_sep = src_tbl.find_last_of('/');\r
+ if(last_sep != string::npos){\r
+fprintf(stderr,"Finding library query %s for query %s\n",src_tbl.c_str(),qnodes[pos]->name.c_str());\r
+ string target_qname = src_tbl.substr(last_sep+1);\r
+ string qpathname = library_path + src_tbl + ".gsql";\r
+ if((fta_in = fopen(qpathname.c_str(), "r")) == NULL) {\r
+ fprintf(stderr,"Can't open library file %s, referenced by query %s in file %s\n\t%s\n",qpathname.c_str(),qnodes[pos]->name.c_str(), qname_to_flname[qnodes[pos]->name].c_str(), strerror(errno));\r
+ exit(1);\r
+ fprintf(stderr,"After exit\n");\r
+ }\r
+fprintf(stderr,"Parsing file %s\n",qpathname.c_str());\r
+// Parse the FTA query\r
+ fta_parse_result = new fta_parse_t();\r
+ FtaParser_setfileinput(fta_in);\r
+ if(FtaParserparse()){\r
+ fprintf(stderr,"FTA parse failed.\n");\r
+ exit(1);\r
+ }\r
+ if(fta_parse_result->parse_type != QUERY_PARSE){\r
+ fprintf(stderr,"ERROR, file %s is not a query file.\n",input_file_names[i].c_str());\r
+ exit(1);\r
+ }\r
+\r
+ map<string, int> local_query_map;\r
+ vector<string> local_query_names;\r
+ vector<table_exp_t *> qlist = fta_parse_result->parse_tree_list->qlist;\r
+ for(p=0;p<qlist.size();++p){\r
+ table_exp_t *fta_parse_tree = qlist[p];\r
+ fta_parse_tree->set_visible(false); // assumed to not produce output\r
+ string imputed_qname = impute_query_name(fta_parse_tree, target_qname);\r
+ if(imputed_qname == target_qname)\r
+ imputed_qname = src_tbl;\r
+ if(local_query_map.count(imputed_qname)>0){\r
+ fprintf(stderr,"ERROR, multiple queries named %s in library file %s\n",imputed_qname.c_str(), qpathname.c_str());\r
+ exit(1);\r
+ }\r
+ local_query_map[ imputed_qname ] = p;\r
+ local_query_names.push_back(imputed_qname);\r
+ }\r
+\r
+ if(local_query_map.count(src_tbl)==0){\r
+ fprintf(stderr,"ERROR, library query file %s has no query named %s\n",qpathname.c_str(),target_qname.c_str());\r
+ exit(1);\r
+ }\r
+\r
+ vector<int> worklist;\r
+ set<int> added_queries;\r
+ vector<query_node *> new_qnodes;\r
+ worklist.push_back(local_query_map[target_qname]);\r
+ added_queries.insert(local_query_map[target_qname]);\r
+ int qq;\r
+ int qpos = qnodes.size();\r
+ for(qq=0;qq<worklist.size();++qq){\r
+ int q_id = worklist[qq];\r
+ query_node *new_qnode = new query_node(qpos+qq, local_query_names[q_id], qpathname, qlist[q_id] );\r
+ new_qnodes.push_back( new_qnode);\r
+ vector<string> refd_tbls = new_qnode->refd_tbls;\r
+ int ff;\r
+ for(ff = 0;ff<refd_tbls.size();++ff){\r
+ if(local_query_map.count(refd_tbls[ff])>0 && added_queries.count(local_query_map[refd_tbls[ff]])==0){\r
+\r
+ if(name_node_map.count(refd_tbls[ff])>0){\r
+ fprintf(stderr,"ERROR, query %s occurs both in the regular query set, file %s, and in library file %s\n",refd_tbls[ff].c_str(), qname_to_flname[refd_tbls[ff]].c_str(), qpathname.c_str() );\r
+ exit(1);\r
+ }else{\r
+ worklist.push_back(local_query_map[refd_tbls[ff]]);\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+ for(qq=0;qq<new_qnodes.size();++qq){\r
+ int qpos = qnodes.size();\r
+ qnodes.push_back(new_qnodes[qq]);\r
+ name_node_map[qnodes[qpos]->name ] = qpos;\r
+ qname_to_flname[qnodes[qpos]->name ] = qpathname;\r
+ }\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+\r
+\r
+\r
+\r
+\r
+\r
+\r
+//---------------------------------------\r
+\r
+\r
+// Add the UDOPS.\r
+\r
+ string udop_missing_sources;\r
+ for(i=0;i<qnodes.size();++i){\r
+ int fi;\r
+ for(fi = 0;fi<qnodes[i]->refd_tbls.size();++fi){\r
+ int sid = Schema->find_tbl(qnodes[i]->refd_tbls[fi]);\r
+ if(sid >= 0){\r
+ if(Schema->get_schema_type(sid) == OPERATOR_VIEW_SCHEMA){\r
+ if(name_node_map.count(qnodes[i]->refd_tbls[fi]) == 0){\r
+ int pos = qnodes.size();\r
+ qnodes.push_back( new query_node(pos, qnodes[i]->refd_tbls[fi], Schema));\r
+ name_node_map[ qnodes[pos]->name ] = pos;\r
+ qnodes[pos]->is_externally_visible = false; // its visible\r
+ // Need to mark the source queries as visible.\r
+ int si;\r
+ string missing_sources = "";\r
+ for(si=0;si<qnodes[pos]->refd_tbls.size();++si){\r
+ string src_tbl = qnodes[pos]->refd_tbls[si];\r
+ if(name_node_map.count(src_tbl)==0){\r
+ missing_sources += src_tbl + " ";\r
+ }\r
+ }\r
+ if(missing_sources != ""){\r
+ udop_missing_sources += "\tUDOP "+qnodes[pos]->name+" references undefined tables "+missing_sources+"\n";\r
+ }\r
+ }\r
+ }\r
+ }\r
+ }\r
+ }\r
+ if(udop_missing_sources != ""){\r
+ fprintf(stderr,"ERROR, User Defined OPerators reference source tables that are not part of the query set:\n%s",udop_missing_sources.c_str());\r
+ exit(1);\r
+ }\r
+\r
+\r
+\r
+////////////////////////////////////////////////////////////////////\r
+/// Check parse trees to verify that some\r
+/// global properties are met :\r
+/// if q1 reads from q2, then\r
+/// q2 is processed before q1\r
+/// q1 can supply q2's parameters\r
+/// Verify there is no cycle in the reads-from graph.\r
+\r
+// Compute an order in which to process the\r
+// queries.\r
+\r
+// Start by building the reads-from lists.\r
+//\r
+\r
+ for(i=0;i<qnodes.size();++i){\r
+ int qi, fi;\r
+ vector<string> refd_tbls = qnodes[i]->refd_tbls;\r
+ for(fi = 0;fi<refd_tbls.size();++fi){\r
+ if(name_node_map.count(refd_tbls[fi])>0){\r
+//printf("query %d (%s) reads from %s (%d)\n", i, qnodes[i]->name.c_str(),refd_tbls[fi].c_str(),name_node_map[refd_tbls[fi]]);\r
+ (qnodes[i]->reads_from).insert(name_node_map[refd_tbls[fi]]);\r
+ }\r
+ }\r
+ }\r
+\r
+\r
+// If one query reads the result of another,\r
+// check for parameter compatibility. Currently it must\r
+// be an exact match. I will move to requiring\r
+// containment after re-ordering, but will require\r
+// some analysis for code generation which is not\r
+// yet in place.\r
+//printf("There are %d query nodes.\n",qnodes.size());\r
+\r
+\r
+ for(i=0;i<qnodes.size();++i){\r
+ vector<var_pair_t *> target_params = qnodes[i]->params;\r
+ for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){\r
+ vector<var_pair_t *> source_params = qnodes[(*si)]->params;\r
+ if(target_params.size() != source_params.size()){\r
+ fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());\r
+ exit(1);\r
+ }\r
+ int p;\r
+ for(p=0;p<target_params.size();++p){\r
+ if(! (target_params[p]->name == source_params[p]->name &&\r
+ target_params[p]->val == source_params[p]->val ) ){\r
+ fprintf(stderr,"ERROR, query %s (in file %s) reads from %s (in file %s), but they do not have identical parameters.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), qnodes[(*si)]->name.c_str(), qnodes[(*si)]->file.c_str());\r
+ exit(1);\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+\r
+// Find the roots.\r
+// Start by counting inedges.\r
+ for(i=0;i<qnodes.size();++i){\r
+ for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){\r
+ qnodes[(*si)]->n_consumers++;\r
+ }\r
+ }\r
+\r
+// The roots are the nodes with indegree zero.\r
+ set<int> roots;\r
+ for(i=0;i<qnodes.size();++i){\r
+ if(qnodes[i]->n_consumers == 0){\r
+ if(qnodes[i]->is_externally_visible == false){\r
+ fprintf(stderr,"WARNING: query %s (file %s) is a root query but it isn't visible. Ignored.\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str());\r
+ }\r
+ roots.insert(i);\r
+ }\r
+ }\r
+\r
+// Remove the parts of the subtree that produce no output.\r
+ set<int> valid_roots;\r
+ set<int> discarded_nodes;\r
+ set<int> candidates;\r
+ while(roots.size() >0){\r
+ for(si=roots.begin();si!=roots.end();++si){\r
+ if(qnodes[(*si)]->is_externally_visible){\r
+ valid_roots.insert((*si));\r
+ }else{\r
+ discarded_nodes.insert((*si));\r
+ set<int>::iterator sir;\r
+ for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){\r
+ qnodes[(*sir)]->n_consumers--;\r
+ if(qnodes[(*sir)]->n_consumers == 0)\r
+ candidates.insert( (*sir));\r
+ }\r
+ }\r
+ }\r
+ roots = candidates;\r
+ candidates.clear();\r
+ }\r
+ roots = valid_roots;\r
+ if(discarded_nodes.size()>0){\r
+ fprintf(stderr,"Warning, the following queries were discarded because they produce no output:\n");\r
+ int di = 0;\r
+ for(si=discarded_nodes.begin();si!=discarded_nodes.end();++si){\r
+ if(di>0 && (di%8)==0) fprintf(stderr,"\n");\r
+ di++;\r
+ fprintf(stderr," %s",qnodes[(*si)]->name.c_str());\r
+ }\r
+ fprintf(stderr,"\n");\r
+ }\r
+\r
+// Compute the sources_to set, ignoring discarded nodes.\r
+ for(i=0;i<qnodes.size();++i){\r
+ if(discarded_nodes.count(i)==0)\r
+ for(si=qnodes[i]->reads_from.begin();si!=qnodes[i]->reads_from.end();++si){\r
+ qnodes[(*si)]->sources_to.insert(i);\r
+ }\r
+ }\r
+\r
+\r
+// Find the nodes that are shared by multiple visible subtrees.\r
+// THe roots become inferred visible nodes.\r
+\r
+// Find the visible nodes.\r
+ vector<int> visible_nodes;\r
+ for(i=0;i<qnodes.size();i++){\r
+ if(qnodes[i]->is_externally_visible){\r
+ visible_nodes.push_back(i);\r
+ }\r
+ }\r
+\r
+// Find UDOPs referenced by visible nodes.\r
+ list<int> workq;\r
+ for(i=0;i<visible_nodes.size();++i){\r
+ workq.push_back(visible_nodes[i]);\r
+ }\r
+ while(!workq.empty()){\r
+ int node = workq.front();\r
+ workq.pop_front();\r
+ set<int>::iterator children;\r
+ if(qnodes[node]->is_udop && qnodes[node]->is_externally_visible == false){\r
+ qnodes[node]->is_externally_visible = true;\r
+ visible_nodes.push_back(node);\r
+ for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){\r
+ if(qnodes[(*children)]->is_externally_visible == false){\r
+ qnodes[(*children)]->is_externally_visible = true;\r
+ visible_nodes.push_back((*children));\r
+ }\r
+ }\r
+ }\r
+ for(children=qnodes[node]->reads_from.begin();children!=qnodes[node]->reads_from.end();++children){\r
+ workq.push_back((*children));\r
+ }\r
+ }\r
+\r
+ bool done = false;\r
+ while(!done){\r
+// reset the nodes\r
+ for(i=0;i<qnodes.size();i++){\r
+ qnodes[i]->subtree_roots.clear();\r
+ }\r
+\r
+// Walk the tree defined by a visible node, not descending into\r
+// subtrees rooted by a visible node. Mark the node visited with\r
+// the visible node ID.\r
+ for(i=0;i<visible_nodes.size();++i){\r
+ set<int> vroots;\r
+ vroots.insert(visible_nodes[i]);\r
+ while(vroots.size()>0){\r
+ for(si=vroots.begin();si!=vroots.end();++si){\r
+ qnodes[(*si)]->subtree_roots.insert(visible_nodes[i]);\r
+\r
+ set<int>::iterator sir;\r
+ for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){\r
+ if(! qnodes[(*sir)]->is_externally_visible){\r
+ candidates.insert( (*sir));\r
+ }\r
+ }\r
+ }\r
+ vroots = candidates;\r
+ candidates.clear();\r
+ }\r
+ }\r
+// Find the nodes in multiple visible node subtrees, but with no parent\r
+// that has is in multile visible node subtrees. Mark these as inferred visible nodes.\r
+ done = true; // until proven otherwise\r
+ for(i=0;i<qnodes.size();i++){\r
+ if(qnodes[i]->subtree_roots.size()>1){\r
+ bool is_new_root = true;\r
+ set<int>::iterator sir;\r
+ for(sir=qnodes[i]->sources_to.begin(); sir!=qnodes[i]->sources_to.end();++sir){\r
+ if(qnodes[(*sir)]->subtree_roots.size()>1)\r
+ is_new_root = false;\r
+ }\r
+ if(is_new_root){\r
+ qnodes[i]->is_externally_visible = true;\r
+ qnodes[i]->inferred_visible_node = true;\r
+ visible_nodes.push_back(i);\r
+ done = false;\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+\r
+\r
+\r
+\r
+// get visible nodes in topo ordering.\r
+// for(i=0;i<qnodes.size();i++){\r
+// qnodes[i]->n_consumers = qnodes[i]->sources_to.size();\r
+// }\r
+ vector<int> process_order;\r
+ while(roots.size() >0){\r
+ for(si=roots.begin();si!=roots.end();++si){\r
+ if(discarded_nodes.count((*si))==0){\r
+ process_order.push_back( (*si) );\r
+ }\r
+ set<int>::iterator sir;\r
+ for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){\r
+ qnodes[(*sir)]->n_consumers--;\r
+ if(qnodes[(*sir)]->n_consumers == 0)\r
+ candidates.insert( (*sir));\r
+ }\r
+ }\r
+ roots = candidates;\r
+ candidates.clear();\r
+ }\r
+\r
+\r
+//printf("process_order.size() =%d\n",process_order.size());\r
+\r
+// Search for cyclic dependencies\r
+ string found_dep;\r
+ for(i=0;i<qnodes.size();++i){\r
+ if(discarded_nodes.count(i)==0 && qnodes[i]->n_consumers > 0){\r
+ if(found_dep.size() != 0) found_dep += ", ";\r
+ found_dep += "query "+qnodes[i]->name+" (file "+qnodes[i]->file+")";\r
+ }\r
+ }\r
+ if(found_dep.size()>0){\r
+ fprintf(stderr,"ERROR, the following queries contain a cyclic reads-from dependency:\n%s\n",found_dep.c_str());\r
+ exit(1);\r
+ }\r
+\r
+// Get a list of query sets, in the order to be processed.\r
+// Start at visible root and do bfs.\r
+// The query set includes queries referenced indirectly,\r
+// as sources for user-defined operators. These are needed\r
+// to ensure that they are added to the schema, but are not part\r
+// of the query tree.\r
+\r
+// stream_node_sets contains queries reachable only through the\r
+// FROM clause, so I can tell which queries to add to the stream\r
+// query. (DISABLED, UDOPS are integrated, does this cause problems?)\r
+\r
+// NOTE: this code works because in order for data to be\r
+// read by multiple hftas, the node must be externally visible.\r
+// But visible nodes define roots of process sets.\r
+// internally visible nodes can feed data only\r
+// to other nodes in the same query file.\r
+// Therefore, any access can be restricted to a file,\r
+// hfta output sharing is done only on roots\r
+// never on interior nodes.\r
+\r
+\r
+\r
+\r
+// Conpute the base collection of hftas.\r
+ vector<hfta_node *> hfta_sets;\r
+ map<string, int> hfta_name_map;\r
+// vector< vector<int> > process_sets;\r
+// vector< set<int> > stream_node_sets;\r
+ reverse(process_order.begin(), process_order.end()); // get listing in reverse order.\r
+ // i.e. process leaves 1st.\r
+ for(i=0;i<process_order.size();++i){\r
+ if(qnodes[process_order[i]]->is_externally_visible == true){\r
+//printf("Visible.\n");\r
+ int root = process_order[i];\r
+ hfta_node *hnode = new hfta_node();\r
+ hnode->name = qnodes[root]-> name;\r
+ hnode->source_name = qnodes[root]-> name;\r
+ hnode->is_udop = qnodes[root]->is_udop;\r
+ hnode->inferred_visible_node = qnodes[root]->inferred_visible_node;\r
+\r
+ vector<int> proc_list; proc_list.push_back(root);\r
+// Ensure that nodes are added only once.\r
+ set<int> proc_set; proc_set.insert(root);\r
+ roots.clear(); roots.insert(root);\r
+ candidates.clear();\r
+ while(roots.size()>0){\r
+ for(si=roots.begin();si!=roots.end();++si){\r
+//printf("Processing root %d\n",(*si));\r
+ set<int>::iterator sir;\r
+ for(sir=qnodes[(*si)]->reads_from.begin(); sir!=qnodes[(*si)]->reads_from.end();++sir){\r
+//printf("reads fom %d\n",(*sir));\r
+ if(qnodes[(*sir)]->is_externally_visible==false){\r
+ candidates.insert( (*sir) );\r
+ if(proc_set.count( (*sir) )==0){\r
+ proc_set.insert( (*sir) );\r
+ proc_list.push_back( (*sir) );\r
+ }\r
+ }\r
+ }\r
+ }\r
+ roots = candidates;\r
+ candidates.clear();\r
+ }\r
+\r
+ reverse(proc_list.begin(), proc_list.end());\r
+ hnode->query_node_indices = proc_list;\r
+ hfta_name_map[hnode->name] = hfta_sets.size();\r
+ hfta_sets.push_back(hnode);\r
+ }\r
+ }\r
+\r
+// Compute the reads_from / sources_to graphs for the hftas.\r
+\r
+ for(i=0;i<hfta_sets.size();++i){\r
+ hfta_node *hnode = hfta_sets[i];\r
+ for(q=0;q<hnode->query_node_indices.size();q++){\r
+ query_node *qnode = qnodes[hnode->query_node_indices[q]];\r
+ for(s=0;s<qnode->refd_tbls.size();++s){\r
+ if(hfta_name_map.count(qnode->refd_tbls[s])){\r
+ int other_hfta = hfta_name_map[qnode->refd_tbls[s]];\r
+ hnode->reads_from.insert(other_hfta);\r
+ hfta_sets[other_hfta]->sources_to.insert(i);\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+// Compute a topological sort of the hfta_sets.\r
+\r
+ vector<int> hfta_topsort;\r
+ workq.clear();\r
+ int hnode_srcs[hfta_sets.size()];\r
+ for(i=0;i<hfta_sets.size();++i){\r
+ hnode_srcs[i] = 0;\r
+ if(hfta_sets[i]->sources_to.size() == 0)\r
+ workq.push_back(i);\r
+ }\r
+\r
+ while(! workq.empty()){\r
+ int node = workq.front();\r
+ workq.pop_front();\r
+ hfta_topsort.push_back(node);\r
+ set<int>::iterator stsi;\r
+ for(stsi=hfta_sets[node]->reads_from.begin();stsi!=hfta_sets[node]->reads_from.end();++stsi){\r
+ int parent = (*stsi);\r
+ hnode_srcs[parent]++;\r
+ if(hnode_srcs[parent] == hfta_sets[parent]->sources_to.size()){\r
+ workq.push_back(parent);\r
+ }\r
+ }\r
+ }\r
+\r
+// Decorate hfta nodes with the level of parallelism given as input.\r
+\r
+ map<string, int>::iterator msii;\r
+ for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){\r
+ string hfta_name = (*msii).first;\r
+ int par = (*msii).second;\r
+ if(hfta_name_map.count(hfta_name) > 0){\r
+ hfta_sets[ hfta_name_map[hfta_name] ]->n_parallel = par;\r
+ }else{\r
+ fprintf(stderr,"Warning, hfta_parallelism.cfg has an entry for %s, but its not a hfta.\n",hfta_name.c_str());\r
+ }\r
+ }\r
+\r
+// Propagate levels of parallelism: children should have a level of parallelism\r
+// as large as any of its parents. Adjust children upwards to compensate.\r
+// Start at parents and adjust children, auto-propagation will occur.\r
+\r
+ for(i=hfta_sets.size()-1;i>=0;i--){\r
+ set<int>::iterator stsi;\r
+ for(stsi=hfta_sets[i]->reads_from.begin();stsi!=hfta_sets[i]->reads_from.end();++stsi){\r
+ if(hfta_sets[i]->n_parallel > hfta_sets[ (*stsi) ]->n_parallel){\r
+ hfta_sets[ (*stsi) ]->n_parallel = hfta_sets[i]->n_parallel;\r
+ }\r
+ }\r
+ }\r
+\r
+// Before all the name mangling, check if therey are any output_spec.cfg\r
+// or hfta_parallelism.cfg entries that do not have a matching query.\r
+\r
+ string dangling_ospecs = "";\r
+ for(msii=qname_to_ospec.begin();msii!=qname_to_ospec.end();++msii){\r
+ string oq = (*msii).first;\r
+ if(hfta_name_map.count(oq) == 0){\r
+ dangling_ospecs += " "+(*msii).first;\r
+ }\r
+ }\r
+ if(dangling_ospecs!=""){\r
+ fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());\r
+ }\r
+\r
+ string dangling_par = "";\r
+ for(msii=hfta_parallelism.begin();msii!=hfta_parallelism.end();++msii){\r
+ string oq = (*msii).first;\r
+ if(hfta_name_map.count(oq) == 0){\r
+ dangling_par += " "+(*msii).first;\r
+ }\r
+ }\r
+ if(dangling_par!=""){\r
+ fprintf(stderr,"WARNING, the following entries in hfta_parallelism.cfg don't have a matching query: %s\n",dangling_par.c_str());\r
+ }\r
+\r
+\r
+\r
+// Replicate parallelized hftas. Do __copyX name mangling. Adjust\r
+// FROM clauses: retarget any name which is an internal node, and\r
+// any which is in hfta_sets (and which is parallelized). Add Merge nodes\r
+// when the source hfta has more parallelism than the target node.\r
+// Add new nodes to qnodes and hfta_nodes wth the appropriate linkages.\r
+\r
+\r
+ int n_original_hfta_sets = hfta_sets.size();\r
+ for(i=0;i<n_original_hfta_sets;++i){\r
+ if(hfta_sets[i]->n_parallel > 1){\r
+ hfta_sets[i]->do_generation =false; // set the deletion flag for this entry.\r
+ set<string> local_nodes; // names of query nodes in the hfta.\r
+ for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){\r
+ local_nodes.insert(qnodes[ hfta_sets[i]->query_node_indices[h] ]->name);\r
+ }\r
+\r
+ for(p=0;p<hfta_sets[i]->n_parallel;++p){\r
+ string mangler = "__copy"+int_to_string(p);\r
+ hfta_node *par_hfta = new hfta_node();\r
+ par_hfta->name = hfta_sets[i]->name + mangler;\r
+ par_hfta->source_name = hfta_sets[i]->name;\r
+ par_hfta->is_udop = hfta_sets[i]->is_udop;\r
+ par_hfta->inferred_visible_node = hfta_sets[i]->inferred_visible_node;\r
+ par_hfta->n_parallel = hfta_sets[i]->n_parallel;\r
+ par_hfta->parallel_idx = p;\r
+\r
+ map<string, int> par_qnode_map; // qnode name-to-idx, aids dependency tracking.\r
+\r
+// Is it a UDOP?\r
+ if(hfta_sets[i]->is_udop){\r
+ int root = hfta_sets[i]->query_node_indices[0];\r
+\r
+ string unequal_par_sources;\r
+ set<int>::iterator rfsii;\r
+ for(rfsii=hfta_sets[i]->reads_from.begin();rfsii!=hfta_sets[i]->reads_from.end();++rfsii){\r
+ if(hfta_sets[(*rfsii)]->n_parallel != par_hfta->n_parallel){\r
+ unequal_par_sources = hfta_sets[(*rfsii)]->name+" ("+int_to_string(hfta_sets[(*rfsii)]->n_parallel)+") ";\r
+ }\r
+ }\r
+ if(unequal_par_sources != ""){\r
+ fprintf(stderr,"ERROR, UDOP %s has parallelism %d, but some of its sources have a different parallelism: %s\n",hfta_sets[i]->name.c_str(), hfta_sets[i]->n_parallel, unequal_par_sources.c_str());\r
+ exit(1);\r
+ }\r
+\r
+ int rti;\r
+ vector<string> new_sources;\r
+ for(rti=0;rti<qnodes[root]->refd_tbls.size();++rti){\r
+ new_sources.push_back(qnodes[root]->refd_tbls[rti]+mangler);\r
+ }\r
+\r
+ query_node *new_qn = new query_node(qnodes.size(), qnodes[root]->name, Schema);\r
+ new_qn->name += mangler;\r
+ new_qn->mangler = mangler;\r
+ new_qn->refd_tbls = new_sources;\r
+ par_hfta->query_node_indices.push_back(qnodes.size());\r
+ par_qnode_map[new_qn->name] = qnodes.size();\r
+ name_node_map[ new_qn->name ] = qnodes.size();\r
+ qnodes.push_back(new_qn);\r
+ }else{\r
+// regular query node\r
+ for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){\r
+ int hqn_idx = hfta_sets[i]->query_node_indices[h];\r
+ table_exp_t *dup_pt = dup_table_exp(qnodes[hqn_idx]->parse_tree);\r
+// rehome the from clause on mangled names.\r
+// create merge nodes as needed for external sources.\r
+ for(f=0;f<dup_pt->fm->tlist.size();++f){\r
+ if(local_nodes.count(dup_pt->fm->tlist[f]->schema_name)>0){\r
+ dup_pt->fm->tlist[f]->schema_name += mangler;\r
+ }else if(hfta_name_map.count(dup_pt->fm->tlist[f]->schema_name)>0){\r
+// Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.\r
+ int other_hidx = hfta_name_map[dup_pt->fm->tlist[f]->schema_name];\r
+ if(par_hfta->n_parallel == hfta_sets[other_hidx]->n_parallel){\r
+ dup_pt->fm->tlist[f]->schema_name += mangler;\r
+ }else{\r
+ vector<string> src_tbls;\r
+ int stride = hfta_sets[other_hidx]->n_parallel / par_hfta->n_parallel;\r
+ if(stride == 0){\r
+ fprintf(stderr,"INTERNAL ERROR, parent hfta %s has a larger parallelism than its child %s\n",par_hfta->name.c_str(), hfta_sets[other_hidx]->name.c_str());\r
+ exit(1);\r
+ }\r
+ for(s=0;s<stride;++s){\r
+ string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s+stride*p);\r
+ src_tbls.push_back(ext_src_name);\r
+ }\r
+ table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);\r
+ string merge_node_name = qnodes[hqn_idx]->name+"__copy"+int_to_string(p)+"__merge"+int_to_string(f);\r
+ dup_pt->fm->tlist[f]->schema_name = merge_node_name;\r
+// Make a qnode to represent the new merge node\r
+ query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);\r
+ qn_pt->refd_tbls = src_tbls;\r
+ qn_pt->is_udop = false;\r
+ qn_pt->is_externally_visible = false;\r
+ qn_pt->inferred_visible_node = false;\r
+ par_hfta->query_node_indices.push_back(qnodes.size());\r
+ par_qnode_map[merge_node_name] = qnodes.size();\r
+ name_node_map[ merge_node_name ] = qnodes.size();\r
+ qnodes.push_back(qn_pt);\r
+ }\r
+ }\r
+ }\r
+ query_node *new_qn = new query_node(qnodes.size(),qnodes[hqn_idx]->name+"__copy"+int_to_string(p),qnodes[hqn_idx]->file,dup_pt);\r
+ for(f=0;f<dup_pt->fm->tlist.size();++f){\r
+ new_qn->refd_tbls.push_back(dup_pt->fm->tlist[f]->schema_name);\r
+ }\r
+ new_qn->params = qnodes[hqn_idx]->params;\r
+ new_qn->is_udop = false;\r
+ new_qn->is_externally_visible = qnodes[hqn_idx]->is_externally_visible;\r
+ new_qn->inferred_visible_node = qnodes[hqn_idx]->inferred_visible_node;\r
+ par_hfta->query_node_indices.insert(par_hfta->query_node_indices.begin(),qnodes.size());\r
+ par_qnode_map[new_qn->name] = qnodes.size();\r
+ name_node_map[ new_qn->name ] = qnodes.size();\r
+ qnodes.push_back(new_qn);\r
+ }\r
+ }\r
+ hfta_name_map[par_hfta->name] = hfta_sets.size();\r
+ hfta_sets.push_back(par_hfta);\r
+ }\r
+ }else{\r
+// This hfta isn't being parallelized, but add merge nodes for any parallelized\r
+// hfta sources.\r
+ if(!hfta_sets[i]->is_udop){\r
+ for(h=0;h<hfta_sets[i]->query_node_indices.size();++h){\r
+ int hqn_idx = hfta_sets[i]->query_node_indices[h];\r
+ for(f=0;f<qnodes[hqn_idx]->parse_tree->fm->tlist.size();++f){\r
+ if(hfta_name_map.count(qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name)>0){\r
+// Ref's an external HFTA. No parallelism => leave as is. Else, if level of parallelism of the two hftas is the same, rename by the mangler. Else, there mnust be more sources, so create a merge node.\r
+ int other_hidx = hfta_name_map[qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name];\r
+ if(hfta_sets[i]->n_parallel != hfta_sets[other_hidx]->n_parallel){\r
+ vector<string> src_tbls;\r
+ for(s=0;s<hfta_sets[other_hidx]->n_parallel;++s){\r
+ string ext_src_name = hfta_sets[other_hidx]->name+"__copy"+int_to_string(s);\r
+ src_tbls.push_back(ext_src_name);\r
+ }\r
+ table_exp_t *merge_pt = table_exp_t::make_deferred_merge(src_tbls);\r
+ string merge_node_name = qnodes[hqn_idx]->name+"__merge"+int_to_string(f);\r
+ qnodes[hqn_idx]->parse_tree->fm->tlist[f]->schema_name = merge_node_name;\r
+// Make a qnode to represent the new merge node\r
+ query_node *qn_pt = new query_node(qnodes.size(),merge_node_name, qnodes[hqn_idx]->file,merge_pt);\r
+ qn_pt->refd_tbls = src_tbls;\r
+ qn_pt->is_udop = false;\r
+ qn_pt->is_externally_visible = false;\r
+ qn_pt->inferred_visible_node = false;\r
+ hfta_sets[i]->query_node_indices.insert(hfta_sets[i]->query_node_indices.begin(),qnodes.size());\r
+ name_node_map[ merge_node_name ] = qnodes.size();\r
+ qnodes.push_back(qn_pt);\r
+ }\r
+ }\r
+ }\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+// Rebuild the reads_from / sources_to lists in the qnodes\r
+ for(q=0;q<qnodes.size();++q){\r
+ qnodes[q]->reads_from.clear();\r
+ qnodes[q]->sources_to.clear();\r
+ }\r
+ for(q=0;q<qnodes.size();++q){\r
+ for(s=0;s<qnodes[q]->refd_tbls.size();++s){\r
+ if(name_node_map.count(qnodes[q]->refd_tbls[s])>0){\r
+ int rf = name_node_map[qnodes[q]->refd_tbls[s]];\r
+ qnodes[q]->reads_from.insert(rf);\r
+ qnodes[rf]->sources_to.insert(q);\r
+ }\r
+ }\r
+ }\r
+\r
+// Rebuild the reads_from / sources_to lists in hfta_sets\r
+ for(q=0;q<hfta_sets.size();++q){\r
+ hfta_sets[q]->reads_from.clear();\r
+ hfta_sets[q]->sources_to.clear();\r
+ }\r
+ for(q=0;q<hfta_sets.size();++q){\r
+ for(s=0;s<hfta_sets[q]->query_node_indices.size();++s){\r
+ int node = hfta_sets[q]->query_node_indices[s];\r
+ set<int>::iterator rfsii;\r
+ for(rfsii=qnodes[node]->reads_from.begin();rfsii!=qnodes[node]->reads_from.end();++rfsii){\r
+ if(hfta_name_map.count(qnodes[(*rfsii)]->name)>0){\r
+ hfta_sets[q]->reads_from.insert(hfta_name_map[qnodes[(*rfsii)]->name]);\r
+ hfta_sets[hfta_name_map[qnodes[(*rfsii)]->name]]->sources_to.insert(q);\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+/*\r
+for(q=0;q<qnodes.size();++q){\r
+ printf("qnode %d reads-from %d:",q,qnodes[q]->reads_from.size());\r
+ set<int>::iterator rsii;\r
+ for(rsii=qnodes[q]->reads_from.begin();rsii!=qnodes[q]->reads_from.end();++rsii)\r
+ printf(" %d",(*rsii));\r
+ printf(", and sources-to %d:",qnodes[q]->sources_to.size());\r
+ for(rsii=qnodes[q]->sources_to.begin();rsii!=qnodes[q]->sources_to.end();++rsii)\r
+ printf(" %d",(*rsii));\r
+ printf("\n");\r
+}\r
+\r
+for(q=0;q<hfta_sets.size();++q){\r
+ if(hfta_sets[q]->do_generation==false)\r
+ continue;\r
+ printf("hfta %d (%s) reads-from %d:",q,hfta_sets[q]->name.c_str(),hfta_sets[q]->reads_from.size());\r
+ set<int>::iterator rsii;\r
+ for(rsii=hfta_sets[q]->reads_from.begin();rsii!=hfta_sets[q]->reads_from.end();++rsii)\r
+ printf(" %d",(*rsii));\r
+ printf(", and sources-to %d:",hfta_sets[q]->sources_to.size());\r
+ for(rsii=hfta_sets[q]->sources_to.begin();rsii!=hfta_sets[q]->sources_to.end();++rsii)\r
+ printf(" %d",(*rsii));\r
+ printf("\n");\r
+}\r
+*/\r
+\r
+\r
+\r
+// Re-topo sort the hftas\r
+ hfta_topsort.clear();\r
+ workq.clear();\r
+ int hnode_srcs_2[hfta_sets.size()];\r
+ for(i=0;i<hfta_sets.size();++i){\r
+ hnode_srcs_2[i] = 0;\r
+ if(hfta_sets[i]->sources_to.size() == 0 && hfta_sets[i]->do_generation){\r
+ workq.push_back(i);\r
+ }\r
+ }\r
+\r
+ while(workq.empty() == false){\r
+ int node = workq.front();\r
+ workq.pop_front();\r
+ hfta_topsort.push_back(node);\r
+ set<int>::iterator stsii;\r
+ for(stsii=hfta_sets[node]->reads_from.begin();stsii!=hfta_sets[node]->reads_from.end();++stsii){\r
+ int child = (*stsii);\r
+ hnode_srcs_2[child]++;\r
+ if(hnode_srcs_2[child] == hfta_sets[child]->sources_to.size()){\r
+ workq.push_back(child);\r
+ }\r
+ }\r
+ }\r
+\r
+// Ensure that all of the query_node_indices in hfta_sets are topologically\r
+// sorted, don't rely on assumptions that all transforms maintain some kind of order.\r
+ for(i=0;i<hfta_sets.size();++i){\r
+ if(hfta_sets[i]->do_generation){\r
+ map<int,int> n_accounted;\r
+ vector<int> new_order;\r
+ workq.clear();\r
+ vector<int>::iterator vii;\r
+ for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){\r
+ n_accounted[(*vii)]= 0;\r
+ }\r
+ for(vii=hfta_sets[i]->query_node_indices.begin();vii!=hfta_sets[i]->query_node_indices.end();++vii){\r
+ set<int>::iterator rfsii;\r
+ for(rfsii=qnodes[(*vii)]->reads_from.begin();rfsii!=qnodes[(*vii)]->reads_from.end();++rfsii){\r
+ if(n_accounted.count((*rfsii)) == 0){\r
+ n_accounted[(*vii)]++;\r
+ }\r
+ }\r
+ if(n_accounted[(*vii)] == qnodes[(*vii)]->reads_from.size()){\r
+ workq.push_back((*vii));\r
+ }\r
+ }\r
+\r
+ while(workq.empty() == false){\r
+ int node = workq.front();\r
+ workq.pop_front();\r
+ new_order.push_back(node);\r
+ set<int>::iterator stsii;\r
+ for(stsii=qnodes[node]->sources_to.begin();stsii!=qnodes[node]->sources_to.end();++stsii){\r
+ if(n_accounted.count((*stsii))){\r
+ n_accounted[(*stsii)]++;\r
+ if(n_accounted[(*stsii)] == qnodes[(*stsii)]->reads_from.size()){\r
+ workq.push_back((*stsii));\r
+ }\r
+ }\r
+ }\r
+ }\r
+ hfta_sets[i]->query_node_indices = new_order;\r
+ }\r
+ }\r
+\r
+\r
+\r
+\r
+\r
+/// Global checkng is done, start the analysis and translation\r
+/// of the query parse tree in the order specified by process_order\r
+\r
+\r
+// Get a list of the LFTAs for global lfta optimization\r
+// TODO: separate building operators from spliting lftas,\r
+// that will make optimizations such as predicate pushing easier.\r
+ vector<stream_query *> lfta_list;\r
+\r
+ stream_query *rootq;\r
+\r
+ int qi,qj;\r
+\r
+ for(qi=hfta_topsort.size()-1;qi>=0;--qi){\r
+\r
+ int hfta_id = hfta_topsort[qi];\r
+ vector<int> curr_list = hfta_sets[hfta_id]->query_node_indices;\r
+\r
+\r
+\r
+// Two possibilities, either its a UDOP, or its a collection of queries.\r
+// if(qnodes[curr_list.back()]->is_udop)\r
+ if(hfta_sets[hfta_id]->is_udop){\r
+ int node_id = curr_list.back();\r
+ int udop_schref = Schema->find_tbl(qnodes[node_id]->file);\r
+ opview_entry *opv = new opview_entry();\r
+\r
+// Many of the UDOP properties aren't currently used.\r
+ opv->parent_qname = "no_parent";\r
+ opv->root_name = qnodes[node_id]->name;\r
+ opv->view_name = qnodes[node_id]->file;\r
+ opv->pos = qi;\r
+ sprintf(tmpstr,"%s_UDOP%d_%s",qnodes[node_id]->name.c_str(),qi,opv->view_name.c_str());\r
+ opv->udop_alias = tmpstr;\r
+ opv->mangler = qnodes[node_id]->mangler;\r
+\r
+ if(opv->mangler != ""){\r
+ int new_udop_schref = Schema->add_duplicate_table(opv->view_name,opv->root_name);\r
+ Schema->mangle_subq_names(new_udop_schref,opv->mangler);\r
+ }\r
+\r
+// This piece of code makes each hfta which referes to the same udop\r
+// reference a distinct running udop. Do this at query optimization time?\r
+// fmtbl->set_udop_alias(opv->udop_alias);\r
+\r
+ opv->exec_fl = Schema->get_op_prop(udop_schref, string("file"));\r
+ opv->liveness_timeout = atoi(Schema->get_op_prop(udop_schref, string("liveness_timeout")).c_str());\r
+\r
+ vector<subquery_spec *> subq = Schema->get_subqueryspecs(udop_schref);\r
+ int s,f,q;\r
+ for(s=0;s<subq.size();++s){\r
+// Validate that the fields match.\r
+ subquery_spec *sqs = subq[s];\r
+ string subq_name = sqs->name + opv->mangler;\r
+ vector<field_entry *> flds = Schema->get_fields(subq_name);\r
+ if(flds.size() == 0){\r
+ fprintf(stderr,"INTERNAL ERROR: the subquery %s of view %s not found in Schema.\n",subq_name.c_str(), opv->view_name.c_str());\r
+ return(1);\r
+ }\r
+ if(flds.size() < sqs->types.size()){\r
+ fprintf(stderr,"ERROR: subquery %s of view %s does not have enough fields (%lu found, %lu expected).\n",subq_name.c_str(), opv->view_name.c_str(),flds.size(), sqs->types.size());\r
+ return(1);\r
+ }\r
+ bool failed = false;\r
+ for(f=0;f<sqs->types.size();++f){\r
+ data_type dte(sqs->types[f],sqs->modifiers[f]);\r
+ data_type dtf(flds[f]->get_type(),flds[f]->get_modifier_list());\r
+ if(! dte.subsumes_type(&dtf) ){\r
+ fprintf(stderr,"ERROR: subquery %s of view %s does not have the correct type for field %d (%s found, %s expected).\n",subq_name.c_str(), opv->view_name.c_str(),f,dtf.to_string().c_str(), dte.to_string().c_str());\r
+ failed = true;\r
+ }\r
+/*\r
+ if(dte.is_temporal() && (dte.get_temporal() != dtf.get_temporal()) ){\r
+ string pstr = dte.get_temporal_string();\r
+ fprintf(stderr,"ERROR: subquery %s of view %s does not have the expected temporal value %s of field %d.\n",sqs->name.c_str(), opv->view_name.c_str(),pstr.c_str(),f);\r
+ failed = true;\r
+ }\r
+*/\r
+ }\r
+ if(failed)\r
+ return(1);\r
+/// Validation done, find the subquery, make a copy of the\r
+/// parse tree, and add it to the return list.\r
+ for(q=0;q<qnodes.size();++q)\r
+ if(qnodes[q]->name == subq_name)\r
+ break;\r
+ if(q==qnodes.size()){\r
+ fprintf(stderr,"INTERNAL ERROR: subquery %s of view %s not found in list of query names.\n",subq_name.c_str(), opv->view_name.c_str());\r
+ return(1);\r
+ }\r
+\r
+ }\r
+\r
+// Cross-link to from entry(s) in all sourced-to tables.\r
+ set<int>::iterator sii;\r
+ for(sii=qnodes[curr_list.back()]->sources_to.begin() ;sii!=qnodes[curr_list.back()]->sources_to.end();++sii){\r
+//printf("\tUDOP %s sources_to %d (%s)\n",hfta_sets[hfta_id]->name.c_str(),(*sii),hfta_sets[(*sii)]->name.c_str());\r
+ vector<tablevar_t *> tblvars = qnodes[(*sii)]->parse_tree->get_from()->get_table_list();\r
+ int ii;\r
+ for(ii=0;ii<tblvars.size();++ii){\r
+ if(tblvars[ii]->schema_name == opv->root_name){\r
+ tblvars[ii]->set_opview_idx(opviews.size());\r
+ }\r
+\r
+ }\r
+ }\r
+\r
+ opviews.append(opv);\r
+ }else{\r
+\r
+// Analyze the parse trees in this query,\r
+// put them in rootq\r
+// vector<int> curr_list = process_sets[qi];\r
+\r
+\r
+////////////////////////////////////////\r
+\r
+ rootq = NULL;\r
+//printf("Process set %d, has %d queries\n",qi,curr_list.size());\r
+ for(qj=0;qj<curr_list.size();++qj){\r
+ i = curr_list[qj];\r
+ fprintf(stderr,"Processing query %s (file %s) is_udop = %d\n",qnodes[i]->name.c_str(), qnodes[i]->file.c_str(),qnodes[i]->is_udop);\r
+\r
+// Select the current query parse tree\r
+ table_exp_t *fta_parse_tree = qnodes[i]->parse_tree;\r
+\r
+// if hfta only, try to fetch any missing schemas\r
+// from the registry (using the print_schema program).\r
+// Here I use a hack to avoid analyzing the query -- all referenced\r
+// tables must be in the from clause\r
+// If there is a problem loading any table, just issue a warning,\r
+//\r
+ tablevar_list_t *fm = fta_parse_tree->get_from();\r
+ vector<string> refd_tbls = fm->get_src_tbls(Schema);\r
+// iterate over all referenced tables\r
+ int t;\r
+ for(t=0;t<refd_tbls.size();++t){\r
+ int tbl_ref = Schema->get_table_ref(refd_tbls[t]);\r
+\r
+ if(tbl_ref < 0){ // if this table is not in the Schema\r
+\r
+ if(hfta_only){\r
+ string cmd="print_schema "+refd_tbls[t];\r
+ FILE *schema_in = popen(cmd.c_str(), "r");\r
+ if(schema_in == NULL){\r
+ fprintf(stderr,"ERROR: cannot execute command %s\n",cmd.c_str());\r
+ }else{\r
+ string schema_instr;\r
+ while(fgets(tmpstr,TMPSTRLEN,schema_in)){\r
+ schema_instr += tmpstr;\r
+ }\r
+ fta_parse_result = new fta_parse_t();\r
+ strcpy(tmp_schema_str,schema_instr.c_str());\r
+ FtaParser_setstringinput(tmp_schema_str);\r
+ if(FtaParserparse()){\r
+ fprintf(stderr,"FTA parse failed on result from command %s.\n",cmd.c_str());\r
+ }else{\r
+ if( fta_parse_result->tables != NULL){\r
+ int tl;\r
+ for(tl=0;tl<fta_parse_result->tables->size();++tl){\r
+ Schema->add_table(fta_parse_result->tables->get_table(tl));\r
+ }\r
+ }else{\r
+ fprintf(stderr,"ERROR command %s returned no tables.\n",cmd.c_str());\r
+ }\r
+ }\r
+ }\r
+ }else{\r
+ fprintf(stderr,"ERROR, query %s (file %s) references stream %s, which is neither a PROTOCOL nor an externally visible query stream.\n", qnodes[i]->name.c_str(), qnodes[i]->file.c_str(), refd_tbls[t].c_str());\r
+ exit(1);\r
+ }\r
+\r
+ }\r
+ }\r
+\r
+\r
+// Analyze the query.\r
+ query_summary_class *qs = analyze_fta(fta_parse_tree, Schema, Ext_fcns, qnodes[i]->name);\r
+ if(qs == NULL){\r
+ fprintf(stderr,"analyze_fta failed on query %s (file %s) \n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str());\r
+ exit(1);\r
+ }\r
+\r
+ stream_query new_sq(qs, Schema);\r
+ if(new_sq.error_code){\r
+ fprintf(stderr,"ERROR, can't create query node for query %s (file %s):\n%s\n",qnodes[i]->name.c_str(),qnodes[i]->file.c_str(),new_sq.err_str.c_str());\r
+ exit(1);\r
+ }\r
+\r
+// Add it to the Schema\r
+ table_def *output_td = new_sq.get_output_tabledef();\r
+ Schema->add_table(output_td);\r
+\r
+// Create a query plan from the analyzed parse tree.\r
+// If its a query referneced via FROM, add it to the stream query.\r
+ if(rootq){\r
+ rootq->add_query(new_sq);\r
+ }else{\r
+ rootq = new stream_query(new_sq);\r
+// have the stream query object inherit properties form the analyzed\r
+// hfta_node object.\r
+ rootq->set_nparallel(hfta_sets[hfta_id]->n_parallel,hfta_sets[hfta_id]->parallel_idx);\r
+ rootq->n_successors = hfta_sets[hfta_id]->sources_to.size();\r
+ }\r
+\r
+\r
+ }\r
+\r
+// This stream query has all its parts\r
+// Build and optimize it.\r
+//printf("translate_fta: generating plan.\n");\r
+ if(rootq->generate_plan(Schema)){\r
+ fprintf(stderr,"INTERNAL ERROR: Can't generate query plan for query %s.\n",rootq->query_name.c_str());\r
+ continue;\r
+ }\r
+\r
+// If we've found the query plan head, so now add the output operators\r
+ if(qname_to_ospec.count(hfta_sets[hfta_id]->source_name)){\r
+ pair< multimap<string, int>::iterator, multimap<string, int>::iterator > oset;\r
+ multimap<string, int>::iterator mmsi;\r
+ oset = qname_to_ospec.equal_range(hfta_sets[hfta_id]->source_name);\r
+ for(mmsi=oset.first; mmsi!=oset.second; ++mmsi){\r
+ rootq->add_output_operator(output_specs[(*mmsi).second]);\r
+ }\r
+ }\r
+\r
+\r
+\r
+// Perform query splitting if necessary.\r
+ bool hfta_returned;\r
+ vector<stream_query *> split_queries = rootq->split_query(Ext_fcns, Schema, hfta_returned, ifaces_db, n_virtual_interfaces, hfta_sets[hfta_id]->n_parallel, hfta_sets[hfta_id]->parallel_idx);\r
+\r
+ int l;\r
+//for(l=0;l<split_queries.size();++l){\r
+//printf("split query %d is %s\n",l,split_queries[l]->q`uery_name.c_str());\r
+//}\r
+\r
+\r
+\r
+\r
+ if(split_queries.size() > 0){ // should be at least one component.\r
+\r
+// Compute the number of LFTAs.\r
+ int n_lfta = split_queries.size();\r
+ if(hfta_returned) n_lfta--;\r
+\r
+\r
+// Process the LFTA components.\r
+ for(l=0;l<n_lfta;++l){\r
+ if(lfta_names.count(split_queries[l]->query_name) == 0){\r
+// Grab the lfta for global optimization.\r
+ vector<tablevar_t *> tvec = split_queries[l]->query_plan[0]->get_input_tbls();\r
+ string liface = tvec[0]->get_interface();\r
+ string lmach = tvec[0]->get_machine();\r
+ if (lmach == "")\r
+ lmach = hostname;\r
+ interface_names.push_back(liface);\r
+ machine_names.push_back(lmach);\r
+//printf("Machine is %s\n",lmach.c_str());\r
+\r
+// Set the ht size from the recommendation, if there is one in the rec file\r
+ if(lfta_htsize.count(split_queries[l]->query_name)>0){\r
+ split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));\r
+ }\r
+\r
+\r
+ lfta_names[split_queries[l]->query_name] = lfta_list.size();\r
+ split_queries[l]->set_gid(lfta_list.size()); // set lfta global id\r
+ lfta_list.push_back(split_queries[l]);\r
+ lfta_mach_lists[lmach].push_back(split_queries[l]);\r
+\r
+// THe following is a hack,\r
+// as I should be generating LFTA code through\r
+// the stream_query object.\r
+ split_queries[l]->query_plan[0]->bind_to_schema(Schema);\r
+// split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;\r
+\r
+/*\r
+// Create query description to embed in lfta.c\r
+ string lfta_schema_str = split_queries[l]->make_schema();\r
+ string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);\r
+\r
+// get NIC capabilities.\r
+ int erri;\r
+ nic_property *nicprop = NULL;\r
+ vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);\r
+ if(iface_codegen_type.size()){\r
+ nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);\r
+ if(!nicprop){\r
+ fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());\r
+ exit(1);\r
+ }\r
+ }\r
+\r
+ lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);\r
+*/\r
+\r
+ snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));\r
+ query_names.push_back(split_queries[l]->query_name);\r
+ mach_query_names[lmach].push_back(query_names.size()-1);\r
+// NOTE: I will assume a 1-1 correspondance between\r
+// mach_query_names[lmach] and lfta_mach_lists[lmach]\r
+// where mach_query_names[lmach][i] contains the index into\r
+// query_names, which names the lfta, and\r
+// mach_query_names[lmach][i] is the stream_query * of the\r
+// corresponding lfta.\r
+// Later, lfta_iface_qnames are the query names matching lfta_iface_lists\r
+\r
+\r
+\r
+ // check if lfta is reusable\r
+ // FTA is reusable if reusable option is set explicitly or it doesn't have any parameters\r
+\r
+ bool lfta_reusable = false;\r
+ if (split_queries[l]->query_plan[0]->get_val_of_def("reusable") == "yes" ||\r
+ split_queries[l]->query_plan[0]->get_param_tbl()->size() == 0) {\r
+ lfta_reusable = true;\r
+ }\r
+ lfta_reuse_options.push_back(lfta_reusable);\r
+\r
+ // LFTA will inherit the liveness timeout specification from the containing query\r
+ // it is too conservative as lfta are expected to spend less time per tuple\r
+ // then full query\r
+\r
+ // extract liveness timeout from query definition\r
+ int liveness_timeout = atoi(split_queries[l]->query_plan[0]->get_val_of_def("liveness_timeout").c_str());\r
+ if (!liveness_timeout) {\r
+// fprintf(stderr, "WARNING: No liveness timeout specified for lfta %s, using default value of %d\n",\r
+// split_queries[l]->query_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);\r
+ liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;\r
+ }\r
+ lfta_liveness_timeouts.push_back(liveness_timeout);\r
+\r
+// Add it to the schema\r
+ table_def *td = split_queries[l]->get_output_tabledef();\r
+ Schema->append_table(td);\r
+//printf("added lfta %d (%s)\n",l,split_queries[l]->query_name.c_str());\r
+\r
+ }\r
+ }\r
+\r
+// If the output is lfta-only, dump out the query name.\r
+ if(split_queries.size() == 1 && !hfta_returned){\r
+ if(output_query_names ){\r
+ fprintf(query_name_output,"%s L\n",split_queries[0]->query_name.c_str());\r
+ }\r
+/*\r
+else{\r
+ fprintf(stderr,"query name is %s\n",split_queries[0]->query_name.c_str());\r
+ }\r
+*/\r
+\r
+/*\r
+// output schema summary\r
+ if(output_schema_summary){\r
+ dump_summary(split_queries[0]);\r
+ }\r
+*/\r
+ }\r
+\r
+\r
+ if(hfta_returned){ // query also has an HFTA component\r
+ int hfta_nbr = split_queries.size()-1;\r
+\r
+ hfta_list.push_back(split_queries[hfta_nbr]);\r
+\r
+// report on generated query names\r
+ if(output_query_names){\r
+ string hfta_name =split_queries[hfta_nbr]->query_name;\r
+ fprintf(query_name_output,"%s H\n",hfta_name.c_str());\r
+ for(l=0;l<hfta_nbr;++l){\r
+ string lfta_name =split_queries[l]->query_name;\r
+ fprintf(query_name_output,"%s L\n",lfta_name.c_str());\r
+ }\r
+ }\r
+// else{\r
+// fprintf(stderr,"query names are ");\r
+// for(l=0;l<hfta_nbr;++l){\r
+// if(l>0) fprintf(stderr,",");\r
+// string fta_name =split_queries[l]->query_name;\r
+// fprintf(stderr," %s",fta_name.c_str());\r
+// }\r
+// fprintf(stderr,"\n");\r
+// }\r
+ }\r
+\r
+ }else{\r
+ fprintf(stderr,"ERROR, query in file %s has a bug.\n",qnodes[i]->file.c_str());\r
+ fprintf(stderr,"%s\n",rootq->get_error_str().c_str());\r
+ exit(1);\r
+ }\r
+ }\r
+}\r
+\r
+\r
+//-----------------------------------------------------------------\r
+// Compute and propagate the SE in PROTOCOL fields compute a field.\r
+//-----------------------------------------------------------------\r
+\r
+for(i=0;i<lfta_list.size();i++){\r
+ lfta_list[i]->generate_protocol_se(sq_map, Schema);\r
+ sq_map[lfta_list[i]->get_sq_name()] = lfta_list[i];\r
+}\r
+for(i=0;i<hfta_list.size();i++){\r
+ hfta_list[i]->generate_protocol_se(sq_map, Schema);\r
+ sq_map[hfta_list[i]->get_sq_name()] = hfta_list[i];\r
+}\r
+\r
+\r
+\r
+//------------------------------------------------------------------------\r
+// Perform individual FTA optimizations\r
+//-----------------------------------------------------------------------\r
+\r
+if (partitioned_mode) {\r
+\r
+ // open partition definition file\r
+ string part_fname = config_dir_path + "partition.txt";\r
+\r
+ FILE* partfd = fopen(part_fname.c_str(), "r");\r
+ if (!partfd) {\r
+ fprintf(stderr,"ERROR, unable to open partition definition file %s\n",part_fname.c_str());\r
+ exit(1);\r
+ }\r
+ PartnParser_setfileinput(partfd);\r
+ if (PartnParserparse()) {\r
+ fprintf(stderr,"ERROR, unable to parse partition definition file %s\n",part_fname.c_str());\r
+ exit(1);\r
+ }\r
+ fclose(partfd);\r
+}\r
+\r
+\r
+print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);\r
+\r
+int num_hfta = hfta_list.size();\r
+for(i=0; i < hfta_list.size(); ++i){\r
+ hfta_list[i]->optimize(hfta_list, lfta_names, interface_names, machine_names, Ext_fcns, Schema, ifaces_db, partn_parse_result);\r
+}\r
+\r
+// Add all new hftas to schema\r
+for(i=num_hfta; i < hfta_list.size(); ++i){\r
+ table_def *td = hfta_list[i]->get_output_tabledef();\r
+ Schema->append_table(td);\r
+}\r
+\r
+print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);\r
+\r
+\r
+\r
+//------------------------------------------------------------------------\r
+// Do global (cross-fta) optimization\r
+//-----------------------------------------------------------------------\r
+\r
+\r
+\r
+\r
+\r
+\r
+set<string> extra_external_libs;\r
+\r
+for(i=0;i<hfta_list.size();++i){ // query also has an HFTA component\r
+\r
+ if(! debug_only){\r
+// build hfta file name, create output\r
+ if(numeric_hfta_flname){\r
+ sprintf(tmpstr,"hfta_%d",hfta_count);\r
+ hfta_names.push_back(tmpstr);\r
+ sprintf(tmpstr,"hfta_%d.cc",hfta_count);\r
+ }else{\r
+ sprintf(tmpstr,"hfta_%s",hfta_list[i]->query_name.c_str());\r
+ hfta_names.push_back(tmpstr);\r
+ sprintf(tmpstr,"hfta_%s.cc",hfta_list[i]->query_name.c_str());\r
+ }\r
+ FILE *hfta_fl = fopen(tmpstr,"w");\r
+ if(hfta_fl == NULL){\r
+ fprintf(stderr,"ERROR can't open fta output file %s\n",tmpstr);\r
+ exit(1);\r
+ }\r
+ fprintf(hfta_fl,"%s\n\n",hfta_list[i]->generate_hfta(Schema, Ext_fcns, opviews, distributed_mode).c_str());\r
+\r
+// If there is a field verifier, warn about\r
+// lack of compatability\r
+// NOTE : this code assumes that visible non-lfta queries\r
+// are those at the root of a stream query.\r
+ string hfta_comment;\r
+ string hfta_title;\r
+ string hfta_namespace;\r
+ if(hfta_list[i]->defines.count("comment")>0)\r
+ hfta_comment = hfta_list[i]->defines["comment"];\r
+ if(hfta_list[i]->defines.count("Comment")>0)\r
+ hfta_comment = hfta_list[i]->defines["Comment"];\r
+ if(hfta_list[i]->defines.count("COMMENT")>0)\r
+ hfta_comment = hfta_list[i]->defines["COMMENT"];\r
+ if(hfta_list[i]->defines.count("title")>0)\r
+ hfta_title = hfta_list[i]->defines["title"];\r
+ if(hfta_list[i]->defines.count("Title")>0)\r
+ hfta_title = hfta_list[i]->defines["Title"];\r
+ if(hfta_list[i]->defines.count("TITLE")>0)\r
+ hfta_title = hfta_list[i]->defines["TITLE"];\r
+ if(hfta_list[i]->defines.count("namespace")>0)\r
+ hfta_namespace = hfta_list[i]->defines["namespace"];\r
+ if(hfta_list[i]->defines.count("Namespace")>0)\r
+ hfta_namespace = hfta_list[i]->defines["Namespace"];\r
+ if(hfta_list[i]->defines.count("Namespace")>0)\r
+ hfta_namespace = hfta_list[i]->defines["Namespace"];\r
+\r
+ if(field_verifier != NULL){\r
+ string warning_str;\r
+ if(hfta_comment == "")\r
+ warning_str += "\tcomment not found.\n";\r
+ if(hfta_title == "")\r
+ warning_str += "\ttitle not found.\n";\r
+ if(hfta_namespace == "")\r
+ warning_str += "\tnamespace not found.\n";\r
+\r
+ vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();\r
+ int fi;\r
+ for(fi=0;fi<flds.size();fi++){\r
+ field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);\r
+ }\r
+ if(warning_str != "")\r
+ fprintf(stderr,"Warning, in HFTA stream %s:\n%s",\r
+ hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());\r
+ }\r
+\r
+ fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());\r
+ if(hfta_comment != "")\r
+ fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());\r
+ if(hfta_title != "")\r
+ fprintf(qtree_output,"\t\t<Title value='%s' />\n",hfta_title.c_str());\r
+ if(hfta_namespace != "")\r
+ fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",hfta_namespace.c_str());\r
+ fprintf(qtree_output,"\t\t<FileName value='%s' />\n",tmpstr);\r
+ fprintf(qtree_output,"\t\t<Rate value='100' />\n");\r
+\r
+// write info about fields to qtree.xml\r
+ vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();\r
+ int fi;\r
+ for(fi=0;fi<flds.size();fi++){\r
+ fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());\r
+ if(flds[fi]->get_modifier_list()->size()){\r
+ fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());\r
+ }\r
+ fprintf(qtree_output," />\n");\r
+ }\r
+\r
+ // extract liveness timeout from query definition\r
+ int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());\r
+ if (!liveness_timeout) {\r
+// fprintf(stderr, "WARNING: No liveness timeout specified for hfta %s, using default value of %d\n",\r
+// hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);\r
+ liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;\r
+ }\r
+ fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", liveness_timeout);\r
+\r
+ vector<tablevar_t *> tmp_tv = hfta_list[i]->get_input_tables();\r
+ int itv;\r
+ for(itv=0;itv<tmp_tv.size();++itv){\r
+ fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",tmp_tv[itv]->get_schema_name().c_str());\r
+ }\r
+ string ifrs = hfta_list[i]->collect_refd_ifaces();\r
+ if(ifrs != ""){\r
+ fprintf(qtree_output,"\t\t<Ifaces_referenced value='%s'/>\n",ifrs.c_str());\r
+ }\r
+ fprintf(qtree_output,"\t</HFTA>\n");\r
+\r
+ fclose(hfta_fl);\r
+ }else{\r
+// debug only -- do code generation to catch generation-time errors.\r
+ hfta_list[i]->generate_hfta(Schema, Ext_fcns,opviews, distributed_mode);\r
+ }\r
+\r
+ hfta_count++; // for hfta file names with numeric suffixes\r
+\r
+ hfta_list[i]->get_external_libs(extra_external_libs);\r
+\r
+ }\r
+\r
+string ext_lib_string;\r
+set<string>::iterator ssi_el;\r
+for(ssi_el=extra_external_libs.begin();ssi_el!=extra_external_libs.end();++ssi_el)\r
+ ext_lib_string += (*ssi_el)+" ";\r
+\r
+\r
+\r
+// Report on the set of operator views\r
+ for(i=0;i<opviews.size();++i){\r
+ opview_entry *opve = opviews.get_entry(i);\r
+ fprintf(qtree_output,"\t<UDOP name='%s' >\n",opve->view_name.c_str());\r
+ fprintf(qtree_output,"\t\t<FileName value='%s' />\n",opve->exec_fl.c_str());\r
+ fprintf(qtree_output,"\t\t<Parent value='%s' />\n",opve->root_name.c_str());\r
+ fprintf(qtree_output,"\t\t<Alias value='%s' />\n",opve->udop_alias.c_str());\r
+ fprintf(qtree_output,"\t\t<Rate value='100' />\n");\r
+\r
+ if (!opve->liveness_timeout) {\r
+// fprintf(stderr, "WARNING: No liveness timeout specified for view %s, using default value of %d\n",\r
+// opve->view_name.c_str(), DEFAULT_UDOP_LIVENESS_TIMEOUT);\r
+ opve->liveness_timeout = DEFAULT_UDOP_LIVENESS_TIMEOUT;\r
+ }\r
+ fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", opve->liveness_timeout);\r
+ int j;\r
+ for(j=0;j<opve->subq_names.size();j++)\r
+ fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",opve->subq_names[j].c_str());\r
+ fprintf(qtree_output,"\t</UDOP>\n");\r
+ }\r
+\r
+\r
+//-----------------------------------------------------------------\r
+\r
+// Create interface-specific meta code files.\r
+// first, open and parse the interface resources file.\r
+ ifaces_db = new ifq_t();\r
+ ierr = "";\r
+ if(ifaces_db->load_ifaces(ifx_fname,use_live_hosts_file,distributed_mode,ierr)){\r
+ fprintf(stderr,"ERROR, can't load interface resource file %s :\n%s",\r
+ ifx_fname.c_str(), ierr.c_str());\r
+ exit(1);\r
+ }\r
+\r
+ map<string, vector<stream_query *> >::iterator svsi;\r
+ for(svsi=lfta_mach_lists.begin(); svsi!=lfta_mach_lists.end(); ++svsi){\r
+ string lmach = (*svsi).first;\r
+\r
+ // For this machine, create a set of lftas per interface.\r
+ vector<stream_query *> mach_lftas = (*svsi).second;\r
+ map<string, vector<stream_query *> > lfta_iface_lists;\r
+ int li;\r
+ for(li=0;li<mach_lftas.size();++li){\r
+ vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();\r
+ string lfta_iface = tvec[0]->get_interface();\r
+ lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);\r
+ }\r
+\r
+ map<string, vector<stream_query *> >::iterator lsvsi;\r
+ for(lsvsi=lfta_iface_lists.begin(); lsvsi!=lfta_iface_lists.end(); ++lsvsi){\r
+ int erri;\r
+ string liface = (*lsvsi).first;\r
+ vector<stream_query *> iface_lftas = (*lsvsi).second;\r
+ vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);\r
+ if(iface_codegen_type.size()){\r
+ nic_property *nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);\r
+ if(!nicprop){\r
+ fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());\r
+ exit(1);\r
+ }\r
+ string mcs = generate_nic_code(iface_lftas, nicprop);\r
+ string mcf_flnm;\r
+ if(lmach != "")\r
+ mcf_flnm = lmach + "_"+liface+".mcf";\r
+ else\r
+ mcf_flnm = hostname + "_"+liface+".mcf";\r
+ FILE *mcf_fl ;\r
+ if((mcf_fl = fopen(mcf_flnm.c_str(),"w")) == NULL){\r
+ fprintf(stderr,"Can't open output file %s\n%s\n",mcf_flnm.c_str(),strerror(errno));\r
+ exit(1);\r
+ }\r
+ fprintf(mcf_fl,"%s",mcs.c_str());\r
+ fclose(mcf_fl);\r
+//printf("mcs of machine %s, iface %s of type %s is \n%s\n",\r
+//lmach.c_str(), liface.c_str(),iface_codegen_type[0].c_str(), mcs.c_str());\r
+ }\r
+ }\r
+\r
+\r
+ }\r
+\r
+\r
+\r
+//-----------------------------------------------------------------\r
+\r
+\r
+// Find common filter predicates in the LFTAs.\r
+// in addition generate structs to store the temporal attributes unpacked by prefilter\r
+ \r
+ map<string, vector<stream_query *> >::iterator ssqi;\r
+ for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){\r
+\r
+ string lmach = (*ssqi).first;\r
+ bool packed_return = false;\r
+ int li, erri;\r
+\r
+\r
+// The LFTAs of this machine.\r
+ vector<stream_query *> mach_lftas = (*ssqi).second;\r
+// break up on a per-interface basis.\r
+ map<string, vector<stream_query *> > lfta_iface_lists;\r
+ map<string, vector<int> > lfta_iface_qname_ix; // need the query name\r
+ // for fta_init\r
+ for(li=0;li<mach_lftas.size();++li){\r
+ vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();\r
+ string lfta_iface = tvec[0]->get_interface();\r
+ lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);\r
+ lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);\r
+ }\r
+\r
+\r
+// Are the return values "packed"?\r
+// This should be done on a per-interface basis.\r
+// But this is defunct code for gs-lite\r
+ for(li=0;li<mach_lftas.size();++li){\r
+ vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();\r
+ string liface = tvec[0]->get_interface();\r
+ vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);\r
+ if(iface_codegen_type.size()){\r
+ if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){\r
+ packed_return = true;\r
+ }\r
+ }\r
+ }\r
+\r
+\r
+// Separate lftas by interface, collect results on a per-interface basis.\r
+\r
+ vector<cnf_set *> no_preds; // fallback if there is no prefilter\r
+ map<string, vector<cnf_set *> > prefilter_preds;\r
+ set<unsigned int> pred_ids; // this can be global for all interfaces\r
+ for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){\r
+ string liface = (*mvsi).first;\r
+ vector<cnf_set *> empty_list;\r
+ prefilter_preds[liface] = empty_list;\r
+ if(! packed_return){\r
+ get_common_lfta_filter(lfta_iface_lists[liface], Schema,Ext_fcns, prefilter_preds[liface], pred_ids);\r
+ }\r
+\r
+// get NIC capabilities. (Is this needed?)\r
+ nic_property *nicprop = NULL;\r
+ vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);\r
+ if(iface_codegen_type.size()){\r
+ nicprop = npdb->get_nic_property(iface_codegen_type[0],erri);\r
+ if(!nicprop){\r
+ fprintf(stderr,"ERROR cannot load nic properties %s for interface %s of machine %s for lfta codegen\n",iface_codegen_type[0].c_str(), liface.c_str(), lmach.c_str());\r
+ exit(1);\r
+ }\r
+ }\r
+ }\r
+\r
+\r
+// Now that we know the prefilter preds, generate the lfta code.\r
+// Do this for all lftas in this machine.\r
+ for(li=0;li<mach_lftas.size();++li){\r
+ set<unsigned int> subsumed_preds;\r
+ set<unsigned int>::iterator sii;\r
+#ifdef PREFILTER_OK\r
+ for(sii=pred_ids.begin();sii!=pred_ids.end();++sii){\r
+ int pid = (*sii);\r
+ if((pid>>16) == li){\r
+ subsumed_preds.insert(pid & 0xffff);\r
+ }\r
+ }\r
+#endif\r
+ string lfta_schema_str = mach_lftas[li]->make_schema();\r
+ string lfta_schema_embed = make_C_embedded_string(lfta_schema_str);\r
+ nic_property *nicprop = NULL; // no NIC properties?\r
+ lfta_val[lmach] += generate_lfta_block(mach_lftas[li]->query_plan[0], Schema, mach_lftas[li]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop,subsumed_preds);\r
+ }\r
+\r
+\r
+// generate structs to store the temporal attributes\r
+// unpacked by prefilter\r
+ col_id_set temp_cids;\r
+ get_prefilter_temporal_cids(lfta_mach_lists[lmach], temp_cids);\r
+ lfta_prefilter_val[lmach] = generate_lfta_prefilter_struct(temp_cids, Schema);\r
+\r
+// Compute the lfta bit signatures and the lfta colrefs\r
+// do this on a per-interface basis\r
+#ifdef PREFILTER_OK\r
+ lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";\r
+#endif\r
+ map<string, vector<long long int> > lfta_sigs; // used again later\r
+ for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){\r
+ string liface = (*mvsi).first;\r
+ vector<long long int> empty_list;\r
+ lfta_sigs[liface] = empty_list;\r
+\r
+ vector<col_id_set> lfta_cols;\r
+ vector<int> lfta_snap_length;\r
+ for(li=0;li<lfta_iface_lists[liface].size();++li){\r
+ unsigned long long int mask=0, bpos=1;\r
+ int f_pos;\r
+ for(f_pos=0;f_pos<prefilter_preds[liface].size();++f_pos){\r
+ if(prefilter_preds[liface][f_pos]->lfta_id.count(li))\r
+ mask |= bpos;\r
+ bpos = bpos << 1;\r
+ }\r
+ lfta_sigs[liface].push_back(mask);\r
+ lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));\r
+ lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));\r
+ }\r
+\r
+//for(li=0;li<mach_lftas.size();++li){\r
+//printf("lfta %d, msak is %llu\n",li,lfta_sigs[li]);\r
+//col_id_set::iterator tcisi;\r
+//for(tcisi=lfta_cols[li].begin(); tcisi!=lfta_cols[li].end();++tcisi){\r
+//printf("\t%s %d (%d)\n",(*tcisi).field.c_str(),(*tcisi).schema_ref, (*tcisi).tblvar_ref);\r
+//}\r
+//}\r
+\r
+\r
+// generate the prefilter\r
+// Do this on a per-interface basis, except for the #define\r
+#ifdef PREFILTER_OK\r
+// lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";\r
+ lfta_val[lmach] += generate_lfta_prefilter(prefilter_preds[liface], temp_cids, Schema, Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);\r
+#else\r
+ lfta_val[lmach] += generate_lfta_prefilter(no_preds, temp_cids, Schema,Ext_fcns, lfta_cols, lfta_sigs[liface], lfta_snap_length, liface);\r
+\r
+#endif\r
+ }\r
+\r
+// Generate interface parameter lookup function\r
+ lfta_val[lmach] += "// lookup interface properties by name\n";\r
+ lfta_val[lmach] += "// mulitple values of given property returned in the same string separated by commas\n";\r
+ lfta_val[lmach] += "// returns NULL if given property does not exist\n";\r
+ lfta_val[lmach] += "gs_sp_t get_iface_properties (const gs_sp_t iface_name, const gs_sp_t property_name) {\n";\r
+\r
+// collect a lit of interface names used by queries running on this host\r
+ set<std::string> iface_names;\r
+ for(i=0;i<mach_query_names[lmach].size();i++){\r
+ int mi = mach_query_names[lmach][i];\r
+ stream_query *lfta_sq = lfta_mach_lists[lmach][i];\r
+\r
+ if(interface_names[mi]=="")\r
+ iface_names.insert("DEFAULTDEV");\r
+ else\r
+ iface_names.insert(interface_names[mi]);\r
+ }\r
+\r
+// generate interface property lookup code for every interface\r
+ set<std::string>::iterator sir;\r
+ for (sir = iface_names.begin(); sir != iface_names.end(); sir++) {\r
+ if (sir == iface_names.begin())\r
+ lfta_val[lmach] += "\tif (!strcmp(iface_name, \"" + *sir + "\")) {\n";\r
+ else\r
+ lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";\r
+\r
+ // iterate through interface properties\r
+ vector<string> iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);\r
+ if (erri) {\r
+ fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());\r
+ exit(1);\r
+ }\r
+ if (iface_properties.empty())\r
+ lfta_val[lmach] += "\t\treturn NULL;\n";\r
+ else {\r
+ for (int i = 0; i < iface_properties.size(); ++i) {\r
+ if (i == 0)\r
+ lfta_val[lmach] += "\t\tif (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";\r
+ else\r
+ lfta_val[lmach] += "\t\t} else if (!strcmp(property_name, \"" + iface_properties[i] + "\")) {\n";\r
+\r
+ // combine all values for the interface property using comma separator\r
+ vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);\r
+ for (int j = 0; j < vals.size(); ++j) {\r
+ lfta_val[lmach] += "\t\t\treturn \"" + vals[j];\r
+ if (j != vals.size()-1)\r
+ lfta_val[lmach] += ",";\r
+ }\r
+ lfta_val[lmach] += "\";\n";\r
+ }\r
+ lfta_val[lmach] += "\t\t} else\n";\r
+ lfta_val[lmach] += "\t\t\treturn NULL;\n";\r
+ }\r
+ }\r
+ lfta_val[lmach] += "\t} else\n";\r
+ lfta_val[lmach] += "\t\treturn NULL;\n";\r
+ lfta_val[lmach] += "}\n\n";\r
+\r
+\r
+// Generate a full list of FTAs for clearinghouse reference\r
+ lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";\r
+ lfta_val[lmach] += "const gs_sp_t fta_names[] = {";\r
+\r
+ for (i = 0; i < query_names.size(); ++i) {\r
+ if (i)\r
+ lfta_val[lmach] += ", ";\r
+ lfta_val[lmach] += "\"" + query_names[i] + "\"";\r
+ }\r
+ for (i = 0; i < hfta_list.size(); ++i) {\r
+ lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";\r
+ }\r
+ lfta_val[lmach] += ", NULL};\n\n";\r
+\r
+\r
+// Add the initialization function to lfta.c\r
+// Change to accept the interface name, and \r
+// set the prefilter function accordingly.\r
+// see the example in demo/err2\r
+ lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";\r
+\r
+// for(i=0;i<mach_query_names[lmach].size();i++)\r
+// int mi = mach_query_names[lmach][i];\r
+// stream_query *lfta_sq = lfta_mach_lists[lmach][i];\r
+\r
+ for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){\r
+ string liface = (*mvsi).first;\r
+ vector<stream_query *> lfta_list = (*mvsi).second;\r
+ for(i=0;i<lfta_list.size();i++){\r
+ stream_query *lfta_sq = lfta_list[i];\r
+ int mi = lfta_iface_qname_ix[liface][i];\r
+ \r
+ fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);\r
+\r
+ string this_iface = "DEFAULTDEV";\r
+ if(interface_names[mi]!="")\r
+ this_iface = '"'+interface_names[mi]+'"';\r
+ lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";\r
+ lfta_val[lmach] += "\t\tfta_register(\""+query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";\r
+// if(interface_names[mi]=="")\r
+// lfta_val[lmach]+="DEFAULTDEV";\r
+// else\r
+// lfta_val[lmach]+='"'+interface_names[mi]+'"';\r
+ lfta_val[lmach] += this_iface;\r
+\r
+\r
+ lfta_val[lmach] += ", "+generate_alloc_name(query_names[mi])\r
+ +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(query_names[mi])\r
+ +"\n#endif\n";\r
+ sprintf(tmpstr,",%d",snap_lengths[mi]);\r
+ lfta_val[lmach] += tmpstr;\r
+\r
+// unsigned long long int mask=0, bpos=1;\r
+// int f_pos;\r
+// for(f_pos=0;f_pos<prefilter_preds.size();++f_pos){\r
+// if(prefilter_preds[f_pos]->lfta_id.count(i))\r
+// mask |= bpos;\r
+// bpos = bpos << 1;\r
+// }\r
+\r
+#ifdef PREFILTER_OK\r
+// sprintf(tmpstr,",%lluull",mask);\r
+ sprintf(tmpstr,",%lluull",lfta_sigs[liface][i]);\r
+ lfta_val[lmach]+=tmpstr;\r
+#else\r
+ lfta_val[lmach] += ",0ull";\r
+#endif\r
+\r
+ lfta_val[lmach] += ");\n";\r
+\r
+\r
+\r
+// End of lfta prefilter stuff\r
+// --------------------------------------------------\r
+\r
+// If there is a field verifier, warn about\r
+// lack of compatability\r
+ string lfta_comment;\r
+ string lfta_title;\r
+ string lfta_namespace;\r
+ map<string,string> ldefs = lfta_sq->query_plan[0]->get_definitions();\r
+ if(ldefs.count("comment")>0)\r
+ lfta_comment = lfta_sq->defines["comment"];\r
+ if(ldefs.count("Comment")>0)\r
+ lfta_comment = lfta_sq->defines["Comment"];\r
+ if(ldefs.count("COMMENT")>0)\r
+ lfta_comment = lfta_sq->defines["COMMENT"];\r
+ if(ldefs.count("title")>0)\r
+ lfta_title = lfta_sq->defines["title"];\r
+ if(ldefs.count("Title")>0)\r
+ lfta_title = lfta_sq->defines["Title"];\r
+ if(ldefs.count("TITLE")>0)\r
+ lfta_title = lfta_sq->defines["TITLE"];\r
+ if(ldefs.count("NAMESPACE")>0)\r
+ lfta_namespace = lfta_sq->defines["NAMESPACE"];\r
+ if(ldefs.count("Namespace")>0)\r
+ lfta_namespace = lfta_sq->defines["Namespace"];\r
+ if(ldefs.count("namespace")>0)\r
+ lfta_namespace = lfta_sq->defines["namespace"];\r
+\r
+ string lfta_ht_size;\r
+ if(lfta_sq->query_plan[0]->node_type()== "sgah_qpn" || lfta_sq->query_plan[0]->node_type()== "rsgah_qpn")\r
+ lfta_ht_size = int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);\r
+ if(ldefs.count("aggregate_slots")>0){\r
+ lfta_ht_size = ldefs["aggregate_slots"];\r
+ }\r
+\r
+// NOTE : I'm assuming that visible lftas do not start with _fta.\r
+// -- will fail for non-visible simple selection queries.\r
+ if(field_verifier != NULL && query_names[mi].substr(0,1) != "_"){\r
+ string warning_str;\r
+ if(lfta_comment == "")\r
+ warning_str += "\tcomment not found.\n";\r
+ if(lfta_title == "")\r
+ warning_str += "\ttitle not found.\n";\r
+ if(lfta_namespace == "")\r
+ warning_str += "\tnamespace not found.\n";\r
+\r
+ vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();\r
+ int fi;\r
+ for(fi=0;fi<flds.size();fi++){\r
+ field_verifier->verify_field(flds[fi]->get_name(),flds[fi]->get_type(),warning_str);\r
+ }\r
+ if(warning_str != "")\r
+ fprintf(stderr,"Warning, in LFTA stream %s:\n%s",\r
+ query_names[mi].c_str(),warning_str.c_str());\r
+ }\r
+\r
+\r
+// Create qtree output\r
+ fprintf(qtree_output,"\t<LFTA name='%s' >\n",query_names[mi].c_str());\r
+ if(lfta_comment != "")\r
+ fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());\r
+ if(lfta_title != "")\r
+ fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());\r
+ if(lfta_namespace != "")\r
+ fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());\r
+ if(lfta_ht_size != "")\r
+ fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());\r
+ if(lmach != "")\r
+ fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());\r
+ else\r
+ fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());\r
+ fprintf(qtree_output,"\t\t<Interface value='%s' />\n",interface_names[mi].c_str());\r
+ fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());\r
+ fprintf(qtree_output,"\t\t<Rate value='100' />\n");\r
+ fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);\r
+// write info about fields to qtree.xml\r
+ vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();\r
+ int fi;\r
+ for(fi=0;fi<flds.size();fi++){\r
+ fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());\r
+ if(flds[fi]->get_modifier_list()->size()){\r
+ fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());\r
+ }\r
+ fprintf(qtree_output," />\n");\r
+ }\r
+ fprintf(qtree_output,"\t</LFTA>\n");\r
+\r
+\r
+ }\r
+ }\r
+\r
+ for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){\r
+ string liface = (*mvsi).first;\r
+ lfta_val[lmach] += \r
+" if (!strcmp(device, \""+liface+"\")) \n"\r
+" lfta_prefilter = &lfta_prefilter_"+liface+"; \n"\r
+;\r
+ }\r
+ lfta_val[lmach] += \r
+" if(lfta_prefilter == NULL){\n"\r
+" fprintf(stderr, \"Error, interface %s not found in fta_init.\\n\",device);\n"\r
+" exit(1);\n"\r
+" }\n"\r
+;\r
+\r
+\r
+\r
+ lfta_val[lmach] += "}\n\n";\r
+\r
+ if(!(debug_only || hfta_only) ){\r
+ string lfta_flnm;\r
+ if(lmach != "")\r
+ lfta_flnm = lmach + "_lfta.c";\r
+ else\r
+ lfta_flnm = hostname + "_lfta.c";\r
+ if((lfta_out = fopen(lfta_flnm.c_str(),"w")) == NULL){\r
+ fprintf(stderr,"Can't open output file %s\n%s\n","lfta.c",strerror(errno));\r
+ exit(1);\r
+ }\r
+ fprintf(lfta_out,"%s",lfta_header.c_str());\r
+ fprintf(lfta_out,"%s",lfta_prefilter_val[lmach].c_str());\r
+ fprintf(lfta_out,"%s",lfta_val[lmach].c_str());\r
+ fclose(lfta_out);\r
+ }\r
+ }\r
+\r
+// Say what are the operators which must execute\r
+ if(opviews.size()>0)\r
+ fprintf(stderr,"The queries use the following external operators:\n");\r
+ for(i=0;i<opviews.size();++i){\r
+ opview_entry *opv = opviews.get_entry(i);\r
+ fprintf(stderr,"\t%s\n",opv->view_name.c_str());\r
+ }\r
+\r
+ if(create_makefile)\r
+ generate_makefile(input_file_names, nfiles, hfta_names, opviews,\r
+ machine_names, schema_file_name,\r
+ interface_names,\r
+ ifaces_db, config_dir_path,use_pads,ext_lib_string, rts_hload);\r
+\r
+\r
+ fprintf(qtree_output,"</QueryNodes>\n");\r
+\r
+ return(0);\r
+}\r
+\r
+////////////////////////////////////////////////////////////\r
+\r
+void generate_makefile(vector<string> &input_file_names, int nfiles,\r
+ vector<string> &hfta_names, opview_set &opviews,\r
+ vector<string> &machine_names,\r
+ string schema_file_name,\r
+ vector<string> &interface_names,\r
+ ifq_t *ifdb, string &config_dir_path,\r
+ bool use_pads,\r
+ string extra_libs,\r
+ map<string, vector<int> > &rts_hload\r
+ ){\r
+ int i,j;\r
+\r
+ if(config_dir_path != ""){\r
+ config_dir_path = "-C "+config_dir_path;\r
+ }\r
+\r
+ struct stat sb;\r
+ bool libz_exists = stat((root_path+"/lib/libz.a").c_str(),&sb) >= 0;\r
+ bool libast_exists = stat((root_path+"/lib/libast.a").c_str(),&sb) >= 0;\r
+\r
+// if(libz_exists && !libast_exists){\r
+// fprintf(stderr,"Configuration ERROR: ../../lib/libz.a exists but ../../lib/libast.a dows not.\n");\r
+// exit(1);\r
+// }\r
+\r
+// Get set of operator executable files to run\r
+ set<string> op_fls;\r
+ set<string>::iterator ssi;\r
+ for(i=0;i<opviews.size();++i){\r
+ opview_entry *opv = opviews.get_entry(i);\r
+ if(opv->exec_fl != "") op_fls.insert(opv->exec_fl);\r
+ }\r
+\r
+ FILE *outfl = fopen("Makefile", "w");\r
+ if(outfl==NULL){\r
+ fprintf(stderr,"Can't open Makefile for write, exiting.\n");\r
+ exit(0);\r
+ }\r
+\r
+ fputs(\r
+("CPP= g++ -O3 -g -I "+root_path+"/include -I "+root_path+"/include/hfta\n"\r
+"CC= gcc -O3 -g -I . -I "+root_path+"/include -I "+root_path+"/include/lfta"\r
+).c_str(), outfl\r
+);\r
+ if(generate_stats)\r
+ fprintf(outfl," -DLFTA_STATS");\r
+\r
+// Gather the set of interfaces\r
+// Also, gather "base interface names" for use in computing\r
+// the hash splitting to virtual interfaces.\r
+// TODO : must update to hanndle machines\r
+ set<string> ifaces;\r
+ set<string> base_vifaces; // base interfaces of virtual interfaces\r
+ map<string, string> ifmachines;\r
+ map<string, string> ifattrs;\r
+ for(i=0;i<interface_names.size();++i){\r
+ ifaces.insert(interface_names[i]);\r
+ ifmachines[interface_names[i]] = machine_names[i];\r
+\r
+ size_t Xpos = interface_names[i].find_last_of("X");\r
+ if(Xpos!=string::npos){\r
+ string iface = interface_names[i].substr(0,Xpos);\r
+ base_vifaces.insert(iface);\r
+ }\r
+ // get interface attributes and add them to the list\r
+ }\r
+\r
+// Do we need to include protobuf libraries?\r
+ bool use_proto = false;\r
+ int erri;\r
+ string err_str;\r
+ for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){\r
+ string ifnm = (*ssi);\r
+ vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);\r
+ for(int ift_i=0;ift_i<ift.size();ift_i++){\r
+ if(ift[ift_i]=="PROTO"){\r
+ use_proto = true;\r
+ }\r
+ }\r
+ }\r
+\r
+ fprintf(outfl,\r
+"\n"\r
+"\n"\r
+"all: rts");\r
+ for(i=0;i<hfta_names.size();++i)\r
+ fprintf(outfl," %s",hfta_names[i].c_str());\r
+ fputs(\r
+("\n"\r
+"\n"\r
+"rts: lfta.o "+root_path+"/lib/libgscphost.a "+root_path+"/lib/libgscplftaaux.a "+root_path+"/lib/libgscprts.a "+root_path+"/lib/libclearinghouse.a\n"\r
+"\tg++ -O3 -g -o rts lfta.o ").c_str(), outfl);\r
+ if(use_pads)\r
+ fprintf(outfl,"-L. ");\r
+ fputs(\r
+("-L"+root_path+"/lib -lgscplftaaux ").c_str(), outfl);\r
+ if(use_pads)\r
+ fprintf(outfl,"-lgscppads -lpads ");\r
+ fprintf(outfl,\r
+"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");\r
+ if(use_pads)\r
+ fprintf(outfl, " -lpz -lz -lbz ");\r
+ if(libz_exists && libast_exists)\r
+ fprintf(outfl," -last ");\r
+ if(use_pads)\r
+ fprintf(outfl, " -ldll -ldl ");\r
+ if(use_proto)\r
+ fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");\r
+ fprintf(outfl," -lgscpaux");\r
+#ifdef GCOV\r
+ fprintf(outfl," -fprofile-arcs");\r
+#endif\r
+ fprintf(outfl,\r
+"\n"\r
+"\n"\r
+"lfta.o: %s_lfta.c\n"\r
+"\t$(CC) -o lfta.o -c %s_lfta.c\n"\r
+"\n"\r
+"%s_lfta.c: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());\r
+ for(i=0;i<nfiles;++i)\r
+ fprintf(outfl," %s",input_file_names[i].c_str());\r
+ if(hostname == ""){\r
+ fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());\r
+ }else{\r
+ fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());\r
+ }\r
+ for(i=0;i<nfiles;++i)\r
+ fprintf(outfl," %s",input_file_names[i].c_str());\r
+ fprintf(outfl,"\n");\r
+\r
+ for(i=0;i<hfta_names.size();++i)\r
+ fprintf(outfl,\r
+("%s: %s.o\n"\r
+"\t$(CPP) -o %s %s.o -L"+root_path+"/lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscphftaaux -lgscphostaux -lm -lgscpaux -lclearinghouse -lresolv -lpthread -lgscpaux -lgscphftaaux -lgscpaux %s\n"\r
+"\n"\r
+"%s.o: %s.cc\n"\r
+"\t$(CPP) -o %s.o -c %s.cc\n"\r
+"\n"\r
+"\n").c_str(),\r
+ hfta_names[i].c_str(), hfta_names[i].c_str(),\r
+ hfta_names[i].c_str(), hfta_names[i].c_str(), extra_libs.c_str(),\r
+ hfta_names[i].c_str(), hfta_names[i].c_str(),\r
+ hfta_names[i].c_str(), hfta_names[i].c_str()\r
+ );\r
+\r
+ fprintf(outfl,\r
+("\n"\r
+"packet_schema.txt:\n"\r
+"\tln -s "+root_path+"/cfg/packet_schema.txt .\n"\r
+"\n"\r
+"external_fcns.def:\n"\r
+"\tln -s "+root_path+"/cfg/external_fcns.def .\n"\r
+"\n"\r
+"clean:\n"\r
+"\trm -rf core rts *.o %s_lfta.c external_fcns.def packet_schema.txt").c_str(),hostname.c_str());\r
+ for(i=0;i<hfta_names.size();++i)\r
+ fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());\r
+ fprintf(outfl,"\n");\r
+\r
+ fclose(outfl);\r
+\r
+\r
+\r
+// Gather the set of interfaces\r
+// TODO : must update to hanndle machines\r
+// TODO : lookup interface attributes and add them as a parameter to rts process\r
+ outfl = fopen("runit", "w");\r
+ if(outfl==NULL){\r
+ fprintf(stderr,"Can't open runit for write, exiting.\n");\r
+ exit(0);\r
+ }\r
+\r
+\r
+ fputs(\r
+("#!/bin/sh\n"\r
+"./stopit\n"\r
++root_path+"/bin/gshub.py> /dev/null 2>&1 &\n"\r
+"sleep 5\n"\r
+"if [ ! -f gshub.log ]\n"\r
+"then\n"\r
+"\techo \"Failed to start bin/gshub.py\"\n"\r
+"\texit -1\n"\r
+"fi\n"\r
+"ADDR=`cat gshub.log`\n"\r
+"ps opgid= $! >> gs.pids\n"\r
+"./rts $ADDR default ").c_str(), outfl);\r
+// int erri;\r
+// string err_str;\r
+ for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){\r
+ string ifnm = (*ssi);\r
+ fprintf(outfl, "%s ",ifnm.c_str());\r
+ vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);\r
+ for(j=0;j<ifv.size();++j)\r
+ fprintf(outfl, "%s ",ifv[j].c_str());\r
+ }\r
+ fprintf(outfl, " &\n");\r
+ fprintf(outfl, "echo $! >> gs.pids\n");\r
+ for(i=0;i<hfta_names.size();++i)\r
+ fprintf(outfl,"./%s $ADDR default &\n",hfta_names[i].c_str());\r
+\r
+ for(j=0;j<opviews.opview_list.size();++j){\r
+ fprintf(outfl,"%s/views/%s %s &\n",root_path.c_str(),opviews.opview_list[j]->exec_fl.c_str(), opviews.opview_list[j]->mangler.c_str());\r
+ }\r
+\r
+ fclose(outfl);\r
+ system("chmod +x runit");\r
+\r
+ outfl = fopen("stopit", "w");\r
+ if(outfl==NULL){\r
+ fprintf(stderr,"Can't open stopit for write, exiting.\n");\r
+ exit(0);\r
+ }\r
+\r
+ fprintf(outfl,"#!/bin/sh\n"\r
+"rm -f gshub.log\n"\r
+"if [ ! -f gs.pids ]\n"\r
+"then\n"\r
+"exit\n"\r
+"fi\n"\r
+"for pgid in `cat gs.pids`\n"\r
+"do\n"\r
+"kill -TERM -$pgid\n"\r
+"done\n"\r
+"sleep 1\n"\r
+"for pgid in `cat gs.pids`\n"\r
+"do\n"\r
+"kill -9 -$pgid\n"\r
+"done\n"\r
+"rm gs.pids\n");\r
+\r
+ fclose(outfl);\r
+ system("chmod +x stopit");\r
+\r
+//-----------------------------------------------\r
+\r
+/* For now disable support for virtual interfaces\r
+ outfl = fopen("set_vinterface_hash.bat", "w");\r
+ if(outfl==NULL){\r
+ fprintf(stderr,"Can't open set_vinterface_hash.bat for write, exiting.\n");\r
+ exit(0);\r
+ }\r
+\r
+// The format should be determined by an entry in the ifres.xml file,\r
+// but for now hardcode the only example I have.\r
+ for(ssi=base_vifaces.begin();ssi!=base_vifaces.end();++ssi){\r
+ if(rts_hload.count((*ssi))){\r
+ string iface_name = (*ssi);\r
+ string iface_number = "";\r
+ for(j=iface_name.size()-1; j>0 && iface_number == ""; j--){\r
+ if(isdigit(iface_name[j])){\r
+ iface_number = iface_name[j];\r
+ if(j>0 && isdigit(iface_name[j-1]))\r
+ iface_number = iface_name[j-1] + iface_number;\r
+ }\r
+ }\r
+\r
+ fprintf(outfl,"dagconfig -d%s -S hat_range=",iface_number.c_str());\r
+ vector<int> halloc = rts_hload[iface_name];\r
+ int prev_limit = 0;\r
+ for(j=0;j<halloc.size();++j){\r
+ if(j>0)\r
+ fprintf(outfl,":");\r
+ fprintf(outfl,"%d-%d",prev_limit,halloc[j]);\r
+ prev_limit = halloc[j];\r
+ }\r
+ fprintf(outfl,"\n");\r
+ }\r
+ }\r
+ fclose(outfl);\r
+ system("chmod +x set_vinterface_hash.bat");\r
+*/\r
+}\r
+\r
+// Code for implementing a local schema\r
+/*\r
+ table_list qpSchema;\r
+\r
+// Load the schemas of any LFTAs.\r
+ int l;\r
+ for(l=0;l<hfta_nbr;++l){\r
+ stream_query *sq0 = split_queries[l];\r
+ table_def *td = sq0->get_output_tabledef();\r
+ qpSchema.append_table(td);\r
+ }\r
+// load the schemas of any other ref'd tables.\r
+// (e.g., hftas)\r
+ vector<tablevar_t *> input_tbl_names = split_queries[hfta_nbr]->get_input_tables();\r
+ int ti;\r
+ for(ti=0;ti<input_tbl_names.size();++ti){\r
+ int tbl_ref = qpSchema.get_table_ref(input_tbl_names[ti]->get_schema_name());\r
+ if(tbl_ref < 0){\r
+ tbl_ref = Schema->get_table_ref(input_tbl_names[ti]->get_schema_name());\r
+ if(tbl_ref < 0){\r
+ fprintf(stderr,"ERROR file %s references table %s, which is not in the schema.\n",input_file_names[i].c_str(), (input_tbl_names[ti]->get_schema_name()).c_str());\r
+ exit(1);\r
+ }\r
+ qpSchema.append_table(Schema->get_table(tbl_ref));\r
+ }\r
+ }\r
+*/\r
+\r
+// Functions related to parsing.\r
+\r
+/*\r
+static int split_string(char *instr,char sep, char **words,int max_words){\r
+ char *loc;\r
+ char *str;\r
+ int nwords = 0;\r
+\r
+ str = instr;\r
+ words[nwords++] = str;\r
+ while( (loc = strchr(str,sep)) != NULL){\r
+ *loc = '\0';\r
+ str = loc+1;\r
+ if(nwords >= max_words){\r
+ fprintf(stderr,"Error in split_string, too many words discovered (max is %d)\n",max_words);\r
+ nwords = max_words-1;\r
+ }\r
+ words[nwords++] = str;\r
+ }\r
+\r
+ return(nwords);\r
+}\r
+\r
+*/\r
+\r