1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #ifndef GROUPBY_OPERATOR_H
17 #define GROUPBY_OPERATOR_H
19 #include "host_tuple.h"
20 #include "base_operator.h"
22 #include "hash_table.h"
24 #define _GB_FLUSH_PER_TUPLE_ 1
28 template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func>
29 class groupby_operator : public base_operator {
32 hash_table<group*, aggregate*, hasher_func, equal_func> group_table[2];
34 unsigned int curr_table;
35 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
41 groupby_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {
42 flush_finished = true;
44 flush_pos = group_table[1-curr_table].end();
45 n_patterns = func.n_groupby_patterns();
48 int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
50 // Push out completed groups
52 if(!flush_finished) partial_flush(result);
54 // create buffer on the stack to store key object
55 char buffer[sizeof(group)];
57 // extract the key information from the tuple and
58 // copy it into buffer
59 group* grp = func.create_group(tup, buffer);
62 // Ignore temp tuples until we can fix their timestamps.
63 if (func.temp_status_received()) {
67 if (func.flush_needed()){
70 if (func.temp_status_received()) {
72 if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {
73 temp_tup.channel = output_channel;
74 result.push_back(temp_tup);
81 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
82 if ((iter = group_table[curr_table].find(grp)) != group_table[curr_table].end()) {
83 // Temporal GBvar is part of the group so no flush is needed.
84 aggregate* old_aggr = (*iter).second;
85 func.update_aggregate(tup, grp, old_aggr);
87 if (func.flush_needed()) {
91 // create a copy of the group on the heap
92 group* new_grp = new group(grp); // need a copy constructor for groups
93 // aggregate* aggr = (aggregate*)malloc(sizeof(aggregate));
94 aggregate* aggr = new aggregate();
95 // create an aggregate in preallocated buffer
96 aggr = func.create_aggregate(tup, (char*)aggr);
98 group_table[curr_table].insert(new_grp, aggr);
101 for(p=0;p<n_patterns;++p){
102 group* new_grp = new group(grp, func.get_pattern(p));
103 aggregate* aggr = new aggregate();
104 aggr = func.create_aggregate(tup, (char*)aggr);
105 group_table[curr_table].insert(new_grp, aggr);
113 int partial_flush(list<host_tuple>& result) {
115 unsigned int old_table = 1-curr_table;
118 // emit up to _GB_FLUSH_PER_TABLE_ output tuples.
119 if (!group_table[old_table].empty()) {
120 for (i=0; flush_pos != group_table[old_table].end() && i<_GB_FLUSH_PER_TUPLE_; ++flush_pos, ++i) {
122 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
124 tup.channel = output_channel;
125 result.push_back(tup);
127 delete ((*flush_pos).first);
128 delete ((*flush_pos).second);
129 // free((*flush_pos).second);
133 // Finalize processing if empty.
134 if(flush_pos == group_table[old_table].end()) {
135 flush_finished = true;
136 group_table[old_table].clear();
137 group_table[old_table].rehash();
142 int flush(list<host_tuple>& result) {
144 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
145 unsigned int old_table = 1-curr_table;
147 // If the old table isn't empty, flush it now.
148 if (!group_table[old_table].empty()) {
149 for (; flush_pos != group_table[old_table].end(); ++flush_pos) {
151 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
154 tup.channel = output_channel;
155 result.push_back(tup);
157 delete ((*flush_pos).first);
158 delete ((*flush_pos).second);
159 // free((*flush_pos).second);
161 group_table[old_table].clear();
162 group_table[old_table].rehash();
165 flush_pos = group_table[curr_table].begin();
166 // If the old table isn't empty, flush it now.
167 if (!group_table[curr_table].empty()) {
168 for (; flush_pos != group_table[curr_table].end(); ++flush_pos) {
170 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
173 tup.channel = output_channel;
174 result.push_back(tup);
176 delete ((*flush_pos).first);
177 delete ((*flush_pos).second);
178 // free((*flush_pos).second);
180 group_table[curr_table].clear();
183 flush_finished = true;
188 int flush_old(list<host_tuple>& result) {
190 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
191 unsigned int old_table = 1-curr_table;
193 // If the old table isn't empty, flush it now.
194 if (!group_table[old_table].empty()) {
195 for (; flush_pos != group_table[old_table].end(); ++flush_pos) {
197 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
200 tup.channel = output_channel;
201 result.push_back(tup);
203 delete ((*flush_pos).first);
204 delete ((*flush_pos).second);
205 // free((*flush_pos).second);
207 group_table[old_table].clear();
208 group_table[old_table].rehash();
211 // swap tables, enable partial flush processing.
212 flush_pos = group_table[curr_table].begin();
213 curr_table = old_table;
214 flush_finished = false;
219 int set_param_block(int sz, void * value) {
220 func.set_param_block(sz, value);
224 int get_temp_status(host_tuple& result) {
225 result.channel = output_channel;
226 return func.create_temp_status_tuple(result, flush_finished);
229 int get_blocked_status () {
233 unsigned int get_mem_footprint() {
234 return group_table[0].get_mem_footprint() + group_table[1].get_mem_footprint();
238 #endif // GROUPBY_OPERATOR_H