ret += "( ";
if(ldt->complex_comparison(ldt) ){
- ret += ldt->get_comparison_fcn(ldt) ;
+ ret += ldt->get_equals_fcn(ldt) ;
ret += "( ";
if(ldt->is_buffer_type() ) ret += "&";
ret += generate_se_code(pr->get_left_se(), schema);
ret += "( ";
if(ldt->complex_comparison(rdt) ){
+// TODO can use get_equals_fcn if op is "=" ?
ret += ldt->get_comparison_fcn(rdt);
ret += "(";
if(ldt->is_buffer_type() ) ret += "&";
string ret;
if(dt->complex_comparison(dt) ){
- ret += dt->get_comparison_fcn(dt);
+ ret += dt->get_equals_fcn(dt);
ret += "(";
if(dt->is_buffer_type() ) ret += "&";
ret += lhs_op;
return(ret);
}
-static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
- string ret;
-
- if(dt->complex_comparison(dt) ){
- ret += dt->get_comparison_fcn(dt);
- ret += "(";
- if(dt->is_buffer_type() ) ret += "&";
- ret += lhs_op;
- ret += ", ";
- if(dt->is_buffer_type() ) ret += "&";
- ret += rhs_op;
- ret += ") == 0";
- }else{
- ret += lhs_op;
- ret += " == ";
- ret += rhs_op;
- }
-
- return(ret);
-}
+//static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
+// string ret;
+//
+ // if(dt->complex_comparison(dt) ){
+// ret += dt->get_equals_fcn(dt);
+// ret += "(";
+// if(dt->is_buffer_type() ) ret += "&";
+// ret += lhs_op;
+// ret += ", ";
+// if(dt->is_buffer_type() ) ret += "&";
+// ret += rhs_op;
+// ret += ") == 0";
+// }else{
+// ret += lhs_op;
+// ret += " == ";
+// ret += rhs_op;
+// }
+//
+// return(ret);
+//}
// Here I assume that only MIN and MAX aggregates can be computed
// over BUFFER data types.
// ret += "#include \"fta.h\"\n\n");
string ret = "#ifndef LFTA_IN_NIC\n";
- ret += "char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n";
+ ret += "const char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n";
ret += "#include<stdio.h>\n";
ret += "#include <limits.h>\n";
ret += "#include <float.h>\n";
ret += ";\n";
- ret += "\t\t\ttuple = allocate_tuple(f, tuple_size );\n";
+ ret += "\t\t\ttuple = (struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
ret += "\t\t\tif( tuple != NULL){\n";
ret+="\tif(command == FTA_COMMAND_FLUSH){\n";
ret+="\t\tif (!t->n_aggrs) {\n";
- ret+="\t\t\ttuple = allocate_tuple(f, 0);\n";
+ ret+="\t\t\ttuple = (struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, 0);\n";
ret+="\t\t\tif( tuple != NULL)\n";
ret+="\t\t\t\tpost_tuple(tuple);\n";
}
ret += "\t\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+");\n";
- ret += "\t\ttuple = allocate_tuple(f, tuple_size );\n";
+ ret += "\t\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
ret += "\t\tif( tuple == NULL)\n\t\treturn 1;\n";
/* mark tuple as EOF_TUPLE */
}
ret += "\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t) + sizeof(struct fta_stat);\n";
- ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
+ ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+"*)allocate_tuple(f, tuple_size );\n";
ret += "\tif( tuple == NULL)\n\t\treturn 1;\n";
ret+="\tpost_tuple(tuple);\n";
ret += "\n\t/* Send a heartbeat message to clearinghouse */\n";
- ret += "\tfta_heartbeat(f->ftaid, t->trace_id++, 1, &stats);\n";
+ ret += "\n\t/* Disable heartbeats for now to avoid overloading clearinghouse */\n";
+ ret += "\t/* fta_heartbeat(f->ftaid, t->trace_id++, 1, &stats); */\n";
ret += "\n\t/* Reset runtime stats */\n";
ret += "\tt->in_tuple_cnt = 0;\n";
ret += ";\n";
- ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
+ ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
// Test passed, make assignments to the tuple.
ret += ";\n";
- ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
+ ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
// Test passed, make assignments to the tuple.
ret += ";\n";
- ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
+ ret += "\ttuple = ( struct "+generate_tuple_name(node_name)+" *)allocate_tuple(f, tuple_size );\n";
ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
// Test passed, make assignments to the tuple.
ret+="\tstruct "+generate_fta_name(node_name)+"* f;\n";
ret+="\tint i;\n";
ret += "\n";
- ret+="\tif((f=fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n";
+ ret+="\tif((f=(struct "+generate_fta_name(node_name)+" *)fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n";
// assign a streamid to fta instance
ret+="\t/* assign a streamid */\n";
}
ret += ";\n";
- ret+="\tif ((f->aggr_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n";
+ ret+="\tif ((f->aggr_table = (struct "+generate_aggr_struct_name(node_name)+" *)sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n";
ret+="\t\treturn(0);\n";
ret+="\t}\n\n";
// ret+="/* compute how many integers we need to store the hashmap */\n";
// ret+="\tf->bitmap_size = (f->max_aggrs % (sizeof(gs_uint32_t) * 4)) ? (f->max_aggrs / (sizeof(gs_uint32_t) * 4) + 1) : (f->max_aggrs / (sizeof(gs_uint32_t) * 4));\n\n";
- ret+="\tif ((f->aggr_table_hashmap = sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n";
+ ret+="\tif ((f->aggr_table_hashmap = (gs_uint32_t *)sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n";
ret+="\t\treturn(0);\n";
ret+="\t}\n";
ret+="/*\t\tfill bitmap with zero \t*/\n";
int bf_byte_size = bf_bit_size / (8*sizeof(char));
int bf_tot = n_bloom*bf_byte_size;
- ret+="\tif ((f->bf_table = sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n";
+ ret+="\tif ((f->bf_table = (unsigned char *)sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n";
ret+="\t\treturn(0);\n";
ret+="\t}\n";
ret +=
}
ht_size = hs;
}
- ret+="\tif ((f->join_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n";
+ ret+="\tif ((f->join_table = (struct "+generate_fj_struct_name(node_name)+" *) sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n";
ret+="\t\treturn(0);\n";
ret+="\t}\n\n";
ret +=
}
// Build list of "partial functions", by clause.
-// NOTE : partial fcns are not handles well.
+// NOTE : partial fcns are not handled well.
// The act of searching for them associates the fcn call
// in the SE with an index to an array. Refs to the
// fcn value are replaced with refs to the variable they are
ag_fcns_start = gb_fcns_end = partial_fcns.size();
if(aggr_tbl != NULL){
for(i=0;i<aggr_tbl->size();i++){
- find_partial_fcns(aggr_tbl->get_aggr_se(i), NULL, NULL, &is_partial_fcn, Ext_fcns);
+ find_partial_fcns(aggr_tbl->get_aggr_se(i), &partial_fcns, NULL, &is_partial_fcn, Ext_fcns);
}
}
ag_fcns_end = partial_fcns.size();
-int compute_snap_len(qp_node *fs, table_list *schema){
+int compute_snap_len(qp_node *fs, table_list *schema, string snap_type){
// Initialize global vars
gb_tbl = NULL;
int tblref = (*csi).tblvar_ref;
string field = (*csi).field;
- param_list *field_params = schema->get_modifier_list(schref, field);
- if(field_params->contains_key("snap_len")){
- string fld_snap_str = field_params->val_of("snap_len");
- int fld_snap;
- if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
- if(fld_snap > snap_len) snap_len = fld_snap;
- n_snap++;
- }else{
- fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
+ if(snap_type == "index"){
+ int pos = schema->get_field_idx(schref, field);
+ if(pos>snap_len) snap_len = pos;
+ n_snap++;
+ }else{
+ param_list *field_params = schema->get_modifier_list(schref, field);
+ if(field_params->contains_key("snap_len")){
+ string fld_snap_str = field_params->val_of("snap_len");
+ int fld_snap;
+ if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
+ if(fld_snap > snap_len) snap_len = fld_snap;
+ n_snap++;
+ }else{
+ fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
+ }
}
}
}