X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=include%2Fhfta%2Frunning_gb_operator.h;h=c20c87f64ae6f88da743e3e26da586f36f478eaa;hb=7cec316889150a8a92238e52c7bad1270608b333;hp=5bd7388d459c794b2d479f016b63e2d89700e048;hpb=e981e864b812c938d3df8b555b6bb98bb89273e7;p=com%2Fgs-lite.git diff --git a/include/hfta/running_gb_operator.h b/include/hfta/running_gb_operator.h index 5bd7388..c20c87f 100644 --- a/include/hfta/running_gb_operator.h +++ b/include/hfta/running_gb_operator.h @@ -31,6 +31,7 @@ private : groupby_func func; hash_table group_table; typename hash_table::iterator flush_pos; + gs_int32_t nflushes; @@ -43,21 +44,17 @@ public: // Push out completed groups - // create buffer on the stack to store key object +// create buffer on the stack to store key object char buffer[sizeof(group)]; +// Number of flushes required - // extract the key information from the tuple and - // copy it into buffer +// extract the key information from the tuple and +// copy it into buffer group* grp = func.create_group(tup, buffer); -/*// Ignore temp tuples until we can fix their timestamps. -if (func.temp_status_received()) { - tup.free_tuple(); - return 0; -} -*/ - + nflushes = func.flush_needed(); + if (!grp) { - if (func.flush_needed()){ + if (nflushes>0){ flush(result); } if (func.temp_status_received()) { @@ -71,7 +68,7 @@ if (func.temp_status_received()) { return 0; } - if (func.flush_needed()) { + if (nflushes>0) { flush(result); } typename hash_table::iterator iter; @@ -95,24 +92,40 @@ if (func.temp_status_received()) { virtual int flush(list& result) { host_tuple tup; typename hash_table::iterator iter; -// If the old table isn't empty, flush it now. - for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) { - bool failed = false; - tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed); - if (!failed) { - tup.channel = output_channel; - result.push_back(tup); - } - if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){ - group* g = (*flush_pos).first; - aggregate* a = (*flush_pos).second; - ++flush_pos; - group_table.erase(g); - delete (g); - delete (a); + +// Limit the number of successive flushes - avoid explosive behavior + const gs_int32_t max_flushes = 10; + if(nflushes>max_flushes){ + fprintf(stderr,"Warning in operator %s, temporal advance of %d windows needed, max number of windows that can be reported at once is %d\n",op_name, nflushes, max_flushes); + nflushes = max_flushes; + } + + for(gs_int32_t flush_no = 0; flush_no < nflushes; ++flush_no){ +// advance the TB for the reinit + if(flush_no < nflushes-1){ + func.advance_last_tb(); }else{ - func.reinit_aggregates((*flush_pos).first, (*flush_pos).second); - ++flush_pos; + func.reset_last_tb(); // Move to current tb in case flush limit reached + } +// If the old table isn't empty, flush it now. + for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) { + bool failed = false; + tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed); + if (!failed) { + tup.channel = output_channel; + result.push_back(tup); + } + if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){ + group* g = (*flush_pos).first; + aggregate* a = (*flush_pos).second; + ++flush_pos; + group_table.erase(g); + delete (g); + delete (a); + }else{ + func.reinit_aggregates((*flush_pos).first, (*flush_pos).second); + ++flush_pos; + } } }