Added quantiling UDAFs
[com/gs-lite.git] / include / hfta / groupby_operator_oop.h
1 /* ------------------------------------------------\r
2 Copyright 2014 AT&T Intellectual Property\r
3    Licensed under the Apache License, Version 2.0 (the "License");\r
4    you may not use this file except in compliance with the License.\r
5    You may obtain a copy of the License at\r
6 \r
7      http://www.apache.org/licenses/LICENSE-2.0\r
8 \r
9    Unless required by applicable law or agreed to in writing, software\r
10    distributed under the License is distributed on an "AS IS" BASIS,\r
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12    See the License for the specific language governing permissions and\r
13    limitations under the License.\r
14  ------------------------------------------- */\r
15 \r
16 #ifndef GROUPBY_OPERATOR_OOP_H\r
17 #define GROUPBY_OPERATOR_OOP_H\r
18 \r
19 #include "host_tuple.h"\r
20 #include "base_operator.h"\r
21 #include <list>\r
22 #include <vector>\r
23 #include "hash_table.h"\r
24 #include <cassert>\r
25 \r
26 //              TED: should be supplied by the groupby_func\r
27 #define _GB_FLUSH_PER_TUPLE_ 1\r
28 \r
29 /* max allowed disorder  -- Jin */\r
30 //              TED: should be supplied by the groupby_func\r
31 #define DISORDER_LEVEL 2\r
32 \r
33 //#define NDEBUG\r
34 \r
35 using namespace std;\r
36 \r
37 // ASSUME temporal_type is one of int, uint, llong, ullong\r
38 \r
39 template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func, class temporal_type>\r
40 class groupby_operator_oop : public base_operator {\r
41 private :\r
42         groupby_func func;\r
43 \r
44         /* a list of hash tables, which maintains aggregates for current window and also k previous ones  -- Jin */\r
45         vector<hash_table<group*, aggregate*, hasher_func, equal_func>* > group_tables;\r
46 \r
47         /* the minimum and maximum window id of the hash tables  -- Jin  */\r
48         temporal_type min_wid, max_wid;\r
49 \r
50 \r
51         bool flush_finished;\r
52         temporal_type curr_table;\r
53         typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;\r
54 \r
55         temporal_type last_flushed_temporal_gb;\r
56         temporal_type last_temporal_gb;\r
57 \r
58 int n_slow_flush;\r
59         int n_patterns;\r
60 \r
61 \r
62 public:\r
63         groupby_operator_oop(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {\r
64                 flush_finished = true;\r
65 \r
66                 min_wid = 0;\r
67                 max_wid = 0;\r
68 n_slow_flush = 0;\r
69                 n_patterns = func.n_groupby_patterns();\r
70         }\r
71 \r
72         ~groupby_operator_oop() {\r
73                 hash_table<group*, aggregate*, hasher_func, equal_func>* table;\r
74                 // delete all the elements in the group_tables list;\r
75                 while (!group_tables.empty()) {\r
76                         table = group_tables.back();\r
77                         group_tables.pop_back();\r
78                         table->clear();\r
79 //fprintf(stderr,"Deleting group table (c) at %lx\n",(gs_uint64_t)(table));\r
80                         delete (table);\r
81                 }\r
82 \r
83         }\r
84 \r
85         int accept_tuple(host_tuple& tup, list<host_tuple>& result) {\r
86 \r
87 \r
88                 // Push out completed groups\r
89                 if(!flush_finished) partial_flush(result);\r
90 \r
91                 // create buffer on the stack to store key object\r
92                 char buffer[sizeof(group)];\r
93 \r
94                 // extract the key information from the tuple and\r
95                 // copy it into buffer\r
96                 group* grp = func.create_group(tup, buffer);\r
97 \r
98 \r
99                 if (!grp) {\r
100 //printf("grp==NULL recieved ");\r
101                         if (func.temp_status_received()) {\r
102 //printf("temp status record ");\r
103                                 last_flushed_temporal_gb = func.get_last_flushed_gb ();\r
104                                 last_temporal_gb = func.get_last_gb ();\r
105                         }\r
106 //printf("\n");\r
107 \r
108 //fprintf(stderr,"min_wid=%d, max_wid=%d, last_temporal_gb=%d, last_flushed_temporal_gb=%d, flush_finished=%d\n",min_wid, max_wid, last_temporal_gb, last_flushed_temporal_gb, flush_finished);\r
109 \r
110                         /* no data has arrived, and so we ignore the temp tuples -- Jin */\r
111                         if (group_tables.size()>0) {\r
112 \r
113                                 gs_int64_t index;\r
114                                 if(last_flushed_temporal_gb >= min_wid){\r
115                                         index = last_flushed_temporal_gb - min_wid;\r
116                                 }else{\r
117                                         index = -(min_wid - last_flushed_temporal_gb);  // unsigned arithmetic\r
118                                 }\r
119 \r
120                                 if (func.flush_needed() && index>=0) {\r
121 #ifdef NDEBUG\r
122 //fprintf(stderr, "flush needed: last_flushed_gb  %u , min_wid %u \n", last_flushed_temporal_gb, min_wid);\r
123 #endif\r
124                                         // Init flush on first temp tuple -- Jin\r
125                                         if ( !flush_finished) {\r
126 #ifdef NDEBUG\r
127 //fprintf(stderr, "last_flushed_gb is %u, min_wid is %u \n", last_flushed_temporal_gb, min_wid);\r
128 #endif\r
129                                                 flush_old(result);\r
130                                         }\r
131                                         if (last_temporal_gb > min_wid && group_tables.size()>0) {\r
132                                                 flush_finished = false;\r
133                                         }\r
134 \r
135                                 // we start to flush from the head of the group tables -- Jin\r
136                                         if(group_tables.size()>0){\r
137                                                 flush_pos = group_tables[0]->begin();\r
138                                         }\r
139 \r
140 #ifdef NDEBUG\r
141 //fprintf(stderr, "after flush old \n");\r
142 #endif\r
143                                 }\r
144                         }\r
145 \r
146                         host_tuple temp_tup;\r
147                         if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {\r
148                                 temp_tup.channel = output_channel;\r
149                                 result.push_back(temp_tup);\r
150                         }\r
151 \r
152                         tup.free_tuple();\r
153                         return 0;\r
154                 }\r
155 \r
156 //fprintf (stderr, "after create group grp=%lx, curr_table = %d\n",(gs_uint64_t)grp, grp->get_curr_gb());\r
157 \r
158                 /*  This is a regular tuple -- Jin */\r
159                 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;\r
160                 /*  first, decide which hash table we need to work at  */\r
161                 curr_table = grp->get_curr_gb();\r
162                 if (max_wid == 0 && min_wid == 0) {\r
163                         group_tables.push_back((new hash_table<group*, aggregate*, hasher_func, equal_func>()));\r
164 //fprintf(stderr,"Added (1) hash tbl at %lx, pos=%d\n",(gs_uint64_t)(group_tables.back()),group_tables.size());\r
165                         max_wid = min_wid = curr_table;\r
166                 }\r
167                 if (curr_table < min_wid) {\r
168                         for (temporal_type i = curr_table; i < min_wid; i++){\r
169                                 group_tables.insert(group_tables.begin(), new hash_table<group*, aggregate*, hasher_func, equal_func>());\r
170 //fprintf(stderr,"Added (2) hash tbl at %lx, pos=%d\n",(gs_uint64_t)(group_tables.back()),group_tables.size());\r
171                         }\r
172                         min_wid = curr_table;\r
173                 }\r
174                 if (curr_table > max_wid) {\r
175                         hash_table<group*, aggregate*, hasher_func, equal_func>* pt;\r
176                         for (temporal_type i = max_wid; i < curr_table; i++) {\r
177                                 pt =new hash_table<group*, aggregate*, hasher_func, equal_func>();\r
178                                 group_tables.push_back(pt);\r
179 //fprintf(stderr,"Added (3) hash tbl at %lx, pos=%d\n",(gs_uint64_t)(group_tables.back()),group_tables.size());\r
180                         }\r
181 \r
182                         max_wid = curr_table;\r
183                 }\r
184                 gs_int64_t index = curr_table - min_wid;\r
185 \r
186                 if ((iter = group_tables[index]->find(grp)) != group_tables[index]->end()) {\r
187                         aggregate* old_aggr = (*iter).second;\r
188                         func.update_aggregate(tup, grp, old_aggr);\r
189                 }else{\r
190                         /* We only flush when a temp tuple is received, so we only check on temp tuple  -- Jin */\r
191                         // create a copy of the group on the heap\r
192                         if(n_patterns <= 1){\r
193 \r
194                                 group* new_grp = new group(grp);        // need a copy constructor for groups\r
195 \r
196                                 aggregate* aggr = new aggregate();\r
197 \r
198                         // create an aggregate in preallocated buffer\r
199                                 aggr = func.create_aggregate(tup, (char*)aggr);\r
200 \r
201 //                              hash_table<group*, aggregate*, hasher_func, equal_func>* pt;\r
202                                 group_tables[index]->insert(new_grp, aggr);\r
203                         }else{\r
204                                 int p;\r
205                                 for(p=0;p<n_patterns;++p){\r
206                                         group* new_grp = new group(grp, func.get_pattern(p));\r
207                                         aggregate* aggr = new aggregate();\r
208                                         aggr = func.create_aggregate(tup, (char*)aggr);\r
209                                         group_tables[index]->insert(new_grp, aggr);\r
210                                 }\r
211                         }\r
212                 }\r
213                 tup.free_tuple();\r
214                 return 0;\r
215         }\r
216 \r
217         int partial_flush(list<host_tuple>& result) {\r
218                 host_tuple tup;\r
219                 /* the hash table we should flush is func->last_flushed_gb_0  -- Jin */\r
220                 /* should we guarantee that everything before func->last_flushed_gb_0 also flushed ??? */\r
221 \r
222                 gs_int64_t i;\r
223 \r
224 //fprintf(stderr, "partial_flush size of group_tables = %u, min_wid is %u, max_wid is %u last_temporal_gb=%d \n", group_tables.size(), min_wid, max_wid, last_temporal_gb);\r
225         if(group_tables.size()==0){\r
226                 flush_finished = true;\r
227 //fprintf(stderr, "out of partial flush early \n");\r
228                 return 0;\r
229         }\r
230 \r
231 //                              emit up to _GB_FLUSH_PER_TABLE_ output tuples.\r
232                 if (!group_tables[0]->empty()) {\r
233                         for (i=0; flush_pos!=group_tables[0]->end() && i < _GB_FLUSH_PER_TUPLE_; ++flush_pos, ++i) {\r
234 n_slow_flush++;\r
235                                 bool failed = false;\r
236                                 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);\r
237                                 if (!failed) {\r
238                                         tup.channel = output_channel;\r
239                                         result.push_back(tup);\r
240                                 }\r
241 //fprintf(stderr,"partial_flush Deleting at Flush_pos=%lx \n",(gs_uint64_t)((*flush_pos).first));\r
242                                 delete ((*flush_pos).first);\r
243                                 delete ((*flush_pos).second);\r
244                         }\r
245                 }\r
246 \r
247 //                      Finalize processing if empty.\r
248                 if (flush_pos == group_tables[0]->end()) {\r
249                         /* one window is completely flushed, so recycle the hash table  -- Jin */\r
250 \r
251                         hash_table<group*, aggregate*, hasher_func, equal_func>* table =  group_tables[0];\r
252 \r
253 //fprintf(stderr,"partial_flush Delelting group table %lx\n",(gs_uint64_t)(group_tables[0]));\r
254                         group_tables[0]->clear();\r
255                         delete (group_tables[0]);\r
256 \r
257                         group_tables.erase(group_tables.begin());\r
258 \r
259                         min_wid++;\r
260 \r
261                         if (last_temporal_gb > min_wid && group_tables.size()>0) {\r
262                                 flush_pos = group_tables[0]->begin();\r
263 \r
264                         } else {\r
265                                 flush_finished = true;\r
266                         }\r
267                 }\r
268 //fprintf(stderr, "out of partial flush \n");\r
269                 return 0;\r
270         }\r
271 \r
272 \r
273         /* Where is this function called ??? */  /* externally */\r
274         int flush(list<host_tuple>& result) {\r
275                 host_tuple tup;\r
276                 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;\r
277                 /* the hash table we should flush is func->last_flushed_gb_0  -- Jin */\r
278                 /* should we guarantee that everything before func->last_flushed_gb_0 also flushed ??? */\r
279                 while ( group_tables.size() > 0) {\r
280                         if (!group_tables[0]->empty()) {\r
281                                 if (flush_finished)\r
282                                         flush_pos = group_tables[0]->begin();\r
283                                 for (; flush_pos != group_tables[0]->end(); ++flush_pos) {\r
284                                         bool failed = false;\r
285 \r
286                                         tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);\r
287 \r
288                                         if (!failed) {\r
289 \r
290                                                 tup.channel = output_channel;\r
291                                                 result.push_back(tup);\r
292                                         }\r
293 //fprintf(stderr,"flush_old Deleting at Flush_pos=%lx \n",(gs_uint64_t)((*flush_pos).first));\r
294                                         delete ((*flush_pos).first);\r
295                                         delete ((*flush_pos).second);\r
296 \r
297                                 }\r
298                         }\r
299                         min_wid++;\r
300 \r
301                 // remove the hashtable from group_tables\r
302                         hash_table<group*, aggregate*, hasher_func, equal_func>* table = group_tables[0];\r
303 \r
304                         table->clear();\r
305 //fprintf(stderr,"flush_old Delelting group table %lx\n",(gs_uint64_t)(group_tables[0]));\r
306                         delete (table);\r
307                         group_tables.erase(group_tables.begin());\r
308 \r
309                         if(group_tables.size()>0){\r
310                                 flush_pos = group_tables[0]->begin();\r
311                         }\r
312                 }\r
313 \r
314 \r
315 \r
316                 flush_finished = true;\r
317 \r
318                 return 0;\r
319         }\r
320 \r
321         /* flushes every hash table before last_flush_gb, and get ready to flush the next window  -- Jin */\r
322         int flush_old(list<host_tuple>& result) {\r
323                 host_tuple tup;\r
324                 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;\r
325                 gs_int64_t num, i;\r
326 \r
327 //fprintf(stderr, "flush_old size of group_tables = %u, min_wid is %u, max_wid is %u last_temporal_gb=%d, num=%d\n", group_tables.size(), min_wid, max_wid, last_temporal_gb, num);\r
328 \r
329                 num = last_temporal_gb - min_wid;\r
330 \r
331                 //If the old table isn't empty, flush it now.\r
332                 for (i = 0; i < num && group_tables.size() > 0; i++) {\r
333                         if (!group_tables[0]->empty()) {\r
334                                 for (; flush_pos != group_tables[0]->end(); ++flush_pos) {\r
335                                         bool failed = false;\r
336 \r
337                                         tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);\r
338 \r
339                                         if (!failed) {\r
340 \r
341                                                 tup.channel = output_channel;\r
342                                                 result.push_back(tup);\r
343                                         }\r
344 //fprintf(stderr,"flush_old Deleting at Flush_pos=%lx \n",(gs_uint64_t)((*flush_pos).first));\r
345                                         delete ((*flush_pos).first);\r
346                                         delete ((*flush_pos).second);\r
347 \r
348                                 }\r
349                         }\r
350                         min_wid++;\r
351 \r
352                 // remove the hashtable from group_tables\r
353                         hash_table<group*, aggregate*, hasher_func, equal_func>* table = group_tables[0];\r
354 \r
355                         table->clear();\r
356 //fprintf(stderr,"flush_old Delelting group table %lx\n",(gs_uint64_t)(group_tables[0]));\r
357                         delete (table);\r
358                         group_tables.erase(group_tables.begin());\r
359 \r
360                         if(group_tables.size()>0){\r
361                                 flush_pos = group_tables[0]->begin();\r
362                         }\r
363                 }\r
364 \r
365                 flush_finished = true;\r
366 \r
367 //fprintf(stderr, "end of flush_old \n");\r
368 \r
369                 return 0;\r
370         }\r
371 \r
372 \r
373         int set_param_block(int sz, void * value) {\r
374                 func.set_param_block(sz, value);\r
375                 return 0;\r
376         }\r
377 \r
378         int get_temp_status(host_tuple& result) {\r
379                 result.channel = output_channel;\r
380                 return func.create_temp_status_tuple(result, flush_finished);\r
381         }\r
382 \r
383         int get_blocked_status () {\r
384                 return -1;\r
385         }\r
386 \r
387         unsigned int get_mem_footprint() {\r
388                 unsigned int ret;\r
389                 unsigned int i;\r
390 \r
391                 for(i=0;i<group_tables.size();++i)\r
392                         ret += group_tables[i]->get_mem_footprint() ;\r
393 \r
394                 return ret;\r
395         }\r
396 };\r
397 \r
398 #endif  // GROUPBY_OPERATOR_OOP_H\r
399 \r