a7e7862128f2d8648efe17c6583ba61419855d28
[com/gs-lite.git] / include / hfta / clean_operator.h
1 /* ------------------------------------------------\r
2 Copyright 2014 AT&T Intellectual Property\r
3    Licensed under the Apache License, Version 2.0 (the "License");\r
4    you may not use this file except in compliance with the License.\r
5    You may obtain a copy of the License at\r
6 \r
7      http://www.apache.org/licenses/LICENSE-2.0\r
8 \r
9    Unless required by applicable law or agreed to in writing, software\r
10    distributed under the License is distributed on an "AS IS" BASIS,\r
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12    See the License for the specific language governing permissions and\r
13    limitations under the License.\r
14  ------------------------------------------- */\r
15 \r
16 #ifndef CLEAN_OPERATOR_H\r
17 #define CLEAN_OPERATOR_H\r
18 \r
19 #include "host_tuple.h"\r
20 #include "base_operator.h"\r
21 #include <list>\r
22 #include "hash_table.h"\r
23 #include <iostream>\r
24 \r
25 #define _GB_FLUSH_PER_TUPLE_ 1\r
26 \r
27 // #define _C_O_DEBUG 1\r
28 \r
29 using namespace std;\r
30 \r
31 template <class clean_func, class group, class aggregate, class state, class hasher_func, class equal_func, class superhasher_func, class superequal_func> class clean_operator: public base_operator{\r
32 \r
33  private:\r
34 \r
35   class superattribute{\r
36   public:\r
37     unsigned int count_distinct;\r
38     list<group*> l;\r
39 \r
40     superattribute(){\r
41       count_distinct = 0;\r
42     };\r
43     ~superattribute(){};\r
44   };\r
45 \r
46   clean_func func;\r
47   hash_table<group*, aggregate*, hasher_func, equal_func> group_table[2];\r
48   hash_table<group*, state*, superhasher_func, superequal_func> supergroup_table[2];\r
49   // maintains count_distinct for every supergroup\r
50   // also maintains list of groups of this supergroup\r
51   hash_table<group*, superattribute*, superhasher_func, superequal_func> sp_attribute[2];\r
52   bool flush_finished;\r
53   unsigned int curr_table;\r
54   unsigned int curr_supertable;\r
55   unsigned int curr_attrtable;\r
56   unsigned int packet_count;\r
57   unsigned int ccc;\r
58   typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter1; //find\r
59   typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;\r
60   typename hash_table<group*, state*, superhasher_func, superequal_func>::iterator iter2; //find\r
61   typename hash_table<group*, state*, superhasher_func, superequal_func>::iterator super_flush_pos;\r
62 \r
63  public:\r
64 \r
65 //  clean_operator(int schema_hadle): func(1){\r
66   clean_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle){\r
67     flush_finished = true;\r
68     curr_table = 0;\r
69     curr_supertable = 0;\r
70     curr_attrtable = 0;\r
71     packet_count = 0;\r
72     ccc = 0;\r
73     flush_pos = group_table[1-curr_table].end();\r
74     super_flush_pos = supergroup_table[1-curr_supertable].end();\r
75   }\r
76 \r
77   virtual int accept_tuple(host_tuple& tup, list<host_tuple>& result){\r
78     packet_count++;\r
79     // evict tuple from the old table\r
80     if(!flush_finished){\r
81       partial_flush(result);\r
82     }\r
83 \r
84     //buffers to store keys\r
85     char buffer[sizeof(group)];\r
86 \r
87     // key of the supergroup is all group-by attributes not including the once that define time window\r
88     // key of the supergroup is a subset of a group key\r
89     //cout << "clean_op: creating group" << "\n";\r
90     group* grp = func.create_group(tup,buffer);\r
91 /*//                    Ignore temp tuples until we can fix their timestamps.\r
92 if(func.temp_status_received()){\r
93   tup.free_tuple();\r
94  return 0;\r
95 }*/\r
96     state* curr_state;\r
97     int cd = 0; //count_distinct\r
98 \r
99     // do final clean at the border of the time window\r
100     if(func.flush_needed()){\r
101       //cout << "number of records: " << packet_count << endl;\r
102       //cout << "number of EVAL records: " << ccc << endl;\r
103       packet_count = 0;\r
104       ccc = 0;\r
105       // for every supergroup - clean group table\r
106       //cout << "FINAL CLEANING PHASE: " << "\n";\r
107       iter2 = supergroup_table[curr_supertable].begin();\r
108       while (iter2 != supergroup_table[curr_supertable].end()) {\r
109         cd =  ((*(sp_attribute[curr_attrtable].find((*iter2).first))).second)->count_distinct;\r
110         func.finalize_state((*iter2).second, cd);\r
111         clean((*iter2).first,(*iter2).second, true);\r
112         ++iter2;\r
113       }\r
114 \r
115     }\r
116 \r
117     if(!grp){\r
118       //cout << "clean_op: failed to create group" << "\n";\r
119       if(func.flush_needed()){\r
120         flush(result);\r
121         superflush();\r
122       }\r
123       if(func.temp_status_received()){\r
124         host_tuple temp_tup;\r
125         if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {\r
126           temp_tup.channel = output_channel;\r
127           result.push_back(temp_tup);\r
128         }\r
129       }\r
130       tup.free_tuple();\r
131       return 0;\r
132     }\r
133 \r
134     // first flush everything from the old table if needed\r
135     // need it before anything else because of the definition of the key for supergroup\r
136     if(func.flush_needed()){\r
137       //do flush of the old group table using state from the old supergroup table\r
138       flush(result);\r
139       //flush everything from the old supertable, swap tables;\r
140       superflush();\r
141     }\r
142 \r
143     state* old_state;\r
144 \r
145      //supergroup exists in the new table\r
146     if ((iter2 = supergroup_table[curr_supertable].find(grp)) != supergroup_table[curr_supertable].end()){\r
147       old_state = (*iter2).second;\r
148 \r
149        superattribute *temp = (*(sp_attribute[curr_attrtable].find(grp))).second;\r
150        cd = temp->count_distinct;\r
151 \r
152        if(!func.evaluate_predicate(tup,grp,old_state, cd)){\r
153          ccc++;\r
154          tup.free_tuple();\r
155          return 0;\r
156        }\r
157        // update superaggregates\r
158        func.update_plus_superaggr(tup, grp, old_state);\r
159        //((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct++;\r
160        temp->count_distinct++;\r
161        cd = temp->count_distinct;\r
162        curr_state = old_state;\r
163     }\r
164     //new supergroup\r
165     else{\r
166 \r
167       //look up the group in the old table,\r
168       if((iter2 = supergroup_table[1-curr_supertable].find(grp)) != supergroup_table[1-curr_supertable].end()){\r
169         cd  = ((*(sp_attribute[1-curr_attrtable].find(grp))).second)->count_distinct;\r
170         //curr_state = new state((*iter2).second);\r
171         curr_state = new state();\r
172         old_state = (*iter2).second;\r
173 \r
174         //if there is one - do reinitialization\r
175         func.reinitialize_state(tup, grp, curr_state,old_state, cd);\r
176       }\r
177       else{\r
178         curr_state = new state();\r
179         //if there isn't - do initialization\r
180         func.initialize_state(tup, grp, curr_state);\r
181       }\r
182 \r
183       // have to create new object for superkey\r
184       group* new_sgrp = new group(grp);\r
185 \r
186       // need to insert the supergroup into the hash table even if the predicate\r
187       // evaluates to false, since the state is initialized with the first tuple of the supergroup\r
188 \r
189       //insert supergroup into the hash table\r
190       supergroup_table[curr_supertable].insert(new_sgrp, curr_state);\r
191       // create superattribute object\r
192       superattribute* sp_attr = new superattribute();\r
193       sp_attribute[curr_attrtable].insert(new_sgrp,sp_attr);\r
194 \r
195 \r
196       if(!func.evaluate_predicate(tup, grp, curr_state, cd)){\r
197         ccc++;\r
198         tup.free_tuple();\r
199         return 0;\r
200       }\r
201 \r
202       // update superaggregates\r
203       func.update_plus_superaggr(tup, grp, curr_state);\r
204       ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct++;\r
205     }\r
206 \r
207     aggregate* ag;\r
208     cd = ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct;\r
209 \r
210     if ((iter1 = group_table[curr_table].find(grp)) != group_table[curr_table].end()) {\r
211       //cout << "clean_op: group already exists" << "\n";\r
212       aggregate* old_aggr = (*iter1).second;\r
213 \r
214       //adjust count_distinct due to aggregation\r
215       ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct--;\r
216 \r
217       //update group aggregates\r
218       func.update_aggregate(tup, grp, old_aggr, curr_state, cd);\r
219       ag = old_aggr;\r
220     }\r
221     else{\r
222       //cout << "clean_op: creating a new group" << "\n";\r
223       // create a copy of the group on the heap\r
224       group *new_grp = new group(grp);  // need a copy constructor for groups\r
225       aggregate* aggr = new aggregate();\r
226       // create an aggregate in preallocated buffer\r
227       aggr = func.create_aggregate(tup, grp, (char*)aggr, curr_state, cd);\r
228       //cout << "clean_op: inserting group into hash" << "\n";\r
229       group_table[curr_table].insert(new_grp, aggr);\r
230       ag = aggr;\r
231 \r
232       // remember group in the list of supergroup\r
233       ((*(sp_attribute[curr_attrtable].find(new_grp))).second)->l.push_back(new_grp);\r
234 \r
235     }\r
236 \r
237 \r
238     //used just for print\r
239     bool do_print = false;\r
240     cd =  ((*(sp_attribute[curr_attrtable].find(grp))).second)->count_distinct;\r
241 \r
242     //CLEANING WHEN\r
243     if(func.need_to_clean(grp, curr_state, cd)){\r
244       clean(grp, curr_state, false);\r
245       do_print = true;\r
246     }\r
247 \r
248     tup.free_tuple();\r
249     return 0;\r
250   }\r
251 \r
252   virtual int flush(list<host_tuple>& result){\r
253 \r
254     //cout << "clean_op: flush" << "\n";\r
255     host_tuple tup;\r
256     unsigned int old_table = 1-curr_table;\r
257     unsigned int old_supertable = 1-curr_supertable;\r
258     unsigned int old_attr = 1-curr_attrtable;\r
259     typename hash_table<group*, state*, superhasher_func, superequal_func>::iterator iter;\r
260     iter = supergroup_table[old_supertable].begin();\r
261     unsigned int cd  = 0;\r
262 \r
263     //      if the old table isn't empty, flush it now.\r
264     if (!group_table[old_table].empty()) {\r
265       //cout << "clean_op: old table is not empty, flushing everything immediately" << "\n";\r
266       for (; flush_pos != group_table[old_table].end(); ++flush_pos) {\r
267 \r
268         bool failed = false;\r
269         if((iter = supergroup_table[old_supertable].find((*flush_pos).first)) != supergroup_table[old_supertable].end()){\r
270 \r
271           cd = ((*(sp_attribute[old_attr].find((*flush_pos).first))).second)->count_distinct;\r
272 \r
273           tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, (*iter).second, cd ,failed);\r
274           if (!failed) {\r
275             //cout << "sampled\n";\r
276             tup.channel = output_channel;\r
277             result.push_back(tup);\r
278           }\r
279 \r
280           // may not need deletion of the list pointer, since the supergroup will be flushed soon anyway.\r
281         }\r
282 \r
283         delete ((*flush_pos).first);\r
284         delete ((*flush_pos).second);\r
285       }\r
286       group_table[old_table].clear();\r
287       group_table[old_table].rehash();\r
288     }\r
289 \r
290     //     swap tables, enable partial flush processing.\r
291     flush_pos = group_table[curr_table].begin();\r
292     curr_table = old_table;\r
293     flush_finished = false;\r
294 \r
295     return 0;\r
296   }\r
297 \r
298   virtual int partial_flush(list<host_tuple>& result){\r
299 \r
300     //cout << "clean_op: partial flush" << "\n";\r
301     host_tuple tup;\r
302     unsigned int old_table = 1-curr_table;\r
303     unsigned int old_supertable = 1-curr_supertable;\r
304     unsigned int old_attr = 1-curr_attrtable;\r
305     unsigned int i;\r
306     unsigned int cd = 0;\r
307     typename hash_table<group*, state*, superhasher_func, superequal_func>::iterator iter;\r
308     iter = supergroup_table[old_supertable].begin();\r
309 \r
310     //  emit up to _GB_FLUSH_PER_TABLE_ output tuples.\r
311     if (!group_table[old_table].empty()) {\r
312       for (i=0; flush_pos != group_table[old_table].end() && i<_GB_FLUSH_PER_TUPLE_; ++flush_pos, ++i) {\r
313 \r
314         bool failed = false;\r
315         // find supergroup of the group to be deleted\r
316         if((iter = supergroup_table[old_supertable].find((*flush_pos).first)) != supergroup_table[old_supertable].end()){\r
317 \r
318           cd = ((*(sp_attribute[old_attr].find((*flush_pos).first))).second)->count_distinct;\r
319 \r
320           tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, (*iter).second, cd, failed);\r
321           if (!failed) {\r
322 \r
323             //cout << "sampled\n";\r
324             tup.channel = output_channel;\r
325             result.push_back(tup);\r
326           }\r
327 \r
328           // may not need deletion of the list pointer, since the supergroup will be flushed soon anyway.\r
329         }\r
330 \r
331         delete ((*flush_pos).first);\r
332         delete ((*flush_pos).second);\r
333       }\r
334     }\r
335 \r
336     //    finalize processing if empty.\r
337     if(flush_pos == group_table[old_table].end()) {\r
338       flush_finished = true;\r
339       group_table[old_table].clear();\r
340       group_table[old_table].rehash();\r
341     }\r
342 \r
343     return 0;\r
344   }\r
345 \r
346   virtual int superflush(){\r
347 \r
348     // cout << "clean_op: superflush" << "\n";\r
349     typename hash_table<group*, superattribute*, superhasher_func, superequal_func>::iterator attr_flush_pos;\r
350     unsigned int old = 1-curr_supertable;\r
351     unsigned int attr_old = 1-curr_attrtable;\r
352 \r
353     // if the old supergroup table isn't empty, flush it now.\r
354     if (!supergroup_table[old].empty()) {\r
355       //cout << "clean_op: flush supertable" << "\n";\r
356       for (; super_flush_pos != supergroup_table[old].end(); ++super_flush_pos) {\r
357         //find that supergroup in the attributes table\r
358         attr_flush_pos = sp_attribute[attr_old].find((*super_flush_pos).first);\r
359 \r
360         delete ((*super_flush_pos).first);\r
361         delete ((*super_flush_pos).second);\r
362         //flush superattribute table too\r
363         //delete ((*attr_flush_pos).first);\r
364         delete ((*attr_flush_pos).second);\r
365       }\r
366       supergroup_table[old].clear();\r
367       supergroup_table[old].rehash();\r
368       sp_attribute[attr_old].clear();\r
369       sp_attribute[attr_old].rehash();\r
370     }\r
371 \r
372     // swap supertables\r
373     super_flush_pos = supergroup_table[curr_supertable].begin();\r
374     curr_supertable = old;\r
375     // swap attribute tables\r
376     curr_attrtable = attr_old;\r
377 \r
378     return 0;\r
379   }\r
380 \r
381   virtual int clean(group* sgr, state* st, bool final_clean){\r
382     //cout << "clean_op: clean" << "\n";\r
383     bool sample = false;\r
384 \r
385     typename list<group*>::iterator viter;\r
386     superattribute* glist = (*(sp_attribute[curr_attrtable].find(sgr))).second;\r
387     int cd = ((*(sp_attribute[curr_attrtable].find(sgr))).second)->count_distinct;\r
388 //    glist->l.size();\r
389 //    group_table[curr_table].size();\r
390     if (!glist->l.empty()){\r
391 \r
392       //cout << "clean_op: list of group pointers is not empty" << "\n";\r
393       viter = glist->l.begin();\r
394       for(; viter != glist->l.end();){\r
395         iter1 = group_table[curr_table].find(*viter);\r
396         aggregate* old_aggr = (*iter1).second;\r
397 \r
398         //if (((*iter1).first->valid)){\r
399         if (final_clean){\r
400             // HAVING\r
401           sample = func.final_sample_group((*iter1).first, old_aggr, st, cd);\r
402         }\r
403           else\r
404             // CLEANING BY\r
405             sample = func.sample_group((*iter1).first, old_aggr, st, cd);\r
406 \r
407           if(!sample){\r
408             //cout << "clean_op: evicting group from the group table" << "\n";\r
409             //update superaggregates\r
410             func.update_minus_superaggr((*iter1).first, old_aggr, st);\r
411             //delete group\r
412             group* g = (*iter1).first;\r
413             aggregate* a = (*iter1).second;\r
414             group_table[curr_table].erase((*iter1).first);\r
415             delete g;\r
416             delete a;\r
417             //update count_distinct\r
418             ((*(sp_attribute[curr_attrtable].find((*iter1).first))).second)->count_distinct--;\r
419             //remove pointer from supergroup\r
420             viter = glist->l.erase(viter);\r
421           }\r
422           else\r
423             ++viter;\r
424       }\r
425     }\r
426 \r
427     return 0;\r
428   }\r
429 \r
430   virtual int set_param_block(int sz, void* value){\r
431     func.set_param_block(sz, value);\r
432     return 0;\r
433   }\r
434 \r
435   virtual int get_temp_status(host_tuple& result){\r
436     result.channel = output_channel;\r
437     return func.create_temp_status_tuple(result, flush_finished);\r
438   }\r
439 \r
440   virtual int get_blocked_status(){\r
441     return -1;\r
442   }\r
443 \r
444   unsigned int get_mem_footprint() {\r
445                 return group_table[0].get_mem_footprint() + group_table[1].get_mem_footprint() +\r
446                 supergroup_table[0].get_mem_footprint() + supergroup_table[1].get_mem_footprint() +\r
447                 sp_attribute[0].get_mem_footprint() + sp_attribute[1].get_mem_footprint();\r
448   }\r
449 \r
450 };\r
451 \r
452 #endif\r