Improvements to aggregation code and fucntion library
[com/gs-lite.git] / include / hfta / running_gb_operator.h
index c20c87f..569bbc4 100644 (file)
@@ -29,8 +29,8 @@ template <class groupby_func, class group, class aggregate, class hasher_func, c
 class running_agg_operator : public base_operator {
 private :
        groupby_func func;
-       hash_table<group*, aggregate*, hasher_func, equal_func> group_table;
-       typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
+       hash_table<group, aggregate, hasher_func, equal_func> group_table;
+       typename hash_table<group, aggregate, hasher_func, equal_func>::iterator flush_pos;
        gs_int32_t nflushes;
 
 
@@ -44,16 +44,15 @@ public:
 
 //                     Push out completed groups
 
-// create buffer on the stack to store key object
-               char buffer[sizeof(group)];
-//     Number of flushes required
-
-// extract the key information from the tuple and
-// copy it into buffer
-               group* grp = func.create_group(tup, buffer);
+               group grp, *ret;
+               ret = func.create_group(tup, (gs_sp_t)&grp);
                nflushes = func.flush_needed();
-               
-               if (!grp) {
+               if(func.disordered()){
+                       fprintf(stderr,"Out of order record in %s\n",op_name);
+                       return 0;
+               }
+
+               if (! ret) {
                        if (nflushes>0){
                                flush(result);
                        }
@@ -71,19 +70,15 @@ public:
                if (nflushes>0) {
                        flush(result);
                }
-               typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
+               typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
                if ((iter = group_table.find(grp)) != group_table.end()) {
-                       aggregate* old_aggr = (*iter).second;
-                       func.update_aggregate(tup, grp, old_aggr);
+                       func.update_aggregate(tup, grp, (*iter).second);
                }else{
-                       // create a copy of the group on the heap
-                       group* new_grp = new group(grp);        // need a copy constructor for groups
-//                     aggregate* aggr = (aggregate*)malloc(sizeof(aggregate));
-                       aggregate* aggr = new aggregate();
-                       // create an aggregate in preallocated buffer
-                       aggr = func.create_aggregate(tup, (char*)aggr);
-
-                       group_table.insert(new_grp, aggr);
+                               aggregate aggr;
+                               // create an aggregate in preallocated buffer
+                               func.create_aggregate(tup, (char*)&aggr);
+                               // neeed operator= doing a deep copy
+                               group_table.insert(grp, aggr);
                }
                tup.free_tuple();
                return 0;
@@ -91,7 +86,7 @@ public:
 
        virtual int flush(list<host_tuple>& result) {
                host_tuple tup;
-               typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
+               typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
 
 //     Limit the number of successive flushes - avoid explosive behavior
                const gs_int32_t max_flushes = 10;
@@ -116,12 +111,10 @@ public:
                                        result.push_back(tup);
                                }
                                if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){
-                               group* g = (*flush_pos).first;
-                               aggregate* a = (*flush_pos).second;
+                                       group &g = (*flush_pos).first;
+                                       //aggregate a = (*flush_pos).second;
                                        ++flush_pos;
                                        group_table.erase(g);
-                                       delete (g);
-                                       delete (a);
                                }else{
                                        func.reinit_aggregates((*flush_pos).first, (*flush_pos).second);
                                        ++flush_pos;