#include"xml_t.h"
#include"field_list.h"
+#include "gsconfig.h"
+
extern int xmlParserparse(void);
extern FILE *xmlParserin;
extern int xmlParserdebug;
vector<string> registration_query_names; // for lfta.c registration
map<string, vector<int> > mach_query_names; // list queries of machine
vector<int> snap_lengths; // for lfta.c registration
+ vector<int> snap_position; // for lfta.c registration
vector<string> interface_names; // for lfta.c registration
vector<string> machine_names; // machine of interface
vector<bool> lfta_reuse_options; // for lfta.c registration
while(fgets(tmpstr,TMPSTRLEN,osp_in)){
o_lineno++;
int nflds = split_string(tmpstr,',',flds,MAXFLDS);
- if(nflds == 7){
+ if(tmpstr[0]!='\n' && tmpstr[0]!='\r' && tmpstr[0]!='\0' && tmpstr[0]!='#'){
+ if(nflds == 7){
// make operator type lowercase
- char *tmpc;
- for(tmpc=flds[1];*tmpc!='\0';++tmpc)
- *tmpc = tolower(*tmpc);
-
- ospec_str *tmp_ospec = new ospec_str();
- tmp_ospec->query = flds[0];
- tmp_ospec->operator_type = flds[1];
- tmp_ospec->operator_param = flds[2];
- tmp_ospec->output_directory = flds[3];
- tmp_ospec->bucketwidth = atoi(flds[4]);
- tmp_ospec->partitioning_flds = flds[5];
- tmp_ospec->n_partitions = atoi(flds[6]);
- qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
- output_specs.push_back(tmp_ospec);
- }else{
- fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
+ char *tmpc;
+ for(tmpc=flds[1];*tmpc!='\0';++tmpc)
+ *tmpc = tolower(*tmpc);
+
+ ospec_str *tmp_ospec = new ospec_str();
+ tmp_ospec->query = flds[0];
+ tmp_ospec->operator_type = flds[1];
+ tmp_ospec->operator_param = flds[2];
+ tmp_ospec->output_directory = flds[3];
+ tmp_ospec->bucketwidth = atoi(flds[4]);
+ tmp_ospec->partitioning_flds = flds[5];
+ tmp_ospec->n_partitions = atoi(flds[6]);
+ qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
+ output_specs.push_back(tmp_ospec);
+ }else{
+ fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
+ }
}
}
fclose(osp_in);
}
if(dangling_ospecs!=""){
fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
+ exit(1);
}
string dangling_par = "";
string lmach = hostname;
if(tvec.size()>0){
liface = tvec[0]->get_interface(); // iface queries have been resolved
- lmach = tvec[0]->get_machine();
+ if(tvec[0]->get_machine() != ""){
+ lmach = tvec[0]->get_machine();
+ }else{
+ fprintf(stderr,"WARNING, lfta %s has empty machine name, using %s\n", split_queries[l]->query_plan[0]->node_name.c_str(), hostname.c_str());
+ }
} // else{
interface_names.push_back(liface);
machine_names.push_back(lmach);
lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
*/
- snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
+ snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "snap"));
+ snap_position.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "index"));
+
+// STOPPED HERE need to figure out how to generate the code that Vlad needs
+// from snap_postion
// TODO NOTE : I'd like it to be the case that registration_query_names
// are the queries to be registered for subsciption.
// if(hfta_namespace == "")
// warning_str += "\tnamespace not found.\n";
-// STOPPED HERE
// There is a get_tbl_keys method implemented for qp_nodes,
// integrate it into steam_query, then call it to find keys,
// and annotate feidls with their key-ness.
// Find common filter predicates in the LFTAs.
-// in addition generate structs to store the temporal attributes unpacked by prefilter
+// in addition generate structs to store the
+// temporal attributes unpacked by prefilter
+// compute & provide interface for per-interface
+// record extraction properties
map<string, vector<stream_query *> >::iterator ssqi;
for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
#endif
map<string, vector<long long int> > lfta_sigs; // used again later
+ map<string, int> lfta_snap_pos; // optimize csv parsing
+ // compute now, use in get_iface_properties
for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
string liface = (*mvsi).first;
vector<long long int> empty_list;
lfta_sigs[liface] = empty_list;
+ lfta_snap_pos[liface] = -1;
vector<col_id_set> lfta_cols;
vector<int> lfta_snap_length;
}
lfta_sigs[liface].push_back(mask);
lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
- lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
+ lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "snap"));
+ int this_snap_pos = compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "index");
+ if(this_snap_pos > lfta_snap_pos[liface])
+ lfta_snap_pos[liface] = this_snap_pos;
}
//for(li=0;li<mach_lftas.size();++li){
}
lfta_val[lmach] += "\";\n";
}
+ lfta_val[lmach] += "\t\t}else if(!strcmp(property_name, \"_max_csv_pos\")){\n";
+ lfta_val[lmach] += "\t\t\treturn \""+int_to_string(lfta_snap_pos[(*sir)])+"\";\n";
lfta_val[lmach] += "\t\t} else\n";
lfta_val[lmach] += "\t\t\treturn NULL;\n";
}
bool use_proto = false;
bool use_bsa = false;
bool use_kafka = false;
+ bool use_ssl = false;
int erri;
string err_str;
for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
vector<string> ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "InterfaceType", erri, err_str);
for(int ift_i=0;ift_i<ift.size();ift_i++){
if(ift[ift_i]=="PROTO"){
+#ifdef PROTO_ENABLED
use_proto = true;
+#else
+ fprintf(stderr,"Runtime libraries built without PROTO support. Rebuild with PROTO_ENABLED defined in gsoptions.h\n");
+ exit(0);
+#endif
}
}
ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "BSA", erri, err_str);
for(int ift_i=0;ift_i<ift.size();ift_i++){
if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
+#ifdef BSA_ENABLED
use_bsa = true;
+#else
+ fprintf(stderr,"Runtime libraries built without BSA support. Rebuild with BSA_ENABLED defined in gsoptions.h\n");
+ exit(0);
+#endif
}
}
ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "KAFKA", erri, err_str);
for(int ift_i=0;ift_i<ift.size();ift_i++){
if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
+#ifdef KAFKA_ENABLED
use_kafka = true;
+#else
+ fprintf(stderr,"Runtime libraries built without KAFKA support. Rebuild with KAFKA_ENABLED defined in gsoptions.h\n");
+ exit(0);
+#endif
}
}
+ ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "ENCRYPTED", erri, err_str);
+ for(int ift_i=0;ift_i<ift.size();ift_i++){
+ if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
+#ifdef SSL_ENABLED
+ use_ssl = true;
+#else
+ fprintf(stderr,"Runtime libraries built without SSL support. Rebuild with SSL_ENABLED defined in gsoptions.h\n");
+ exit(0);
+#endif
+ }
+ }
}
fprintf(outfl,
if(use_pads)
fprintf(outfl,"-lgscppads -lpads ");
fprintf(outfl,
-"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
+"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz -lrt");
if(use_pads)
fprintf(outfl, " -lpz -lz -lbz ");
if(libz_exists && libast_exists)
fprintf(outfl," -last ");
if(use_pads)
fprintf(outfl, " -ldll -ldl ");
- if(use_proto)
- fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
- if(use_bsa)
- fprintf(outfl, " -lbsa_stream ");
- if(use_kafka)
- fprintf(outfl, " -lrdkafka ");
+
+#ifdef PROTO_ENABLED
+ fprintf(outfl, " -L/usr/local/lib/ -lprotobuf-c ");
+#endif
+#ifdef BSA_ENABLED
+ fprintf(outfl, " -lbsa_stream ");
+#endif
+#ifdef KAFKA_ENABLED
+ fprintf(outfl, " -lrdkafka ");
+#endif
+#ifdef SSL_ENABLED
+ fprintf(outfl, " -lssl -lcrypto ");
+#endif
fprintf(outfl," -lgscpaux");
#ifdef GCOV
fprintf(outfl," -fprofile-arcs");