set<int>::iterator si;
- vector<string> query_names; // for lfta.c registration
+ vector<string> registration_query_names; // for lfta.c registration
map<string, vector<int> > mach_query_names; // list queries of machine
vector<int> snap_lengths; // for lfta.c registration
vector<string> interface_names; // for lfta.c registration
map<string, string> lfta_prefilter_val;
string lfta_header =
-"#include <limits.h>\n\n"
+"#include <limits.h>\n"
"#include \"rts.h\"\n"
"#include \"fta.h\"\n"
"#include \"lapp.h\"\n"
-"#include \"rts_udaf.h\"\n\n"
+"#include \"rts_udaf.h\"\n"
+"#include<stdio.h>\n"
+"#include <float.h>\n"
+"#include \"rdtsc.h\"\n"
+"#include \"watchlist.h\"\n\n"
+
;
// Get any locally defined parsing headers
glob_t glob_result;
memset(&glob_result, 0, sizeof(glob_result));
- // do the glob operation
+ // do the glob operation TODO should be from GSROOT
int return_value = glob("../../include/lfta/local/*h", GLOB_TILDE, NULL, &glob_result);
if(return_value == 0){
+ lfta_header += "\n";
for(size_t i = 0; i < glob_result.gl_pathc; ++i) {
char *flds[1000];
int nflds = split_string(glob_result.gl_pathv[i],'/',flds,1000);
- lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n\n";
+ lfta_header += "#include \"local/"+string(flds[nflds-1])+"\"\n";
}
+ lfta_header += "\n";
}else{
fprintf(stderr,"Warning, glob on ../../include/lfta/local/*h failed.\n");
}
if(lfta_names.count(split_queries[l]->query_name) == 0){
// Grab the lfta for global optimization.
vector<tablevar_t *> tvec = split_queries[l]->query_plan[0]->get_input_tbls();
- string liface = tvec[0]->get_interface(); // iface queries have been resolved
- string lmach = tvec[0]->get_machine();
+ string liface = "_local_";
+// string lmach = "";
+ string lmach = hostname;
+ if(tvec.size()>0){
+ liface = tvec[0]->get_interface(); // iface queries have been resolved
+ lmach = tvec[0]->get_machine();
+ } // else{
+ interface_names.push_back(liface);
+ machine_names.push_back(lmach);
+// }
vector<predicate_t *> schemaid_preds;
for(int irv=0;irv<tvec.size();++irv){
int schema_ref = tvec[irv]->get_schema_ref();
if (lmach == "")
lmach = hostname;
- interface_names.push_back(liface);
- machine_names.push_back(lmach);
+// interface_names.push_back(liface);
+// machine_names.push_back(lmach);
+
//printf("Machine is %s\n",lmach.c_str());
// Check if a schemaId constraint needs to be inserted.
fprintf(stderr,"Error looking up the interface %s on host %s: %s\n",liface.c_str(), lmach.c_str(), if_error.c_str());
exit(1);
}
- if(iface->has_multiple_schemas()){
+
+
+ if(tvec[irv]->get_interface() != "_local_"){
+ if(iface->has_multiple_schemas()){
if(schema_id<0){ // invalid schema_id
fprintf(stderr,"Error, schema %s has no schema_id, but is sourced from multi-schema interface %s (ref'd in query %s)\n", schema_name.c_str(), liface.c_str(), split_queries[0]->query_name.c_str());
exit(1);
exit(1);
}
}
- }else{ // single-schema interface
+ }else{ // single-schema interface
schema_id = -1; // don't generate schema_id predicate
vector<string> iface_schemas = iface->get_property("Schemas");
if(iface_schemas.size()>0 && find(iface_schemas.begin(), iface_schemas.end(), schema_name) == iface_schemas.end()){
fprintf(stderr, "ERROR, interface %s is a single-schema interface, but has %d elements in its Schemas list.\n", liface.c_str(), (int)iface_schemas.size());
exit(1);
}
- }
+ }
+ }else{
+ schema_id = -1;
+ }
// If we need to check the schema_id, insert a predicate into the lfta.
// TODO not just schema_id, the full all_schema_ids set.
// cnf built, now insert it.
split_queries[l]->query_plan[0]->append_to_where(clist[0]);
-// Specialized processing ... currently filter join
+// Specialized processing
+// filter join, get two schemaid preds
string node_type = split_queries[l]->query_plan[0]->node_type();
if(node_type == "filter_join"){
filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
}
schemaid_preds.push_back(schid_pr);
}
+// watchlist join, get the first schemaid pred
+ if(node_type == "watch_join"){
+ watch_join_qpn *fj = (watch_join_qpn *)split_queries[l]->query_plan[0];
+ if(irv==0){
+ fj->pred_t0.push_back(clist[0]);
+ schemaid_preds.push_back(schid_pr);
+ }
+ }
}
}
// Specialized processing, currently filter join.
- if(schemaid_preds.size()>0){
+ if(schemaid_preds.size()>1){
string node_type = split_queries[l]->query_plan[0]->node_type();
if(node_type == "filter_join"){
filter_join_qpn *fj = (filter_join_qpn *)split_queries[l]->query_plan[0];
*/
snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
- query_names.push_back(split_queries[l]->query_name);
- mach_query_names[lmach].push_back(query_names.size()-1);
+
+// TODO NOTE : I'd like it to be the case that registration_query_names
+// are the queries to be registered for subsciption.
+// but there is some complex bookkeeping here.
+ registration_query_names.push_back(split_queries[l]->query_name);
+ mach_query_names[lmach].push_back(registration_query_names.size()-1);
// NOTE: I will assume a 1-1 correspondance between
// mach_query_names[lmach] and lfta_mach_lists[lmach]
// where mach_query_names[lmach][i] contains the index into
}
-print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
+print_hftas("preopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
int num_hfta = hfta_list.size();
for(i=0; i < hfta_list.size(); ++i){
Schema->append_table(td);
}
-print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, query_names, interface_names);
+print_hftas("postopt_hfta_info.txt", hfta_list, lfta_names, registration_query_names, interface_names);
missing_keys.push_back(hfta_keys[ki]);
}
if(missing_keys.size()>0){
- fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the outputg:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
+ fprintf(stderr, "Error, HFTA stream %s has keys defined which aren't in the output:", hfta_list[i]->get_output_tabledef()->get_tbl_name().c_str());
for(int hi=0; hi<missing_keys.size(); ++hi){
fprintf(stderr," %s", missing_keys[hi].c_str());
}
int li;
for(li=0;li<mach_lftas.size();++li){
vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
- string lfta_iface = tvec[0]->get_interface();
+ string lfta_iface = "_local_";
+ if(tvec.size()>0){
+ string lfta_iface = tvec[0]->get_interface();
+ }
lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
}
// for fta_init
for(li=0;li<mach_lftas.size();++li){
vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
- string lfta_iface = tvec[0]->get_interface();
+ string lfta_iface = "_local_";
+ if(tvec.size()>0){
+ lfta_iface = tvec[0]->get_interface();
+ }
lfta_iface_lists[lfta_iface].push_back(mach_lftas[li]);
lfta_iface_qname_ix[lfta_iface].push_back(mach_query_names[lmach][li]);
}
// But this is defunct code for gs-lite
for(li=0;li<mach_lftas.size();++li){
vector<tablevar_t *> tvec = mach_lftas[li]->query_plan[0]->get_input_tbls();
- string liface = tvec[0]->get_interface();
+ string liface = "_local_";
+ if(tvec.size()>0){
+ liface = tvec[0]->get_interface();
+ }
vector<string> iface_codegen_type = ifaces_db->get_iface_vals(lmach,liface,"iface_codegen_type",erri,err_str);
if(iface_codegen_type.size()){
if(npdb->get_nic_prop_val(iface_codegen_type[0],"Return",erri) == "Packed"){
- packed_return = true;
+ packed_return = true;
}
}
}
lfta_val[lmach] += "\t} else if (!strcmp(iface_name, \"" + *sir + "\")) {\n";
// iterate through interface properties
- vector<string> iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
+ vector<string> iface_properties;
+ if(*sir!="_local_"){ // dummy watchlist interface, don't process.
+ iface_properties = ifaces_db->get_iface_properties(lmach,*sir,erri,err_str);
+ }
if (erri) {
fprintf(stderr,"ERROR cannot retrieve interface properties for %s.%s, %s\n",lmach.c_str(), sir->c_str(), err_str.c_str());
exit(1);
lfta_val[lmach] += "// list of FTAs clearinghouse expects to register themselves\n";
lfta_val[lmach] += "const gs_sp_t fta_names[] = {";
- for (i = 0; i < query_names.size(); ++i) {
- if (i)
- lfta_val[lmach] += ", ";
- lfta_val[lmach] += "\"" + query_names[i] + "\"";
+ bool first = true;
+ for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
+ string liface = (*mvsi).first;
+ if(liface != "_local_"){ // these don't register themselves
+ vector<stream_query *> lfta_list = (*mvsi).second;
+ for(i=0;i<lfta_list.size();i++){
+ int mi = lfta_iface_qname_ix[liface][i];
+ if(first) first = false;
+ else lfta_val[lmach] += ", ";
+ lfta_val[lmach] += "\"" + registration_query_names[mi] + "\"";
+ }
+ }
}
+// for (i = 0; i < registration_query_names.size(); ++i) {
+// if (i)
+// lfta_val[lmach] += ", ";
+// lfta_val[lmach] += "\"" + registration_query_names[i] + "\"";
+// }
+
for (i = 0; i < hfta_list.size(); ++i) {
lfta_val[lmach] += ", \"" + hfta_list[i]->query_name + "\"";
}
// set the prefilter function accordingly.
// see the example in demo/err2
lfta_val[lmach] += "void fta_init(gs_sp_t device){\n";
+ lfta_val[lmach] += "// note: the last parameter in fta_register is the prefilter signature\n";
// for(i=0;i<mach_query_names[lmach].size();i++)
// int mi = mach_query_names[lmach][i];
for(i=0;i<lfta_list.size();i++){
stream_query *lfta_sq = lfta_list[i];
int mi = lfta_iface_qname_ix[liface][i];
+
+ if(liface == "_local_"){
+// Don't register an init function, do the init code inline
+ lfta_val[lmach] += "// initialization code for "+registration_query_names[mi]+";\n";
+ lfta_val[lmach] += "\t"+generate_watchlist_name(registration_query_names[mi])+".filename = "+generate_watchlist_name(registration_query_names[mi])+"__fstr;\n";
+ continue;
+ }
fprintf(stderr,"interface %s, LFTA %d, snap length is %d\n",liface.c_str(),mi,snap_lengths[mi]);
if(interface_names[mi]!="")
this_iface = '"'+interface_names[mi]+'"';
lfta_val[lmach] += "\tif(!strcmp(device,"+this_iface+"))\n";
- lfta_val[lmach] += "\t\tfta_register(\""+query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
+ lfta_val[lmach] += "\t\tfta_register(\""+registration_query_names[mi]+"\", " + (lfta_reuse_options[mi]?"1":"0") + ", ";
// if(interface_names[mi]=="")
// lfta_val[lmach]+="DEFAULTDEV";
// else
lfta_val[lmach] += this_iface;
- lfta_val[lmach] += ", "+generate_alloc_name(query_names[mi])
- +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(query_names[mi])
+ lfta_val[lmach] += ", "+generate_alloc_name(registration_query_names[mi])
+ +"\n#ifndef LFTA_IN_NIC\n\t,"+generate_schema_string_name(registration_query_names[mi])
+"\n#endif\n";
sprintf(tmpstr,",%d",snap_lengths[mi]);
lfta_val[lmach] += tmpstr;
// NOTE : I'm assuming that visible lftas do not start with _fta.
// -- will fail for non-visible simple selection queries.
- if(field_verifier != NULL && query_names[mi].substr(0,1) != "_"){
+ if(field_verifier != NULL && registration_query_names[mi].substr(0,1) != "_"){
string warning_str;
if(lfta_comment == "")
warning_str += "\tcomment not found.\n";
}
if(warning_str != "")
fprintf(stderr,"Warning, in LFTA stream %s:\n%s",
- query_names[mi].c_str(),warning_str.c_str());
+ registration_query_names[mi].c_str(),warning_str.c_str());
}
// Create qtree output
- fprintf(qtree_output,"\t<LFTA name='%s' >\n",query_names[mi].c_str());
- if(lfta_comment != "")
- fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
- if(lfta_title != "")
- fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
- if(lfta_namespace != "")
- fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
- if(lfta_ht_size != "")
- fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
+ fprintf(qtree_output,"\t<LFTA name='%s' >\n",registration_query_names[mi].c_str());
+ if(lfta_comment != "")
+ fprintf(qtree_output,"\t\t<Description value='%s' />\n",lfta_comment.c_str());
+ if(lfta_title != "")
+ fprintf(qtree_output,"\t\t<Title value='%s' />\n",lfta_title.c_str());
+ if(lfta_namespace != "")
+ fprintf(qtree_output,"\t\t<Namespace value='%s' />\n",lfta_namespace.c_str());
+ if(lfta_ht_size != "")
+ fprintf(qtree_output,"\t\t<HtSize value='%s' />\n",lfta_ht_size.c_str());
if(lmach != "")
fprintf(qtree_output,"\t\t<Host value='%s' />\n",lmach.c_str());
else
fprintf(qtree_output,"\t\t<Host value='%s' />\n",hostname.c_str());
fprintf(qtree_output,"\t\t<Interface value='%s' />\n",interface_names[mi].c_str());
- fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
+ std::vector<tablevar_t *> itbls = lfta_sq->get_input_tables();
+ for(int t=0;t<itbls.size();++t){
+ fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",itbls[t]->get_schema_name().c_str());
+ }
+// fprintf(qtree_output,"\t\t<ReadsFrom value='%s' />\n",interface_names[mi].c_str());
fprintf(qtree_output,"\t\t<Rate value='100' />\n");
fprintf(qtree_output,"\t\t<LivenessTimeout value='%d' />\n", lfta_liveness_timeouts[mi]);
// write info about fields to qtree.xml
// for an interface type
bool use_proto = false;
+ bool use_bsa = false;
+ bool use_kafka = false;
int erri;
string err_str;
for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
use_proto = true;
}
}
+ ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "BSA", erri, err_str);
+ for(int ift_i=0;ift_i<ift.size();ift_i++){
+ if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
+ use_bsa = true;
+ }
+ }
+ ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "KAFKA", erri, err_str);
+ for(int ift_i=0;ift_i<ift.size();ift_i++){
+ if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
+ use_kafka = true;
+ }
+ }
}
fprintf(outfl,
fprintf(outfl, " -ldll -ldl ");
if(use_proto)
fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
+ if(use_bsa)
+ fprintf(outfl, " -lbsa_stream ");
+ if(use_kafka)
+ fprintf(outfl, " -lrdkafka ");
fprintf(outfl," -lgscpaux");
#ifdef GCOV
fprintf(outfl," -fprofile-arcs");
// string err_str;
for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
string ifnm = (*ssi);
+ // suppress internal _local_ interface
+ if (ifnm == "_local_")
+ continue;
fprintf(outfl, "%s ",ifnm.c_str());
vector<string> ifv = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "Command", erri, err_str);
for(j=0;j<ifv.size();++j)