vector<string> registration_query_names; // for lfta.c registration
map<string, vector<int> > mach_query_names; // list queries of machine
vector<int> snap_lengths; // for lfta.c registration
+ vector<int> snap_position; // for lfta.c registration
vector<string> interface_names; // for lfta.c registration
vector<string> machine_names; // machine of interface
vector<bool> lfta_reuse_options; // for lfta.c registration
while(fgets(tmpstr,TMPSTRLEN,osp_in)){
o_lineno++;
int nflds = split_string(tmpstr,',',flds,MAXFLDS);
- if(nflds == 7){
+ if(tmpstr[0]!='\n' && tmpstr[0]!='\r' && tmpstr[0]!='\0' && tmpstr[0]!='#'){
+ if(nflds == 7){
// make operator type lowercase
- char *tmpc;
- for(tmpc=flds[1];*tmpc!='\0';++tmpc)
- *tmpc = tolower(*tmpc);
-
- ospec_str *tmp_ospec = new ospec_str();
- tmp_ospec->query = flds[0];
- tmp_ospec->operator_type = flds[1];
- tmp_ospec->operator_param = flds[2];
- tmp_ospec->output_directory = flds[3];
- tmp_ospec->bucketwidth = atoi(flds[4]);
- tmp_ospec->partitioning_flds = flds[5];
- tmp_ospec->n_partitions = atoi(flds[6]);
- qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
- output_specs.push_back(tmp_ospec);
- }else{
- fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
+ char *tmpc;
+ for(tmpc=flds[1];*tmpc!='\0';++tmpc)
+ *tmpc = tolower(*tmpc);
+
+ ospec_str *tmp_ospec = new ospec_str();
+ tmp_ospec->query = flds[0];
+ tmp_ospec->operator_type = flds[1];
+ tmp_ospec->operator_param = flds[2];
+ tmp_ospec->output_directory = flds[3];
+ tmp_ospec->bucketwidth = atoi(flds[4]);
+ tmp_ospec->partitioning_flds = flds[5];
+ tmp_ospec->n_partitions = atoi(flds[6]);
+ qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
+ output_specs.push_back(tmp_ospec);
+ }else{
+ fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
+ }
}
}
fclose(osp_in);
}
if(dangling_ospecs!=""){
fprintf(stderr,"WARNING, the following entries in output_spec.cfg don't have a matching query: %s\n",dangling_ospecs.c_str());
+ exit(1);
}
string dangling_par = "";
lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
*/
- snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
+ snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "snap"));
+ snap_position.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "index"));
+
+// STOPPED HERE need to figure out how to generate the code that Vlad needs
+// from snap_postion
// TODO NOTE : I'd like it to be the case that registration_query_names
// are the queries to be registered for subsciption.
// if(hfta_namespace == "")
// warning_str += "\tnamespace not found.\n";
-// STOPPED HERE
// There is a get_tbl_keys method implemented for qp_nodes,
// integrate it into steam_query, then call it to find keys,
// and annotate feidls with their key-ness.
// Find common filter predicates in the LFTAs.
-// in addition generate structs to store the temporal attributes unpacked by prefilter
+// in addition generate structs to store the
+// temporal attributes unpacked by prefilter
+// compute & provide interface for per-interface
+// record extraction properties
map<string, vector<stream_query *> >::iterator ssqi;
for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
#endif
map<string, vector<long long int> > lfta_sigs; // used again later
+ map<string, int> lfta_snap_pos; // optimize csv parsing
+ // compute now, use in get_iface_properties
for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
string liface = (*mvsi).first;
vector<long long int> empty_list;
lfta_sigs[liface] = empty_list;
+ lfta_snap_pos[liface] = -1;
vector<col_id_set> lfta_cols;
vector<int> lfta_snap_length;
}
lfta_sigs[liface].push_back(mask);
lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
- lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
+ lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "snap"));
+ int this_snap_pos = compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "index");
+ if(this_snap_pos > lfta_snap_pos[liface])
+ lfta_snap_pos[liface] = this_snap_pos;
}
//for(li=0;li<mach_lftas.size();++li){
}
lfta_val[lmach] += "\";\n";
}
+ lfta_val[lmach] += "\t\t}else if(!strcmp(property_name, \"_max_csv_pos\")){\n";
+ lfta_val[lmach] += "\t\t\treturn \""+int_to_string(lfta_snap_pos[(*sir)])+"\";\n";
lfta_val[lmach] += "\t\t} else\n";
lfta_val[lmach] += "\t\t\treturn NULL;\n";
}
bool use_proto = false;
bool use_bsa = false;
bool use_kafka = false;
+ bool use_ssl = false;
int erri;
string err_str;
for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
#endif
}
}
+ ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "ENCRYPTED", erri, err_str);
+ for(int ift_i=0;ift_i<ift.size();ift_i++){
+ if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
+#ifdef SSL_ENABLED
+ use_ssl = true;
+#else
+ fprintf(stderr,"Runtime libraries built without SSL support. Rebuild with SSL_ENABLED defined in gsoptions.h\n");
+ exit(0);
+#endif
+ }
+ }
}
fprintf(outfl,
if(use_pads)
fprintf(outfl,"-lgscppads -lpads ");
fprintf(outfl,
-"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
+"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz -lrt");
if(use_pads)
fprintf(outfl, " -lpz -lz -lbz ");
if(libz_exists && libast_exists)
#endif
#ifdef KAFKA_ENABLED
fprintf(outfl, " -lrdkafka ");
+#endif
+#ifdef SSL_ENABLED
+ fprintf(outfl, " -lssl -lcrypto ");
#endif
fprintf(outfl," -lgscpaux");
#ifdef GCOV