Added quantiling UDAFs
[com/gs-lite.git] / include / hfta / groupby_operator.h
1 /* ------------------------------------------------\r
2 Copyright 2014 AT&T Intellectual Property\r
3    Licensed under the Apache License, Version 2.0 (the "License");\r
4    you may not use this file except in compliance with the License.\r
5    You may obtain a copy of the License at\r
6 \r
7      http://www.apache.org/licenses/LICENSE-2.0\r
8 \r
9    Unless required by applicable law or agreed to in writing, software\r
10    distributed under the License is distributed on an "AS IS" BASIS,\r
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12    See the License for the specific language governing permissions and\r
13    limitations under the License.\r
14  ------------------------------------------- */\r
15 \r
16 #ifndef GROUPBY_OPERATOR_H\r
17 #define GROUPBY_OPERATOR_H\r
18 \r
19 #include "host_tuple.h"\r
20 #include "base_operator.h"\r
21 #include <list>\r
22 #include "hash_table.h"\r
23 \r
24 #define _GB_FLUSH_PER_TUPLE_ 1\r
25 \r
26 using namespace std;\r
27 \r
28 template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func>\r
29 class groupby_operator : public base_operator {\r
30 private :\r
31         groupby_func func;\r
32         hash_table<group*, aggregate*, hasher_func, equal_func> group_table[2];\r
33         bool flush_finished;\r
34         unsigned int curr_table;\r
35         typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;\r
36         int n_patterns;\r
37 \r
38 \r
39 \r
40 public:\r
41         groupby_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {\r
42                 flush_finished = true;\r
43                 curr_table = 0;\r
44                 flush_pos = group_table[1-curr_table].end();\r
45                 n_patterns = func.n_groupby_patterns();\r
46         }\r
47 \r
48         int accept_tuple(host_tuple& tup, list<host_tuple>& result) {\r
49 \r
50 //                      Push out completed groups\r
51 \r
52                 if(!flush_finished) partial_flush(result);\r
53 \r
54                 // create buffer on the stack to store key object\r
55                 char buffer[sizeof(group)];\r
56 \r
57                 // extract the key information from the tuple and\r
58                 // copy it into buffer\r
59                 group* grp = func.create_group(tup, buffer);\r
60                 if (!grp) {\r
61 /*\r
62 //                      Ignore temp tuples until we can fix their timestamps.\r
63 if (func.temp_status_received()) {\r
64  tup.free_tuple();\r
65  return 0;\r
66 }*/\r
67                         if (func.flush_needed()){\r
68                                 flush_old(result);\r
69                 }\r
70                         if (func.temp_status_received()) {\r
71                                 host_tuple temp_tup;\r
72                                 if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {\r
73                                         temp_tup.channel = output_channel;\r
74                                         result.push_back(temp_tup);\r
75                                 }\r
76                         }\r
77                         tup.free_tuple();\r
78                         return 0;\r
79                 }\r
80 \r
81                 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;\r
82                 if ((iter = group_table[curr_table].find(grp)) != group_table[curr_table].end()) {\r
83 //                              Temporal GBvar is part of the group so no flush is needed.\r
84                         aggregate* old_aggr = (*iter).second;\r
85                         func.update_aggregate(tup, grp, old_aggr);\r
86                 }else{\r
87                         if (func.flush_needed()) {\r
88                                 flush_old(result);\r
89                         }\r
90                         if(n_patterns <= 1){\r
91                         // create a copy of the group on the heap\r
92                                 group* new_grp = new group(grp);        // need a copy constructor for groups\r
93 //                      aggregate* aggr = (aggregate*)malloc(sizeof(aggregate));\r
94                                 aggregate* aggr = new aggregate();\r
95                         // create an aggregate in preallocated buffer\r
96                                 aggr = func.create_aggregate(tup, (char*)aggr);\r
97 \r
98                                 group_table[curr_table].insert(new_grp, aggr);\r
99                         }else{\r
100                                 int p;\r
101                                 for(p=0;p<n_patterns;++p){\r
102                                         group* new_grp = new group(grp, func.get_pattern(p));\r
103                                         aggregate* aggr = new aggregate();\r
104                                         aggr = func.create_aggregate(tup, (char*)aggr);\r
105                                         group_table[curr_table].insert(new_grp, aggr);\r
106                                 }\r
107                         }\r
108                 }\r
109                 tup.free_tuple();\r
110                 return 0;\r
111         }\r
112 \r
113         int partial_flush(list<host_tuple>& result) {\r
114                 host_tuple tup;\r
115                 unsigned int old_table = 1-curr_table;\r
116                 unsigned int i;\r
117 \r
118 //                              emit up to _GB_FLUSH_PER_TABLE_ output tuples.\r
119                 if (!group_table[old_table].empty()) {\r
120                         for (i=0; flush_pos != group_table[old_table].end() && i<_GB_FLUSH_PER_TUPLE_; ++flush_pos, ++i) {\r
121                                 bool failed = false;\r
122                                 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);\r
123                                 if (!failed) {\r
124                                         tup.channel = output_channel;\r
125                                         result.push_back(tup);\r
126                                 }\r
127                                 delete ((*flush_pos).first);\r
128                                 delete ((*flush_pos).second);\r
129 //                              free((*flush_pos).second);\r
130                         }\r
131                 }\r
132 \r
133 //                      Finalize processing if empty.\r
134                 if(flush_pos == group_table[old_table].end()) {\r
135                         flush_finished = true;\r
136                         group_table[old_table].clear();\r
137                         group_table[old_table].rehash();\r
138                 }\r
139                 return 0;\r
140         }\r
141 \r
142         int flush(list<host_tuple>& result) {\r
143                 host_tuple tup;\r
144                 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;\r
145                 unsigned int old_table = 1-curr_table;\r
146 \r
147 //                      If the old table isn't empty, flush it now.\r
148                 if (!group_table[old_table].empty()) {\r
149                         for (; flush_pos != group_table[old_table].end(); ++flush_pos) {\r
150                                 bool failed = false;\r
151                                 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);\r
152                                 if (!failed) {\r
153 \r
154                                         tup.channel = output_channel;\r
155                                         result.push_back(tup);\r
156                                 }\r
157                                 delete ((*flush_pos).first);\r
158                                 delete ((*flush_pos).second);\r
159 //                              free((*flush_pos).second);\r
160                         }\r
161                         group_table[old_table].clear();\r
162                         group_table[old_table].rehash();\r
163                 }\r
164 \r
165                 flush_pos = group_table[curr_table].begin();\r
166 //                      If the old table isn't empty, flush it now.\r
167                 if (!group_table[curr_table].empty()) {\r
168                         for (; flush_pos != group_table[curr_table].end(); ++flush_pos) {\r
169                                 bool failed = false;\r
170                                 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);\r
171                                 if (!failed) {\r
172 \r
173                                         tup.channel = output_channel;\r
174                                         result.push_back(tup);\r
175                                 }\r
176                                 delete ((*flush_pos).first);\r
177                                 delete ((*flush_pos).second);\r
178 //                              free((*flush_pos).second);\r
179                         }\r
180                         group_table[curr_table].clear();\r
181                 }\r
182 \r
183                 flush_finished = true;\r
184 \r
185                 return 0;\r
186         }\r
187 \r
188         int flush_old(list<host_tuple>& result) {\r
189                 host_tuple tup;\r
190                 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;\r
191                 unsigned int old_table = 1-curr_table;\r
192 \r
193 //                      If the old table isn't empty, flush it now.\r
194                 if (!group_table[old_table].empty()) {\r
195                         for (; flush_pos != group_table[old_table].end(); ++flush_pos) {\r
196                                 bool failed = false;\r
197                                 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);\r
198                                 if (!failed) {\r
199 \r
200                                         tup.channel = output_channel;\r
201                                         result.push_back(tup);\r
202                                 }\r
203                                 delete ((*flush_pos).first);\r
204                                 delete ((*flush_pos).second);\r
205 //                              free((*flush_pos).second);\r
206                         }\r
207                         group_table[old_table].clear();\r
208                         group_table[old_table].rehash();\r
209                 }\r
210 \r
211 //                      swap tables, enable partial flush processing.\r
212                 flush_pos = group_table[curr_table].begin();\r
213                 curr_table = old_table;\r
214                 flush_finished = false;\r
215 \r
216                 return 0;\r
217         }\r
218 \r
219         int set_param_block(int sz, void * value) {\r
220                 func.set_param_block(sz, value);\r
221                 return 0;\r
222         }\r
223 \r
224         int get_temp_status(host_tuple& result) {\r
225                 result.channel = output_channel;\r
226                 return func.create_temp_status_tuple(result, flush_finished);\r
227         }\r
228 \r
229         int get_blocked_status () {\r
230                 return -1;\r
231         }\r
232 \r
233         unsigned int get_mem_footprint() {\r
234                 return group_table[0].get_mem_footprint() + group_table[1].get_mem_footprint();\r
235         }\r
236 };\r
237 \r
238 #endif  // GROUPBY_OPERATOR_H\r