-/* ------------------------------------------------
-Copyright 2014 AT&T Intellectual Property
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ------------------------------------------- */
-
-#ifndef CLEAN_OPERATOR_H
-#define CLEAN_OPERATOR_H
-
-#include "host_tuple.h"
-#include "base_operator.h"
-#include <list>
-#include "hash_table.h"
-#include <iostream>
-
-#define _GB_FLUSH_PER_TUPLE_ 1
-
-// #define _C_O_DEBUG 1
-
-using namespace std;
-
-template <class clean_func, class group, class aggregate, class state, class hasher_func, class equal_func, class superhasher_func, class superequal_func> class clean_operator: public base_operator{
-
- private:
-
- class superattribute{
- public:
- unsigned int count_distinct;
- list<group*> l;
-
- superattribute(){
- count_distinct = 0;
- };
- ~superattribute(){};
- };
-
- clean_func func;
- hash_table<group*, aggregate*, hasher_func, equal_func> group_table[2];
- hash_table<group*, state*, superhasher_func, superequal_func> supergroup_table[2];
- // maintains count_distinct for every supergroup
- // also maintains list of groups of this supergroup
- hash_table<group*, superattribute*, superhasher_func, superequal_func> sp_attribute[2];
- bool flush_finished;
- unsigned int curr_table;
- unsigned int curr_supertable;
- unsigned int curr_attrtable;
- unsigned int packet_count;
- unsigned int ccc;
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter1; //find
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
- typename hash_table<group*, state*, superhasher_func, superequal_func>::iterator iter2; //find
- typename hash_table<group*, state*, superhasher_func, superequal_func>::iterator super_flush_pos;
-
- public:
-
-// clean_operator(int schema_hadle): func(1){
- clean_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle){
- flush_finished = true;
- curr_table = 0;
- curr_supertable = 0;
- curr_attrtable = 0;
- packet_count = 0;
- ccc = 0;
- flush_pos = group_table[1-curr_table].end();
- super_flush_pos = supergroup_table[1-curr_supertable].end();
- }
-
- virtual int accept_tuple(host_tuple& tup, list<host_tuple>& result){
- packet_count++;
- // evict tuple from the old table
- if(!flush_finished){
- partial_flush(result);
- }
-
- //buffers to store keys
- char buffer[sizeof(group)];
-
- // key of the supergroup is all group-by attributes not including the once that define time window
- // key of the supergroup is a subset of a group key
- //cout << "clean_op: creating group" << "\n";
- group* grp = func.create_group(tup,buffer);
-/*// Ignore temp tuples until we can fix their timestamps.
-if(func.temp_status_received()){
- tup.free_tuple();
- return 0;
-}*/
- state* curr_state;
- int cd = 0; //count_distinct
-
- // do final clean at the border of the time window
- if(func.flush_needed()){
- //cout << "number of records: " << packet_count << endl;
- //cout << "number of EVAL records: " << ccc << endl;
- packet_count = 0;
- ccc = 0;
- // for every supergroup - clean group table
- //cout << "FINAL CLEANING PHASE: " << "\n";
- iter2 = supergroup_table[curr_supertable].begin();
- while (iter2 != supergroup_table[curr_supertable].end()) {
- cd = ((*(sp_attribute[curr_attrtable].find((*iter2).first))).second)->count_distinct;
- func.finalize_state((*iter2).second, cd);
- clean((*iter2).first,(*iter2).second, true);
- ++iter2;
- }
-
- }
-
- if(!grp){
- //cout << "clean_op: failed to create group" << "\n";
- if(func.flush_needed()){
- flush(result);
- superflush();
- }
- if(func.temp_status_received()){
- host_tuple temp_tup;
- if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {
- temp_tup.channel = output_channel;
- result.push_back(temp_tup);
- }
- }
- tup.free_tuple();
- return 0;
- }
-
- // first flush everything from the old table if needed
- // need it before anything else because of the definition of the key for supergroup
- if(func.flush_needed()){
- //do flush of the old group table using state from the old supergroup table
- flush(result);
- //flush everything from the old supertable, swap tables;
- superflush();
- }
-
- state* old_state;
-
- //supergroup exists in the new table
- if ((iter2 = supergroup_table[curr_supertable].find(grp)) != supergroup_table[curr_supertable].end()){
- old_state = (*iter2).second;
-
- superattribute *temp = (*(sp_attribute[curr_attrtable].find(grp))).second;
- cd = temp->count_distinct;
-
- if(!func.evaluate_predicate(tup,grp,old_state, cd)){
- ccc++;
- tup.free_tuple();
- return 0;
- }
- // update superaggregates
- func.update_plus_superaggr(tup, grp, old_state);
- //((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct++;
- temp->count_distinct++;
- cd = temp->count_distinct;
- curr_state = old_state;
- }
- //new supergroup
- else{
-
- //look up the group in the old table,
- if((iter2 = supergroup_table[1-curr_supertable].find(grp)) != supergroup_table[1-curr_supertable].end()){
- cd = ((*(sp_attribute[1-curr_attrtable].find(grp))).second)->count_distinct;
- //curr_state = new state((*iter2).second);
- curr_state = new state();
- old_state = (*iter2).second;
-
- //if there is one - do reinitialization
- func.reinitialize_state(tup, grp, curr_state,old_state, cd);
- }
- else{
- curr_state = new state();
- //if there isn't - do initialization
- func.initialize_state(tup, grp, curr_state);
- }
-
- // have to create new object for superkey
- group* new_sgrp = new group(grp);
-
- // need to insert the supergroup into the hash table even if the predicate
- // evaluates to false, since the state is initialized with the first tuple of the supergroup
-
- //insert supergroup into the hash table
- supergroup_table[curr_supertable].insert(new_sgrp, curr_state);
- // create superattribute object
- superattribute* sp_attr = new superattribute();
- sp_attribute[curr_attrtable].insert(new_sgrp,sp_attr);
-
-
- if(!func.evaluate_predicate(tup, grp, curr_state, cd)){
- ccc++;
- tup.free_tuple();
- return 0;
- }
-
- // update superaggregates
- func.update_plus_superaggr(tup, grp, curr_state);
- ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct++;
- }
-
- aggregate* ag;
- cd = ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct;
-
- if ((iter1 = group_table[curr_table].find(grp)) != group_table[curr_table].end()) {
- //cout << "clean_op: group already exists" << "\n";
- aggregate* old_aggr = (*iter1).second;
-
- //adjust count_distinct due to aggregation
- ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct--;
-
- //update group aggregates
- func.update_aggregate(tup, grp, old_aggr, curr_state, cd);
- ag = old_aggr;
- }
- else{
- //cout << "clean_op: creating a new group" << "\n";
- // create a copy of the group on the heap
- group *new_grp = new group(grp); // need a copy constructor for groups
- aggregate* aggr = new aggregate();
- // create an aggregate in preallocated buffer
- aggr = func.create_aggregate(tup, grp, (char*)aggr, curr_state, cd);
- //cout << "clean_op: inserting group into hash" << "\n";
- group_table[curr_table].insert(new_grp, aggr);
- ag = aggr;
-
- // remember group in the list of supergroup
- ((*(sp_attribute[curr_attrtable].find(new_grp))).second)->l.push_back(new_grp);
-
- }
-
-
- //used just for print
- bool do_print = false;
- cd = ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct;
-
- //CLEANING WHEN
- if(func.need_to_clean(grp, curr_state, cd)){
- clean(grp, curr_state, false);
- do_print = true;
- }
-
- tup.free_tuple();
- return 0;
- }
-
- virtual int flush(list<host_tuple>& result){
-
- //cout << "clean_op: flush" << "\n";
- host_tuple tup;
- unsigned int old_table = 1-curr_table;
- unsigned int old_supertable = 1-curr_supertable;
- unsigned int old_attr = 1-curr_attrtable;
- typename hash_table<group*, state*, superhasher_func, superequal_func>::iterator iter;
- iter = supergroup_table[old_supertable].begin();
- unsigned int cd = 0;
-
- // if the old table isn't empty, flush it now.
- if (!group_table[old_table].empty()) {
- //cout << "clean_op: old table is not empty, flushing everything immediately" << "\n";
- for (; flush_pos != group_table[old_table].end(); ++flush_pos) {
-
- bool failed = false;
- if((iter = supergroup_table[old_supertable].find((*flush_pos).first)) != supergroup_table[old_supertable].end()){
-
- cd = ((*(sp_attribute[old_attr].find((*flush_pos).first))).second)->count_distinct;
-
- tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, (*iter).second, cd ,failed);
- if (!failed) {
- //cout << "sampled\n";
- tup.channel = output_channel;
- result.push_back(tup);
- }
-
- // may not need deletion of the list pointer, since the supergroup will be flushed soon anyway.
- }
-
- delete ((*flush_pos).first);
- delete ((*flush_pos).second);
- }
- group_table[old_table].clear();
- group_table[old_table].rehash();
- }
-
- // swap tables, enable partial flush processing.
- flush_pos = group_table[curr_table].begin();
- curr_table = old_table;
- flush_finished = false;
-
- return 0;
- }
-
- virtual int partial_flush(list<host_tuple>& result){
-
- //cout << "clean_op: partial flush" << "\n";
- host_tuple tup;
- unsigned int old_table = 1-curr_table;
- unsigned int old_supertable = 1-curr_supertable;
- unsigned int old_attr = 1-curr_attrtable;
- unsigned int i;
- unsigned int cd = 0;
- typename hash_table<group*, state*, superhasher_func, superequal_func>::iterator iter;
- iter = supergroup_table[old_supertable].begin();
-
- // emit up to _GB_FLUSH_PER_TABLE_ output tuples.
- if (!group_table[old_table].empty()) {
- for (i=0; flush_pos != group_table[old_table].end() && i<_GB_FLUSH_PER_TUPLE_; ++flush_pos, ++i) {
-
- bool failed = false;
- // find supergroup of the group to be deleted
- if((iter = supergroup_table[old_supertable].find((*flush_pos).first)) != supergroup_table[old_supertable].end()){
-
- cd = ((*(sp_attribute[old_attr].find((*flush_pos).first))).second)->count_distinct;
-
- tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, (*iter).second, cd, failed);
- if (!failed) {
-
- //cout << "sampled\n";
- tup.channel = output_channel;
- result.push_back(tup);
- }
-
- // may not need deletion of the list pointer, since the supergroup will be flushed soon anyway.
- }
-
- delete ((*flush_pos).first);
- delete ((*flush_pos).second);
- }
- }
-
- // finalize processing if empty.
- if(flush_pos == group_table[old_table].end()) {
- flush_finished = true;
- group_table[old_table].clear();
- group_table[old_table].rehash();
- }
-
- return 0;
- }
-
- virtual int superflush(){
-
- // cout << "clean_op: superflush" << "\n";
- typename hash_table<group*, superattribute*, superhasher_func, superequal_func>::iterator attr_flush_pos;
- unsigned int old = 1-curr_supertable;
- unsigned int attr_old = 1-curr_attrtable;
-
- // if the old supergroup table isn't empty, flush it now.
- if (!supergroup_table[old].empty()) {
- //cout << "clean_op: flush supertable" << "\n";
- for (; super_flush_pos != supergroup_table[old].end(); ++super_flush_pos) {
- //find that supergroup in the attributes table
- attr_flush_pos = sp_attribute[attr_old].find((*super_flush_pos).first);
-
- delete ((*super_flush_pos).first);
- delete ((*super_flush_pos).second);
- //flush superattribute table too
- //delete ((*attr_flush_pos).first);
- delete ((*attr_flush_pos).second);
- }
- supergroup_table[old].clear();
- supergroup_table[old].rehash();
- sp_attribute[attr_old].clear();
- sp_attribute[attr_old].rehash();
- }
-
- // swap supertables
- super_flush_pos = supergroup_table[curr_supertable].begin();
- curr_supertable = old;
- // swap attribute tables
- curr_attrtable = attr_old;
-
- return 0;
- }
-
- virtual int clean(group* sgr, state* st, bool final_clean){
- //cout << "clean_op: clean" << "\n";
- bool sample = false;
-
- typename list<group*>::iterator viter;
- superattribute* glist = (*(sp_attribute[curr_attrtable].find(sgr))).second;
- int cd = ((*(sp_attribute[curr_attrtable].find(sgr))).second)->count_distinct;
-// glist->l.size();
-// group_table[curr_table].size();
- if (!glist->l.empty()){
-
- //cout << "clean_op: list of group pointers is not empty" << "\n";
- viter = glist->l.begin();
- for(; viter != glist->l.end();){
- iter1 = group_table[curr_table].find(*viter);
- aggregate* old_aggr = (*iter1).second;
-
- //if (((*iter1).first->valid)){
- if (final_clean){
- // HAVING
- sample = func.final_sample_group((*iter1).first, old_aggr, st, cd);
- }
- else
- // CLEANING BY
- sample = func.sample_group((*iter1).first, old_aggr, st, cd);
-
- if(!sample){
- //cout << "clean_op: evicting group from the group table" << "\n";
- //update superaggregates
- func.update_minus_superaggr((*iter1).first, old_aggr, st);
- //delete group
- group* g = (*iter1).first;
- aggregate* a = (*iter1).second;
- group_table[curr_table].erase((*iter1).first);
- delete g;
- delete a;
- //update count_distinct
- ((*(sp_attribute[curr_attrtable].find((*iter1).first))).second)->count_distinct--;
- //remove pointer from supergroup
- viter = glist->l.erase(viter);
- }
- else
- ++viter;
- }
- }
-
- return 0;
- }
-
- virtual int set_param_block(int sz, void* value){
- func.set_param_block(sz, value);
- return 0;
- }
-
- virtual int get_temp_status(host_tuple& result){
- result.channel = output_channel;
- return func.create_temp_status_tuple(result, flush_finished);
- }
-
- virtual int get_blocked_status(){
- return -1;
- }
-
- unsigned int get_mem_footprint() {
- return group_table[0].get_mem_footprint() + group_table[1].get_mem_footprint() +
- supergroup_table[0].get_mem_footprint() + supergroup_table[1].get_mem_footprint() +
- sp_attribute[0].get_mem_footprint() + sp_attribute[1].get_mem_footprint();
- }
-
-};
-
-#endif
+/* ------------------------------------------------\r
+Copyright 2014 AT&T Intellectual Property\r
+ Licensed under the Apache License, Version 2.0 (the "License");\r
+ you may not use this file except in compliance with the License.\r
+ You may obtain a copy of the License at\r
+\r
+ http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+ ------------------------------------------- */\r
+\r
+#ifndef CLEAN_OPERATOR_H\r
+#define CLEAN_OPERATOR_H\r
+\r
+#include "host_tuple.h"\r
+#include "base_operator.h"\r
+#include <list>\r
+#include "hash_table.h"\r
+#include <iostream>\r
+\r
+#define _GB_FLUSH_PER_TUPLE_ 1\r
+\r
+// #define _C_O_DEBUG 1\r
+\r
+using namespace std;\r
+\r
+template <class clean_func, class group, class aggregate, class state, class hasher_func, class equal_func, class superhasher_func, class superequal_func> class clean_operator: public base_operator{\r
+\r
+ private:\r
+\r
+ class superattribute{\r
+ public:\r
+ unsigned int count_distinct;\r
+ list<group*> l;\r
+\r
+ superattribute(){\r
+ count_distinct = 0;\r
+ };\r
+ ~superattribute(){};\r
+ };\r
+\r
+ clean_func func;\r
+ hash_table<group*, aggregate*, hasher_func, equal_func> group_table[2];\r
+ hash_table<group*, state*, superhasher_func, superequal_func> supergroup_table[2];\r
+ // maintains count_distinct for every supergroup\r
+ // also maintains list of groups of this supergroup\r
+ hash_table<group*, superattribute*, superhasher_func, superequal_func> sp_attribute[2];\r
+ bool flush_finished;\r
+ unsigned int curr_table;\r
+ unsigned int curr_supertable;\r
+ unsigned int curr_attrtable;\r
+ unsigned int packet_count;\r
+ unsigned int ccc;\r
+ typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter1; //find\r
+ typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;\r
+ typename hash_table<group*, state*, superhasher_func, superequal_func>::iterator iter2; //find\r
+ typename hash_table<group*, state*, superhasher_func, superequal_func>::iterator super_flush_pos;\r
+\r
+ public:\r
+\r
+// clean_operator(int schema_hadle): func(1){\r
+ clean_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle){\r
+ flush_finished = true;\r
+ curr_table = 0;\r
+ curr_supertable = 0;\r
+ curr_attrtable = 0;\r
+ packet_count = 0;\r
+ ccc = 0;\r
+ flush_pos = group_table[1-curr_table].end();\r
+ super_flush_pos = supergroup_table[1-curr_supertable].end();\r
+ }\r
+\r
+ virtual int accept_tuple(host_tuple& tup, list<host_tuple>& result){\r
+ packet_count++;\r
+ // evict tuple from the old table\r
+ if(!flush_finished){\r
+ partial_flush(result);\r
+ }\r
+\r
+ //buffers to store keys\r
+ char buffer[sizeof(group)];\r
+\r
+ // key of the supergroup is all group-by attributes not including the once that define time window\r
+ // key of the supergroup is a subset of a group key\r
+ //cout << "clean_op: creating group" << "\n";\r
+ group* grp = func.create_group(tup,buffer);\r
+/*// Ignore temp tuples until we can fix their timestamps.\r
+if(func.temp_status_received()){\r
+ tup.free_tuple();\r
+ return 0;\r
+}*/\r
+ state* curr_state;\r
+ int cd = 0; //count_distinct\r
+\r
+ // do final clean at the border of the time window\r
+ if(func.flush_needed()){\r
+ //cout << "number of records: " << packet_count << endl;\r
+ //cout << "number of EVAL records: " << ccc << endl;\r
+ packet_count = 0;\r
+ ccc = 0;\r
+ // for every supergroup - clean group table\r
+ //cout << "FINAL CLEANING PHASE: " << "\n";\r
+ iter2 = supergroup_table[curr_supertable].begin();\r
+ while (iter2 != supergroup_table[curr_supertable].end()) {\r
+ cd = ((*(sp_attribute[curr_attrtable].find((*iter2).first))).second)->count_distinct;\r
+ func.finalize_state((*iter2).second, cd);\r
+ clean((*iter2).first,(*iter2).second, true);\r
+ ++iter2;\r
+ }\r
+\r
+ }\r
+\r
+ if(!grp){\r
+ //cout << "clean_op: failed to create group" << "\n";\r
+ if(func.flush_needed()){\r
+ flush(result);\r
+ superflush();\r
+ }\r
+ if(func.temp_status_received()){\r
+ host_tuple temp_tup;\r
+ if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {\r
+ temp_tup.channel = output_channel;\r
+ result.push_back(temp_tup);\r
+ }\r
+ }\r
+ tup.free_tuple();\r
+ return 0;\r
+ }\r
+\r
+ // first flush everything from the old table if needed\r
+ // need it before anything else because of the definition of the key for supergroup\r
+ if(func.flush_needed()){\r
+ //do flush of the old group table using state from the old supergroup table\r
+ flush(result);\r
+ //flush everything from the old supertable, swap tables;\r
+ superflush();\r
+ }\r
+\r
+ state* old_state;\r
+\r
+ //supergroup exists in the new table\r
+ if ((iter2 = supergroup_table[curr_supertable].find(grp)) != supergroup_table[curr_supertable].end()){\r
+ old_state = (*iter2).second;\r
+\r
+ superattribute *temp = (*(sp_attribute[curr_attrtable].find(grp))).second;\r
+ cd = temp->count_distinct;\r
+\r
+ if(!func.evaluate_predicate(tup,grp,old_state, cd)){\r
+ ccc++;\r
+ tup.free_tuple();\r
+ return 0;\r
+ }\r
+ // update superaggregates\r
+ func.update_plus_superaggr(tup, grp, old_state);\r
+ //((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct++;\r
+ temp->count_distinct++;\r
+ cd = temp->count_distinct;\r
+ curr_state = old_state;\r
+ }\r
+ //new supergroup\r
+ else{\r
+\r
+ //look up the group in the old table,\r
+ if((iter2 = supergroup_table[1-curr_supertable].find(grp)) != supergroup_table[1-curr_supertable].end()){\r
+ cd = ((*(sp_attribute[1-curr_attrtable].find(grp))).second)->count_distinct;\r
+ //curr_state = new state((*iter2).second);\r
+ curr_state = new state();\r
+ old_state = (*iter2).second;\r
+\r
+ //if there is one - do reinitialization\r
+ func.reinitialize_state(tup, grp, curr_state,old_state, cd);\r
+ }\r
+ else{\r
+ curr_state = new state();\r
+ //if there isn't - do initialization\r
+ func.initialize_state(tup, grp, curr_state);\r
+ }\r
+\r
+ // have to create new object for superkey\r
+ group* new_sgrp = new group(grp);\r
+\r
+ // need to insert the supergroup into the hash table even if the predicate\r
+ // evaluates to false, since the state is initialized with the first tuple of the supergroup\r
+\r
+ //insert supergroup into the hash table\r
+ supergroup_table[curr_supertable].insert(new_sgrp, curr_state);\r
+ // create superattribute object\r
+ superattribute* sp_attr = new superattribute();\r
+ sp_attribute[curr_attrtable].insert(new_sgrp,sp_attr);\r
+\r
+\r
+ if(!func.evaluate_predicate(tup, grp, curr_state, cd)){\r
+ ccc++;\r
+ tup.free_tuple();\r
+ return 0;\r
+ }\r
+\r
+ // update superaggregates\r
+ func.update_plus_superaggr(tup, grp, curr_state);\r
+ ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct++;\r
+ }\r
+\r
+ aggregate* ag;\r
+ cd = ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct;\r
+\r
+ if ((iter1 = group_table[curr_table].find(grp)) != group_table[curr_table].end()) {\r
+ //cout << "clean_op: group already exists" << "\n";\r
+ aggregate* old_aggr = (*iter1).second;\r
+\r
+ //adjust count_distinct due to aggregation\r
+ ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct--;\r
+\r
+ //update group aggregates\r
+ func.update_aggregate(tup, grp, old_aggr, curr_state, cd);\r
+ ag = old_aggr;\r
+ }\r
+ else{\r
+ //cout << "clean_op: creating a new group" << "\n";\r
+ // create a copy of the group on the heap\r
+ group *new_grp = new group(grp); // need a copy constructor for groups\r
+ aggregate* aggr = new aggregate();\r
+ // create an aggregate in preallocated buffer\r
+ aggr = func.create_aggregate(tup, grp, (char*)aggr, curr_state, cd);\r
+ //cout << "clean_op: inserting group into hash" << "\n";\r
+ group_table[curr_table].insert(new_grp, aggr);\r
+ ag = aggr;\r
+\r
+ // remember group in the list of supergroup\r
+ ((*(sp_attribute[curr_attrtable].find(new_grp))).second)->l.push_back(new_grp);\r
+\r
+ }\r
+\r
+\r
+ //used just for print\r
+ bool do_print = false;\r
+ cd = ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct;\r
+\r
+ //CLEANING WHEN\r
+ if(func.need_to_clean(grp, curr_state, cd)){\r
+ clean(grp, curr_state, false);\r
+ do_print = true;\r
+ }\r
+\r
+ tup.free_tuple();\r
+ return 0;\r
+ }\r
+\r
+ virtual int flush(list<host_tuple>& result){\r
+\r
+ //cout << "clean_op: flush" << "\n";\r
+ host_tuple tup;\r
+ unsigned int old_table = 1-curr_table;\r
+ unsigned int old_supertable = 1-curr_supertable;\r
+ unsigned int old_attr = 1-curr_attrtable;\r
+ typename hash_table<group*, state*, superhasher_func, superequal_func>::iterator iter;\r
+ iter = supergroup_table[old_supertable].begin();\r
+ unsigned int cd = 0;\r
+\r
+ // if the old table isn't empty, flush it now.\r
+ if (!group_table[old_table].empty()) {\r
+ //cout << "clean_op: old table is not empty, flushing everything immediately" << "\n";\r
+ for (; flush_pos != group_table[old_table].end(); ++flush_pos) {\r
+\r
+ bool failed = false;\r
+ if((iter = supergroup_table[old_supertable].find((*flush_pos).first)) != supergroup_table[old_supertable].end()){\r
+\r
+ cd = ((*(sp_attribute[old_attr].find((*flush_pos).first))).second)->count_distinct;\r
+\r
+ tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, (*iter).second, cd ,failed);\r
+ if (!failed) {\r
+ //cout << "sampled\n";\r
+ tup.channel = output_channel;\r
+ result.push_back(tup);\r
+ }\r
+\r
+ // may not need deletion of the list pointer, since the supergroup will be flushed soon anyway.\r
+ }\r
+\r
+ delete ((*flush_pos).first);\r
+ delete ((*flush_pos).second);\r
+ }\r
+ group_table[old_table].clear();\r
+ group_table[old_table].rehash();\r
+ }\r
+\r
+ // swap tables, enable partial flush processing.\r
+ flush_pos = group_table[curr_table].begin();\r
+ curr_table = old_table;\r
+ flush_finished = false;\r
+\r
+ return 0;\r
+ }\r
+\r
+ virtual int partial_flush(list<host_tuple>& result){\r
+\r
+ //cout << "clean_op: partial flush" << "\n";\r
+ host_tuple tup;\r
+ unsigned int old_table = 1-curr_table;\r
+ unsigned int old_supertable = 1-curr_supertable;\r
+ unsigned int old_attr = 1-curr_attrtable;\r
+ unsigned int i;\r
+ unsigned int cd = 0;\r
+ typename hash_table<group*, state*, superhasher_func, superequal_func>::iterator iter;\r
+ iter = supergroup_table[old_supertable].begin();\r
+\r
+ // emit up to _GB_FLUSH_PER_TABLE_ output tuples.\r
+ if (!group_table[old_table].empty()) {\r
+ for (i=0; flush_pos != group_table[old_table].end() && i<_GB_FLUSH_PER_TUPLE_; ++flush_pos, ++i) {\r
+\r
+ bool failed = false;\r
+ // find supergroup of the group to be deleted\r
+ if((iter = supergroup_table[old_supertable].find((*flush_pos).first)) != supergroup_table[old_supertable].end()){\r
+\r
+ cd = ((*(sp_attribute[old_attr].find((*flush_pos).first))).second)->count_distinct;\r
+\r
+ tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, (*iter).second, cd, failed);\r
+ if (!failed) {\r
+\r
+ //cout << "sampled\n";\r
+ tup.channel = output_channel;\r
+ result.push_back(tup);\r
+ }\r
+\r
+ // may not need deletion of the list pointer, since the supergroup will be flushed soon anyway.\r
+ }\r
+\r
+ delete ((*flush_pos).first);\r
+ delete ((*flush_pos).second);\r
+ }\r
+ }\r
+\r
+ // finalize processing if empty.\r
+ if(flush_pos == group_table[old_table].end()) {\r
+ flush_finished = true;\r
+ group_table[old_table].clear();\r
+ group_table[old_table].rehash();\r
+ }\r
+\r
+ return 0;\r
+ }\r
+\r
+ virtual int superflush(){\r
+\r
+ // cout << "clean_op: superflush" << "\n";\r
+ typename hash_table<group*, superattribute*, superhasher_func, superequal_func>::iterator attr_flush_pos;\r
+ unsigned int old = 1-curr_supertable;\r
+ unsigned int attr_old = 1-curr_attrtable;\r
+\r
+ // if the old supergroup table isn't empty, flush it now.\r
+ if (!supergroup_table[old].empty()) {\r
+ //cout << "clean_op: flush supertable" << "\n";\r
+ for (; super_flush_pos != supergroup_table[old].end(); ++super_flush_pos) {\r
+ //find that supergroup in the attributes table\r
+ attr_flush_pos = sp_attribute[attr_old].find((*super_flush_pos).first);\r
+\r
+ delete ((*super_flush_pos).first);\r
+ delete ((*super_flush_pos).second);\r
+ //flush superattribute table too\r
+ //delete ((*attr_flush_pos).first);\r
+ delete ((*attr_flush_pos).second);\r
+ }\r
+ supergroup_table[old].clear();\r
+ supergroup_table[old].rehash();\r
+ sp_attribute[attr_old].clear();\r
+ sp_attribute[attr_old].rehash();\r
+ }\r
+\r
+ // swap supertables\r
+ super_flush_pos = supergroup_table[curr_supertable].begin();\r
+ curr_supertable = old;\r
+ // swap attribute tables\r
+ curr_attrtable = attr_old;\r
+\r
+ return 0;\r
+ }\r
+\r
+ virtual int clean(group* sgr, state* st, bool final_clean){\r
+ //cout << "clean_op: clean" << "\n";\r
+ bool sample = false;\r
+\r
+ typename list<group*>::iterator viter;\r
+ superattribute* glist = (*(sp_attribute[curr_attrtable].find(sgr))).second;\r
+ int cd = ((*(sp_attribute[curr_attrtable].find(sgr))).second)->count_distinct;\r
+// glist->l.size();\r
+// group_table[curr_table].size();\r
+ if (!glist->l.empty()){\r
+\r
+ //cout << "clean_op: list of group pointers is not empty" << "\n";\r
+ viter = glist->l.begin();\r
+ for(; viter != glist->l.end();){\r
+ iter1 = group_table[curr_table].find(*viter);\r
+ aggregate* old_aggr = (*iter1).second;\r
+\r
+ //if (((*iter1).first->valid)){\r
+ if (final_clean){\r
+ // HAVING\r
+ sample = func.final_sample_group((*iter1).first, old_aggr, st, cd);\r
+ }\r
+ else\r
+ // CLEANING BY\r
+ sample = func.sample_group((*iter1).first, old_aggr, st, cd);\r
+\r
+ if(!sample){\r
+ //cout << "clean_op: evicting group from the group table" << "\n";\r
+ //update superaggregates\r
+ func.update_minus_superaggr((*iter1).first, old_aggr, st);\r
+ //delete group\r
+ group* g = (*iter1).first;\r
+ aggregate* a = (*iter1).second;\r
+ group_table[curr_table].erase((*iter1).first);\r
+ delete g;\r
+ delete a;\r
+ //update count_distinct\r
+ ((*(sp_attribute[curr_attrtable].find((*iter1).first))).second)->count_distinct--;\r
+ //remove pointer from supergroup\r
+ viter = glist->l.erase(viter);\r
+ }\r
+ else\r
+ ++viter;\r
+ }\r
+ }\r
+\r
+ return 0;\r
+ }\r
+\r
+ virtual int set_param_block(int sz, void* value){\r
+ func.set_param_block(sz, value);\r
+ return 0;\r
+ }\r
+\r
+ virtual int get_temp_status(host_tuple& result){\r
+ result.channel = output_channel;\r
+ return func.create_temp_status_tuple(result, flush_finished);\r
+ }\r
+\r
+ virtual int get_blocked_status(){\r
+ return -1;\r
+ }\r
+\r
+ unsigned int get_mem_footprint() {\r
+ return group_table[0].get_mem_footprint() + group_table[1].get_mem_footprint() +\r
+ supergroup_table[0].get_mem_footprint() + supergroup_table[1].get_mem_footprint() +\r
+ sp_attribute[0].get_mem_footprint() + sp_attribute[1].get_mem_footprint();\r
+ }\r
+\r
+};\r
+\r
+#endif\r