X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=include%2Fhfta%2Frunning_gb_operator.h;h=ca2e7fc0c9bd88a8bbc8b87868d29154489620bf;hb=989d19428b3d21982b048cf256f625a8ca664c2e;hp=c20c87f64ae6f88da743e3e26da586f36f478eaa;hpb=7cec316889150a8a92238e52c7bad1270608b333;p=com%2Fgs-lite.git diff --git a/include/hfta/running_gb_operator.h b/include/hfta/running_gb_operator.h index c20c87f..ca2e7fc 100644 --- a/include/hfta/running_gb_operator.h +++ b/include/hfta/running_gb_operator.h @@ -29,8 +29,8 @@ template group_table; - typename hash_table::iterator flush_pos; + hash_table group_table; + typename hash_table::iterator flush_pos; gs_int32_t nflushes; @@ -44,16 +44,15 @@ public: // Push out completed groups -// create buffer on the stack to store key object - char buffer[sizeof(group)]; -// Number of flushes required - -// extract the key information from the tuple and -// copy it into buffer - group* grp = func.create_group(tup, buffer); + group grp, *ret; + ret = func.create_group(tup, (gs_sp_t)&grp); nflushes = func.flush_needed(); - - if (!grp) { + if(func.disordered()){ + // fprintf(stderr,"Out of order record in %s\n",op_name); + return 0; + } + + if (! ret) { if (nflushes>0){ flush(result); } @@ -71,19 +70,15 @@ public: if (nflushes>0) { flush(result); } - typename hash_table::iterator iter; + typename hash_table::iterator iter; if ((iter = group_table.find(grp)) != group_table.end()) { - aggregate* old_aggr = (*iter).second; - func.update_aggregate(tup, grp, old_aggr); + func.update_aggregate(tup, grp, (*iter).second); }else{ - // create a copy of the group on the heap - group* new_grp = new group(grp); // need a copy constructor for groups -// aggregate* aggr = (aggregate*)malloc(sizeof(aggregate)); - aggregate* aggr = new aggregate(); - // create an aggregate in preallocated buffer - aggr = func.create_aggregate(tup, (char*)aggr); - - group_table.insert(new_grp, aggr); + char aggr_buffer[sizeof(aggregate)]; + // create an aggregate in preallocated buffer + func.create_aggregate(tup, aggr_buffer); + // neeed operator= doing a deep copy + group_table.insert(grp, (*(aggregate*)aggr_buffer)); } tup.free_tuple(); return 0; @@ -91,10 +86,10 @@ public: virtual int flush(list& result) { host_tuple tup; - typename hash_table::iterator iter; + typename hash_table::iterator iter; // Limit the number of successive flushes - avoid explosive behavior - const gs_int32_t max_flushes = 10; + const gs_int32_t max_flushes = 25; if(nflushes>max_flushes){ fprintf(stderr,"Warning in operator %s, temporal advance of %d windows needed, max number of windows that can be reported at once is %d\n",op_name, nflushes, max_flushes); nflushes = max_flushes; @@ -116,12 +111,10 @@ public: result.push_back(tup); } if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){ - group* g = (*flush_pos).first; - aggregate* a = (*flush_pos).second; + group &g = (*flush_pos).first; + //aggregate a = (*flush_pos).second; ++flush_pos; group_table.erase(g); - delete (g); - delete (a); }else{ func.reinit_aggregates((*flush_pos).first, (*flush_pos).second); ++flush_pos;