X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=include%2Fhfta%2Fclean_operator.h;h=6c8d0ed0113eb54d841d116195887ec2abb18e08;hb=e981e864b812c938d3df8b555b6bb98bb89273e7;hp=a7e7862128f2d8648efe17c6583ba61419855d28;hpb=44ea17511358ebc75952066580e31cba8b38ddb8;p=com%2Fgs-lite.git diff --git a/include/hfta/clean_operator.h b/include/hfta/clean_operator.h index a7e7862..6c8d0ed 100644 --- a/include/hfta/clean_operator.h +++ b/include/hfta/clean_operator.h @@ -1,452 +1,452 @@ -/* ------------------------------------------------ -Copyright 2014 AT&T Intellectual Property - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - ------------------------------------------- */ - -#ifndef CLEAN_OPERATOR_H -#define CLEAN_OPERATOR_H - -#include "host_tuple.h" -#include "base_operator.h" -#include -#include "hash_table.h" -#include - -#define _GB_FLUSH_PER_TUPLE_ 1 - -// #define _C_O_DEBUG 1 - -using namespace std; - -template class clean_operator: public base_operator{ - - private: - - class superattribute{ - public: - unsigned int count_distinct; - list l; - - superattribute(){ - count_distinct = 0; - }; - ~superattribute(){}; - }; - - clean_func func; - hash_table group_table[2]; - hash_table supergroup_table[2]; - // maintains count_distinct for every supergroup - // also maintains list of groups of this supergroup - hash_table sp_attribute[2]; - bool flush_finished; - unsigned int curr_table; - unsigned int curr_supertable; - unsigned int curr_attrtable; - unsigned int packet_count; - unsigned int ccc; - typename hash_table::iterator iter1; //find - typename hash_table::iterator flush_pos; - typename hash_table::iterator iter2; //find - typename hash_table::iterator super_flush_pos; - - public: - -// clean_operator(int schema_hadle): func(1){ - clean_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle){ - flush_finished = true; - curr_table = 0; - curr_supertable = 0; - curr_attrtable = 0; - packet_count = 0; - ccc = 0; - flush_pos = group_table[1-curr_table].end(); - super_flush_pos = supergroup_table[1-curr_supertable].end(); - } - - virtual int accept_tuple(host_tuple& tup, list& result){ - packet_count++; - // evict tuple from the old table - if(!flush_finished){ - partial_flush(result); - } - - //buffers to store keys - char buffer[sizeof(group)]; - - // key of the supergroup is all group-by attributes not including the once that define time window - // key of the supergroup is a subset of a group key - //cout << "clean_op: creating group" << "\n"; - group* grp = func.create_group(tup,buffer); -/*// Ignore temp tuples until we can fix their timestamps. -if(func.temp_status_received()){ - tup.free_tuple(); - return 0; -}*/ - state* curr_state; - int cd = 0; //count_distinct - - // do final clean at the border of the time window - if(func.flush_needed()){ - //cout << "number of records: " << packet_count << endl; - //cout << "number of EVAL records: " << ccc << endl; - packet_count = 0; - ccc = 0; - // for every supergroup - clean group table - //cout << "FINAL CLEANING PHASE: " << "\n"; - iter2 = supergroup_table[curr_supertable].begin(); - while (iter2 != supergroup_table[curr_supertable].end()) { - cd = ((*(sp_attribute[curr_attrtable].find((*iter2).first))).second)->count_distinct; - func.finalize_state((*iter2).second, cd); - clean((*iter2).first,(*iter2).second, true); - ++iter2; - } - - } - - if(!grp){ - //cout << "clean_op: failed to create group" << "\n"; - if(func.flush_needed()){ - flush(result); - superflush(); - } - if(func.temp_status_received()){ - host_tuple temp_tup; - if (!func.create_temp_status_tuple(temp_tup, flush_finished)) { - temp_tup.channel = output_channel; - result.push_back(temp_tup); - } - } - tup.free_tuple(); - return 0; - } - - // first flush everything from the old table if needed - // need it before anything else because of the definition of the key for supergroup - if(func.flush_needed()){ - //do flush of the old group table using state from the old supergroup table - flush(result); - //flush everything from the old supertable, swap tables; - superflush(); - } - - state* old_state; - - //supergroup exists in the new table - if ((iter2 = supergroup_table[curr_supertable].find(grp)) != supergroup_table[curr_supertable].end()){ - old_state = (*iter2).second; - - superattribute *temp = (*(sp_attribute[curr_attrtable].find(grp))).second; - cd = temp->count_distinct; - - if(!func.evaluate_predicate(tup,grp,old_state, cd)){ - ccc++; - tup.free_tuple(); - return 0; - } - // update superaggregates - func.update_plus_superaggr(tup, grp, old_state); - //((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct++; - temp->count_distinct++; - cd = temp->count_distinct; - curr_state = old_state; - } - //new supergroup - else{ - - //look up the group in the old table, - if((iter2 = supergroup_table[1-curr_supertable].find(grp)) != supergroup_table[1-curr_supertable].end()){ - cd = ((*(sp_attribute[1-curr_attrtable].find(grp))).second)->count_distinct; - //curr_state = new state((*iter2).second); - curr_state = new state(); - old_state = (*iter2).second; - - //if there is one - do reinitialization - func.reinitialize_state(tup, grp, curr_state,old_state, cd); - } - else{ - curr_state = new state(); - //if there isn't - do initialization - func.initialize_state(tup, grp, curr_state); - } - - // have to create new object for superkey - group* new_sgrp = new group(grp); - - // need to insert the supergroup into the hash table even if the predicate - // evaluates to false, since the state is initialized with the first tuple of the supergroup - - //insert supergroup into the hash table - supergroup_table[curr_supertable].insert(new_sgrp, curr_state); - // create superattribute object - superattribute* sp_attr = new superattribute(); - sp_attribute[curr_attrtable].insert(new_sgrp,sp_attr); - - - if(!func.evaluate_predicate(tup, grp, curr_state, cd)){ - ccc++; - tup.free_tuple(); - return 0; - } - - // update superaggregates - func.update_plus_superaggr(tup, grp, curr_state); - ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct++; - } - - aggregate* ag; - cd = ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct; - - if ((iter1 = group_table[curr_table].find(grp)) != group_table[curr_table].end()) { - //cout << "clean_op: group already exists" << "\n"; - aggregate* old_aggr = (*iter1).second; - - //adjust count_distinct due to aggregation - ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct--; - - //update group aggregates - func.update_aggregate(tup, grp, old_aggr, curr_state, cd); - ag = old_aggr; - } - else{ - //cout << "clean_op: creating a new group" << "\n"; - // create a copy of the group on the heap - group *new_grp = new group(grp); // need a copy constructor for groups - aggregate* aggr = new aggregate(); - // create an aggregate in preallocated buffer - aggr = func.create_aggregate(tup, grp, (char*)aggr, curr_state, cd); - //cout << "clean_op: inserting group into hash" << "\n"; - group_table[curr_table].insert(new_grp, aggr); - ag = aggr; - - // remember group in the list of supergroup - ((*(sp_attribute[curr_attrtable].find(new_grp))).second)->l.push_back(new_grp); - - } - - - //used just for print - bool do_print = false; - cd = ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct; - - //CLEANING WHEN - if(func.need_to_clean(grp, curr_state, cd)){ - clean(grp, curr_state, false); - do_print = true; - } - - tup.free_tuple(); - return 0; - } - - virtual int flush(list& result){ - - //cout << "clean_op: flush" << "\n"; - host_tuple tup; - unsigned int old_table = 1-curr_table; - unsigned int old_supertable = 1-curr_supertable; - unsigned int old_attr = 1-curr_attrtable; - typename hash_table::iterator iter; - iter = supergroup_table[old_supertable].begin(); - unsigned int cd = 0; - - // if the old table isn't empty, flush it now. - if (!group_table[old_table].empty()) { - //cout << "clean_op: old table is not empty, flushing everything immediately" << "\n"; - for (; flush_pos != group_table[old_table].end(); ++flush_pos) { - - bool failed = false; - if((iter = supergroup_table[old_supertable].find((*flush_pos).first)) != supergroup_table[old_supertable].end()){ - - cd = ((*(sp_attribute[old_attr].find((*flush_pos).first))).second)->count_distinct; - - tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, (*iter).second, cd ,failed); - if (!failed) { - //cout << "sampled\n"; - tup.channel = output_channel; - result.push_back(tup); - } - - // may not need deletion of the list pointer, since the supergroup will be flushed soon anyway. - } - - delete ((*flush_pos).first); - delete ((*flush_pos).second); - } - group_table[old_table].clear(); - group_table[old_table].rehash(); - } - - // swap tables, enable partial flush processing. - flush_pos = group_table[curr_table].begin(); - curr_table = old_table; - flush_finished = false; - - return 0; - } - - virtual int partial_flush(list& result){ - - //cout << "clean_op: partial flush" << "\n"; - host_tuple tup; - unsigned int old_table = 1-curr_table; - unsigned int old_supertable = 1-curr_supertable; - unsigned int old_attr = 1-curr_attrtable; - unsigned int i; - unsigned int cd = 0; - typename hash_table::iterator iter; - iter = supergroup_table[old_supertable].begin(); - - // emit up to _GB_FLUSH_PER_TABLE_ output tuples. - if (!group_table[old_table].empty()) { - for (i=0; flush_pos != group_table[old_table].end() && i<_GB_FLUSH_PER_TUPLE_; ++flush_pos, ++i) { - - bool failed = false; - // find supergroup of the group to be deleted - if((iter = supergroup_table[old_supertable].find((*flush_pos).first)) != supergroup_table[old_supertable].end()){ - - cd = ((*(sp_attribute[old_attr].find((*flush_pos).first))).second)->count_distinct; - - tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, (*iter).second, cd, failed); - if (!failed) { - - //cout << "sampled\n"; - tup.channel = output_channel; - result.push_back(tup); - } - - // may not need deletion of the list pointer, since the supergroup will be flushed soon anyway. - } - - delete ((*flush_pos).first); - delete ((*flush_pos).second); - } - } - - // finalize processing if empty. - if(flush_pos == group_table[old_table].end()) { - flush_finished = true; - group_table[old_table].clear(); - group_table[old_table].rehash(); - } - - return 0; - } - - virtual int superflush(){ - - // cout << "clean_op: superflush" << "\n"; - typename hash_table::iterator attr_flush_pos; - unsigned int old = 1-curr_supertable; - unsigned int attr_old = 1-curr_attrtable; - - // if the old supergroup table isn't empty, flush it now. - if (!supergroup_table[old].empty()) { - //cout << "clean_op: flush supertable" << "\n"; - for (; super_flush_pos != supergroup_table[old].end(); ++super_flush_pos) { - //find that supergroup in the attributes table - attr_flush_pos = sp_attribute[attr_old].find((*super_flush_pos).first); - - delete ((*super_flush_pos).first); - delete ((*super_flush_pos).second); - //flush superattribute table too - //delete ((*attr_flush_pos).first); - delete ((*attr_flush_pos).second); - } - supergroup_table[old].clear(); - supergroup_table[old].rehash(); - sp_attribute[attr_old].clear(); - sp_attribute[attr_old].rehash(); - } - - // swap supertables - super_flush_pos = supergroup_table[curr_supertable].begin(); - curr_supertable = old; - // swap attribute tables - curr_attrtable = attr_old; - - return 0; - } - - virtual int clean(group* sgr, state* st, bool final_clean){ - //cout << "clean_op: clean" << "\n"; - bool sample = false; - - typename list::iterator viter; - superattribute* glist = (*(sp_attribute[curr_attrtable].find(sgr))).second; - int cd = ((*(sp_attribute[curr_attrtable].find(sgr))).second)->count_distinct; -// glist->l.size(); -// group_table[curr_table].size(); - if (!glist->l.empty()){ - - //cout << "clean_op: list of group pointers is not empty" << "\n"; - viter = glist->l.begin(); - for(; viter != glist->l.end();){ - iter1 = group_table[curr_table].find(*viter); - aggregate* old_aggr = (*iter1).second; - - //if (((*iter1).first->valid)){ - if (final_clean){ - // HAVING - sample = func.final_sample_group((*iter1).first, old_aggr, st, cd); - } - else - // CLEANING BY - sample = func.sample_group((*iter1).first, old_aggr, st, cd); - - if(!sample){ - //cout << "clean_op: evicting group from the group table" << "\n"; - //update superaggregates - func.update_minus_superaggr((*iter1).first, old_aggr, st); - //delete group - group* g = (*iter1).first; - aggregate* a = (*iter1).second; - group_table[curr_table].erase((*iter1).first); - delete g; - delete a; - //update count_distinct - ((*(sp_attribute[curr_attrtable].find((*iter1).first))).second)->count_distinct--; - //remove pointer from supergroup - viter = glist->l.erase(viter); - } - else - ++viter; - } - } - - return 0; - } - - virtual int set_param_block(int sz, void* value){ - func.set_param_block(sz, value); - return 0; - } - - virtual int get_temp_status(host_tuple& result){ - result.channel = output_channel; - return func.create_temp_status_tuple(result, flush_finished); - } - - virtual int get_blocked_status(){ - return -1; - } - - unsigned int get_mem_footprint() { - return group_table[0].get_mem_footprint() + group_table[1].get_mem_footprint() + - supergroup_table[0].get_mem_footprint() + supergroup_table[1].get_mem_footprint() + - sp_attribute[0].get_mem_footprint() + sp_attribute[1].get_mem_footprint(); - } - -}; - -#endif +/* ------------------------------------------------ +Copyright 2014 AT&T Intellectual Property + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ------------------------------------------- */ + +#ifndef CLEAN_OPERATOR_H +#define CLEAN_OPERATOR_H + +#include "host_tuple.h" +#include "base_operator.h" +#include +#include "hash_table.h" +#include + +#define _GB_FLUSH_PER_TUPLE_ 1 + +// #define _C_O_DEBUG 1 + +using namespace std; + +template class clean_operator: public base_operator{ + + private: + + class superattribute{ + public: + unsigned int count_distinct; + list l; + + superattribute(){ + count_distinct = 0; + }; + ~superattribute(){}; + }; + + clean_func func; + hash_table group_table[2]; + hash_table supergroup_table[2]; + // maintains count_distinct for every supergroup + // also maintains list of groups of this supergroup + hash_table sp_attribute[2]; + bool flush_finished; + unsigned int curr_table; + unsigned int curr_supertable; + unsigned int curr_attrtable; + unsigned int packet_count; + unsigned int ccc; + typename hash_table::iterator iter1; //find + typename hash_table::iterator flush_pos; + typename hash_table::iterator iter2; //find + typename hash_table::iterator super_flush_pos; + + public: + +// clean_operator(int schema_hadle): func(1){ + clean_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle){ + flush_finished = true; + curr_table = 0; + curr_supertable = 0; + curr_attrtable = 0; + packet_count = 0; + ccc = 0; + flush_pos = group_table[1-curr_table].end(); + super_flush_pos = supergroup_table[1-curr_supertable].end(); + } + + virtual int accept_tuple(host_tuple& tup, list& result){ + packet_count++; + // evict tuple from the old table + if(!flush_finished){ + partial_flush(result); + } + + //buffers to store keys + char buffer[sizeof(group)]; + + // key of the supergroup is all group-by attributes not including the once that define time window + // key of the supergroup is a subset of a group key + //cout << "clean_op: creating group" << "\n"; + group* grp = func.create_group(tup,buffer); +/*// Ignore temp tuples until we can fix their timestamps. +if(func.temp_status_received()){ + tup.free_tuple(); + return 0; +}*/ + state* curr_state; + int cd = 0; //count_distinct + + // do final clean at the border of the time window + if(func.flush_needed()){ + //cout << "number of records: " << packet_count << endl; + //cout << "number of EVAL records: " << ccc << endl; + packet_count = 0; + ccc = 0; + // for every supergroup - clean group table + //cout << "FINAL CLEANING PHASE: " << "\n"; + iter2 = supergroup_table[curr_supertable].begin(); + while (iter2 != supergroup_table[curr_supertable].end()) { + cd = ((*(sp_attribute[curr_attrtable].find((*iter2).first))).second)->count_distinct; + func.finalize_state((*iter2).second, cd); + clean((*iter2).first,(*iter2).second, true); + ++iter2; + } + + } + + if(!grp){ + //cout << "clean_op: failed to create group" << "\n"; + if(func.flush_needed()){ + flush(result); + superflush(); + } + if(func.temp_status_received()){ + host_tuple temp_tup; + if (!func.create_temp_status_tuple(temp_tup, flush_finished)) { + temp_tup.channel = output_channel; + result.push_back(temp_tup); + } + } + tup.free_tuple(); + return 0; + } + + // first flush everything from the old table if needed + // need it before anything else because of the definition of the key for supergroup + if(func.flush_needed()){ + //do flush of the old group table using state from the old supergroup table + flush(result); + //flush everything from the old supertable, swap tables; + superflush(); + } + + state* old_state; + + //supergroup exists in the new table + if ((iter2 = supergroup_table[curr_supertable].find(grp)) != supergroup_table[curr_supertable].end()){ + old_state = (*iter2).second; + + superattribute *temp = (*(sp_attribute[curr_attrtable].find(grp))).second; + cd = temp->count_distinct; + + if(!func.evaluate_predicate(tup,grp,old_state, cd)){ + ccc++; + tup.free_tuple(); + return 0; + } + // update superaggregates + func.update_plus_superaggr(tup, grp, old_state); + //((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct++; + temp->count_distinct++; + cd = temp->count_distinct; + curr_state = old_state; + } + //new supergroup + else{ + + //look up the group in the old table, + if((iter2 = supergroup_table[1-curr_supertable].find(grp)) != supergroup_table[1-curr_supertable].end()){ + cd = ((*(sp_attribute[1-curr_attrtable].find(grp))).second)->count_distinct; + //curr_state = new state((*iter2).second); + curr_state = new state(); + old_state = (*iter2).second; + + //if there is one - do reinitialization + func.reinitialize_state(tup, grp, curr_state,old_state, cd); + } + else{ + curr_state = new state(); + //if there isn't - do initialization + func.initialize_state(tup, grp, curr_state); + } + + // have to create new object for superkey + group* new_sgrp = new group(grp); + + // need to insert the supergroup into the hash table even if the predicate + // evaluates to false, since the state is initialized with the first tuple of the supergroup + + //insert supergroup into the hash table + supergroup_table[curr_supertable].insert(new_sgrp, curr_state); + // create superattribute object + superattribute* sp_attr = new superattribute(); + sp_attribute[curr_attrtable].insert(new_sgrp,sp_attr); + + + if(!func.evaluate_predicate(tup, grp, curr_state, cd)){ + ccc++; + tup.free_tuple(); + return 0; + } + + // update superaggregates + func.update_plus_superaggr(tup, grp, curr_state); + ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct++; + } + + aggregate* ag; + cd = ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct; + + if ((iter1 = group_table[curr_table].find(grp)) != group_table[curr_table].end()) { + //cout << "clean_op: group already exists" << "\n"; + aggregate* old_aggr = (*iter1).second; + + //adjust count_distinct due to aggregation + ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct--; + + //update group aggregates + func.update_aggregate(tup, grp, old_aggr, curr_state, cd); + ag = old_aggr; + } + else{ + //cout << "clean_op: creating a new group" << "\n"; + // create a copy of the group on the heap + group *new_grp = new group(grp); // need a copy constructor for groups + aggregate* aggr = new aggregate(); + // create an aggregate in preallocated buffer + aggr = func.create_aggregate(tup, grp, (char*)aggr, curr_state, cd); + //cout << "clean_op: inserting group into hash" << "\n"; + group_table[curr_table].insert(new_grp, aggr); + ag = aggr; + + // remember group in the list of supergroup + ((*(sp_attribute[curr_attrtable].find(new_grp))).second)->l.push_back(new_grp); + + } + + + //used just for print + bool do_print = false; + cd = ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct; + + //CLEANING WHEN + if(func.need_to_clean(grp, curr_state, cd)){ + clean(grp, curr_state, false); + do_print = true; + } + + tup.free_tuple(); + return 0; + } + + virtual int flush(list& result){ + + //cout << "clean_op: flush" << "\n"; + host_tuple tup; + unsigned int old_table = 1-curr_table; + unsigned int old_supertable = 1-curr_supertable; + unsigned int old_attr = 1-curr_attrtable; + typename hash_table::iterator iter; + iter = supergroup_table[old_supertable].begin(); + unsigned int cd = 0; + + // if the old table isn't empty, flush it now. + if (!group_table[old_table].empty()) { + //cout << "clean_op: old table is not empty, flushing everything immediately" << "\n"; + for (; flush_pos != group_table[old_table].end(); ++flush_pos) { + + bool failed = false; + if((iter = supergroup_table[old_supertable].find((*flush_pos).first)) != supergroup_table[old_supertable].end()){ + + cd = ((*(sp_attribute[old_attr].find((*flush_pos).first))).second)->count_distinct; + + tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, (*iter).second, cd ,failed); + if (!failed) { + //cout << "sampled\n"; + tup.channel = output_channel; + result.push_back(tup); + } + + // may not need deletion of the list pointer, since the supergroup will be flushed soon anyway. + } + + delete ((*flush_pos).first); + delete ((*flush_pos).second); + } + group_table[old_table].clear(); + group_table[old_table].rehash(); + } + + // swap tables, enable partial flush processing. + flush_pos = group_table[curr_table].begin(); + curr_table = old_table; + flush_finished = false; + + return 0; + } + + virtual int partial_flush(list& result){ + + //cout << "clean_op: partial flush" << "\n"; + host_tuple tup; + unsigned int old_table = 1-curr_table; + unsigned int old_supertable = 1-curr_supertable; + unsigned int old_attr = 1-curr_attrtable; + unsigned int i; + unsigned int cd = 0; + typename hash_table::iterator iter; + iter = supergroup_table[old_supertable].begin(); + + // emit up to _GB_FLUSH_PER_TABLE_ output tuples. + if (!group_table[old_table].empty()) { + for (i=0; flush_pos != group_table[old_table].end() && i<_GB_FLUSH_PER_TUPLE_; ++flush_pos, ++i) { + + bool failed = false; + // find supergroup of the group to be deleted + if((iter = supergroup_table[old_supertable].find((*flush_pos).first)) != supergroup_table[old_supertable].end()){ + + cd = ((*(sp_attribute[old_attr].find((*flush_pos).first))).second)->count_distinct; + + tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, (*iter).second, cd, failed); + if (!failed) { + + //cout << "sampled\n"; + tup.channel = output_channel; + result.push_back(tup); + } + + // may not need deletion of the list pointer, since the supergroup will be flushed soon anyway. + } + + delete ((*flush_pos).first); + delete ((*flush_pos).second); + } + } + + // finalize processing if empty. + if(flush_pos == group_table[old_table].end()) { + flush_finished = true; + group_table[old_table].clear(); + group_table[old_table].rehash(); + } + + return 0; + } + + virtual int superflush(){ + + // cout << "clean_op: superflush" << "\n"; + typename hash_table::iterator attr_flush_pos; + unsigned int old = 1-curr_supertable; + unsigned int attr_old = 1-curr_attrtable; + + // if the old supergroup table isn't empty, flush it now. + if (!supergroup_table[old].empty()) { + //cout << "clean_op: flush supertable" << "\n"; + for (; super_flush_pos != supergroup_table[old].end(); ++super_flush_pos) { + //find that supergroup in the attributes table + attr_flush_pos = sp_attribute[attr_old].find((*super_flush_pos).first); + + delete ((*super_flush_pos).first); + delete ((*super_flush_pos).second); + //flush superattribute table too + //delete ((*attr_flush_pos).first); + delete ((*attr_flush_pos).second); + } + supergroup_table[old].clear(); + supergroup_table[old].rehash(); + sp_attribute[attr_old].clear(); + sp_attribute[attr_old].rehash(); + } + + // swap supertables + super_flush_pos = supergroup_table[curr_supertable].begin(); + curr_supertable = old; + // swap attribute tables + curr_attrtable = attr_old; + + return 0; + } + + virtual int clean(group* sgr, state* st, bool final_clean){ + //cout << "clean_op: clean" << "\n"; + bool sample = false; + + typename list::iterator viter; + superattribute* glist = (*(sp_attribute[curr_attrtable].find(sgr))).second; + int cd = ((*(sp_attribute[curr_attrtable].find(sgr))).second)->count_distinct; +// glist->l.size(); +// group_table[curr_table].size(); + if (!glist->l.empty()){ + + //cout << "clean_op: list of group pointers is not empty" << "\n"; + viter = glist->l.begin(); + for(; viter != glist->l.end();){ + iter1 = group_table[curr_table].find(*viter); + aggregate* old_aggr = (*iter1).second; + + //if (((*iter1).first->valid)){ + if (final_clean){ + // HAVING + sample = func.final_sample_group((*iter1).first, old_aggr, st, cd); + } + else + // CLEANING BY + sample = func.sample_group((*iter1).first, old_aggr, st, cd); + + if(!sample){ + //cout << "clean_op: evicting group from the group table" << "\n"; + //update superaggregates + func.update_minus_superaggr((*iter1).first, old_aggr, st); + //delete group + group* g = (*iter1).first; + aggregate* a = (*iter1).second; + group_table[curr_table].erase((*iter1).first); + delete g; + delete a; + //update count_distinct + ((*(sp_attribute[curr_attrtable].find((*iter1).first))).second)->count_distinct--; + //remove pointer from supergroup + viter = glist->l.erase(viter); + } + else + ++viter; + } + } + + return 0; + } + + virtual int set_param_block(int sz, void* value){ + func.set_param_block(sz, value); + return 0; + } + + virtual int get_temp_status(host_tuple& result){ + result.channel = output_channel; + return func.create_temp_status_tuple(result, flush_finished); + } + + virtual int get_blocked_status(){ + return -1; + } + + unsigned int get_mem_footprint() { + return group_table[0].get_mem_footprint() + group_table[1].get_mem_footprint() + + supergroup_table[0].get_mem_footprint() + supergroup_table[1].get_mem_footprint() + + sp_attribute[0].get_mem_footprint() + sp_attribute[1].get_mem_footprint(); + } + +}; + +#endif