ret = func.create_group(tup, (gs_sp_t)&grp);
nflushes = func.flush_needed();
if(func.disordered()){
- fprintf(stderr,"Out of order record in %s\n",op_name);
+ // fprintf(stderr,"Out of order record in %s\n",op_name);
return 0;
}
if ((iter = group_table.find(grp)) != group_table.end()) {
func.update_aggregate(tup, grp, (*iter).second);
}else{
- aggregate aggr;
+ char aggr_buffer[sizeof(aggregate)];
// create an aggregate in preallocated buffer
- func.create_aggregate(tup, (char*)&aggr);
+ func.create_aggregate(tup, aggr_buffer);
// neeed operator= doing a deep copy
- group_table.insert(grp, aggr);
+ group_table.insert(grp, (*(aggregate*)aggr_buffer));
}
tup.free_tuple();
return 0;
typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
// Limit the number of successive flushes - avoid explosive behavior
- const gs_int32_t max_flushes = 10;
+ const gs_int32_t max_flushes = 25;
if(nflushes>max_flushes){
fprintf(stderr,"Warning in operator %s, temporal advance of %d windows needed, max number of windows that can be reported at once is %d\n",op_name, nflushes, max_flushes);
nflushes = max_flushes;