// Dump schema summary
void dump_summary(stream_query *str){
+ for(int q=0;q<str->query_plan.size();++q){
+ qp_node *qp = str->query_plan[q];
+ if(qp==NULL)
+ continue; // there can be blanks
+
+ fprintf(schema_summary_output,"-----\n");
+ fprintf(schema_summary_output,"%s\n",qp->node_name.c_str());
+
+ table_def *sch = qp->get_fields();
+
+ vector<field_entry *> flds = sch->get_fields();
+ int f;
+ for(f=0;f<flds.size();++f){
+ if(f>0) fprintf(schema_summary_output,"|");
+ fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
+ }
+ fprintf(schema_summary_output,"\n");
+ for(f=0;f<flds.size();++f){
+ if(f>0) fprintf(schema_summary_output,"|");
+ fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
+ }
+ fprintf(schema_summary_output,"\n");
+
+ map<std::string, std::string> defines = qp->get_definitions();
+ string comment = "";
+ if(defines.count("comment")>0){
+ comment = defines["comment"];
+ }
+ fprintf(schema_summary_output,"%s\n",comment.c_str());
+
+ vector<tablevar_t *> input_tables = qp->get_input_tbls();
+ for(int t=0; t<input_tables.size(); ++t){
+ if(t>0) fprintf(schema_summary_output,"|");
+ string machine = input_tables[t]->get_machine();
+ string iface = input_tables[t]->get_interface();
+ string schema = input_tables[t]->get_schema_name();
+ if(machine!=""){
+ fprintf(schema_summary_output,"%s.",machine.c_str());
+ }
+ if(iface!=""){
+ fprintf(schema_summary_output,"%s.",iface.c_str());
+ }else{
+ if(machine!="") fprintf(schema_summary_output,".");
+ }
+ fprintf(schema_summary_output,"%s",schema.c_str());
+ }
+
+ fprintf(schema_summary_output,"\n");
+ }
+
+
+
+/*
+ fprintf(schema_summary_output,"-----\n");
fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
table_def *sch = str->get_output_tabledef();
fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
}
fprintf(schema_summary_output,"\n");
+
+ string comment = "";
+ if(str->defines.count("comment")>0){
+ comment = str->defines["comment"];
+ }
+ fprintf(schema_summary_output,"%s\n",comment.c_str());
+
+ vector<tablevar_t *> input_tables = str->get_input_tables();
+ for(int t=0; t<input_tables.size(); ++t){
+ if(t>0) fprintf(schema_summary_output,"|");
+ string machine = input_tables[t]->get_machine();
+ string iface = input_tables[t]->get_interface();
+ string schema = input_tables[t]->get_schema_name();
+ if(machine!=""){
+ fprintf(schema_summary_output,"%s.",machine.c_str());
+ }
+ if(iface!=""){
+ fprintf(schema_summary_output,"%s.",iface.c_str());
+ }else{
+ if(machine!="") fprintf(schema_summary_output,".");
+ }
+ fprintf(schema_summary_output,"%s",schema.c_str());
+ }
+ fprintf(schema_summary_output,"\n");
+*/
+
}
// Globals
vector<string> registration_query_names; // for lfta.c registration
map<string, vector<int> > mach_query_names; // list queries of machine
vector<int> snap_lengths; // for lfta.c registration
+ vector<int> snap_position; // for lfta.c registration
vector<string> interface_names; // for lfta.c registration
vector<string> machine_names; // machine of interface
vector<bool> lfta_reuse_options; // for lfta.c registration
while(fgets(tmpstr,TMPSTRLEN,osp_in)){
o_lineno++;
int nflds = split_string(tmpstr,',',flds,MAXFLDS);
- if(nflds == 7){
+ if(tmpstr[0]!='\n' && tmpstr[0]!='\r' && tmpstr[0]!='\0' && tmpstr[0]!='#'){
+ if(nflds == 7){
// make operator type lowercase
- char *tmpc;
- for(tmpc=flds[1];*tmpc!='\0';++tmpc)
- *tmpc = tolower(*tmpc);
-
- ospec_str *tmp_ospec = new ospec_str();
- tmp_ospec->query = flds[0];
- tmp_ospec->operator_type = flds[1];
- tmp_ospec->operator_param = flds[2];
- tmp_ospec->output_directory = flds[3];
- tmp_ospec->bucketwidth = atoi(flds[4]);
- tmp_ospec->partitioning_flds = flds[5];
- tmp_ospec->n_partitions = atoi(flds[6]);
- qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
- output_specs.push_back(tmp_ospec);
- }else{
- fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
+ char *tmpc;
+ for(tmpc=flds[1];*tmpc!='\0';++tmpc)
+ *tmpc = tolower(*tmpc);
+
+ ospec_str *tmp_ospec = new ospec_str();
+ tmp_ospec->query = flds[0];
+ tmp_ospec->operator_type = flds[1];
+ tmp_ospec->operator_param = flds[2];
+ tmp_ospec->output_directory = flds[3];
+ tmp_ospec->bucketwidth = atoi(flds[4]);
+ tmp_ospec->partitioning_flds = flds[5];
+ tmp_ospec->n_partitions = atoi(flds[6]);
+ qname_to_ospec.insert(pair<string,int>(tmp_ospec->query,output_specs.size()));
+ output_specs.push_back(tmp_ospec);
+ }else{
+ fprintf(stderr,"Warning, line %d corrupted in output_spec.cfg, has %d fields.\n",o_lineno,nflds);
+ }
}
}
fclose(osp_in);
map<string, int> hfta_name_map;
// vector< vector<int> > process_sets;
// vector< set<int> > stream_node_sets;
- reverse(process_order.begin(), process_order.end()); // get listing in reverse order.
- // i.e. process leaves 1st.
+ reverse(process_order.begin(), process_order.end()); // get listing in reverse .
+ // order: process leaves 1st.
for(i=0;i<process_order.size();++i){
if(qnodes[process_order[i]]->is_externally_visible == true){
//printf("Visible.\n");
lfta_val[lmach] += generate_lfta_block(split_queries[l]->query_plan[0], Schema, split_queries[l]->get_gid(), Ext_fcns, lfta_schema_embed, ifaces_db,nicprop);
*/
- snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema));
+ snap_lengths.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "snap"));
+ snap_position.push_back(compute_snap_len(split_queries[l]->query_plan[0], Schema, "index"));
+
+// STOPPED HERE need to figure out how to generate the code that Vlad needs
+// from snap_postion
// TODO NOTE : I'd like it to be the case that registration_query_names
// are the queries to be registered for subsciption.
}
*/
-/*
-// output schema summary
- if(output_schema_summary){
- dump_summary(split_queries[0]);
- }
-*/
}
+// output schema summary
+ if(output_schema_summary){
+ for(int o=0;o<split_queries.size(); ++o){
+ dump_summary(split_queries[o]);
+ }
+ }
if(hfta_returned){ // query also has an HFTA component
// Find common filter predicates in the LFTAs.
-// in addition generate structs to store the temporal attributes unpacked by prefilter
+// in addition generate structs to store the
+// temporal attributes unpacked by prefilter
+// compute & provide interface for per-interface
+// record extraction properties
map<string, vector<stream_query *> >::iterator ssqi;
for(ssqi=lfta_mach_lists.begin(); ssqi!=lfta_mach_lists.end(); ++ssqi){
lfta_prefilter_val[lmach] += "#define PREFILTER_DEFINED 1;\n\n";
#endif
map<string, vector<long long int> > lfta_sigs; // used again later
+ map<string, int> lfta_snap_pos; // optimize csv parsing
+ // compute now, use in get_iface_properties
for(auto mvsi=lfta_iface_lists.begin(); mvsi!=lfta_iface_lists.end(); ++mvsi){
string liface = (*mvsi).first;
vector<long long int> empty_list;
lfta_sigs[liface] = empty_list;
+ lfta_snap_pos[liface] = -1;
vector<col_id_set> lfta_cols;
vector<int> lfta_snap_length;
}
lfta_sigs[liface].push_back(mask);
lfta_cols.push_back(lfta_iface_lists[liface][li]->query_plan[0]->get_colrefs(true,Schema));
- lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema));
+ lfta_snap_length.push_back(compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "snap"));
+ int this_snap_pos = compute_snap_len(lfta_iface_lists[liface][li]->query_plan[0], Schema, "index");
+ if(this_snap_pos > lfta_snap_pos[liface])
+ lfta_snap_pos[liface] = this_snap_pos;
}
//for(li=0;li<mach_lftas.size();++li){
}
lfta_val[lmach] += "\";\n";
}
+ lfta_val[lmach] += "\t\t}else if(!strcmp(property_name, \"_max_csv_pos\")){\n";
+ lfta_val[lmach] += "\t\t\treturn \""+int_to_string(lfta_snap_pos[(*sir)])+"\";\n";
lfta_val[lmach] += "\t\t} else\n";
lfta_val[lmach] += "\t\t\treturn NULL;\n";
}
bool use_proto = false;
bool use_bsa = false;
bool use_kafka = false;
+ bool use_ssl = false;
int erri;
string err_str;
for(ssi=ifaces.begin();ssi!=ifaces.end();++ssi){
#endif
}
}
+ ift = ifdb->get_iface_vals(ifmachines[ifnm],ifnm, "ENCRYPTED", erri, err_str);
+ for(int ift_i=0;ift_i<ift.size();ift_i++){
+ if(ift[ift_i] == "TRUE" || ift[ift_i] == "True" || ift[ift_i] == "true"){
+#ifdef SSL_ENABLED
+ use_ssl = true;
+#else
+ fprintf(stderr,"Runtime libraries built without SSL support. Rebuild with SSL_ENABLED defined in gsoptions.h\n");
+ exit(0);
+#endif
+ }
+ }
}
fprintf(outfl,
if(use_pads)
fprintf(outfl,"-lgscppads -lpads ");
fprintf(outfl,
-"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz");
+"-lgscprts -lgscphost -lm -lgscpaux -lgscplftaaux -lclearinghouse -lresolv -lpthread -lgscpinterface -lz -lrt");
if(use_pads)
fprintf(outfl, " -lpz -lz -lbz ");
if(libz_exists && libast_exists)
#endif
#ifdef KAFKA_ENABLED
fprintf(outfl, " -lrdkafka ");
+#endif
+#ifdef SSL_ENABLED
+ fprintf(outfl, " -lssl -lcrypto ");
#endif
fprintf(outfl," -lgscpaux");
#ifdef GCOV