exit -1
fi
-$GSLITE_ROOT/bin/translate_fta -h localhost -c -M -R $GSLITE_ROOT -C $GSLITE_ROOT/cfg -l $GSLITE_ROOT/qlib packet_schema.txt *.gsql
+$GSLITE_ROOT/bin/translate_fta -f -N -h localhost -c -M -R $GSLITE_ROOT -C $GSLITE_ROOT/cfg -l $GSLITE_ROOT/qlib packet_schema.txt *.gsql
ret=$?
if [ $ret -ne 0 ]
then
uint FUN [COST LOW] extr_med_hfta0_fcn(string);
uint EXTR qspace_of quant_udaf_hfta0 extr_quant_hfta0_space (uint);
uint FUN [COST LOW] extr_quant_hfta0_space(string);
- string UDAF [SUBAGGR quant_udaf_lfta3, SUPERAGGR quant_udaf_hfta3] quant_udaf_hfta0 fstring16 (uint);
- string UDAF quant_udaf_hfta3 fstring16 (string);
+ string UDAF [SUBAGGR quant_udaf_lfta3, SUPERAGGR quant_udaf_hfta3] quant_udaf_hfta0 fstring24 (uint);
+ string UDAF quant_udaf_hfta3 fstring24 (string);
string UDAF quant_udaf_lfta3 fstring6600 (uint);
///////////////////////////////////////////////////////////
// count the # times the payload is different from the previous value
/////////////////////////////////////////////////////////////
- uint UDAF [SUBAGGR count_diff_lfta_ui, SUPERAGGR count_diff_hfta] count_diff fstring12 (uint);
- uint UDAF count_diff_hfta fstring12 (string);
+ uint UDAF [SUBAGGR count_diff_lfta_ui, SUPERAGGR count_diff_hfta] count_diff fstring16 (uint);
+ uint UDAF count_diff_hfta fstring16 (string);
string UDAF count_diff_lfta_ui fstring20 (uint);
- uint UDAF [SUBAGGR count_diff_lfta_i, SUPERAGGR count_diff_hfta] count_diff fstring12 (int);
+ uint UDAF [SUBAGGR count_diff_lfta_i, SUPERAGGR count_diff_hfta] count_diff fstring16 (int);
string UDAF count_diff_lfta_i fstring20 (int);
- uint UDAF [SUBAGGR count_diff_lfta_ul, SUPERAGGR count_diff_hfta] count_diff fstring12 (ullong);
+ uint UDAF [SUBAGGR count_diff_lfta_ul, SUPERAGGR count_diff_hfta] count_diff fstring16 (ullong);
string UDAF count_diff_lfta_ul fstring20 (ullong);
- uint UDAF [SUBAGGR count_diff_lfta_l, SUPERAGGR count_diff_hfta] count_diff fstring12 (llong);
+ uint UDAF [SUBAGGR count_diff_lfta_l, SUPERAGGR count_diff_hfta] count_diff fstring16 (llong);
string UDAF count_diff_lfta_l fstring20 (llong);
- uint UDAF [SUBAGGR count_diff_lfta_s, SUPERAGGR count_diff_hfta] count_diff fstring12 (string);
+ uint UDAF [SUBAGGR count_diff_lfta_s, SUPERAGGR count_diff_hfta] count_diff fstring16 (string);
string UDAF count_diff_lfta_s fstring20 (string);
flush_old(result);
}
if(n_patterns <= 1){
- aggregate aggr;
+ char aggr_buffer[sizeof(aggregate)];
// create an aggregate in preallocated buffer
- func.create_aggregate(tup, (char*)&aggr);
+ func.create_aggregate(tup, aggr_buffer);
// neeed operator= doing a deep copy
- group_table.insert(grp, aggr);
+ group_table.insert(grp, (*(aggregate*)aggr_buffer));
}else{
int p;
// TODO this code is wrong, must check if each pattern is in the group table.
for(p=0;p<n_patterns;++p){
// need shallow copy constructor for groups
group new_grp(grp, func.get_pattern(p));
- aggregate aggr;
- func.create_aggregate(tup, (char*)&aggr);
+ char aggr_buffer[sizeof(aggregate)];
+ func.create_aggregate(tup, aggr_buffer);
// neeed operator= doing a deep copy
- group_table.insert(new_grp, aggr);
+ group_table.insert(new_grp, (*(aggregate*)aggr_buffer));
}
}
}
flush_old(result);
}
if(n_patterns <= 1){
- aggregate aggr;
+ char aggr_buffer[sizeof(aggregate)];
// create an aggregate in preallocated buffer
- func.create_aggregate(tup, (char*)&aggr);
+ func.create_aggregate(tup, aggr_buffer);
// neeed operator= doing a deep copy
- group_table[curr_table].insert(grp, aggr);
+ group_table[curr_table].insert(grp, (*(aggregate*)aggr_buffer));
}else{
int p;
for(p=0;p<n_patterns;++p){
// TODO this code is wrong need to check each pattern to see if its in the table
- // need shalow copy constructor for groups
+ // need shallow copy constructor for groups
group new_grp(grp, func.get_pattern(p));
- aggregate aggr;
- func.create_aggregate(tup, (char*)&aggr);
+ char aggr_buffer[sizeof(aggregate)];
+ func.create_aggregate(tup, aggr_buffer);
// neeed operator= doing a deep copy
- group_table[curr_table].insert(new_grp, aggr);
+ group_table[curr_table].insert(new_grp, (*(aggregate*)aggr_buffer));
}
}
}
allocated = 1;
total_memory = num_buckets * sizeof(hash_bucket);
- fprintf(stderr, "Initial allocaton of %d buckets\n", (int)num_buckets);
+ // fprintf(stderr, "Initial allocaton of %d buckets\n", (int)num_buckets);
}
data_item* d = new data_item(key, val);
// OR it was less then half of the load factor
size_t min_buckets = 0;
if ((max_load > load_factor) || ((2 * max_load) < load_factor)) {
- min_buckets = _max_size / load_factor;
+ min_buckets = ceil(_max_size / load_factor);
}
-
- if (min_buckets) {
+
+
+ if (min_buckets) {
// find power-of-2 size large than min_buckets;
int nb;
for(nb=2;nb<min_buckets;nb*=2);
- num_buckets = nb;
-
- fprintf(stderr, "Resizing to %d buckets\n", (int)num_buckets);
- delete[] buckets;
- hash_mask = num_buckets-1;
-
- buckets = new hash_bucket[num_buckets];
- total_memory = num_buckets * sizeof(hash_bucket);
+ // make sure we actually changing the size
+ if (nb != num_buckets) {
+ fprintf(stderr, "Resizing from %d to %d buckets\n", (int)num_buckets, nb);
+ num_buckets = nb;
+ delete[] buckets;
+ hash_mask = num_buckets-1;
+
+ buckets = new hash_bucket[num_buckets];
+ total_memory = num_buckets * sizeof(hash_bucket);
+ }
}
return 0;
key_type key;
data_item* next; // next data item in overflow chain
- data_item(const key_type& k) : key(k), next(NULL) { }
+ data_item(const key_type& k) {
+ key = k;
+ next = NULL;
+ }
};
struct hash_bucket {
// OR it was less then half of the load factor
size_t min_buckets = 0;
if ((max_load > load_factor) || ((2 * max_load) < load_factor)) {
- min_buckets = _max_size / load_factor;
+ min_buckets = ceil(_max_size / load_factor);
}
if (min_buckets) {
// find power-of-2 size large than min_buckets;
int nb;
for(nb=2;nb<min_buckets;nb*=2);
- num_buckets = nb;
-
- fprintf(stderr, "Resizing to %d buckets\n", num_buckets);
- delete[] buckets;
- hash_mask = num_buckets-1;
-
- buckets = new hash_bucket[num_buckets];
+ // make sure we actually changing the size
+ if (nb != num_buckets) {
+ fprintf(stderr, "Resizing from %d to %d buckets\n", (int)num_buckets, nb);
+ num_buckets = nb;
+ delete[] buckets;
+ hash_mask = num_buckets-1;
+
+ buckets = new hash_bucket[num_buckets];
+ }
}
return 0;
// Internal functions
+void hfta_vstr_init(vstring * str);
gs_retval_t Vstring_Constructor(vstring *, gs_csp_t);
gs_retval_t hfta_vstr_length(vstring *);
void hfta_vstr_assign_with_copy_in_tuple(vstring32 *, const vstring *,
top_tuple.channel = output_channel;
result.push_back(top_tuple);
}
+ merge_queue.clear();
queue_mem = 0;
if ((iter = group_table.find(grp)) != group_table.end()) {
func.update_aggregate(tup, grp, (*iter).second);
}else{
- aggregate aggr;
+ char aggr_buffer[sizeof(aggregate)];
// create an aggregate in preallocated buffer
- func.create_aggregate(tup, (char*)&aggr);
+ func.create_aggregate(tup, aggr_buffer);
// neeed operator= doing a deep copy
- group_table.insert(grp, aggr);
+ group_table.insert(grp, (*(aggregate*)aggr_buffer));
}
tup.free_tuple();
return 0;
unpack_fcn = dt.get_hfta_unpack_fcn_noxf();
}
if(dt.is_buffer_type()){
- sprintf(tmpstr,"\t unpack_var_%s_%d = %s(tup%d.data, tup%d.tuple_size, unpack_offset_%s_%d, &problem);\n",field.c_str(), tblref, unpack_fcn.c_str(), tblref, tblref, field.c_str(), tblref);
+ sprintf(tmpstr,"\tunpack_var_%s_%d = %s(tup%d.data, tup%d.tuple_size, unpack_offset_%s_%d, &problem); // unpack_cid\n",field.c_str(), tblref, unpack_fcn.c_str(), tblref, tblref, field.c_str(), tblref);
}else{
- sprintf(tmpstr,"\t unpack_var_%s_%d = %s_nocheck(tup%d.data, unpack_offset_%s_%d);\n",field.c_str(), tblref, unpack_fcn.c_str(), tblref, field.c_str(), tblref);
+ sprintf(tmpstr,"\tunpack_var_%s_%d = %s_nocheck(tup%d.data, unpack_offset_%s_%d); // unpack_cid\n",field.c_str(), tblref, unpack_fcn.c_str(), tblref, field.c_str(), tblref);
}
ret += tmpstr;
if(dt.is_buffer_type()){
if(structured_types.size()==0){
ret += "\t"+generate_functor_name() + "_groupdef(){};\n";
}else{
- ret += "\t"+generate_functor_name() + "_groupdef(){}\n";
+ ret += "\t"+generate_functor_name() + "_groupdef(){\n";
+ for(g=0;g<gb_tbl.size();g++){
+ data_type *gdt = gb_tbl.get_data_type(g);
+ if(gdt->is_buffer_type()){
+ sprintf(tmpstr,"\t\t%s(&gb_var%d);\n",
+ gdt->get_hfta_buffer_init().c_str(), g );
+ ret += tmpstr;
+ }
+ }
+ ret += "\t};\n";
}
// For temporal status tuple we don't need to do anything else
- ret += "\tif (temp_tuple_received) return NULL;\n\n";
+ ret += "\tif (temp_tuple_received){\n";
+ ret += "\t\tdisordered_arrival = false;\n";
+ ret += "\t\treturn NULL;\n\n";
+ ret += "\t}\n";
for(w=0;w<where.size();++w){
sprintf(tmpstr,"//\t\tPredicate clause %d.\n",w);
// Deal with outer join stuff
col_id_set l_cids, r_cids;
+ col_id_set l_base_cids, r_base_cids; // l_cids and r_cids get modified
+ // to account for extra_f fields to
+ // unpack for value imputation
col_id_set::iterator ocsi;
for(ocsi=local_cids.begin();ocsi!=local_cids.end();++ocsi){
- if((*ocsi).tblvar_ref == 0) l_cids.insert((*ocsi));
- else r_cids.insert((*ocsi));
+ if((*ocsi).tblvar_ref == 0){
+ l_cids.insert((*ocsi)); l_base_cids.insert((*ocsi));
+ }else{
+ r_cids.insert((*ocsi)); r_base_cids.insert((*ocsi));
+ }
}
for(ocsi=se_cids.begin();ocsi!=se_cids.end();++ocsi){
- if((*ocsi).tblvar_ref == 0) l_cids.insert((*ocsi));
- else r_cids.insert((*ocsi));
+ if((*ocsi).tblvar_ref == 0){
+ l_cids.insert((*ocsi)); l_base_cids.insert((*ocsi));
+ }else{
+ r_cids.insert((*ocsi)); r_base_cids.insert((*ocsi));
+ }
}
ret += "\t}else if(tup0.data){\n";
string unpack_null = ""; col_id_set extra_cids;
- for(ocsi=r_cids.begin();ocsi!=r_cids.end();++ocsi){
+ for(ocsi=r_base_cids.begin();ocsi!=r_base_cids.end();++ocsi){
string field = (*ocsi).field;
if(r_equiv.count(field)){
- unpack_null+="\t\tunpack_var_"+field+"_1="+generate_se_code(r_equiv[field],schema)+";\n";
+ unpack_null+="\t\tunpack_var_"+field+"_1="+generate_se_code(r_equiv[field],schema)+"; // r_equiv\n";
get_new_se_cids(r_equiv[field],l_cids,new_cids,NULL);
}else{
int schref = (*ocsi).schema_ref;
// NB : works for string type only
// NNB: installed fix for ipv6, more of this should be pushed
// into the literal_t code.
- unpack_null+="\tunpack_var_"+field+"_1= "+empty_lit.hfta_empty_literal_name()+";\n";
+ unpack_null+="\t\tunpack_var_"+field+"_1= "+empty_lit.hfta_empty_literal_name()+"; // empty\n";
}else{
- unpack_null+="\tunpack_var_"+field+"_1="+empty_lit.to_hfta_C_code("")+";\n";
+ unpack_null+="\t\tunpack_var_"+field+"_1="+empty_lit.to_hfta_C_code("")+"; // empty\n";
}
}
}
+ ret += "// l_cids\n";
ret += gen_unpack_cids(schema, l_cids, "tup", needs_xform);
+ ret += "// extra_cids\n";
ret += gen_unpack_cids(schema, extra_cids, "tup", needs_xform);
ret += unpack_null;
ret += gen_unpack_partial_fcn(schema, partial_fcns, sl_pfcns, "tup");
ret+="\t}else{\n";
- unpack_null = ""; extra_cids.clear();
- for(ocsi=l_cids.begin();ocsi!=l_cids.end();++ocsi){
+ unpack_null = ""; extra_cids.clear(); new_cids.clear();
+ for(ocsi=l_base_cids.begin();ocsi!=l_base_cids.end();++ocsi){
string field = (*ocsi).field;
if(l_equiv.count(field)){
- unpack_null+="\t\tunpack_var_"+field+"_0="+generate_se_code(l_equiv[field],schema)+";\n";
+ unpack_null+="\t\tunpack_var_"+field+"_0="+generate_se_code(l_equiv[field],schema)+"; // l_equiv\n";
get_new_se_cids(l_equiv[field],r_cids,new_cids,NULL);
}else{
int schref = (*ocsi).schema_ref;
// NB : works for string type only
// NNB: installed fix for ipv6, more of this should be pushed
// into the literal_t code.
- unpack_null+="\tunpack_var_"+field+"_0= "+empty_lit.hfta_empty_literal_name()+";\n";
+ unpack_null+="\t\tunpack_var_"+field+"_0= "+empty_lit.hfta_empty_literal_name()+"; // empty\n";
}else{
- unpack_null+="\tunpack_var_"+field+"_0="+empty_lit.to_hfta_C_code("")+";\n";
+ unpack_null+="\t\tunpack_var_"+field+"_0="+empty_lit.to_hfta_C_code("")+"; // empty\n";
}
}
}
+ ret += "// r_cids\n";
ret += gen_unpack_cids(schema, r_cids, "tup", needs_xform);
+ ret += "// extra_cids\n";
ret += gen_unpack_cids(schema, extra_cids, "tup", needs_xform);
ret += unpack_null;
ret += gen_unpack_partial_fcn(schema, partial_fcns, sl_pfcns, "tup");
}
// Constructors
- ret += "\t"+generate_functor_name() + "_groupdef(){};\n";
+ ret += "\t"+generate_functor_name() + "_groupdef(){\n";
+ for(g=0;g<gb_tbl.size();g++){
+ data_type *gdt = gb_tbl.get_data_type(g);
+ if(gdt->is_buffer_type()){
+ sprintf(tmpstr,"\t\t%s(&gb_var%d);\n",
+ gdt->get_hfta_buffer_init().c_str(), g );
+ ret += tmpstr;
+ }
+ }
+ ret += "\t};\n";
+
ret += "\t// shallow copy constructor\n";
ret += "\t"+generate_functor_name() + "_groupdef("+
this->generate_functor_name() + "_groupdef &gd){\n";
// Dump schema summary
void dump_summary(stream_query *str){
+ for(int q=0;q<str->query_plan.size();++q){
+ qp_node *qp = str->query_plan[q];
+ if(qp==NULL)
+ continue; // there can be blanks
+
+ fprintf(schema_summary_output,"-----\n");
+ fprintf(schema_summary_output,"%s\n",qp->node_name.c_str());
+
+ table_def *sch = qp->get_fields();
+
+ vector<field_entry *> flds = sch->get_fields();
+ int f;
+ for(f=0;f<flds.size();++f){
+ if(f>0) fprintf(schema_summary_output,"|");
+ fprintf(schema_summary_output,"%s",flds[f]->get_name().c_str());
+ }
+ fprintf(schema_summary_output,"\n");
+ for(f=0;f<flds.size();++f){
+ if(f>0) fprintf(schema_summary_output,"|");
+ fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
+ }
+ fprintf(schema_summary_output,"\n");
+
+ map<std::string, std::string> defines = qp->get_definitions();
+ string comment = "";
+ if(defines.count("comment")>0){
+ comment = defines["comment"];
+ }
+ fprintf(schema_summary_output,"%s\n",comment.c_str());
+
+ vector<tablevar_t *> input_tables = qp->get_input_tbls();
+ for(int t=0; t<input_tables.size(); ++t){
+ if(t>0) fprintf(schema_summary_output,"|");
+ string machine = input_tables[t]->get_machine();
+ string iface = input_tables[t]->get_interface();
+ string schema = input_tables[t]->get_schema_name();
+ if(machine!=""){
+ fprintf(schema_summary_output,"%s.",machine.c_str());
+ }
+ if(iface!=""){
+ fprintf(schema_summary_output,"%s.",iface.c_str());
+ }else{
+ if(machine!="") fprintf(schema_summary_output,".");
+ }
+ fprintf(schema_summary_output,"%s",schema.c_str());
+ }
+
+ fprintf(schema_summary_output,"\n");
+ }
+
+
+
+/*
+ fprintf(schema_summary_output,"-----\n");
fprintf(schema_summary_output,"%s\n",str->query_name.c_str());
table_def *sch = str->get_output_tabledef();
fprintf(schema_summary_output,"%s",flds[f]->get_type().c_str());
}
fprintf(schema_summary_output,"\n");
+
+ string comment = "";
+ if(str->defines.count("comment")>0){
+ comment = str->defines["comment"];
+ }
+ fprintf(schema_summary_output,"%s\n",comment.c_str());
+
+ vector<tablevar_t *> input_tables = str->get_input_tables();
+ for(int t=0; t<input_tables.size(); ++t){
+ if(t>0) fprintf(schema_summary_output,"|");
+ string machine = input_tables[t]->get_machine();
+ string iface = input_tables[t]->get_interface();
+ string schema = input_tables[t]->get_schema_name();
+ if(machine!=""){
+ fprintf(schema_summary_output,"%s.",machine.c_str());
+ }
+ if(iface!=""){
+ fprintf(schema_summary_output,"%s.",iface.c_str());
+ }else{
+ if(machine!="") fprintf(schema_summary_output,".");
+ }
+ fprintf(schema_summary_output,"%s",schema.c_str());
+ }
+ fprintf(schema_summary_output,"\n");
+*/
+
}
// Globals
map<string, int> hfta_name_map;
// vector< vector<int> > process_sets;
// vector< set<int> > stream_node_sets;
- reverse(process_order.begin(), process_order.end()); // get listing in reverse order.
- // i.e. process leaves 1st.
+ reverse(process_order.begin(), process_order.end()); // get listing in reverse .
+ // order: process leaves 1st.
for(i=0;i<process_order.size();++i){
if(qnodes[process_order[i]]->is_externally_visible == true){
//printf("Visible.\n");
}
*/
-/*
-// output schema summary
- if(output_schema_summary){
- dump_summary(split_queries[0]);
- }
-*/
}
+// output schema summary
+ if(output_schema_summary){
+ for(int o=0;o<split_queries.size(); ++o){
+ dump_summary(split_queries[o]);
+ }
+ }
if(hfta_returned){ // query also has an HFTA component
return("ERROR_NO_SUCH_buffer_destroy_FCN");
}
+
+string data_type::get_hfta_buffer_init(){
+ switch(type){
+ case v_str_t:
+ return("hfta_vstr_init");
+ default:
+ break;
+ }
+
+ return("ERROR_NO_SUCH_buffer_init_FCN");
+}
//-----------------------------
std::string get_hfta_buffer_replace();
std::string get_hfta_buffer_size();
std::string get_hfta_buffer_destroy();
+ std::string get_hfta_buffer_init();
bool is_structured_type();
// std::string get_interface_type();
str->length = 0;
}
+// Make the vstring safe to destroy even if its never initialized
+// (e.g. stack-allocated groups containing strings)
+void hfta_vstr_init(vstring * str) {
+ str->length = 0;
+}
gs_retval_t hfta_vstr_length(vstring *str) {
return str->length;