#include"xml_t.h"
#include"field_list.h"
+#include "gsconfig.h"
+
extern int xmlParserparse(void);
extern FILE *xmlParserin;
extern int xmlParserdebug;
set<int>::iterator si;
- vector<string> query_names; // for lfta.c registration
+ vector<string> registration_query_names; // for lfta.c registration
map<string, vector<int> > mach_query_names; // list queries of machine
vector<int> snap_lengths; // for lfta.c registration
+ vector<int> snap_position; // for lfta.c registration
vector<string> interface_names; // for lfta.c registration
vector<string> machine_names; // machine of interface
vector<bool> lfta_reuse_options; // for lfta.c registration
while(fgets(tmpstr,TMPSTRLEN,osp_in)){
o_lineno++;
int nflds = split_string(tmpstr,',',flds,MAXFLDS);
- if(nflds == 7){
+ if(tmpstr[0]!='\n' && tmpstr[0]!='\r' && tmpstr[0]!='\0' && tmpstr[0]!='#'){
+ if(nflds == 7){
// make operator type lowercase
- char *tmpc;
- for(tmpc=flds[1];*tmpc!='\0';++tmpc)
- *tmpc = tolower(*tmpc);
-
- ospec_str *tmp_ospec = new ospec_str();
- tmp_ospec->query = flds[0];
- tmp_ospec->operator_type = flds[1];
- tmp_ospec->operator_param = flds[2];
- tmp_ospec->output_directory = flds[3];
- tmp_ospec->bucketwidth = atoi(flds[4]);
- tmp_ospec->partitioning_flds = flds[5];
- tmp_ospec->n_partitions = atoi(flds[6]);
- qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
- output_specs.push_back(tmp_ospec);
- }else{
- fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
+ char *tmpc;
+ for(tmpc=flds[1];*tmpc!='\0';++tmpc)
+ *tmpc = tolower(*tmpc);
+
+ ospec_str *tmp_ospec = new ospec_str();
+ tmp_ospec->query = flds[0];
+ tmp_ospec->operator_type = flds[1];
+ tmp_ospec->operator_param = flds[2];
+ tmp_ospec->output_directory = flds[3];
+ tmp_ospec->bucketwidth = atoi(flds[4]);
+ tmp_ospec->partitioning_flds = flds[5];
+ tmp_ospec->n_partitions = atoi(flds[6]);
+ qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
+ output_specs.push_back(tmp_ospec);
+ }else{
+ fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
+ }
}
}
fclose(osp_in);
map<string, string> lfta_prefilter_val;
string lfta_header =
-"#include <limits.h>\n\n"
+"#include <limits.h>\n"
"#include \"rts.h\"\n"
"#include \"fta.h\"\n"
"#include \"lapp.h\"\n"
-"#include \"rts_udaf.h\"\n\n"
+"#include \"rts_udaf.h\"\n"
+"#include<stdio.h>\n"
+"#include <float.h>\n"
+"#include \"rdtsc.h\"\n"
+"#include \"watchlist.h\"\n\n"
+
;
// Get any locally defined parsing headers
glob_t glob_result;
memset(&glob_result, 0, sizeof(glob_result));
- // do the glob operation
+ // do the glob operation TODO should be from GSROOT
int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
if(return_value == 0){
+ lfta_header += "\n";
for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
char *flds[1000];
int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
- lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n\n";
+ lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n";
}
+ lfta_header += "\n";
}else{
fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
}
}
if(dangling_ospecs!=""){
fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
+ exit(1);
}
string dangling_par = "";
if(lfta_names.count(split_queries[l]->query_name) == 0){
// Grab the lfta for global optimization.
vector<tablevar_t *> tvec = split_queries[l]->query_plan[0]->get_input_tbls();
- string liface = tvec[0]->get_interface(); // iface queries have been resolved
- string lmach = tvec[0]->get_machine();
- string schema_name = tvec[0]->get_schema_name();
- int schema_ref = tvec[0]->get_schema_ref();
- if (lmach == "")
- lmach = hostname;
- interface_names.push_back(liface);
- machine_names.push_back(lmach);
+ string liface = "_local_";
+// string lmach = "";
+ string lmach = hostname;
+ if(tvec.size()>0){
+ liface = tvec[0]->get_interface(); // iface queries have been resolved
+ if(tvec[0]->get_machine() != ""){
+ lmach = tvec[0]->get_machine();
+ }else{
+ fprintf(stderr,"WARNING, lfta %s has empty machine name, using %s\n", split_queries[l]->query_plan[0]->node_name.c_str(), hostname.c_str());
+ }
+ } // else{
+ interface_names.push_back(liface);
+ machine_names.push_back(lmach);
+// }
+
+ vector<predicate_t *> schemaid_preds;
+ for(int irv=0;irv<tvec.size();++irv){
+
+ string schema_name = tvec[irv]->get_schema_name();
+ string rvar_name = tvec[irv]->get_var_name();
+ int schema_ref = tvec[irv]->get_schema_ref();
+ if (lmach == "")
+ lmach = hostname;
+// interface_names.push_back(liface);
+// machine_names.push_back(lmach);
+
//printf("Machine is %s\n",lmach.c_str());
// Check if a schemaId constraint needs to be inserted.
- if(schema_ref<0){ // can result from some kinds of splits
- schema_ref = Schema->get_table_ref(schema_name);
- }
- int schema_id = Schema->get_schema_id(schema_ref); // id associated with PROTOCOL
- int errnum = 0;
- string if_error;
- iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
- if(iface==NULL){
- fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
- exit(1);
- }
- if(iface->has_multiple_schemas()){
- if(schema_id<0){ // invalid schema_id
- fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
- exit(1);
+ if(schema_ref<0){ // can result from some kinds of splits
+ schema_ref = Schema->get_table_ref(schema_name);
}
- vector<string> iface_schemas = iface->get_property("Schemas");
- if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
- fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
+ int schema_id = Schema->get_schema_id(schema_ref); // id associated with PROTOCOL
+ int errnum = 0;
+ string if_error;
+ iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
+ if(iface==NULL){
+ fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
exit(1);
- }
+ }
+
+
+ if(tvec[irv]->get_interface() != "_local_"){
+ if(iface->has_multiple_schemas()){
+ if(schema_id<0){ // invalid schema_id
+ fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
+ exit(1);
+ }
+ vector<string> iface_schemas = iface->get_property("Schemas");
+ if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
+ fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
+ exit(1);
+ }
// Ensure that in liface, schema_id is used for only one schema
- if(schema_of_schemaid.count(liface)==0){
- map<int, string> empty_map;
- schema_of_schemaid[liface] = empty_map;
- }
- if(schema_of_schemaid[liface].count(schema_id)==0){
- schema_of_schemaid[liface][schema_id] = schema_name;
- }else{
- if(schema_of_schemaid[liface][schema_id] != schema_name){
- fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
+ if(schema_of_schemaid.count(liface)==0){
+ map<int, string> empty_map;
+ schema_of_schemaid[liface] = empty_map;
+ }
+ if(schema_of_schemaid[liface].count(schema_id)==0){
+ schema_of_schemaid[liface][schema_id] = schema_name;
+ }else{
+ if(schema_of_schemaid[liface][schema_id] != schema_name){
+ fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
+ exit(1);
+ }
+ }
+ }else{ // single-schema interface
+ schema_id = -1; // don't generate schema_id predicate
+ vector<string> iface_schemas = iface->get_property("Schemas");
+ if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
+ fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
exit(1);
}
+ if(iface_schemas.size()>1){
+ fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
+ exit(1);
+ }
+ }
+ }else{
+ schema_id = -1;
}
- }else{ // single-schema interface
- schema_id = -1; // don't generate schema_id predicate
- vector<string> iface_schemas = iface->get_property("Schemas");
- if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
- fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
- exit(1);
- }
- if(iface_schemas.size()>1){
- fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
- exit(1);
- }
- }
// If we need to check the schema_id, insert a predicate into the lfta.
// TODO not just schema_id, the full all_schema_ids set.
- if(schema_id>=0){
- colref_t *schid_cr = new colref_t("schemaId");
- schid_cr->schema_ref = schema_ref;
- schid_cr->tablevar_ref = 0;
- scalarexp_t *schid_se = new scalarexp_t(schid_cr);
- data_type *schid_dt = new data_type("uint");
- schid_se->dt = schid_dt;
-
- string schid_str = int_to_string(schema_id);
- literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
- scalarexp_t *lit_se = new scalarexp_t(schid_lit);
- lit_se->dt = schid_dt;
-
- predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
- vector<cnf_elem *> clist;
- make_cnf_from_pr(schid_pr, clist);
- analyze_cnf(clist[0]);
- clist[0]->cost = 1; // cheap one comparison
+ if(schema_id>=0){
+ colref_t *schid_cr = new colref_t("schemaId");
+ schid_cr->schema_ref = schema_ref;
+ schid_cr->table_name = rvar_name;
+ schid_cr->tablevar_ref = 0;
+ schid_cr->default_table = false;
+ scalarexp_t *schid_se = new scalarexp_t(schid_cr);
+ data_type *schid_dt = new data_type("uint");
+ schid_se->dt = schid_dt;
+
+ string schid_str = int_to_string(schema_id);
+ literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
+ scalarexp_t *lit_se = new scalarexp_t(schid_lit);
+ lit_se->dt = schid_dt;
+
+ predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
+ vector<cnf_elem *> clist;
+ make_cnf_from_pr(schid_pr, clist);
+ analyze_cnf(clist[0]);
+ clist[0]->cost = 1; // cheap one comparison
// cnf built, now insert it.
- split_queries[l]->query_plan[0]->append_to_where(clist[0]);
+ split_queries[l]->query_plan[0]->append_to_where(clist[0]);
+
+// Specialized processing
+// filter join, get two schemaid preds
+ string node_type = split_queries[l]->query_plan[0]->node_type();
+ if(node_type == "filter_join"){
+ filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
+ if(irv==0){
+ fj->pred_t0.push_back(clist[0]);
+ }else{
+ fj->pred_t1.push_back(clist[0]);
+ }
+ schemaid_preds.push_back(schid_pr);
+ }
+// watchlist join, get the first schemaid pred
+ if(node_type == "watch_join"){
+ watch_join_qpn *fj = (watch_join_qpn *)split_queries[l]->query_plan[0];
+ if(irv==0){
+ fj->pred_t0.push_back(clist[0]);
+ schemaid_preds.push_back(schid_pr);
+ }
+ }
+ }
+ }
+// Specialized processing, currently filter join.
+ if(schemaid_preds.size()>1){
+ string node_type = split_queries[l]->query_plan[0]->node_type();
+ if(node_type == "filter_join"){
+ filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
+ predicate_t *filter_pr = new predicate_t("OR", schemaid_preds[0], schemaid_preds[1]);
+ vector<cnf_elem *> clist;
+ make_cnf_from_pr(filter_pr, clist);
+ analyze_cnf(clist[0]);
+ clist[0]->cost = 1; // cheap one comparison
+ fj->shared_pred.push_back(clist[0]);
+ }
}
// THe following is a hack,
// as I should be generating LFTA code through
// the stream_query object.
+
split_queries[l]->query_plan[0]->bind_to_schema(Schema);
+
// split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
/*
lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
*/
- snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
- query_names.push_back(split_queries[l]->query_name);
- mach_query_names[lmach].push_back(query_names.size()-1);
+ snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "snap"));
+ snap_position.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "index"));
+
+// STOPPED HERE need to figure out how to generate the code that Vlad needs
+// from snap_postion
+
+// TODO NOTE : I'd like it to be the case that registration_query_names
+// are the queries to be registered for subsciption.
+// but there is some complex bookkeeping here.
+ registration_query_names.push_back(split_queries[l]->query_name);
+ mach_query_names[lmach].push_back(registration_query_names.size()-1);
// NOTE: I will assume a 1-1 correspondance between
// mach_query_names[lmach] and lfta_mach_lists[lmach]
// where mach_query_names[lmach][i] contains the index into
}
-print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
+print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
int num_hfta = hfta_list.size();
for(i=0; i < hfta_list.size(); ++i){
Schema->append_table(td);
}
-print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
+print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
// if(hfta_namespace == "")
// warning_str += "\tnamespace not found.\n";
-// STOPPED HERE
// There is a get_tbl_keys method implemented for qp_nodes,
// integrate it into steam_query, then call it to find keys,
// and annotate feidls with their key-ness.
missing_keys.push_back(hfta_keys[ki]);
}
if(missing_keys.size()>0){
- fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the outputg:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
+ fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the output:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
for(int hi=0; hi<missing_keys.size(); ++hi){
fprintf(stderr," %s", missing_keys[hi].c_str());
}
int li;
for(li=0;li<mach_lftas.size();++li){
vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
- string lfta_iface = tvec[0]->get_interface();
+ string lfta_iface = "_local_";
+ if(tvec.size()>0){
+ string lfta_iface = tvec[0]->get_interface();
+ }
lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
}
// Find common filter predicates in the LFTAs.
-// in addition generate structs to store the temporal attributes unpacked by prefilter
+// in addition generate structs to store the
+// temporal attributes unpacked by prefilter
+// compute & provide interface for per-interface
+// record extraction properties
map<string, vector<stream_query *> >::iterator ssqi;
for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
// for fta_init
for(li=0;li<mach_lftas.size();++li){
vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
- string lfta_iface = tvec[0]->get_interface();
+ string lfta_iface = "_local_";
+ if(tvec.size()>0){
+ lfta_iface = tvec[0]->get_interface();
+ }
lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
}
// But this is defunct code for gs-lite
for(li=0;li<mach_lftas.size();++li){
vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
- string liface = tvec[0]->get_interface();
+ string liface = "_local_";
+ if(tvec.size()>0){
+ liface = tvec[0]->get_interface();
+ }
vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
if(iface_codegen_type.size()){
if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
- packed_return = true;
+ packed_return = true;
}
}
}
lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
#endif
map<string, vector<long long int> > lfta_sigs; // used again later
+ map<string, int> lfta_snap_pos; // optimize csv parsing
+ // compute now, use in get_iface_properties
for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
string liface = (*mvsi).first;
vector<long long int> empty_list;
lfta_sigs[liface] = empty_list;
+ lfta_snap_pos[liface] = -1;
vector<col_id_set> lfta_cols;
vector<int> lfta_snap_length;
}
lfta_sigs[liface].push_back(mask);
lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
- lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
+ lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "snap"));
+ int this_snap_pos = compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "index");
+ if(this_snap_pos > lfta_snap_pos[liface])
+ lfta_snap_pos[liface] = this_snap_pos;
}
//for(li=0;li<mach_lftas.size();++li){
lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
// iterate through interface properties
- vector<string> iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
+ vector<string> iface_properties;
+ if(*sir!="_local_"){ // dummy watchlist interface, don't process.
+ iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
+ }
if (erri) {
fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
exit(1);
}
lfta_val[lmach] += "\";\n";
}
+ lfta_val[lmach] += "\t\t}else if(!strcmp(property_name, \"_max_csv_pos\")){\n";
+ lfta_val[lmach] += "\t\t\treturn \""+int_to_string(lfta_snap_pos[(*sir)])+"\";\n";
lfta_val[lmach] += "\t\t} else\n";
lfta_val[lmach] += "\t\t\treturn NULL;\n";
}
lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
- for (i = 0; i < query_names.size(); ++i) {
- if (i)
- lfta_val[lmach] += ", ";
- lfta_val[lmach] += "\"" + query_names[i] + "\"";
+ bool first = true;
+ for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
+ string liface = (*mvsi).first;
+ if(liface != "_local_"){ // these don't register themselves
+ vector<stream_query *> lfta_list = (*mvsi).second;
+ for(i=0;i<lfta_list.size();i++){
+ int mi = lfta_iface_qname_ix[liface][i];
+ if(first) first = false;
+ else lfta_val[lmach] += ", ";
+ lfta_val[lmach] += "\"" + registration_query_names[mi] + "\"";
+ }
+ }
}
+// for (i = 0; i < registration_query_names.size(); ++i) {
+// if (i)
+// lfta_val[lmach] += ", ";
+// lfta_val[lmach] += "\"" + registration_query_names[i] + "\"";
+// }
+
for (i = 0; i < hfta_list.size(); ++i) {
lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
}
// set the prefilter function accordingly.
// see the example in demo/err2
lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
+ lfta_val[lmach] += "// note: the last parameter in fta_register is the prefilter signature\n";
// for(i=0;i<mach_query_names[lmach].size();i++)
// int mi = mach_query_names[lmach][i];
for(i=0;i<lfta_list.size();i++){
stream_query *lfta_sq = lfta_list[i];
int mi = lfta_iface_qname_ix[liface][i];
+
+ if(liface == "_local_"){
+// Don't register an init function, do the init code inline
+ lfta_val[lmach] += "// initialization code for "+registration_query_names[mi]+";\n";
+ lfta_val[lmach] += "\t"+generate_watchlist_name(registration_query_names[mi])+".filename = "+generate_watchlist_name(registration_query_names[mi])+"__fstr;\n";
+ continue;
+ }
fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
if(interface_names[mi]!="")
this_iface = '"'+interface_names[mi]+'"';
lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
- lfta_val[lmach] += "\t\tfta_register(\""+query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
+ lfta_val[lmach] += "\t\tfta_register(\""+registration_query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
// if(interface_names[mi]=="")
// lfta_val[lmach]+="DEFAULTDEV";
// else
lfta_val[lmach] += this_iface;
- lfta_val[lmach] += ", "+generate_alloc_name(query_names[mi])
- +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(query_names[mi])
+ lfta_val[lmach] += ", "+generate_alloc_name(registration_query_names[mi])
+ +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(registration_query_names[mi])
+"\n#endif\n";
sprintf(tmpstr,",%d",snap_lengths[mi]);
lfta_val[lmach] += tmpstr;
// NOTE : I'm assuming that visible lftas do not start with _fta.
// -- will fail for non-visible simple selection queries.
- if(field_verifier != NULL && query_names[mi].substr(0,1) != "_"){
+ if(field_verifier != NULL && registration_query_names[mi].substr(0,1) != "_"){
string warning_str;
if(lfta_comment == "")
warning_str += "\tcomment not found.\n";
}
if(warning_str != "")
fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
- query_names[mi].c_str(),warning_str.c_str());
+ registration_query_names[mi].c_str(),warning_str.c_str());
}
// Create qtree output
- fprintf(qtree_output,"\t<LFTA name='%s' >\n",query_names[mi].c_str());
- if(lfta_comment != "")
- fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
- if(lfta_title != "")
- fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
- if(lfta_namespace != "")
- fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
- if(lfta_ht_size != "")
- fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
+ fprintf(qtree_output,"\t<LFTA name='%s' >\n",registration_query_names[mi].c_str());
+ if(lfta_comment != "")
+ fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
+ if(lfta_title != "")
+ fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
+ if(lfta_namespace != "")
+ fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
+ if(lfta_ht_size != "")
+ fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
if(lmach != "")
fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
else
fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
fprintf(qtree_output,"\t\t<Interface value='%s' />\n",interface_names[mi].c_str());
- fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
+ std::vector<tablevar_t *> itbls = lfta_sq->get_input_tables();
+ for(int t=0;t<itbls.size();++t){
+ fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",itbls[t]->get_schema_name().c_str());
+ }
+// fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
fprintf(qtree_output,"\t\t<Rate value='100' />\n");
fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
// write info about fields to qtree.xml
// for an interface type
bool use_proto = false;
+ bool use_bsa = false;
+ bool use_kafka = false;
+ bool use_ssl = false;
int erri;
string err_str;
for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
for(int ift_i=0;ift_i<ift.size();ift_i++){
if(ift[ift_i]=="PROTO"){
+#ifdef PROTO_ENABLED
use_proto = true;
+#else
+ fprintf(stderr,"Runtime libraries built without PROTO support. Rebuild with PROTO_ENABLED defined in gsoptions.h\n");
+ exit(0);
+#endif
+ }
+ }
+ ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "BSA", erri, err_str);
+ for(int ift_i=0;ift_i<ift.size();ift_i++){
+ if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
+#ifdef BSA_ENABLED
+ use_bsa = true;
+#else
+ fprintf(stderr,"Runtime libraries built without BSA support. Rebuild with BSA_ENABLED defined in gsoptions.h\n");
+ exit(0);
+#endif
}
}
+ ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "KAFKA", erri, err_str);
+ for(int ift_i=0;ift_i<ift.size();ift_i++){
+ if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
+#ifdef KAFKA_ENABLED
+ use_kafka = true;
+#else
+ fprintf(stderr,"Runtime libraries built without KAFKA support. Rebuild with KAFKA_ENABLED defined in gsoptions.h\n");
+ exit(0);
+#endif
+ }
+ }
+ ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "ENCRYPTED", erri, err_str);
+ for(int ift_i=0;ift_i<ift.size();ift_i++){
+ if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
+#ifdef SSL_ENABLED
+ use_ssl = true;
+#else
+ fprintf(stderr,"Runtime libraries built without SSL support. Rebuild with SSL_ENABLED defined in gsoptions.h\n");
+ exit(0);
+#endif
+ }
+ }
}
fprintf(outfl,
if(use_pads)
fprintf(outfl,"-lgscppads -lpads ");
fprintf(outfl,
-"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
+"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz -lrt");
if(use_pads)
fprintf(outfl, " -lpz -lz -lbz ");
if(libz_exists && libast_exists)
fprintf(outfl," -last ");
if(use_pads)
fprintf(outfl, " -ldll -ldl ");
- if(use_proto)
- fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
+
+#ifdef PROTO_ENABLED
+ fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
+#endif
+#ifdef BSA_ENABLED
+ fprintf(outfl, " -lbsa_stream ");
+#endif
+#ifdef KAFKA_ENABLED
+ fprintf(outfl, " -lrdkafka ");
+#endif
+#ifdef SSL_ENABLED
+ fprintf(outfl, " -lssl -lcrypto ");
+#endif
fprintf(outfl," -lgscpaux");
#ifdef GCOV
fprintf(outfl," -fprofile-arcs");
// string err_str;
for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
string ifnm = (*ssi);
+ // suppress internal _local_ interface
+ if (ifnm == "_local_")
+ continue;
fprintf(outfl, "%s ",ifnm.c_str());
vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
for(j=0;j<ifv.size();++j)