X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=include%2Fhfta%2Frunning_gb_operator.h;h=ca2e7fc0c9bd88a8bbc8b87868d29154489620bf;hb=52bf6cf53a585197f998187399ebfd88681d4490;hp=5bd7388d459c794b2d479f016b63e2d89700e048;hpb=3ff5c433efcaee8b01fbeed90ab848008f2e6278;p=com%2Fgs-lite.git diff --git a/include/hfta/running_gb_operator.h b/include/hfta/running_gb_operator.h index 5bd7388..ca2e7fc 100644 --- a/include/hfta/running_gb_operator.h +++ b/include/hfta/running_gb_operator.h @@ -29,8 +29,9 @@ template group_table; - typename hash_table::iterator flush_pos; + hash_table group_table; + typename hash_table::iterator flush_pos; + gs_int32_t nflushes; @@ -43,21 +44,16 @@ public: // Push out completed groups - // create buffer on the stack to store key object - char buffer[sizeof(group)]; - - // extract the key information from the tuple and - // copy it into buffer - group* grp = func.create_group(tup, buffer); -/*// Ignore temp tuples until we can fix their timestamps. -if (func.temp_status_received()) { - tup.free_tuple(); - return 0; -} -*/ - - if (!grp) { - if (func.flush_needed()){ + group grp, *ret; + ret = func.create_group(tup, (gs_sp_t)&grp); + nflushes = func.flush_needed(); + if(func.disordered()){ + // fprintf(stderr,"Out of order record in %s\n",op_name); + return 0; + } + + if (! ret) { + if (nflushes>0){ flush(result); } if (func.temp_status_received()) { @@ -71,22 +67,18 @@ if (func.temp_status_received()) { return 0; } - if (func.flush_needed()) { + if (nflushes>0) { flush(result); } - typename hash_table::iterator iter; + typename hash_table::iterator iter; if ((iter = group_table.find(grp)) != group_table.end()) { - aggregate* old_aggr = (*iter).second; - func.update_aggregate(tup, grp, old_aggr); + func.update_aggregate(tup, grp, (*iter).second); }else{ - // create a copy of the group on the heap - group* new_grp = new group(grp); // need a copy constructor for groups -// aggregate* aggr = (aggregate*)malloc(sizeof(aggregate)); - aggregate* aggr = new aggregate(); - // create an aggregate in preallocated buffer - aggr = func.create_aggregate(tup, (char*)aggr); - - group_table.insert(new_grp, aggr); + char aggr_buffer[sizeof(aggregate)]; + // create an aggregate in preallocated buffer + func.create_aggregate(tup, aggr_buffer); + // neeed operator= doing a deep copy + group_table.insert(grp, (*(aggregate*)aggr_buffer)); } tup.free_tuple(); return 0; @@ -94,25 +86,39 @@ if (func.temp_status_received()) { virtual int flush(list& result) { host_tuple tup; - typename hash_table::iterator iter; -// If the old table isn't empty, flush it now. - for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) { - bool failed = false; - tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed); - if (!failed) { - tup.channel = output_channel; - result.push_back(tup); - } - if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){ - group* g = (*flush_pos).first; - aggregate* a = (*flush_pos).second; - ++flush_pos; - group_table.erase(g); - delete (g); - delete (a); + typename hash_table::iterator iter; + +// Limit the number of successive flushes - avoid explosive behavior + const gs_int32_t max_flushes = 25; + if(nflushes>max_flushes){ + fprintf(stderr,"Warning in operator %s, temporal advance of %d windows needed, max number of windows that can be reported at once is %d\n",op_name, nflushes, max_flushes); + nflushes = max_flushes; + } + + for(gs_int32_t flush_no = 0; flush_no < nflushes; ++flush_no){ +// advance the TB for the reinit + if(flush_no < nflushes-1){ + func.advance_last_tb(); }else{ - func.reinit_aggregates((*flush_pos).first, (*flush_pos).second); - ++flush_pos; + func.reset_last_tb(); // Move to current tb in case flush limit reached + } +// If the old table isn't empty, flush it now. + for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) { + bool failed = false; + tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed); + if (!failed) { + tup.channel = output_channel; + result.push_back(tup); + } + if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){ + group &g = (*flush_pos).first; + //aggregate a = (*flush_pos).second; + ++flush_pos; + group_table.erase(g); + }else{ + func.reinit_aggregates((*flush_pos).first, (*flush_pos).second); + ++flush_pos; + } } }