-/* ------------------------------------------------
-Copyright 2014 AT&T Intellectual Property
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ------------------------------------------- */
-
-#ifndef GROUPBY_OPERATOR_OOP_H
-#define GROUPBY_OPERATOR_OOP_H
-
-#include "host_tuple.h"
-#include "base_operator.h"
-#include <list>
-#include <vector>
-#include "hash_table.h"
-#include <cassert>
-
-// TED: should be supplied by the groupby_func
-#define _GB_FLUSH_PER_TUPLE_ 1
-
-/* max allowed disorder -- Jin */
-// TED: should be supplied by the groupby_func
-#define DISORDER_LEVEL 2
-
-//#define NDEBUG
-
-using namespace std;
-
-// ASSUME temporal_type is one of int, uint, llong, ullong
-
-template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func, class temporal_type>
-class groupby_operator_oop : public base_operator {
-private :
- groupby_func func;
-
- /* a list of hash tables, which maintains aggregates for current window and also k previous ones -- Jin */
- vector<hash_table<group*, aggregate*, hasher_func, equal_func>* > group_tables;
-
- /* the minimum and maximum window id of the hash tables -- Jin */
- temporal_type min_wid, max_wid;
-
-
- bool flush_finished;
- temporal_type curr_table;
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
-
- temporal_type last_flushed_temporal_gb;
- temporal_type last_temporal_gb;
-
-int n_slow_flush;
- int n_patterns;
-
-
-public:
- groupby_operator_oop(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {
- flush_finished = true;
-
- min_wid = 0;
- max_wid = 0;
-n_slow_flush = 0;
- n_patterns = func.n_groupby_patterns();
- }
-
- ~groupby_operator_oop() {
- hash_table<group*, aggregate*, hasher_func, equal_func>* table;
- // delete all the elements in the group_tables list;
- while (!group_tables.empty()) {
- table = group_tables.back();
- group_tables.pop_back();
- table->clear();
-//fprintf(stderr,"Deleting group table (c) at %lx\n",(gs_uint64_t)(table));
- delete (table);
- }
-
- }
-
- int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
-
-
- // Push out completed groups
- if(!flush_finished) partial_flush(result);
-
- // create buffer on the stack to store key object
- char buffer[sizeof(group)];
-
- // extract the key information from the tuple and
- // copy it into buffer
- group* grp = func.create_group(tup, buffer);
-
-
- if (!grp) {
-//printf("grp==NULL recieved ");
- if (func.temp_status_received()) {
-//printf("temp status record ");
- last_flushed_temporal_gb = func.get_last_flushed_gb ();
- last_temporal_gb = func.get_last_gb ();
- }
-//printf("\n");
-
-//fprintf(stderr,"min_wid=%d, max_wid=%d, last_temporal_gb=%d, last_flushed_temporal_gb=%d, flush_finished=%d\n",min_wid, max_wid, last_temporal_gb, last_flushed_temporal_gb, flush_finished);
-
- /* no data has arrived, and so we ignore the temp tuples -- Jin */
- if (group_tables.size()>0) {
-
- gs_int64_t index;
- if(last_flushed_temporal_gb >= min_wid){
- index = last_flushed_temporal_gb - min_wid;
- }else{
- index = -(min_wid - last_flushed_temporal_gb); // unsigned arithmetic
- }
-
- if (func.flush_needed() && index>=0) {
-#ifdef NDEBUG
-//fprintf(stderr, "flush needed: last_flushed_gb %u , min_wid %u \n", last_flushed_temporal_gb, min_wid);
-#endif
- // Init flush on first temp tuple -- Jin
- if ( !flush_finished) {
-#ifdef NDEBUG
-//fprintf(stderr, "last_flushed_gb is %u, min_wid is %u \n", last_flushed_temporal_gb, min_wid);
-#endif
- flush_old(result);
- }
- if (last_temporal_gb > min_wid && group_tables.size()>0) {
- flush_finished = false;
- }
-
- // we start to flush from the head of the group tables -- Jin
- if(group_tables.size()>0){
- flush_pos = group_tables[0]->begin();
- }
-
-#ifdef NDEBUG
-//fprintf(stderr, "after flush old \n");
-#endif
- }
- }
-
- host_tuple temp_tup;
- if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {
- temp_tup.channel = output_channel;
- result.push_back(temp_tup);
- }
-
- tup.free_tuple();
- return 0;
- }
-
-//fprintf (stderr, "after create group grp=%lx, curr_table = %d\n",(gs_uint64_t)grp, grp->get_curr_gb());
-
- /* This is a regular tuple -- Jin */
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
- /* first, decide which hash table we need to work at */
- curr_table = grp->get_curr_gb();
- if (max_wid == 0 && min_wid == 0) {
- group_tables.push_back((new hash_table<group*, aggregate*, hasher_func, equal_func>()));
-//fprintf(stderr,"Added (1) hash tbl at %lx, pos=%d\n",(gs_uint64_t)(group_tables.back()),group_tables.size());
- max_wid = min_wid = curr_table;
- }
- if (curr_table < min_wid) {
- for (temporal_type i = curr_table; i < min_wid; i++){
- group_tables.insert(group_tables.begin(), new hash_table<group*, aggregate*, hasher_func, equal_func>());
-//fprintf(stderr,"Added (2) hash tbl at %lx, pos=%d\n",(gs_uint64_t)(group_tables.back()),group_tables.size());
- }
- min_wid = curr_table;
- }
- if (curr_table > max_wid) {
- hash_table<group*, aggregate*, hasher_func, equal_func>* pt;
- for (temporal_type i = max_wid; i < curr_table; i++) {
- pt =new hash_table<group*, aggregate*, hasher_func, equal_func>();
- group_tables.push_back(pt);
-//fprintf(stderr,"Added (3) hash tbl at %lx, pos=%d\n",(gs_uint64_t)(group_tables.back()),group_tables.size());
- }
-
- max_wid = curr_table;
- }
- gs_int64_t index = curr_table - min_wid;
-
- if ((iter = group_tables[index]->find(grp)) != group_tables[index]->end()) {
- aggregate* old_aggr = (*iter).second;
- func.update_aggregate(tup, grp, old_aggr);
- }else{
- /* We only flush when a temp tuple is received, so we only check on temp tuple -- Jin */
- // create a copy of the group on the heap
- if(n_patterns <= 1){
-
- group* new_grp = new group(grp); // need a copy constructor for groups
-
- aggregate* aggr = new aggregate();
-
- // create an aggregate in preallocated buffer
- aggr = func.create_aggregate(tup, (char*)aggr);
-
-// hash_table<group*, aggregate*, hasher_func, equal_func>* pt;
- group_tables[index]->insert(new_grp, aggr);
- }else{
- int p;
- for(p=0;p<n_patterns;++p){
- group* new_grp = new group(grp, func.get_pattern(p));
- aggregate* aggr = new aggregate();
- aggr = func.create_aggregate(tup, (char*)aggr);
- group_tables[index]->insert(new_grp, aggr);
- }
- }
- }
- tup.free_tuple();
- return 0;
- }
-
- int partial_flush(list<host_tuple>& result) {
- host_tuple tup;
- /* the hash table we should flush is func->last_flushed_gb_0 -- Jin */
- /* should we guarantee that everything before func->last_flushed_gb_0 also flushed ??? */
-
- gs_int64_t i;
-
-//fprintf(stderr, "partial_flush size of group_tables = %u, min_wid is %u, max_wid is %u last_temporal_gb=%d \n", group_tables.size(), min_wid, max_wid, last_temporal_gb);
- if(group_tables.size()==0){
- flush_finished = true;
-//fprintf(stderr, "out of partial flush early \n");
- return 0;
- }
-
-// emit up to _GB_FLUSH_PER_TABLE_ output tuples.
- if (!group_tables[0]->empty()) {
- for (i=0; flush_pos!=group_tables[0]->end() && i < _GB_FLUSH_PER_TUPLE_; ++flush_pos, ++i) {
-n_slow_flush++;
- bool failed = false;
- tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
- if (!failed) {
- tup.channel = output_channel;
- result.push_back(tup);
- }
-//fprintf(stderr,"partial_flush Deleting at Flush_pos=%lx \n",(gs_uint64_t)((*flush_pos).first));
- delete ((*flush_pos).first);
- delete ((*flush_pos).second);
- }
- }
-
-// Finalize processing if empty.
- if (flush_pos == group_tables[0]->end()) {
- /* one window is completely flushed, so recycle the hash table -- Jin */
-
- hash_table<group*, aggregate*, hasher_func, equal_func>* table = group_tables[0];
-
-//fprintf(stderr,"partial_flush Delelting group table %lx\n",(gs_uint64_t)(group_tables[0]));
- group_tables[0]->clear();
- delete (group_tables[0]);
-
- group_tables.erase(group_tables.begin());
-
- min_wid++;
-
- if (last_temporal_gb > min_wid && group_tables.size()>0) {
- flush_pos = group_tables[0]->begin();
-
- } else {
- flush_finished = true;
- }
- }
-//fprintf(stderr, "out of partial flush \n");
- return 0;
- }
-
-
- /* Where is this function called ??? */ /* externally */
- int flush(list<host_tuple>& result) {
- host_tuple tup;
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
- /* the hash table we should flush is func->last_flushed_gb_0 -- Jin */
- /* should we guarantee that everything before func->last_flushed_gb_0 also flushed ??? */
- while ( group_tables.size() > 0) {
- if (!group_tables[0]->empty()) {
- if (flush_finished)
- flush_pos = group_tables[0]->begin();
- for (; flush_pos != group_tables[0]->end(); ++flush_pos) {
- bool failed = false;
-
- tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
-
- if (!failed) {
-
- tup.channel = output_channel;
- result.push_back(tup);
- }
-//fprintf(stderr,"flush_old Deleting at Flush_pos=%lx \n",(gs_uint64_t)((*flush_pos).first));
- delete ((*flush_pos).first);
- delete ((*flush_pos).second);
-
- }
- }
- min_wid++;
-
- // remove the hashtable from group_tables
- hash_table<group*, aggregate*, hasher_func, equal_func>* table = group_tables[0];
-
- table->clear();
-//fprintf(stderr,"flush_old Delelting group table %lx\n",(gs_uint64_t)(group_tables[0]));
- delete (table);
- group_tables.erase(group_tables.begin());
-
- if(group_tables.size()>0){
- flush_pos = group_tables[0]->begin();
- }
- }
-
-
-
- flush_finished = true;
-
- return 0;
- }
-
- /* flushes every hash table before last_flush_gb, and get ready to flush the next window -- Jin */
- int flush_old(list<host_tuple>& result) {
- host_tuple tup;
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
- gs_int64_t num, i;
-
-//fprintf(stderr, "flush_old size of group_tables = %u, min_wid is %u, max_wid is %u last_temporal_gb=%d, num=%d\n", group_tables.size(), min_wid, max_wid, last_temporal_gb, num);
-
- num = last_temporal_gb - min_wid;
-
- //If the old table isn't empty, flush it now.
- for (i = 0; i < num && group_tables.size() > 0; i++) {
- if (!group_tables[0]->empty()) {
- for (; flush_pos != group_tables[0]->end(); ++flush_pos) {
- bool failed = false;
-
- tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
-
- if (!failed) {
-
- tup.channel = output_channel;
- result.push_back(tup);
- }
-//fprintf(stderr,"flush_old Deleting at Flush_pos=%lx \n",(gs_uint64_t)((*flush_pos).first));
- delete ((*flush_pos).first);
- delete ((*flush_pos).second);
-
- }
- }
- min_wid++;
-
- // remove the hashtable from group_tables
- hash_table<group*, aggregate*, hasher_func, equal_func>* table = group_tables[0];
-
- table->clear();
-//fprintf(stderr,"flush_old Delelting group table %lx\n",(gs_uint64_t)(group_tables[0]));
- delete (table);
- group_tables.erase(group_tables.begin());
-
- if(group_tables.size()>0){
- flush_pos = group_tables[0]->begin();
- }
- }
-
- flush_finished = true;
-
-//fprintf(stderr, "end of flush_old \n");
-
- return 0;
- }
-
-
- int set_param_block(int sz, void * value) {
- func.set_param_block(sz, value);
- return 0;
- }
-
- int get_temp_status(host_tuple& result) {
- result.channel = output_channel;
- return func.create_temp_status_tuple(result, flush_finished);
- }
-
- int get_blocked_status () {
- return -1;
- }
-
- unsigned int get_mem_footprint() {
- unsigned int ret;
- unsigned int i;
-
- for(i=0;i<group_tables.size();++i)
- ret += group_tables[i]->get_mem_footprint() ;
-
- return ret;
- }
-};
-
-#endif // GROUPBY_OPERATOR_OOP_H
-
+/* ------------------------------------------------\r
+Copyright 2014 AT&T Intellectual Property\r
+ Licensed under the Apache License, Version 2.0 (the "License");\r
+ you may not use this file except in compliance with the License.\r
+ You may obtain a copy of the License at\r
+\r
+ http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+ ------------------------------------------- */\r
+\r
+#ifndef GROUPBY_OPERATOR_OOP_H\r
+#define GROUPBY_OPERATOR_OOP_H\r
+\r
+#include "host_tuple.h"\r
+#include "base_operator.h"\r
+#include <list>\r
+#include <vector>\r
+#include "hash_table.h"\r
+#include <cassert>\r
+\r
+// TED: should be supplied by the groupby_func\r
+#define _GB_FLUSH_PER_TUPLE_ 1\r
+\r
+/* max allowed disorder -- Jin */\r
+// TED: should be supplied by the groupby_func\r
+#define DISORDER_LEVEL 2\r
+\r
+//#define NDEBUG\r
+\r
+using namespace std;\r
+\r
+// ASSUME temporal_type is one of int, uint, llong, ullong\r
+\r
+template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func, class temporal_type>\r
+class groupby_operator_oop : public base_operator {\r
+private :\r
+ groupby_func func;\r
+\r
+ /* a list of hash tables, which maintains aggregates for current window and also k previous ones -- Jin */\r
+ vector<hash_table<group*, aggregate*, hasher_func, equal_func>* > group_tables;\r
+\r
+ /* the minimum and maximum window id of the hash tables -- Jin */\r
+ temporal_type min_wid, max_wid;\r
+\r
+\r
+ bool flush_finished;\r
+ temporal_type curr_table;\r
+ typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;\r
+\r
+ temporal_type last_flushed_temporal_gb;\r
+ temporal_type last_temporal_gb;\r
+\r
+int n_slow_flush;\r
+ int n_patterns;\r
+\r
+\r
+public:\r
+ groupby_operator_oop(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {\r
+ flush_finished = true;\r
+\r
+ min_wid = 0;\r
+ max_wid = 0;\r
+n_slow_flush = 0;\r
+ n_patterns = func.n_groupby_patterns();\r
+ }\r
+\r
+ ~groupby_operator_oop() {\r
+ hash_table<group*, aggregate*, hasher_func, equal_func>* table;\r
+ // delete all the elements in the group_tables list;\r
+ while (!group_tables.empty()) {\r
+ table = group_tables.back();\r
+ group_tables.pop_back();\r
+ table->clear();\r
+//fprintf(stderr,"Deleting group table (c) at %lx\n",(gs_uint64_t)(table));\r
+ delete (table);\r
+ }\r
+\r
+ }\r
+\r
+ int accept_tuple(host_tuple& tup, list<host_tuple>& result) {\r
+\r
+\r
+ // Push out completed groups\r
+ if(!flush_finished) partial_flush(result);\r
+\r
+ // create buffer on the stack to store key object\r
+ char buffer[sizeof(group)];\r
+\r
+ // extract the key information from the tuple and\r
+ // copy it into buffer\r
+ group* grp = func.create_group(tup, buffer);\r
+\r
+\r
+ if (!grp) {\r
+//printf("grp==NULL recieved ");\r
+ if (func.temp_status_received()) {\r
+//printf("temp status record ");\r
+ last_flushed_temporal_gb = func.get_last_flushed_gb ();\r
+ last_temporal_gb = func.get_last_gb ();\r
+ }\r
+//printf("\n");\r
+\r
+//fprintf(stderr,"min_wid=%d, max_wid=%d, last_temporal_gb=%d, last_flushed_temporal_gb=%d, flush_finished=%d\n",min_wid, max_wid, last_temporal_gb, last_flushed_temporal_gb, flush_finished);\r
+\r
+ /* no data has arrived, and so we ignore the temp tuples -- Jin */\r
+ if (group_tables.size()>0) {\r
+\r
+ gs_int64_t index;\r
+ if(last_flushed_temporal_gb >= min_wid){\r
+ index = last_flushed_temporal_gb - min_wid;\r
+ }else{\r
+ index = -(min_wid - last_flushed_temporal_gb); // unsigned arithmetic\r
+ }\r
+\r
+ if (func.flush_needed() && index>=0) {\r
+#ifdef NDEBUG\r
+//fprintf(stderr, "flush needed: last_flushed_gb %u , min_wid %u \n", last_flushed_temporal_gb, min_wid);\r
+#endif\r
+ // Init flush on first temp tuple -- Jin\r
+ if ( !flush_finished) {\r
+#ifdef NDEBUG\r
+//fprintf(stderr, "last_flushed_gb is %u, min_wid is %u \n", last_flushed_temporal_gb, min_wid);\r
+#endif\r
+ flush_old(result);\r
+ }\r
+ if (last_temporal_gb > min_wid && group_tables.size()>0) {\r
+ flush_finished = false;\r
+ }\r
+\r
+ // we start to flush from the head of the group tables -- Jin\r
+ if(group_tables.size()>0){\r
+ flush_pos = group_tables[0]->begin();\r
+ }\r
+\r
+#ifdef NDEBUG\r
+//fprintf(stderr, "after flush old \n");\r
+#endif\r
+ }\r
+ }\r
+\r
+ host_tuple temp_tup;\r
+ if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {\r
+ temp_tup.channel = output_channel;\r
+ result.push_back(temp_tup);\r
+ }\r
+\r
+ tup.free_tuple();\r
+ return 0;\r
+ }\r
+\r
+//fprintf (stderr, "after create group grp=%lx, curr_table = %d\n",(gs_uint64_t)grp, grp->get_curr_gb());\r
+\r
+ /* This is a regular tuple -- Jin */\r
+ typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;\r
+ /* first, decide which hash table we need to work at */\r
+ curr_table = grp->get_curr_gb();\r
+ if (max_wid == 0 && min_wid == 0) {\r
+ group_tables.push_back((new hash_table<group*, aggregate*, hasher_func, equal_func>()));\r
+//fprintf(stderr,"Added (1) hash tbl at %lx, pos=%d\n",(gs_uint64_t)(group_tables.back()),group_tables.size());\r
+ max_wid = min_wid = curr_table;\r
+ }\r
+ if (curr_table < min_wid) {\r
+ for (temporal_type i = curr_table; i < min_wid; i++){\r
+ group_tables.insert(group_tables.begin(), new hash_table<group*, aggregate*, hasher_func, equal_func>());\r
+//fprintf(stderr,"Added (2) hash tbl at %lx, pos=%d\n",(gs_uint64_t)(group_tables.back()),group_tables.size());\r
+ }\r
+ min_wid = curr_table;\r
+ }\r
+ if (curr_table > max_wid) {\r
+ hash_table<group*, aggregate*, hasher_func, equal_func>* pt;\r
+ for (temporal_type i = max_wid; i < curr_table; i++) {\r
+ pt =new hash_table<group*, aggregate*, hasher_func, equal_func>();\r
+ group_tables.push_back(pt);\r
+//fprintf(stderr,"Added (3) hash tbl at %lx, pos=%d\n",(gs_uint64_t)(group_tables.back()),group_tables.size());\r
+ }\r
+\r
+ max_wid = curr_table;\r
+ }\r
+ gs_int64_t index = curr_table - min_wid;\r
+\r
+ if ((iter = group_tables[index]->find(grp)) != group_tables[index]->end()) {\r
+ aggregate* old_aggr = (*iter).second;\r
+ func.update_aggregate(tup, grp, old_aggr);\r
+ }else{\r
+ /* We only flush when a temp tuple is received, so we only check on temp tuple -- Jin */\r
+ // create a copy of the group on the heap\r
+ if(n_patterns <= 1){\r
+\r
+ group* new_grp = new group(grp); // need a copy constructor for groups\r
+\r
+ aggregate* aggr = new aggregate();\r
+\r
+ // create an aggregate in preallocated buffer\r
+ aggr = func.create_aggregate(tup, (char*)aggr);\r
+\r
+// hash_table<group*, aggregate*, hasher_func, equal_func>* pt;\r
+ group_tables[index]->insert(new_grp, aggr);\r
+ }else{\r
+ int p;\r
+ for(p=0;p<n_patterns;++p){\r
+ group* new_grp = new group(grp, func.get_pattern(p));\r
+ aggregate* aggr = new aggregate();\r
+ aggr = func.create_aggregate(tup, (char*)aggr);\r
+ group_tables[index]->insert(new_grp, aggr);\r
+ }\r
+ }\r
+ }\r
+ tup.free_tuple();\r
+ return 0;\r
+ }\r
+\r
+ int partial_flush(list<host_tuple>& result) {\r
+ host_tuple tup;\r
+ /* the hash table we should flush is func->last_flushed_gb_0 -- Jin */\r
+ /* should we guarantee that everything before func->last_flushed_gb_0 also flushed ??? */\r
+\r
+ gs_int64_t i;\r
+\r
+//fprintf(stderr, "partial_flush size of group_tables = %u, min_wid is %u, max_wid is %u last_temporal_gb=%d \n", group_tables.size(), min_wid, max_wid, last_temporal_gb);\r
+ if(group_tables.size()==0){\r
+ flush_finished = true;\r
+//fprintf(stderr, "out of partial flush early \n");\r
+ return 0;\r
+ }\r
+\r
+// emit up to _GB_FLUSH_PER_TABLE_ output tuples.\r
+ if (!group_tables[0]->empty()) {\r
+ for (i=0; flush_pos!=group_tables[0]->end() && i < _GB_FLUSH_PER_TUPLE_; ++flush_pos, ++i) {\r
+n_slow_flush++;\r
+ bool failed = false;\r
+ tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);\r
+ if (!failed) {\r
+ tup.channel = output_channel;\r
+ result.push_back(tup);\r
+ }\r
+//fprintf(stderr,"partial_flush Deleting at Flush_pos=%lx \n",(gs_uint64_t)((*flush_pos).first));\r
+ delete ((*flush_pos).first);\r
+ delete ((*flush_pos).second);\r
+ }\r
+ }\r
+\r
+// Finalize processing if empty.\r
+ if (flush_pos == group_tables[0]->end()) {\r
+ /* one window is completely flushed, so recycle the hash table -- Jin */\r
+\r
+ hash_table<group*, aggregate*, hasher_func, equal_func>* table = group_tables[0];\r
+\r
+//fprintf(stderr,"partial_flush Delelting group table %lx\n",(gs_uint64_t)(group_tables[0]));\r
+ group_tables[0]->clear();\r
+ delete (group_tables[0]);\r
+\r
+ group_tables.erase(group_tables.begin());\r
+\r
+ min_wid++;\r
+\r
+ if (last_temporal_gb > min_wid && group_tables.size()>0) {\r
+ flush_pos = group_tables[0]->begin();\r
+\r
+ } else {\r
+ flush_finished = true;\r
+ }\r
+ }\r
+//fprintf(stderr, "out of partial flush \n");\r
+ return 0;\r
+ }\r
+\r
+\r
+ /* Where is this function called ??? */ /* externally */\r
+ int flush(list<host_tuple>& result) {\r
+ host_tuple tup;\r
+ typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;\r
+ /* the hash table we should flush is func->last_flushed_gb_0 -- Jin */\r
+ /* should we guarantee that everything before func->last_flushed_gb_0 also flushed ??? */\r
+ while ( group_tables.size() > 0) {\r
+ if (!group_tables[0]->empty()) {\r
+ if (flush_finished)\r
+ flush_pos = group_tables[0]->begin();\r
+ for (; flush_pos != group_tables[0]->end(); ++flush_pos) {\r
+ bool failed = false;\r
+\r
+ tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);\r
+\r
+ if (!failed) {\r
+\r
+ tup.channel = output_channel;\r
+ result.push_back(tup);\r
+ }\r
+//fprintf(stderr,"flush_old Deleting at Flush_pos=%lx \n",(gs_uint64_t)((*flush_pos).first));\r
+ delete ((*flush_pos).first);\r
+ delete ((*flush_pos).second);\r
+\r
+ }\r
+ }\r
+ min_wid++;\r
+\r
+ // remove the hashtable from group_tables\r
+ hash_table<group*, aggregate*, hasher_func, equal_func>* table = group_tables[0];\r
+\r
+ table->clear();\r
+//fprintf(stderr,"flush_old Delelting group table %lx\n",(gs_uint64_t)(group_tables[0]));\r
+ delete (table);\r
+ group_tables.erase(group_tables.begin());\r
+\r
+ if(group_tables.size()>0){\r
+ flush_pos = group_tables[0]->begin();\r
+ }\r
+ }\r
+\r
+\r
+\r
+ flush_finished = true;\r
+\r
+ return 0;\r
+ }\r
+\r
+ /* flushes every hash table before last_flush_gb, and get ready to flush the next window -- Jin */\r
+ int flush_old(list<host_tuple>& result) {\r
+ host_tuple tup;\r
+ typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;\r
+ gs_int64_t num, i;\r
+\r
+//fprintf(stderr, "flush_old size of group_tables = %u, min_wid is %u, max_wid is %u last_temporal_gb=%d, num=%d\n", group_tables.size(), min_wid, max_wid, last_temporal_gb, num);\r
+\r
+ num = last_temporal_gb - min_wid;\r
+\r
+ //If the old table isn't empty, flush it now.\r
+ for (i = 0; i < num && group_tables.size() > 0; i++) {\r
+ if (!group_tables[0]->empty()) {\r
+ for (; flush_pos != group_tables[0]->end(); ++flush_pos) {\r
+ bool failed = false;\r
+\r
+ tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);\r
+\r
+ if (!failed) {\r
+\r
+ tup.channel = output_channel;\r
+ result.push_back(tup);\r
+ }\r
+//fprintf(stderr,"flush_old Deleting at Flush_pos=%lx \n",(gs_uint64_t)((*flush_pos).first));\r
+ delete ((*flush_pos).first);\r
+ delete ((*flush_pos).second);\r
+\r
+ }\r
+ }\r
+ min_wid++;\r
+\r
+ // remove the hashtable from group_tables\r
+ hash_table<group*, aggregate*, hasher_func, equal_func>* table = group_tables[0];\r
+\r
+ table->clear();\r
+//fprintf(stderr,"flush_old Delelting group table %lx\n",(gs_uint64_t)(group_tables[0]));\r
+ delete (table);\r
+ group_tables.erase(group_tables.begin());\r
+\r
+ if(group_tables.size()>0){\r
+ flush_pos = group_tables[0]->begin();\r
+ }\r
+ }\r
+\r
+ flush_finished = true;\r
+\r
+//fprintf(stderr, "end of flush_old \n");\r
+\r
+ return 0;\r
+ }\r
+\r
+\r
+ int set_param_block(int sz, void * value) {\r
+ func.set_param_block(sz, value);\r
+ return 0;\r
+ }\r
+\r
+ int get_temp_status(host_tuple& result) {\r
+ result.channel = output_channel;\r
+ return func.create_temp_status_tuple(result, flush_finished);\r
+ }\r
+\r
+ int get_blocked_status () {\r
+ return -1;\r
+ }\r
+\r
+ unsigned int get_mem_footprint() {\r
+ unsigned int ret;\r
+ unsigned int i;\r
+\r
+ for(i=0;i<group_tables.size();++i)\r
+ ret += group_tables[i]->get_mem_footprint() ;\r
+\r
+ return ret;\r
+ }\r
+};\r
+\r
+#endif // GROUPBY_OPERATOR_OOP_H\r
+\r