class running_agg_operator : public base_operator {
private :
groupby_func func;
- hash_table<group*, aggregate*, hasher_func, equal_func> group_table;
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
+ hash_table<group, aggregate, hasher_func, equal_func> group_table;
+ typename hash_table<group, aggregate, hasher_func, equal_func>::iterator flush_pos;
gs_int32_t nflushes;
// Push out completed groups
-// create buffer on the stack to store key object
- char buffer[sizeof(group)];
-// Number of flushes required
-
-// extract the key information from the tuple and
-// copy it into buffer
- group* grp = func.create_group(tup, buffer);
+ group grp, *ret;
+ ret = func.create_group(tup, (gs_sp_t)&grp);
nflushes = func.flush_needed();
-
- if (!grp) {
+ if(func.disordered()){
+ fprintf(stderr,"Out of order record in %s\n",op_name);
+ return 0;
+ }
+
+ if (! ret) {
if (nflushes>0){
flush(result);
}
if (nflushes>0) {
flush(result);
}
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
+ typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
if ((iter = group_table.find(grp)) != group_table.end()) {
- aggregate* old_aggr = (*iter).second;
- func.update_aggregate(tup, grp, old_aggr);
+ func.update_aggregate(tup, grp, (*iter).second);
}else{
- // create a copy of the group on the heap
- group* new_grp = new group(grp); // need a copy constructor for groups
-// aggregate* aggr = (aggregate*)malloc(sizeof(aggregate));
- aggregate* aggr = new aggregate();
- // create an aggregate in preallocated buffer
- aggr = func.create_aggregate(tup, (char*)aggr);
-
- group_table.insert(new_grp, aggr);
+ aggregate aggr;
+ // create an aggregate in preallocated buffer
+ func.create_aggregate(tup, (char*)&aggr);
+ // neeed operator= doing a deep copy
+ group_table.insert(grp, aggr);
}
tup.free_tuple();
return 0;
virtual int flush(list<host_tuple>& result) {
host_tuple tup;
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
+ typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
// Limit the number of successive flushes - avoid explosive behavior
const gs_int32_t max_flushes = 10;
result.push_back(tup);
}
if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){
- group* g = (*flush_pos).first;
- aggregate* a = (*flush_pos).second;
+ group &g = (*flush_pos).first;
+ //aggregate a = (*flush_pos).second;
++flush_pos;
group_table.erase(g);
- delete (g);
- delete (a);
}else{
func.reinit_aggregates((*flush_pos).first, (*flush_pos).second);
++flush_pos;