Improvements to aggregation code and fucntion library
[com/gs-lite.git] / include / hfta / groupby_operator.h
index 26645c3..e831bc0 100644 (file)
@@ -1,4 +1,4 @@
-/* ------------------------------------------------
+/** ------------------------------------------------
 Copyright 2014 AT&T Intellectual Property
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
@@ -21,7 +21,6 @@ Copyright 2014 AT&T Intellectual Property
 #include <list>
 #include "hash_table.h"
 
-#define _GB_FLUSH_PER_TUPLE_ 1
 
 using namespace std;
 
@@ -29,19 +28,13 @@ template <class groupby_func, class group, class aggregate, class hasher_func, c
 class groupby_operator : public base_operator {
 private :
        groupby_func func;
-       hash_table<group*, aggregate*, hasher_func, equal_func> group_table[2];
+       hash_table<group, aggregate, hasher_func, equal_func> group_table;
        bool flush_finished;
-       unsigned int curr_table;
-       typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
+       typename hash_table<group, aggregate, hasher_func, equal_func>::iterator flush_pos;
        int n_patterns;
-
-
-
 public:
        groupby_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {
                flush_finished = true;
-               curr_table = 0;
-               flush_pos = group_table[1-curr_table].end();
                n_patterns = func.n_groupby_patterns();
        }
 
@@ -49,24 +42,17 @@ public:
 
 //                     Push out completed groups
 
-               if(!flush_finished) partial_flush(result);
-
-               // create buffer on the stack to store key object
-               char buffer[sizeof(group)];
-
                // extract the key information from the tuple and
                // copy it into buffer
-               group* grp = func.create_group(tup, buffer);
-               if (!grp) {
-/*
-//                     Ignore temp tuples until we can fix their timestamps.
-if (func.temp_status_received()) {
- tup.free_tuple();
- return 0;
-}*/
+               group grp;
+               if (!func.create_group(tup, (gs_sp_t)&grp)) {
+                       if(func.disordered()){
+                               fprintf(stderr,"Out of order record in %s\n",op_name);
+                               return 0;
+                       }
                        if (func.flush_needed()){
                                flush_old(result);
-               }
+                       }
                        if (func.temp_status_received()) {
                                host_tuple temp_tup;
                                if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {
@@ -77,32 +63,35 @@ if (func.temp_status_received()) {
                        tup.free_tuple();
                        return 0;
                }
+               if(func.disordered()){
+                       fprintf(stderr,"Out of order record in %s\n",op_name);
+                       return 0;
+               }
 
-               typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
-               if ((iter = group_table[curr_table].find(grp)) != group_table[curr_table].end()) {
+               typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
+               if ((iter = group_table.find(grp)) != group_table.end()) {
 //                             Temporal GBvar is part of the group so no flush is needed.
-                       aggregate* old_aggr = (*iter).second;
-                       func.update_aggregate(tup, grp, old_aggr);
+                       func.update_aggregate(tup, grp, (*iter).second);
                }else{
                        if (func.flush_needed()) {
                                flush_old(result);
                        }
                        if(n_patterns <= 1){
-                       // create a copy of the group on the heap
-                               group* new_grp = new group(grp);        // need a copy constructor for groups
-//                     aggregate* aggr = (aggregate*)malloc(sizeof(aggregate));
-                               aggregate* aggr = new aggregate();
-                       // create an aggregate in preallocated buffer
-                               aggr = func.create_aggregate(tup, (char*)aggr);
-
-                               group_table[curr_table].insert(new_grp, aggr);
+                               aggregate aggr;
+                               // create an aggregate in preallocated buffer
+                               func.create_aggregate(tup, (char*)&aggr);
+                               // neeed operator= doing a deep copy
+                               group_table.insert(grp, aggr);
                        }else{
                                int p;
+// TODO this code is wrong, must check if each pattern is in the group table.
                                for(p=0;p<n_patterns;++p){
-                                       group* new_grp = new group(grp, func.get_pattern(p));
-                                       aggregate* aggr = new aggregate();
-                                       aggr = func.create_aggregate(tup, (char*)aggr);
-                                       group_table[curr_table].insert(new_grp, aggr);
+                                       // need shallow copy constructor for groups
+                                       group new_grp(grp, func.get_pattern(p));
+                                       aggregate aggr;
+                                       func.create_aggregate(tup, (char*)&aggr);
+                                       // neeed operator= doing a deep copy
+                                       group_table.insert(new_grp, aggr);
                                }
                        }
                }
@@ -110,43 +99,14 @@ if (func.temp_status_received()) {
                return 0;
        }
 
-       int partial_flush(list<host_tuple>& result) {
-               host_tuple tup;
-               unsigned int old_table = 1-curr_table;
-               unsigned int i;
-
-//                             emit up to _GB_FLUSH_PER_TABLE_ output tuples.
-               if (!group_table[old_table].empty()) {
-                       for (i=0; flush_pos != group_table[old_table].end() && i<_GB_FLUSH_PER_TUPLE_; ++flush_pos, ++i) {
-                               bool failed = false;
-                               tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
-                               if (!failed) {
-                                       tup.channel = output_channel;
-                                       result.push_back(tup);
-                               }
-                               delete ((*flush_pos).first);
-                               delete ((*flush_pos).second);
-//                             free((*flush_pos).second);
-                       }
-               }
-
-//                     Finalize processing if empty.
-               if(flush_pos == group_table[old_table].end()) {
-                       flush_finished = true;
-                       group_table[old_table].clear();
-                       group_table[old_table].rehash();
-               }
-               return 0;
-       }
 
        int flush(list<host_tuple>& result) {
                host_tuple tup;
-               typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
-               unsigned int old_table = 1-curr_table;
 
-//                     If the old table isn't empty, flush it now.
-               if (!group_table[old_table].empty()) {
-                       for (; flush_pos != group_table[old_table].end(); ++flush_pos) {
+               flush_pos = group_table.begin();
+//                     If the table isn't empty, flush it now.
+               if (!group_table.empty()) {
+                       for (; flush_pos != group_table.end(); ++flush_pos) {
                                bool failed = false;
                                tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
                                if (!failed) {
@@ -154,30 +114,9 @@ if (func.temp_status_received()) {
                                        tup.channel = output_channel;
                                        result.push_back(tup);
                                }
-                               delete ((*flush_pos).first);
-                               delete ((*flush_pos).second);
 //                             free((*flush_pos).second);
                        }
-                       group_table[old_table].clear();
-                       group_table[old_table].rehash();
-               }
-
-               flush_pos = group_table[curr_table].begin();
-//                     If the old table isn't empty, flush it now.
-               if (!group_table[curr_table].empty()) {
-                       for (; flush_pos != group_table[curr_table].end(); ++flush_pos) {
-                               bool failed = false;
-                               tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
-                               if (!failed) {
-
-                                       tup.channel = output_channel;
-                                       result.push_back(tup);
-                               }
-                               delete ((*flush_pos).first);
-                               delete ((*flush_pos).second);
-//                             free((*flush_pos).second);
-                       }
-                       group_table[curr_table].clear();
+                       group_table.clear();
                }
 
                flush_finished = true;
@@ -186,33 +125,10 @@ if (func.temp_status_received()) {
        }
 
        int flush_old(list<host_tuple>& result) {
-               host_tuple tup;
-               typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
-               unsigned int old_table = 1-curr_table;
-
-//                     If the old table isn't empty, flush it now.
-               if (!group_table[old_table].empty()) {
-                       for (; flush_pos != group_table[old_table].end(); ++flush_pos) {
-                               bool failed = false;
-                               tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
-                               if (!failed) {
-
-                                       tup.channel = output_channel;
-                                       result.push_back(tup);
-                               }
-                               delete ((*flush_pos).first);
-                               delete ((*flush_pos).second);
-//                             free((*flush_pos).second);
-                       }
-                       group_table[old_table].clear();
-                       group_table[old_table].rehash();
-               }
-
-//                     swap tables, enable partial flush processing.
-               flush_pos = group_table[curr_table].begin();
-               curr_table = old_table;
-               flush_finished = false;
 
+               flush(result);
+               group_table.clear();
+               group_table.resize();
                return 0;
        }
 
@@ -231,7 +147,7 @@ if (func.temp_status_received()) {
        }
 
        unsigned int get_mem_footprint() {
-               return group_table[0].get_mem_footprint() + group_table[1].get_mem_footprint();
+               return group_table.get_mem_footprint();
        }
 };