Disable printing out-of-order messages, increase max_flushes to 25 in running groupby
[com/gs-lite.git] / include / hfta / running_gb_operator.h
index 5bd7388..ca2e7fc 100644 (file)
@@ -29,8 +29,9 @@ template <class groupby_func, class group, class aggregate, class hasher_func, c
 class running_agg_operator : public base_operator {
 private :
        groupby_func func;
-       hash_table<group*, aggregate*, hasher_func, equal_func> group_table;
-       typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
+       hash_table<group, aggregate, hasher_func, equal_func> group_table;
+       typename hash_table<group, aggregate, hasher_func, equal_func>::iterator flush_pos;
+       gs_int32_t nflushes;
 
 
 
@@ -43,21 +44,16 @@ public:
 
 //                     Push out completed groups
 
-               // create buffer on the stack to store key object
-               char buffer[sizeof(group)];
-
-               // extract the key information from the tuple and
-               // copy it into buffer
-               group* grp = func.create_group(tup, buffer);
-/*//                   Ignore temp tuples until we can fix their timestamps.
-if (func.temp_status_received()) {
- tup.free_tuple();
- return 0;
-}
-*/
-
-               if (!grp) {
-                       if (func.flush_needed()){
+               group grp, *ret;
+               ret = func.create_group(tup, (gs_sp_t)&grp);
+               nflushes = func.flush_needed();
+               if(func.disordered()){
+                       // fprintf(stderr,"Out of order record in %s\n",op_name);
+                       return 0;
+               }
+
+               if (! ret) {
+                       if (nflushes>0){
                                flush(result);
                        }
                        if (func.temp_status_received()) {
@@ -71,22 +67,18 @@ if (func.temp_status_received()) {
                        return 0;
                }
 
-               if (func.flush_needed()) {
+               if (nflushes>0) {
                        flush(result);
                }
-               typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
+               typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
                if ((iter = group_table.find(grp)) != group_table.end()) {
-                       aggregate* old_aggr = (*iter).second;
-                       func.update_aggregate(tup, grp, old_aggr);
+                       func.update_aggregate(tup, grp, (*iter).second);
                }else{
-                       // create a copy of the group on the heap
-                       group* new_grp = new group(grp);        // need a copy constructor for groups
-//                     aggregate* aggr = (aggregate*)malloc(sizeof(aggregate));
-                       aggregate* aggr = new aggregate();
-                       // create an aggregate in preallocated buffer
-                       aggr = func.create_aggregate(tup, (char*)aggr);
-
-                       group_table.insert(new_grp, aggr);
+                               char aggr_buffer[sizeof(aggregate)];
+                               // create an aggregate in preallocated buffer
+                               func.create_aggregate(tup, aggr_buffer);
+                               // neeed operator= doing a deep copy
+                               group_table.insert(grp, (*(aggregate*)aggr_buffer));
                }
                tup.free_tuple();
                return 0;
@@ -94,25 +86,39 @@ if (func.temp_status_received()) {
 
        virtual int flush(list<host_tuple>& result) {
                host_tuple tup;
-               typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
-//             If the old table isn't empty, flush it now.
-               for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) {
-                       bool failed = false;
-                       tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
-                       if (!failed) {
-                               tup.channel = output_channel;
-                               result.push_back(tup);
-                       }
-                       if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){
-                           group* g = (*flush_pos).first;
-                       aggregate* a = (*flush_pos).second;
-                               ++flush_pos;
-                               group_table.erase(g);
-                               delete (g);
-                               delete (a);
+               typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
+
+//     Limit the number of successive flushes - avoid explosive behavior
+               const gs_int32_t max_flushes = 25;
+               if(nflushes>max_flushes){
+                       fprintf(stderr,"Warning in operator %s, temporal advance of %d windows needed, max number of windows that can be reported at once is %d\n",op_name, nflushes, max_flushes);
+                       nflushes = max_flushes;
+               }
+
+               for(gs_int32_t flush_no = 0; flush_no < nflushes; ++flush_no){
+//     advance the TB for the reinit
+                       if(flush_no < nflushes-1){
+                               func.advance_last_tb();
                        }else{
-                               func.reinit_aggregates((*flush_pos).first, (*flush_pos).second);
-                               ++flush_pos;
+                               func.reset_last_tb();   // Move to current tb in case flush limit reached
+                       }
+//             If the old table isn't empty, flush it now.
+                       for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) {
+                               bool failed = false;
+                               tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
+                               if (!failed) {
+                                       tup.channel = output_channel;
+                                       result.push_back(tup);
+                               }
+                               if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){
+                                       group &g = (*flush_pos).first;
+                                       //aggregate a = (*flush_pos).second;
+                                       ++flush_pos;
+                                       group_table.erase(g);
+                               }else{
+                                       func.reinit_aggregates((*flush_pos).first, (*flush_pos).second);
+                                       ++flush_pos;
+                               }
                        }
                }