1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #ifndef CLEAN_OPERATOR_H
17 #define CLEAN_OPERATOR_H
19 #include "host_tuple.h"
20 #include "base_operator.h"
22 #include "hash_table.h"
25 #define _GB_FLUSH_PER_TUPLE_ 1
27 // #define _C_O_DEBUG 1
31 template <class clean_func, class group, class aggregate, class state, class hasher_func, class equal_func, class superhasher_func, class superequal_func> class clean_operator: public base_operator{
37 unsigned int count_distinct;
47 hash_table<group*, aggregate*, hasher_func, equal_func> group_table[2];
48 hash_table<group*, state*, superhasher_func, superequal_func> supergroup_table[2];
49 // maintains count_distinct for every supergroup
50 // also maintains list of groups of this supergroup
51 hash_table<group*, superattribute*, superhasher_func, superequal_func> sp_attribute[2];
53 unsigned int curr_table;
54 unsigned int curr_supertable;
55 unsigned int curr_attrtable;
56 unsigned int packet_count;
58 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter1; //find
59 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
60 typename hash_table<group*, state*, superhasher_func, superequal_func>::iterator iter2; //find
61 typename hash_table<group*, state*, superhasher_func, superequal_func>::iterator super_flush_pos;
65 // clean_operator(int schema_hadle): func(1){
66 clean_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle){
67 flush_finished = true;
73 flush_pos = group_table[1-curr_table].end();
74 super_flush_pos = supergroup_table[1-curr_supertable].end();
77 virtual int accept_tuple(host_tuple& tup, list<host_tuple>& result){
79 // evict tuple from the old table
81 partial_flush(result);
84 //buffers to store keys
85 char buffer[sizeof(group)];
87 // key of the supergroup is all group-by attributes not including the once that define time window
88 // key of the supergroup is a subset of a group key
89 //cout << "clean_op: creating group" << "\n";
90 group* grp = func.create_group(tup,buffer);
91 /*// Ignore temp tuples until we can fix their timestamps.
92 if(func.temp_status_received()){
97 int cd = 0; //count_distinct
99 // do final clean at the border of the time window
100 if(func.flush_needed()){
101 //cout << "number of records: " << packet_count << endl;
102 //cout << "number of EVAL records: " << ccc << endl;
105 // for every supergroup - clean group table
106 //cout << "FINAL CLEANING PHASE: " << "\n";
107 iter2 = supergroup_table[curr_supertable].begin();
108 while (iter2 != supergroup_table[curr_supertable].end()) {
109 cd = ((*(sp_attribute[curr_attrtable].find((*iter2).first))).second)->count_distinct;
110 func.finalize_state((*iter2).second, cd);
111 clean((*iter2).first,(*iter2).second, true);
118 //cout << "clean_op: failed to create group" << "\n";
119 if(func.flush_needed()){
123 if(func.temp_status_received()){
125 if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {
126 temp_tup.channel = output_channel;
127 result.push_back(temp_tup);
134 // first flush everything from the old table if needed
135 // need it before anything else because of the definition of the key for supergroup
136 if(func.flush_needed()){
137 //do flush of the old group table using state from the old supergroup table
139 //flush everything from the old supertable, swap tables;
145 //supergroup exists in the new table
146 if ((iter2 = supergroup_table[curr_supertable].find(grp)) != supergroup_table[curr_supertable].end()){
147 old_state = (*iter2).second;
149 superattribute *temp = (*(sp_attribute[curr_attrtable].find(grp))).second;
150 cd = temp->count_distinct;
152 if(!func.evaluate_predicate(tup,grp,old_state, cd)){
157 // update superaggregates
158 func.update_plus_superaggr(tup, grp, old_state);
159 //((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct++;
160 temp->count_distinct++;
161 cd = temp->count_distinct;
162 curr_state = old_state;
167 //look up the group in the old table,
168 if((iter2 = supergroup_table[1-curr_supertable].find(grp)) != supergroup_table[1-curr_supertable].end()){
169 cd = ((*(sp_attribute[1-curr_attrtable].find(grp))).second)->count_distinct;
170 //curr_state = new state((*iter2).second);
171 curr_state = new state();
172 old_state = (*iter2).second;
174 //if there is one - do reinitialization
175 func.reinitialize_state(tup, grp, curr_state,old_state, cd);
178 curr_state = new state();
179 //if there isn't - do initialization
180 func.initialize_state(tup, grp, curr_state);
183 // have to create new object for superkey
184 group* new_sgrp = new group(grp);
186 // need to insert the supergroup into the hash table even if the predicate
187 // evaluates to false, since the state is initialized with the first tuple of the supergroup
189 //insert supergroup into the hash table
190 supergroup_table[curr_supertable].insert(new_sgrp, curr_state);
191 // create superattribute object
192 superattribute* sp_attr = new superattribute();
193 sp_attribute[curr_attrtable].insert(new_sgrp,sp_attr);
196 if(!func.evaluate_predicate(tup, grp, curr_state, cd)){
202 // update superaggregates
203 func.update_plus_superaggr(tup, grp, curr_state);
204 ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct++;
208 cd = ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct;
210 if ((iter1 = group_table[curr_table].find(grp)) != group_table[curr_table].end()) {
211 //cout << "clean_op: group already exists" << "\n";
212 aggregate* old_aggr = (*iter1).second;
214 //adjust count_distinct due to aggregation
215 ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct--;
217 //update group aggregates
218 func.update_aggregate(tup, grp, old_aggr, curr_state, cd);
222 //cout << "clean_op: creating a new group" << "\n";
223 // create a copy of the group on the heap
224 group *new_grp = new group(grp); // need a copy constructor for groups
225 aggregate* aggr = new aggregate();
226 // create an aggregate in preallocated buffer
227 aggr = func.create_aggregate(tup, grp, (char*)aggr, curr_state, cd);
228 //cout << "clean_op: inserting group into hash" << "\n";
229 group_table[curr_table].insert(new_grp, aggr);
232 // remember group in the list of supergroup
233 ((*(sp_attribute[curr_attrtable].find(new_grp))).second)->l.push_back(new_grp);
238 //used just for print
239 bool do_print = false;
240 cd = ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct;
243 if(func.need_to_clean(grp, curr_state, cd)){
244 clean(grp, curr_state, false);
252 virtual int flush(list<host_tuple>& result){
254 //cout << "clean_op: flush" << "\n";
256 unsigned int old_table = 1-curr_table;
257 unsigned int old_supertable = 1-curr_supertable;
258 unsigned int old_attr = 1-curr_attrtable;
259 typename hash_table<group*, state*, superhasher_func, superequal_func>::iterator iter;
260 iter = supergroup_table[old_supertable].begin();
263 // if the old table isn't empty, flush it now.
264 if (!group_table[old_table].empty()) {
265 //cout << "clean_op: old table is not empty, flushing everything immediately" << "\n";
266 for (; flush_pos != group_table[old_table].end(); ++flush_pos) {
269 if((iter = supergroup_table[old_supertable].find((*flush_pos).first)) != supergroup_table[old_supertable].end()){
271 cd = ((*(sp_attribute[old_attr].find((*flush_pos).first))).second)->count_distinct;
273 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, (*iter).second, cd ,failed);
275 //cout << "sampled\n";
276 tup.channel = output_channel;
277 result.push_back(tup);
280 // may not need deletion of the list pointer, since the supergroup will be flushed soon anyway.
283 delete ((*flush_pos).first);
284 delete ((*flush_pos).second);
286 group_table[old_table].clear();
287 group_table[old_table].rehash();
290 // swap tables, enable partial flush processing.
291 flush_pos = group_table[curr_table].begin();
292 curr_table = old_table;
293 flush_finished = false;
298 virtual int partial_flush(list<host_tuple>& result){
300 //cout << "clean_op: partial flush" << "\n";
302 unsigned int old_table = 1-curr_table;
303 unsigned int old_supertable = 1-curr_supertable;
304 unsigned int old_attr = 1-curr_attrtable;
307 typename hash_table<group*, state*, superhasher_func, superequal_func>::iterator iter;
308 iter = supergroup_table[old_supertable].begin();
310 // emit up to _GB_FLUSH_PER_TABLE_ output tuples.
311 if (!group_table[old_table].empty()) {
312 for (i=0; flush_pos != group_table[old_table].end() && i<_GB_FLUSH_PER_TUPLE_; ++flush_pos, ++i) {
315 // find supergroup of the group to be deleted
316 if((iter = supergroup_table[old_supertable].find((*flush_pos).first)) != supergroup_table[old_supertable].end()){
318 cd = ((*(sp_attribute[old_attr].find((*flush_pos).first))).second)->count_distinct;
320 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, (*iter).second, cd, failed);
323 //cout << "sampled\n";
324 tup.channel = output_channel;
325 result.push_back(tup);
328 // may not need deletion of the list pointer, since the supergroup will be flushed soon anyway.
331 delete ((*flush_pos).first);
332 delete ((*flush_pos).second);
336 // finalize processing if empty.
337 if(flush_pos == group_table[old_table].end()) {
338 flush_finished = true;
339 group_table[old_table].clear();
340 group_table[old_table].rehash();
346 virtual int superflush(){
348 // cout << "clean_op: superflush" << "\n";
349 typename hash_table<group*, superattribute*, superhasher_func, superequal_func>::iterator attr_flush_pos;
350 unsigned int old = 1-curr_supertable;
351 unsigned int attr_old = 1-curr_attrtable;
353 // if the old supergroup table isn't empty, flush it now.
354 if (!supergroup_table[old].empty()) {
355 //cout << "clean_op: flush supertable" << "\n";
356 for (; super_flush_pos != supergroup_table[old].end(); ++super_flush_pos) {
357 //find that supergroup in the attributes table
358 attr_flush_pos = sp_attribute[attr_old].find((*super_flush_pos).first);
360 delete ((*super_flush_pos).first);
361 delete ((*super_flush_pos).second);
362 //flush superattribute table too
363 //delete ((*attr_flush_pos).first);
364 delete ((*attr_flush_pos).second);
366 supergroup_table[old].clear();
367 supergroup_table[old].rehash();
368 sp_attribute[attr_old].clear();
369 sp_attribute[attr_old].rehash();
373 super_flush_pos = supergroup_table[curr_supertable].begin();
374 curr_supertable = old;
375 // swap attribute tables
376 curr_attrtable = attr_old;
381 virtual int clean(group* sgr, state* st, bool final_clean){
382 //cout << "clean_op: clean" << "\n";
385 typename list<group*>::iterator viter;
386 superattribute* glist = (*(sp_attribute[curr_attrtable].find(sgr))).second;
387 int cd = ((*(sp_attribute[curr_attrtable].find(sgr))).second)->count_distinct;
389 // group_table[curr_table].size();
390 if (!glist->l.empty()){
392 //cout << "clean_op: list of group pointers is not empty" << "\n";
393 viter = glist->l.begin();
394 for(; viter != glist->l.end();){
395 iter1 = group_table[curr_table].find(*viter);
396 aggregate* old_aggr = (*iter1).second;
398 //if (((*iter1).first->valid)){
401 sample = func.final_sample_group((*iter1).first, old_aggr, st, cd);
405 sample = func.sample_group((*iter1).first, old_aggr, st, cd);
408 //cout << "clean_op: evicting group from the group table" << "\n";
409 //update superaggregates
410 func.update_minus_superaggr((*iter1).first, old_aggr, st);
412 group* g = (*iter1).first;
413 aggregate* a = (*iter1).second;
414 group_table[curr_table].erase((*iter1).first);
417 //update count_distinct
418 ((*(sp_attribute[curr_attrtable].find((*iter1).first))).second)->count_distinct--;
419 //remove pointer from supergroup
420 viter = glist->l.erase(viter);
430 virtual int set_param_block(int sz, void* value){
431 func.set_param_block(sz, value);
435 virtual int get_temp_status(host_tuple& result){
436 result.channel = output_channel;
437 return func.create_temp_status_tuple(result, flush_finished);
440 virtual int get_blocked_status(){
444 unsigned int get_mem_footprint() {
445 return group_table[0].get_mem_footprint() + group_table[1].get_mem_footprint() +
446 supergroup_table[0].get_mem_footprint() + supergroup_table[1].get_mem_footprint() +
447 sp_attribute[0].get_mem_footprint() + sp_attribute[1].get_mem_footprint();