class running_agg_operator : public base_operator {
private :
groupby_func func;
- hash_table<group*, aggregate*, hasher_func, equal_func> group_table;
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
+ hash_table<group, aggregate, hasher_func, equal_func> group_table;
+ typename hash_table<group, aggregate, hasher_func, equal_func>::iterator flush_pos;
+ gs_int32_t nflushes;
// Push out completed groups
- // create buffer on the stack to store key object
- char buffer[sizeof(group)];
-
- // extract the key information from the tuple and
- // copy it into buffer
- group* grp = func.create_group(tup, buffer);
-/*// Ignore temp tuples until we can fix their timestamps.
-if (func.temp_status_received()) {
- tup.free_tuple();
- return 0;
-}
-*/
-
- if (!grp) {
- if (func.flush_needed()){
+ group grp, *ret;
+ ret = func.create_group(tup, (gs_sp_t)&grp);
+ nflushes = func.flush_needed();
+ if(func.disordered()){
+ fprintf(stderr,"Out of order record in %s\n",op_name);
+ return 0;
+ }
+
+ if (! ret) {
+ if (nflushes>0){
flush(result);
}
if (func.temp_status_received()) {
return 0;
}
- if (func.flush_needed()) {
+ if (nflushes>0) {
flush(result);
}
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
+ typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
if ((iter = group_table.find(grp)) != group_table.end()) {
- aggregate* old_aggr = (*iter).second;
- func.update_aggregate(tup, grp, old_aggr);
+ func.update_aggregate(tup, grp, (*iter).second);
}else{
- // create a copy of the group on the heap
- group* new_grp = new group(grp); // need a copy constructor for groups
-// aggregate* aggr = (aggregate*)malloc(sizeof(aggregate));
- aggregate* aggr = new aggregate();
- // create an aggregate in preallocated buffer
- aggr = func.create_aggregate(tup, (char*)aggr);
-
- group_table.insert(new_grp, aggr);
+ aggregate aggr;
+ // create an aggregate in preallocated buffer
+ func.create_aggregate(tup, (char*)&aggr);
+ // neeed operator= doing a deep copy
+ group_table.insert(grp, aggr);
}
tup.free_tuple();
return 0;
virtual int flush(list<host_tuple>& result) {
host_tuple tup;
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
-// If the old table isn't empty, flush it now.
- for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) {
- bool failed = false;
- tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
- if (!failed) {
- tup.channel = output_channel;
- result.push_back(tup);
- }
- if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){
- group* g = (*flush_pos).first;
- aggregate* a = (*flush_pos).second;
- ++flush_pos;
- group_table.erase(g);
- delete (g);
- delete (a);
+ typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
+
+// Limit the number of successive flushes - avoid explosive behavior
+ const gs_int32_t max_flushes = 10;
+ if(nflushes>max_flushes){
+ fprintf(stderr,"Warning in operator %s, temporal advance of %d windows needed, max number of windows that can be reported at once is %d\n",op_name, nflushes, max_flushes);
+ nflushes = max_flushes;
+ }
+
+ for(gs_int32_t flush_no = 0; flush_no < nflushes; ++flush_no){
+// advance the TB for the reinit
+ if(flush_no < nflushes-1){
+ func.advance_last_tb();
}else{
- func.reinit_aggregates((*flush_pos).first, (*flush_pos).second);
- ++flush_pos;
+ func.reset_last_tb(); // Move to current tb in case flush limit reached
+ }
+// If the old table isn't empty, flush it now.
+ for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) {
+ bool failed = false;
+ tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
+ if (!failed) {
+ tup.channel = output_channel;
+ result.push_back(tup);
+ }
+ if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){
+ group &g = (*flush_pos).first;
+ //aggregate a = (*flush_pos).second;
+ ++flush_pos;
+ group_table.erase(g);
+ }else{
+ func.reinit_aggregates((*flush_pos).first, (*flush_pos).second);
+ ++flush_pos;
+ }
}
}