#include"xml_t.h"
#include"field_list.h"
+#include "gsconfig.h"
+
extern int xmlParserparse(void);
extern FILE *xmlParserin;
extern int xmlParserdebug;
// Dump schema summary
void dump_summary(stream_query *str){
+ for(int q=0;q<str->query_plan.size();++q){
+ qp_node *qp = str->query_plan[q];
+ if(qp==NULL)
+ continue; // there can be blanks
+
+ fprintf(schema_summary_output,"-----\n");
+ fprintf(schema_summary_output,"%s\n",qp->node_name.c_str());
+
+ table_def *sch = qp->get_fields();
+
+ vector<field_entry *> flds = sch->get_fields();
+ int f;
+ for(f=0;f<flds.size();++f){
+ if(f>0) fprintf(schema_summary_output,"|");
+ fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
+ }
+ fprintf(schema_summary_output,"\n");
+ for(f=0;f<flds.size();++f){
+ if(f>0) fprintf(schema_summary_output,"|");
+ fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
+ }
+ fprintf(schema_summary_output,"\n");
+
+ map<std::string, std::string> defines = qp->get_definitions();
+ string comment = "";
+ if(defines.count("comment")>0){
+ comment = defines["comment"];
+ }
+ fprintf(schema_summary_output,"%s\n",comment.c_str());
+
+ vector<tablevar_t *> input_tables = qp->get_input_tbls();
+ for(int t=0; t<input_tables.size(); ++t){
+ if(t>0) fprintf(schema_summary_output,"|");
+ string machine = input_tables[t]->get_machine();
+ string iface = input_tables[t]->get_interface();
+ string schema = input_tables[t]->get_schema_name();
+ if(machine!=""){
+ fprintf(schema_summary_output,"%s.",machine.c_str());
+ }
+ if(iface!=""){
+ fprintf(schema_summary_output,"%s.",iface.c_str());
+ }else{
+ if(machine!="") fprintf(schema_summary_output,".");
+ }
+ fprintf(schema_summary_output,"%s",schema.c_str());
+ }
+
+ fprintf(schema_summary_output,"\n");
+ }
+
+
+
+/*
+ fprintf(schema_summary_output,"-----\n");
fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
table_def *sch = str->get_output_tabledef();
fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
}
fprintf(schema_summary_output,"\n");
+
+ string comment = "";
+ if(str->defines.count("comment")>0){
+ comment = str->defines["comment"];
+ }
+ fprintf(schema_summary_output,"%s\n",comment.c_str());
+
+ vector<tablevar_t *> input_tables = str->get_input_tables();
+ for(int t=0; t<input_tables.size(); ++t){
+ if(t>0) fprintf(schema_summary_output,"|");
+ string machine = input_tables[t]->get_machine();
+ string iface = input_tables[t]->get_interface();
+ string schema = input_tables[t]->get_schema_name();
+ if(machine!=""){
+ fprintf(schema_summary_output,"%s.",machine.c_str());
+ }
+ if(iface!=""){
+ fprintf(schema_summary_output,"%s.",iface.c_str());
+ }else{
+ if(machine!="") fprintf(schema_summary_output,".");
+ }
+ fprintf(schema_summary_output,"%s",schema.c_str());
+ }
+ fprintf(schema_summary_output,"\n");
+*/
+
}
// Globals
set<int>::iterator si;
- vector<string> query_names; // for lfta.c registration
+ vector<string> registration_query_names; // for lfta.c registration
map<string, vector<int> > mach_query_names; // list queries of machine
vector<int> snap_lengths; // for lfta.c registration
+ vector<int> snap_position; // for lfta.c registration
vector<string> interface_names; // for lfta.c registration
vector<string> machine_names; // machine of interface
vector<bool> lfta_reuse_options; // for lfta.c registration
while(fgets(tmpstr,TMPSTRLEN,osp_in)){
o_lineno++;
int nflds = split_string(tmpstr,',',flds,MAXFLDS);
- if(nflds == 7){
+ if(tmpstr[0]!='\n' && tmpstr[0]!='\r' && tmpstr[0]!='\0' && tmpstr[0]!='#'){
+ if(nflds == 7){
// make operator type lowercase
- char *tmpc;
- for(tmpc=flds[1];*tmpc!='\0';++tmpc)
- *tmpc = tolower(*tmpc);
-
- ospec_str *tmp_ospec = new ospec_str();
- tmp_ospec->query = flds[0];
- tmp_ospec->operator_type = flds[1];
- tmp_ospec->operator_param = flds[2];
- tmp_ospec->output_directory = flds[3];
- tmp_ospec->bucketwidth = atoi(flds[4]);
- tmp_ospec->partitioning_flds = flds[5];
- tmp_ospec->n_partitions = atoi(flds[6]);
- qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
- output_specs.push_back(tmp_ospec);
- }else{
- fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
+ char *tmpc;
+ for(tmpc=flds[1];*tmpc!='\0';++tmpc)
+ *tmpc = tolower(*tmpc);
+
+ ospec_str *tmp_ospec = new ospec_str();
+ tmp_ospec->query = flds[0];
+ tmp_ospec->operator_type = flds[1];
+ tmp_ospec->operator_param = flds[2];
+ tmp_ospec->output_directory = flds[3];
+ tmp_ospec->bucketwidth = atoi(flds[4]);
+ tmp_ospec->partitioning_flds = flds[5];
+ tmp_ospec->n_partitions = atoi(flds[6]);
+ qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
+ output_specs.push_back(tmp_ospec);
+ }else{
+ fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
+ }
}
}
fclose(osp_in);
}
Schema = fta_parse_result->tables;
+// Ensure that all schema_ids, if set, are distinct.
+// Obsolete? There is code elsewhere to ensure that schema IDs are
+// distinct on a per-interface basis.
+/*
+ set<int> found_ids;
+ set<int> dup_ids;
+ for(int t=0;t<Schema->size();++t){
+ int sch_id = Schema->get_table(t)->get_schema_id();
+ if(sch_id> 0){
+ if(found_ids.find(sch_id) != found_ids.end()){
+ dup_ids.insert(sch_id);
+ }else{
+ found_ids.insert(sch_id);
+ }
+ }
+ if(dup_ids.size()>0){
+ fprintf(stderr, "Error, the schema has duplicate schema_ids:");
+ for(auto dit=dup_ids.begin();dit!=dup_ids.end(); ++dit)
+ fprintf(stderr," %d",(*dit));
+ fprintf(stderr,"\n");
+ exit(1);
+ }
+ }
+*/
+
+
// Process schema field inheritance
int retval;
retval = Schema->unroll_tables(err_str);
if(retval){
- fprintf(stderr,"Error processing schema filed inheritance:\n %s\n", err_str.c_str() );
+ fprintf(stderr,"Error processing schema field inheritance:\n %s\n", err_str.c_str() );
exit(1);
}
}else{
map<string, string> lfta_prefilter_val;
string lfta_header =
-"#include <limits.h>\n\n"
+"#include <limits.h>\n"
"#include \"rts.h\"\n"
"#include \"fta.h\"\n"
"#include \"lapp.h\"\n"
-"#include \"rts_udaf.h\"\n\n"
+"#include \"rts_udaf.h\"\n"
+"#include<stdio.h>\n"
+"#include <float.h>\n"
+"#include \"rdtsc.h\"\n"
+"#include \"watchlist.h\"\n\n"
+
;
// Get any locally defined parsing headers
glob_t glob_result;
memset(&glob_result, 0, sizeof(glob_result));
- // do the glob operation
+ // do the glob operation TODO should be from GSROOT
int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
if(return_value == 0){
+ lfta_header += "\n";
for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
char *flds[1000];
int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
- lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n\n";
+ lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n";
}
+ lfta_header += "\n";
}else{
fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
}
map<string, int> hfta_name_map;
// vector< vector<int> > process_sets;
// vector< set<int> > stream_node_sets;
- reverse(process_order.begin(), process_order.end()); // get listing in reverse order.
- // i.e. process leaves 1st.
+ reverse(process_order.begin(), process_order.end()); // get listing in reverse .
+ // order: process leaves 1st.
for(i=0;i<process_order.size();++i){
if(qnodes[process_order[i]]->is_externally_visible == true){
//printf("Visible.\n");
}
if(dangling_ospecs!=""){
fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
+ exit(1);
}
string dangling_par = "";
// TODO: separate building operators from spliting lftas,
// that will make optimizations such as predicate pushing easier.
vector<stream_query *> lfta_list;
-
stream_query *rootq;
-
int qi,qj;
+ map<string, map<int, string> > schema_of_schemaid; // ensure that schema IDs are unique to an <interface, schema>
+
for(qi=hfta_topsort.size()-1;qi>=0;--qi){
int hfta_id = hfta_topsort[qi];
// Compute the number of LFTAs.
int n_lfta = split_queries.size();
if(hfta_returned) n_lfta--;
-
+// Check if a schemaId constraint needs to be inserted.
// Process the LFTA components.
for(l=0;l<n_lfta;++l){
if(lfta_names.count(split_queries[l]->query_name) == 0){
// Grab the lfta for global optimization.
vector<tablevar_t *> tvec = split_queries[l]->query_plan[0]->get_input_tbls();
- string liface = tvec[0]->get_interface();
- string lmach = tvec[0]->get_machine();
- if (lmach == "")
- lmach = hostname;
- interface_names.push_back(liface);
- machine_names.push_back(lmach);
+ string liface = "_local_";
+// string lmach = "";
+ string lmach = hostname;
+ if(tvec.size()>0){
+ liface = tvec[0]->get_interface(); // iface queries have been resolved
+ if(tvec[0]->get_machine() != ""){
+ lmach = tvec[0]->get_machine();
+ }else{
+ fprintf(stderr,"WARNING, lfta %s has empty machine name, using %s\n", split_queries[l]->query_plan[0]->node_name.c_str(), hostname.c_str());
+ }
+ } // else{
+ interface_names.push_back(liface);
+ machine_names.push_back(lmach);
+// }
+
+ vector<predicate_t *> schemaid_preds;
+ for(int irv=0;irv<tvec.size();++irv){
+
+ string schema_name = tvec[irv]->get_schema_name();
+ string rvar_name = tvec[irv]->get_var_name();
+ int schema_ref = tvec[irv]->get_schema_ref();
+ if (lmach == "")
+ lmach = hostname;
+// interface_names.push_back(liface);
+// machine_names.push_back(lmach);
+
//printf("Machine is %s\n",lmach.c_str());
+// Check if a schemaId constraint needs to be inserted.
+ if(schema_ref<0){ // can result from some kinds of splits
+ schema_ref = Schema->get_table_ref(schema_name);
+ }
+ int schema_id = Schema->get_schema_id(schema_ref); // id associated with PROTOCOL
+ int errnum = 0;
+ string if_error;
+ iface_t *iface = ifaces_db-> get_interface(lmach, liface, errnum, if_error);
+ if(iface==NULL){
+ fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
+ exit(1);
+ }
+
+
+ if(tvec[irv]->get_interface() != "_local_"){
+ if(iface->has_multiple_schemas()){
+ if(schema_id<0){ // invalid schema_id
+ fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
+ exit(1);
+ }
+ vector<string> iface_schemas = iface->get_property("Schemas");
+ if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
+ fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
+ exit(1);
+ }
+// Ensure that in liface, schema_id is used for only one schema
+ if(schema_of_schemaid.count(liface)==0){
+ map<int, string> empty_map;
+ schema_of_schemaid[liface] = empty_map;
+ }
+ if(schema_of_schemaid[liface].count(schema_id)==0){
+ schema_of_schemaid[liface][schema_id] = schema_name;
+ }else{
+ if(schema_of_schemaid[liface][schema_id] != schema_name){
+ fprintf(stderr, "ERROR, For interface %s, schema id %d is used in schemas %s and %s\n", liface.c_str(), schema_id, schema_name.c_str(), schema_of_schemaid[liface][schema_id].c_str());
+ exit(1);
+ }
+ }
+ }else{ // single-schema interface
+ schema_id = -1; // don't generate schema_id predicate
+ vector<string> iface_schemas = iface->get_property("Schemas");
+ if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
+ fprintf(stderr,"Error, query %s references schema %s from interface %s, but this schema is not in this interface's Schemas list\n", split_queries[0]->query_name.c_str(), schema_name.c_str(), liface.c_str());
+ exit(1);
+ }
+ if(iface_schemas.size()>1){
+ fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
+ exit(1);
+ }
+ }
+ }else{
+ schema_id = -1;
+ }
+
+// If we need to check the schema_id, insert a predicate into the lfta.
+// TODO not just schema_id, the full all_schema_ids set.
+ if(schema_id>=0){
+ colref_t *schid_cr = new colref_t("schemaId");
+ schid_cr->schema_ref = schema_ref;
+ schid_cr->table_name = rvar_name;
+ schid_cr->tablevar_ref = 0;
+ schid_cr->default_table = false;
+ scalarexp_t *schid_se = new scalarexp_t(schid_cr);
+ data_type *schid_dt = new data_type("uint");
+ schid_se->dt = schid_dt;
+
+ string schid_str = int_to_string(schema_id);
+ literal_t *schid_lit = new literal_t(schid_str.c_str(), LITERAL_INT);
+ scalarexp_t *lit_se = new scalarexp_t(schid_lit);
+ lit_se->dt = schid_dt;
+
+ predicate_t *schid_pr = new predicate_t(schid_se, "=", lit_se);
+ vector<cnf_elem *> clist;
+ make_cnf_from_pr(schid_pr, clist);
+ analyze_cnf(clist[0]);
+ clist[0]->cost = 1; // cheap one comparison
+// cnf built, now insert it.
+ split_queries[l]->query_plan[0]->append_to_where(clist[0]);
+
+// Specialized processing
+// filter join, get two schemaid preds
+ string node_type = split_queries[l]->query_plan[0]->node_type();
+ if(node_type == "filter_join"){
+ filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
+ if(irv==0){
+ fj->pred_t0.push_back(clist[0]);
+ }else{
+ fj->pred_t1.push_back(clist[0]);
+ }
+ schemaid_preds.push_back(schid_pr);
+ }
+// watchlist join, get the first schemaid pred
+ if(node_type == "watch_join"){
+ watch_join_qpn *fj = (watch_join_qpn *)split_queries[l]->query_plan[0];
+ if(irv==0){
+ fj->pred_t0.push_back(clist[0]);
+ schemaid_preds.push_back(schid_pr);
+ }
+ }
+ }
+ }
+// Specialized processing, currently filter join.
+ if(schemaid_preds.size()>1){
+ string node_type = split_queries[l]->query_plan[0]->node_type();
+ if(node_type == "filter_join"){
+ filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
+ predicate_t *filter_pr = new predicate_t("OR", schemaid_preds[0], schemaid_preds[1]);
+ vector<cnf_elem *> clist;
+ make_cnf_from_pr(filter_pr, clist);
+ analyze_cnf(clist[0]);
+ clist[0]->cost = 1; // cheap one comparison
+ fj->shared_pred.push_back(clist[0]);
+ }
+ }
+
+
+
+
+
+
+
// Set the ht size from the recommendation, if there is one in the rec file
if(lfta_htsize.count(split_queries[l]->query_name)>0){
split_queries[l]->query_plan[0]->set_definition("aggregate_slots",int_to_string(lfta_htsize[split_queries[l]->query_name]));
// THe following is a hack,
// as I should be generating LFTA code through
// the stream_query object.
+
split_queries[l]->query_plan[0]->bind_to_schema(Schema);
+
// split_queries[l]->query_plan[0]->definitions = split_queries[l]->defines;
/*
lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
*/
- snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
- query_names.push_back(split_queries[l]->query_name);
- mach_query_names[lmach].push_back(query_names.size()-1);
+ snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "snap"));
+ snap_position.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "index"));
+
+// STOPPED HERE need to figure out how to generate the code that Vlad needs
+// from snap_postion
+
+// TODO NOTE : I'd like it to be the case that registration_query_names
+// are the queries to be registered for subsciption.
+// but there is some complex bookkeeping here.
+ registration_query_names.push_back(split_queries[l]->query_name);
+ mach_query_names[lmach].push_back(registration_query_names.size()-1);
// NOTE: I will assume a 1-1 correspondance between
// mach_query_names[lmach] and lfta_mach_lists[lmach]
// where mach_query_names[lmach][i] contains the index into
}
*/
-/*
-// output schema summary
- if(output_schema_summary){
- dump_summary(split_queries[0]);
- }
-*/
}
+// output schema summary
+ if(output_schema_summary){
+ for(int o=0;o<split_queries.size(); ++o){
+ dump_summary(split_queries[o]);
+ }
+ }
if(hfta_returned){ // query also has an HFTA component
}
-print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
+print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
int num_hfta = hfta_list.size();
for(i=0; i < hfta_list.size(); ++i){
Schema->append_table(td);
}
-print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
+print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
hfta_namespace = hfta_list[i]->defines["namespace"];
if(hfta_list[i]->defines.count("Namespace")>0)
hfta_namespace = hfta_list[i]->defines["Namespace"];
- if(hfta_list[i]->defines.count("Namespace")>0)
- hfta_namespace = hfta_list[i]->defines["Namespace"];
+ if(hfta_list[i]->defines.count("NAMESPACE")>0)
+ hfta_namespace = hfta_list[i]->defines["NAMESPACE"];
if(field_verifier != NULL){
string warning_str;
if(hfta_comment == "")
warning_str += "\tcomment not found.\n";
- if(hfta_title == "")
- warning_str += "\ttitle not found.\n";
- if(hfta_namespace == "")
- warning_str += "\tnamespace not found.\n";
+
+// Obsolete stuff that Carsten wanted
+// if(hfta_title == "")
+// warning_str += "\ttitle not found.\n";
+// if(hfta_namespace == "")
+// warning_str += "\tnamespace not found.\n";
+
+// There is a get_tbl_keys method implemented for qp_nodes,
+// integrate it into steam_query, then call it to find keys,
+// and annotate feidls with their key-ness.
+// If there is a "keys" proprty in the defines block, override anything returned
+// from the automated analysis
vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
int fi;
hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str(),warning_str.c_str());
}
+// Get the fields in this query
+ vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
+
+// do key processing
+ string hfta_keys_s = "";
+ if(hfta_list[i]->defines.count("keys")>0)
+ hfta_keys_s = hfta_list[i]->defines["keys"];
+ if(hfta_list[i]->defines.count("Keys")>0)
+ hfta_keys_s = hfta_list[i]->defines["Keys"];
+ if(hfta_list[i]->defines.count("KEYS")>0)
+ hfta_keys_s = hfta_list[i]->defines["KEYS"];
+ string xtra_keys_s = "";
+ if(hfta_list[i]->defines.count("extra_keys")>0)
+ xtra_keys_s = hfta_list[i]->defines["extra_keys"];
+ if(hfta_list[i]->defines.count("Extra_Keys")>0)
+ xtra_keys_s = hfta_list[i]->defines["Extra_Keys"];
+ if(hfta_list[i]->defines.count("EXTRA_KEYS")>0)
+ xtra_keys_s = hfta_list[i]->defines["EXTRA_KEYS"];
+// get the keys
+ vector<string> hfta_keys;
+ vector<string> partial_keys;
+ vector<string> xtra_keys;
+ if(hfta_keys_s==""){
+ hfta_keys = hfta_list[i]->get_tbl_keys(partial_keys);
+ if(xtra_keys_s.size()>0){
+ xtra_keys = split_string(xtra_keys_s, ',');
+ }
+ for(int xi=0;xi<xtra_keys.size();++xi){
+ if(find(hfta_keys.begin(), hfta_keys.end(), xtra_keys[xi]) == hfta_keys.end()){
+ hfta_keys.push_back(xtra_keys[xi]);
+ }
+ }
+ }else{
+ hfta_keys = split_string(hfta_keys_s, ',');
+ }
+// validate that all of the keys exist in the output.
+// (exit on error, as its a bad specificiation)
+ vector<string> missing_keys;
+ for(int ki=0;ki<hfta_keys.size(); ++ki){
+ int fi;
+ for(fi=0;fi<flds.size();++fi){
+ if(hfta_keys[ki] == flds[fi]->get_name())
+ break;
+ }
+ if(fi==flds.size())
+ missing_keys.push_back(hfta_keys[ki]);
+ }
+ if(missing_keys.size()>0){
+ fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the output:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
+ for(int hi=0; hi<missing_keys.size(); ++hi){
+ fprintf(stderr," %s", missing_keys[hi].c_str());
+ }
+ fprintf(stderr,"\n");
+ exit(1);
+ }
+
fprintf(qtree_output,"\t<HFTA name='%s'>\n",hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
if(hfta_comment != "")
fprintf(qtree_output,"\t\t<Description value='%s' />\n",hfta_comment.c_str());
fprintf(qtree_output,"\t\t<Rate value='100' />\n");
// write info about fields to qtree.xml
- vector<field_entry *> flds = hfta_list[i]->get_output_tabledef()->get_fields();
int fi;
for(fi=0;fi<flds.size();fi++){
fprintf(qtree_output,"\t\t<Field name='%s' pos='%d' type='%s' ",flds[fi]->get_name().c_str(), fi, flds[fi]->get_type().c_str());
if(flds[fi]->get_modifier_list()->size()){
- fprintf(qtree_output,"mods='%s' ", flds[fi]->get_modifier_list()->to_string().c_str());
+ fprintf(qtree_output,"mods='%s'", flds[fi]->get_modifier_list()->to_string().c_str());
}
fprintf(qtree_output," />\n");
}
+// info about keys
+ for(int hi=0;hi<hfta_keys.size();++hi){
+ fprintf(qtree_output, "\t\t<Key value='%s'/>\n",hfta_keys[hi].c_str());
+ }
+ for(int hi=0;hi<partial_keys.size();++hi){
+ fprintf(qtree_output, "\t\t<PartialKey value='%s'/>\n",partial_keys[hi].c_str());
+ }
+ for(int hi=0;hi<xtra_keys.size();++hi){
+ fprintf(qtree_output, "\t\t<ExtraKey value='%s'/>\n",xtra_keys[hi].c_str());
+ }
+
// extract liveness timeout from query definition
int liveness_timeout = atoi(hfta_list[i]->query_plan[hfta_list[i]->qhead]->get_val_of_def("liveness_timeout").c_str());
int li;
for(li=0;li<mach_lftas.size();++li){
vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
- string lfta_iface = tvec[0]->get_interface();
+ string lfta_iface = "_local_";
+ if(tvec.size()>0){
+ string lfta_iface = tvec[0]->get_interface();
+ }
lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
}
// Find common filter predicates in the LFTAs.
-// in addition generate structs to store the temporal attributes unpacked by prefilter
+// in addition generate structs to store the
+// temporal attributes unpacked by prefilter
+// compute & provide interface for per-interface
+// record extraction properties
map<string, vector<stream_query *> >::iterator ssqi;
for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
// for fta_init
for(li=0;li<mach_lftas.size();++li){
vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
- string lfta_iface = tvec[0]->get_interface();
+ string lfta_iface = "_local_";
+ if(tvec.size()>0){
+ lfta_iface = tvec[0]->get_interface();
+ }
lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
}
// But this is defunct code for gs-lite
for(li=0;li<mach_lftas.size();++li){
vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
- string liface = tvec[0]->get_interface();
+ string liface = "_local_";
+ if(tvec.size()>0){
+ liface = tvec[0]->get_interface();
+ }
vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
if(iface_codegen_type.size()){
if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
- packed_return = true;
+ packed_return = true;
}
}
}
lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
#endif
map<string, vector<long long int> > lfta_sigs; // used again later
+ map<string, int> lfta_snap_pos; // optimize csv parsing
+ // compute now, use in get_iface_properties
for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
string liface = (*mvsi).first;
vector<long long int> empty_list;
lfta_sigs[liface] = empty_list;
+ lfta_snap_pos[liface] = -1;
vector<col_id_set> lfta_cols;
vector<int> lfta_snap_length;
}
lfta_sigs[liface].push_back(mask);
lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
- lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
+ lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "snap"));
+ int this_snap_pos = compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "index");
+ if(this_snap_pos > lfta_snap_pos[liface])
+ lfta_snap_pos[liface] = this_snap_pos;
}
//for(li=0;li<mach_lftas.size();++li){
lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
// iterate through interface properties
- vector<string> iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
+ vector<string> iface_properties;
+ if(*sir!="_local_"){ // dummy watchlist interface, don't process.
+ iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
+ }
if (erri) {
fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
exit(1);
// combine all values for the interface property using comma separator
vector<string> vals = ifaces_db->get_iface_vals(lmach,*sir,iface_properties[i],erri,err_str);
+ lfta_val[lmach] += "\t\t\treturn \"";
for (int j = 0; j < vals.size(); ++j) {
- lfta_val[lmach] += "\t\t\treturn \"" + vals[j];
+ lfta_val[lmach] += vals[j];
if (j != vals.size()-1)
lfta_val[lmach] += ",";
}
lfta_val[lmach] += "\";\n";
}
+ lfta_val[lmach] += "\t\t}else if(!strcmp(property_name, \"_max_csv_pos\")){\n";
+ lfta_val[lmach] += "\t\t\treturn \""+int_to_string(lfta_snap_pos[(*sir)])+"\";\n";
lfta_val[lmach] += "\t\t} else\n";
lfta_val[lmach] += "\t\t\treturn NULL;\n";
}
lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
- for (i = 0; i < query_names.size(); ++i) {
- if (i)
- lfta_val[lmach] += ", ";
- lfta_val[lmach] += "\"" + query_names[i] + "\"";
+ bool first = true;
+ for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
+ string liface = (*mvsi).first;
+ if(liface != "_local_"){ // these don't register themselves
+ vector<stream_query *> lfta_list = (*mvsi).second;
+ for(i=0;i<lfta_list.size();i++){
+ int mi = lfta_iface_qname_ix[liface][i];
+ if(first) first = false;
+ else lfta_val[lmach] += ", ";
+ lfta_val[lmach] += "\"" + registration_query_names[mi] + "\"";
+ }
+ }
}
+// for (i = 0; i < registration_query_names.size(); ++i) {
+// if (i)
+// lfta_val[lmach] += ", ";
+// lfta_val[lmach] += "\"" + registration_query_names[i] + "\"";
+// }
+
for (i = 0; i < hfta_list.size(); ++i) {
lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
}
// set the prefilter function accordingly.
// see the example in demo/err2
lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
+ lfta_val[lmach] += "// note: the last parameter in fta_register is the prefilter signature\n";
// for(i=0;i<mach_query_names[lmach].size();i++)
// int mi = mach_query_names[lmach][i];
for(i=0;i<lfta_list.size();i++){
stream_query *lfta_sq = lfta_list[i];
int mi = lfta_iface_qname_ix[liface][i];
+
+ if(liface == "_local_"){
+// Don't register an init function, do the init code inline
+ lfta_val[lmach] += "// initialization code for "+registration_query_names[mi]+";\n";
+ lfta_val[lmach] += "\t"+generate_watchlist_name(registration_query_names[mi])+".filename = "+generate_watchlist_name(registration_query_names[mi])+"__fstr;\n";
+ continue;
+ }
fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
- lfta_val[lmach] += "\tfta_register(\""+query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
- if(interface_names[mi]=="")
- lfta_val[lmach]+="DEFAULTDEV";
- else
- lfta_val[lmach]+='"'+interface_names[mi]+'"';
+ string this_iface = "DEFAULTDEV";
+ if(interface_names[mi]!="")
+ this_iface = '"'+interface_names[mi]+'"';
+ lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
+ lfta_val[lmach] += "\t\tfta_register(\""+registration_query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
+// if(interface_names[mi]=="")
+// lfta_val[lmach]+="DEFAULTDEV";
+// else
+// lfta_val[lmach]+='"'+interface_names[mi]+'"';
+ lfta_val[lmach] += this_iface;
+
- lfta_val[lmach] += ", "+generate_alloc_name(query_names[mi])
- +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(query_names[mi])
+ lfta_val[lmach] += ", "+generate_alloc_name(registration_query_names[mi])
+ +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(registration_query_names[mi])
+"\n#endif\n";
sprintf(tmpstr,",%d",snap_lengths[mi]);
lfta_val[lmach] += tmpstr;
// NOTE : I'm assuming that visible lftas do not start with _fta.
// -- will fail for non-visible simple selection queries.
- if(field_verifier != NULL && query_names[mi].substr(0,1) != "_"){
+ if(field_verifier != NULL && registration_query_names[mi].substr(0,1) != "_"){
string warning_str;
if(lfta_comment == "")
warning_str += "\tcomment not found.\n";
- if(lfta_title == "")
- warning_str += "\ttitle not found.\n";
- if(lfta_namespace == "")
- warning_str += "\tnamespace not found.\n";
+// Obsolete stuff that carsten wanted
+// if(lfta_title == "")
+// warning_str += "\ttitle not found.\n";
+// if(lfta_namespace == "")
+// warning_str += "\tnamespace not found.\n";
vector<field_entry *> flds = lfta_sq->get_output_tabledef()->get_fields();
int fi;
}
if(warning_str != "")
fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
- query_names[mi].c_str(),warning_str.c_str());
+ registration_query_names[mi].c_str(),warning_str.c_str());
}
// Create qtree output
- fprintf(qtree_output,"\t<LFTA name='%s' >\n",query_names[mi].c_str());
- if(lfta_comment != "")
- fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
- if(lfta_title != "")
- fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
- if(lfta_namespace != "")
- fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
- if(lfta_ht_size != "")
- fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
+ fprintf(qtree_output,"\t<LFTA name='%s' >\n",registration_query_names[mi].c_str());
+ if(lfta_comment != "")
+ fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
+ if(lfta_title != "")
+ fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
+ if(lfta_namespace != "")
+ fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
+ if(lfta_ht_size != "")
+ fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
if(lmach != "")
fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
else
fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
fprintf(qtree_output,"\t\t<Interface value='%s' />\n",interface_names[mi].c_str());
- fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
+ std::vector<tablevar_t *> itbls = lfta_sq->get_input_tables();
+ for(int t=0;t<itbls.size();++t){
+ fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",itbls[t]->get_schema_name().c_str());
+ }
+// fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
fprintf(qtree_output,"\t\t<Rate value='100' />\n");
fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
// write info about fields to qtree.xml
}
// Do we need to include protobuf libraries?
+// TODO Move to the interface library: get the libraries to include
+// for an interface type
+
bool use_proto = false;
+ bool use_bsa = false;
+ bool use_kafka = false;
+ bool use_ssl = false;
int erri;
string err_str;
for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
for(int ift_i=0;ift_i<ift.size();ift_i++){
if(ift[ift_i]=="PROTO"){
+#ifdef PROTO_ENABLED
use_proto = true;
+#else
+ fprintf(stderr,"Runtime libraries built without PROTO support. Rebuild with PROTO_ENABLED defined in gsoptions.h\n");
+ exit(0);
+#endif
+ }
+ }
+ ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "BSA", erri, err_str);
+ for(int ift_i=0;ift_i<ift.size();ift_i++){
+ if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
+#ifdef BSA_ENABLED
+ use_bsa = true;
+#else
+ fprintf(stderr,"Runtime libraries built without BSA support. Rebuild with BSA_ENABLED defined in gsoptions.h\n");
+ exit(0);
+#endif
+ }
+ }
+ ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "KAFKA", erri, err_str);
+ for(int ift_i=0;ift_i<ift.size();ift_i++){
+ if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
+#ifdef KAFKA_ENABLED
+ use_kafka = true;
+#else
+ fprintf(stderr,"Runtime libraries built without KAFKA support. Rebuild with KAFKA_ENABLED defined in gsoptions.h\n");
+ exit(0);
+#endif
}
}
+ ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "ENCRYPTED", erri, err_str);
+ for(int ift_i=0;ift_i<ift.size();ift_i++){
+ if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
+#ifdef SSL_ENABLED
+ use_ssl = true;
+#else
+ fprintf(stderr,"Runtime libraries built without SSL support. Rebuild with SSL_ENABLED defined in gsoptions.h\n");
+ exit(0);
+#endif
+ }
+ }
}
fprintf(outfl,
if(use_pads)
fprintf(outfl,"-lgscppads -lpads ");
fprintf(outfl,
-"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
+"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz -lrt");
if(use_pads)
fprintf(outfl, " -lpz -lz -lbz ");
if(libz_exists && libast_exists)
fprintf(outfl," -last ");
if(use_pads)
fprintf(outfl, " -ldll -ldl ");
- if(use_proto)
- fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
+
+#ifdef PROTO_ENABLED
+ fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
+#endif
+#ifdef BSA_ENABLED
+ fprintf(outfl, " -lbsa_stream ");
+#endif
+#ifdef KAFKA_ENABLED
+ fprintf(outfl, " -lrdkafka ");
+#endif
+#ifdef SSL_ENABLED
+ fprintf(outfl, " -lssl -lcrypto ");
+#endif
fprintf(outfl," -lgscpaux");
#ifdef GCOV
fprintf(outfl," -fprofile-arcs");
// string err_str;
for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
string ifnm = (*ssi);
+ // suppress internal _local_ interface
+ if (ifnm == "_local_")
+ continue;
fprintf(outfl, "%s ",ifnm.c_str());
vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
for(j=0;j<ifv.size();++j)