- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
-// If the old table isn't empty, flush it now.
- for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) {
- bool failed = false;
- tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
- if (!failed) {
- tup.channel = output_channel;
- result.push_back(tup);
- }
- if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){
- group* g = (*flush_pos).first;
- aggregate* a = (*flush_pos).second;
- ++flush_pos;
- group_table.erase(g);
- delete (g);
- delete (a);
+ typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
+
+// Limit the number of successive flushes - avoid explosive behavior
+ const gs_int32_t max_flushes = 25;
+ if(nflushes>max_flushes){
+ fprintf(stderr,"Warning in operator %s, temporal advance of %d windows needed, max number of windows that can be reported at once is %d\n",op_name, nflushes, max_flushes);
+ nflushes = max_flushes;
+ }
+
+ for(gs_int32_t flush_no = 0; flush_no < nflushes; ++flush_no){
+// advance the TB for the reinit
+ if(flush_no < nflushes-1){
+ func.advance_last_tb();