groupby_func func;
hash_table<group*, aggregate*, hasher_func, equal_func> group_table;
typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
+ gs_int32_t nflushes;
// Push out completed groups
- // create buffer on the stack to store key object
+// create buffer on the stack to store key object
char buffer[sizeof(group)];
+// Number of flushes required
- // extract the key information from the tuple and
- // copy it into buffer
+// extract the key information from the tuple and
+// copy it into buffer
group* grp = func.create_group(tup, buffer);
-/*// Ignore temp tuples until we can fix their timestamps.
-if (func.temp_status_received()) {
- tup.free_tuple();
- return 0;
-}
-*/
-
+ nflushes = func.flush_needed();
+
if (!grp) {
- if (func.flush_needed()){
+ if (nflushes>0){
flush(result);
}
if (func.temp_status_received()) {
return 0;
}
- if (func.flush_needed()) {
+ if (nflushes>0) {
flush(result);
}
typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
virtual int flush(list<host_tuple>& result) {
host_tuple tup;
typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
-// If the old table isn't empty, flush it now.
- for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) {
- bool failed = false;
- tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
- if (!failed) {
- tup.channel = output_channel;
- result.push_back(tup);
- }
- if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){
- group* g = (*flush_pos).first;
- aggregate* a = (*flush_pos).second;
- ++flush_pos;
- group_table.erase(g);
- delete (g);
- delete (a);
+
+// Limit the number of successive flushes - avoid explosive behavior
+ const gs_int32_t max_flushes = 10;
+ if(nflushes>max_flushes){
+ fprintf(stderr,"Warning in operator %s, temporal advance of %d windows needed, max number of windows that can be reported at once is %d\n",op_name, nflushes, max_flushes);
+ nflushes = max_flushes;
+ }
+
+ for(gs_int32_t flush_no = 0; flush_no < nflushes; ++flush_no){
+// advance the TB for the reinit
+ if(flush_no < nflushes-1){
+ func.advance_last_tb();
}else{
- func.reinit_aggregates((*flush_pos).first, (*flush_pos).second);
- ++flush_pos;
+ func.reset_last_tb(); // Move to current tb in case flush limit reached
+ }
+// If the old table isn't empty, flush it now.
+ for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) {
+ bool failed = false;
+ tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
+ if (!failed) {
+ tup.channel = output_channel;
+ result.push_back(tup);
+ }
+ if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){
+ group* g = (*flush_pos).first;
+ aggregate* a = (*flush_pos).second;
+ ++flush_pos;
+ group_table.erase(g);
+ delete (g);
+ delete (a);
+ }else{
+ func.reinit_aggregates((*flush_pos).first, (*flush_pos).second);
+ ++flush_pos;
+ }
}
}