Initial commit
[com/gs-lite.git] / include / hfta / clean_operator.h
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #ifndef CLEAN_OPERATOR_H
17 #define CLEAN_OPERATOR_H
18
19 #include "host_tuple.h"
20 #include "base_operator.h"
21 #include <list>
22 #include "hash_table.h"
23 #include <iostream>
24
25 #define _GB_FLUSH_PER_TUPLE_ 1
26
27 // #define _C_O_DEBUG 1
28
29 using namespace std;
30
31 template <class clean_func, class group, class aggregate, class state, class hasher_func, class equal_func, class superhasher_func, class superequal_func> class clean_operator: public base_operator{
32
33  private:
34
35   class superattribute{
36   public:
37     unsigned int count_distinct;
38     list<group*> l;
39
40     superattribute(){
41       count_distinct = 0;
42     };
43     ~superattribute(){};
44   };
45
46   clean_func func;
47   hash_table<group*, aggregate*, hasher_func, equal_func> group_table[2];
48   hash_table<group*, state*, superhasher_func, superequal_func> supergroup_table[2];
49   // maintains count_distinct for every supergroup
50   // also maintains list of groups of this supergroup
51   hash_table<group*, superattribute*, superhasher_func, superequal_func> sp_attribute[2];
52   bool flush_finished;
53   unsigned int curr_table;
54   unsigned int curr_supertable;
55   unsigned int curr_attrtable;
56   unsigned int packet_count;
57   unsigned int ccc;
58   typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter1; //find
59   typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
60   typename hash_table<group*, state*, superhasher_func, superequal_func>::iterator iter2; //find
61   typename hash_table<group*, state*, superhasher_func, superequal_func>::iterator super_flush_pos;
62
63  public:
64
65 //  clean_operator(int schema_hadle): func(1){
66   clean_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle){
67     flush_finished = true;
68     curr_table = 0;
69     curr_supertable = 0;
70     curr_attrtable = 0;
71     packet_count = 0;
72     ccc = 0;
73     flush_pos = group_table[1-curr_table].end();
74     super_flush_pos = supergroup_table[1-curr_supertable].end();
75   }
76
77   virtual int accept_tuple(host_tuple& tup, list<host_tuple>& result){
78     packet_count++;
79     // evict tuple from the old table
80     if(!flush_finished){
81       partial_flush(result);
82     }
83
84     //buffers to store keys
85     char buffer[sizeof(group)];
86
87     // key of the supergroup is all group-by attributes not including the once that define time window
88     // key of the supergroup is a subset of a group key
89     //cout << "clean_op: creating group" << "\n";
90     group* grp = func.create_group(tup,buffer);
91 /*//                    Ignore temp tuples until we can fix their timestamps.
92 if(func.temp_status_received()){
93   tup.free_tuple();
94  return 0;
95 }*/
96     state* curr_state;
97     int cd = 0; //count_distinct
98
99     // do final clean at the border of the time window
100     if(func.flush_needed()){
101       //cout << "number of records: " << packet_count << endl;
102       //cout << "number of EVAL records: " << ccc << endl;
103       packet_count = 0;
104       ccc = 0;
105       // for every supergroup - clean group table
106       //cout << "FINAL CLEANING PHASE: " << "\n";
107       iter2 = supergroup_table[curr_supertable].begin();
108       while (iter2 != supergroup_table[curr_supertable].end()) {
109         cd =  ((*(sp_attribute[curr_attrtable].find((*iter2).first))).second)->count_distinct;
110         func.finalize_state((*iter2).second, cd);
111         clean((*iter2).first,(*iter2).second, true);
112         ++iter2;
113       }
114
115     }
116
117     if(!grp){
118       //cout << "clean_op: failed to create group" << "\n";
119       if(func.flush_needed()){
120         flush(result);
121         superflush();
122       }
123       if(func.temp_status_received()){
124         host_tuple temp_tup;
125         if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {
126           temp_tup.channel = output_channel;
127           result.push_back(temp_tup);
128         }
129       }
130       tup.free_tuple();
131       return 0;
132     }
133
134     // first flush everything from the old table if needed
135     // need it before anything else because of the definition of the key for supergroup
136     if(func.flush_needed()){
137       //do flush of the old group table using state from the old supergroup table
138       flush(result);
139       //flush everything from the old supertable, swap tables;
140       superflush();
141     }
142
143     state* old_state;
144
145      //supergroup exists in the new table
146     if ((iter2 = supergroup_table[curr_supertable].find(grp)) != supergroup_table[curr_supertable].end()){
147       old_state = (*iter2).second;
148
149        superattribute *temp = (*(sp_attribute[curr_attrtable].find(grp))).second;
150        cd = temp->count_distinct;
151
152        if(!func.evaluate_predicate(tup,grp,old_state, cd)){
153          ccc++;
154          tup.free_tuple();
155          return 0;
156        }
157        // update superaggregates
158        func.update_plus_superaggr(tup, grp, old_state);
159        //((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct++;
160        temp->count_distinct++;
161        cd = temp->count_distinct;
162        curr_state = old_state;
163     }
164     //new supergroup
165     else{
166
167       //look up the group in the old table,
168       if((iter2 = supergroup_table[1-curr_supertable].find(grp)) != supergroup_table[1-curr_supertable].end()){
169         cd  = ((*(sp_attribute[1-curr_attrtable].find(grp))).second)->count_distinct;
170         //curr_state = new state((*iter2).second);
171         curr_state = new state();
172         old_state = (*iter2).second;
173
174         //if there is one - do reinitialization
175         func.reinitialize_state(tup, grp, curr_state,old_state, cd);
176       }
177       else{
178         curr_state = new state();
179         //if there isn't - do initialization
180         func.initialize_state(tup, grp, curr_state);
181       }
182
183       // have to create new object for superkey
184       group* new_sgrp = new group(grp);
185
186       // need to insert the supergroup into the hash table even if the predicate
187       // evaluates to false, since the state is initialized with the first tuple of the supergroup
188
189       //insert supergroup into the hash table
190       supergroup_table[curr_supertable].insert(new_sgrp, curr_state);
191       // create superattribute object
192       superattribute* sp_attr = new superattribute();
193       sp_attribute[curr_attrtable].insert(new_sgrp,sp_attr);
194
195
196       if(!func.evaluate_predicate(tup, grp, curr_state, cd)){
197         ccc++;
198         tup.free_tuple();
199         return 0;
200       }
201
202       // update superaggregates
203       func.update_plus_superaggr(tup, grp, curr_state);
204       ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct++;
205     }
206
207     aggregate* ag;
208     cd = ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct;
209
210     if ((iter1 = group_table[curr_table].find(grp)) != group_table[curr_table].end()) {
211       //cout << "clean_op: group already exists" << "\n";
212       aggregate* old_aggr = (*iter1).second;
213
214       //adjust count_distinct due to aggregation
215       ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct--;
216
217       //update group aggregates
218       func.update_aggregate(tup, grp, old_aggr, curr_state, cd);
219       ag = old_aggr;
220     }
221     else{
222       //cout << "clean_op: creating a new group" << "\n";
223       // create a copy of the group on the heap
224       group *new_grp = new group(grp);  // need a copy constructor for groups
225       aggregate* aggr = new aggregate();
226       // create an aggregate in preallocated buffer
227       aggr = func.create_aggregate(tup, grp, (char*)aggr, curr_state, cd);
228       //cout << "clean_op: inserting group into hash" << "\n";
229       group_table[curr_table].insert(new_grp, aggr);
230       ag = aggr;
231
232       // remember group in the list of supergroup
233       ((*(sp_attribute[curr_attrtable].find(new_grp))).second)->l.push_back(new_grp);
234
235     }
236
237
238     //used just for print
239     bool do_print = false;
240     cd =  ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct;
241
242     //CLEANING WHEN
243     if(func.need_to_clean(grp, curr_state, cd)){
244       clean(grp, curr_state, false);
245       do_print = true;
246     }
247
248     tup.free_tuple();
249     return 0;
250   }
251
252   virtual int flush(list<host_tuple>& result){
253
254     //cout << "clean_op: flush" << "\n";
255     host_tuple tup;
256     unsigned int old_table = 1-curr_table;
257     unsigned int old_supertable = 1-curr_supertable;
258     unsigned int old_attr = 1-curr_attrtable;
259     typename hash_table<group*, state*, superhasher_func, superequal_func>::iterator iter;
260     iter = supergroup_table[old_supertable].begin();
261     unsigned int cd  = 0;
262
263     //      if the old table isn't empty, flush it now.
264     if (!group_table[old_table].empty()) {
265       //cout << "clean_op: old table is not empty, flushing everything immediately" << "\n";
266       for (; flush_pos != group_table[old_table].end(); ++flush_pos) {
267
268         bool failed = false;
269         if((iter = supergroup_table[old_supertable].find((*flush_pos).first)) != supergroup_table[old_supertable].end()){
270
271           cd = ((*(sp_attribute[old_attr].find((*flush_pos).first))).second)->count_distinct;
272
273           tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, (*iter).second, cd ,failed);
274           if (!failed) {
275             //cout << "sampled\n";
276             tup.channel = output_channel;
277             result.push_back(tup);
278           }
279
280           // may not need deletion of the list pointer, since the supergroup will be flushed soon anyway.
281         }
282
283         delete ((*flush_pos).first);
284         delete ((*flush_pos).second);
285       }
286       group_table[old_table].clear();
287       group_table[old_table].rehash();
288     }
289
290     //     swap tables, enable partial flush processing.
291     flush_pos = group_table[curr_table].begin();
292     curr_table = old_table;
293     flush_finished = false;
294
295     return 0;
296   }
297
298   virtual int partial_flush(list<host_tuple>& result){
299
300     //cout << "clean_op: partial flush" << "\n";
301     host_tuple tup;
302     unsigned int old_table = 1-curr_table;
303     unsigned int old_supertable = 1-curr_supertable;
304     unsigned int old_attr = 1-curr_attrtable;
305     unsigned int i;
306     unsigned int cd = 0;
307     typename hash_table<group*, state*, superhasher_func, superequal_func>::iterator iter;
308     iter = supergroup_table[old_supertable].begin();
309
310     //  emit up to _GB_FLUSH_PER_TABLE_ output tuples.
311     if (!group_table[old_table].empty()) {
312       for (i=0; flush_pos != group_table[old_table].end() && i<_GB_FLUSH_PER_TUPLE_; ++flush_pos, ++i) {
313
314         bool failed = false;
315         // find supergroup of the group to be deleted
316         if((iter = supergroup_table[old_supertable].find((*flush_pos).first)) != supergroup_table[old_supertable].end()){
317
318           cd = ((*(sp_attribute[old_attr].find((*flush_pos).first))).second)->count_distinct;
319
320           tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, (*iter).second, cd, failed);
321           if (!failed) {
322
323             //cout << "sampled\n";
324             tup.channel = output_channel;
325             result.push_back(tup);
326           }
327
328           // may not need deletion of the list pointer, since the supergroup will be flushed soon anyway.
329         }
330
331         delete ((*flush_pos).first);
332         delete ((*flush_pos).second);
333       }
334     }
335
336     //    finalize processing if empty.
337     if(flush_pos == group_table[old_table].end()) {
338       flush_finished = true;
339       group_table[old_table].clear();
340       group_table[old_table].rehash();
341     }
342
343     return 0;
344   }
345
346   virtual int superflush(){
347
348     // cout << "clean_op: superflush" << "\n";
349     typename hash_table<group*, superattribute*, superhasher_func, superequal_func>::iterator attr_flush_pos;
350     unsigned int old = 1-curr_supertable;
351     unsigned int attr_old = 1-curr_attrtable;
352
353     // if the old supergroup table isn't empty, flush it now.
354     if (!supergroup_table[old].empty()) {
355       //cout << "clean_op: flush supertable" << "\n";
356       for (; super_flush_pos != supergroup_table[old].end(); ++super_flush_pos) {
357         //find that supergroup in the attributes table
358         attr_flush_pos = sp_attribute[attr_old].find((*super_flush_pos).first);
359
360         delete ((*super_flush_pos).first);
361         delete ((*super_flush_pos).second);
362         //flush superattribute table too
363         //delete ((*attr_flush_pos).first);
364         delete ((*attr_flush_pos).second);
365       }
366       supergroup_table[old].clear();
367       supergroup_table[old].rehash();
368       sp_attribute[attr_old].clear();
369       sp_attribute[attr_old].rehash();
370     }
371
372     // swap supertables
373     super_flush_pos = supergroup_table[curr_supertable].begin();
374     curr_supertable = old;
375     // swap attribute tables
376     curr_attrtable = attr_old;
377
378     return 0;
379   }
380
381   virtual int clean(group* sgr, state* st, bool final_clean){
382     //cout << "clean_op: clean" << "\n";
383     bool sample = false;
384
385     typename list<group*>::iterator viter;
386     superattribute* glist = (*(sp_attribute[curr_attrtable].find(sgr))).second;
387     int cd = ((*(sp_attribute[curr_attrtable].find(sgr))).second)->count_distinct;
388 //    glist->l.size();
389 //    group_table[curr_table].size();
390     if (!glist->l.empty()){
391
392       //cout << "clean_op: list of group pointers is not empty" << "\n";
393       viter = glist->l.begin();
394       for(; viter != glist->l.end();){
395         iter1 = group_table[curr_table].find(*viter);
396         aggregate* old_aggr = (*iter1).second;
397
398         //if (((*iter1).first->valid)){
399         if (final_clean){
400             // HAVING
401           sample = func.final_sample_group((*iter1).first, old_aggr, st, cd);
402         }
403           else
404             // CLEANING BY
405             sample = func.sample_group((*iter1).first, old_aggr, st, cd);
406
407           if(!sample){
408             //cout << "clean_op: evicting group from the group table" << "\n";
409             //update superaggregates
410             func.update_minus_superaggr((*iter1).first, old_aggr, st);
411             //delete group
412             group* g = (*iter1).first;
413             aggregate* a = (*iter1).second;
414             group_table[curr_table].erase((*iter1).first);
415             delete g;
416             delete a;
417             //update count_distinct
418             ((*(sp_attribute[curr_attrtable].find((*iter1).first))).second)->count_distinct--;
419             //remove pointer from supergroup
420             viter = glist->l.erase(viter);
421           }
422           else
423             ++viter;
424       }
425     }
426
427     return 0;
428   }
429
430   virtual int set_param_block(int sz, void* value){
431     func.set_param_block(sz, value);
432     return 0;
433   }
434
435   virtual int get_temp_status(host_tuple& result){
436     result.channel = output_channel;
437     return func.create_temp_status_tuple(result, flush_finished);
438   }
439
440   virtual int get_blocked_status(){
441     return -1;
442   }
443
444   unsigned int get_mem_footprint() {
445                 return group_table[0].get_mem_footprint() + group_table[1].get_mem_footprint() +
446                 supergroup_table[0].get_mem_footprint() + supergroup_table[1].get_mem_footprint() +
447                 sp_attribute[0].get_mem_footprint() + sp_attribute[1].get_mem_footprint();
448   }
449
450 };
451
452 #endif