1 /** ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #ifndef GROUPBY_SLOWFLUSH_OPERATOR_H
17 #define GROUPBY_OPERATOR_H
19 #include "host_tuple.h"
20 #include "base_operator.h"
22 #include "hash_table.h"
24 #define _HFTA_SLOW_FLUSH
28 template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func>
29 class groupby_slowflush_operator : public base_operator {
32 hash_table<group, aggregate, hasher_func, equal_func> group_table[2];
34 unsigned int curr_table;
35 typename hash_table<group, aggregate, hasher_func, equal_func>::iterator flush_pos;
40 groupby_slowflush_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {
41 flush_finished = true;
43 flush_pos = group_table[1-curr_table].end();
44 n_patterns = func.n_groupby_patterns();
45 gb_per_flush = func.gb_flush_per_tuple();
48 int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
50 // Push out completed groups
51 if(!flush_finished) partial_flush(result);
53 // extract the key information from the tuple and
54 // copy it into buffer
56 if (!func.create_group(tup, (gs_sp_t)&grp)) {
57 if(func.disordered()){
58 // fprintf(stderr,"Out of order record in %s\n",op_name);
61 if (func.flush_needed()){
64 if (func.temp_status_received()) {
66 if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {
67 temp_tup.channel = output_channel;
68 result.push_back(temp_tup);
74 if(func.disordered()){
75 // fprintf(stderr,"Out of order record in %s\n",op_name);
79 typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
80 if ((iter = group_table[curr_table].find(grp)) != group_table[curr_table].end()) {
81 // Temporal GBvar is part of the group so no flush is needed.
82 func.update_aggregate(tup, grp, (*iter).second);
84 if (func.flush_needed()) {
88 char aggr_buffer[sizeof(aggregate)];
89 // create an aggregate in preallocated buffer
90 func.create_aggregate(tup, aggr_buffer);
91 // neeed operator= doing a deep copy
92 group_table[curr_table].insert(grp, (*(aggregate*)aggr_buffer));
95 for(p=0;p<n_patterns;++p){
96 // TODO this code is wrong need to check each pattern to see if its in the table
97 // need shallow copy constructor for groups
98 group new_grp(grp, func.get_pattern(p));
99 char aggr_buffer[sizeof(aggregate)];
100 func.create_aggregate(tup, aggr_buffer);
101 // neeed operator= doing a deep copy
102 group_table[curr_table].insert(new_grp, (*(aggregate*)aggr_buffer));
110 int partial_flush(list<host_tuple>& result) {
112 unsigned int old_table = 1-curr_table;
115 // emit up to _GB_FLUSH_PER_TABLE_ output tuples.
116 if (!group_table[old_table].empty()) {
117 for (i=0; flush_pos != group_table[old_table].end() && i<gb_per_flush; ++flush_pos, ++i) {
119 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
121 tup.channel = output_channel;
122 result.push_back(tup);
124 // free((*flush_pos).second);
128 // Finalize processing if empty.
129 if(flush_pos == group_table[old_table].end()) {
130 flush_finished = true;
131 group_table[old_table].clear();
132 group_table[old_table].resize();
137 int flush(list<host_tuple>& result) {
139 typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
140 unsigned int old_table = 1-curr_table;
142 // If the old table isn't empty, flush it now.
143 if (!group_table[old_table].empty()) {
144 for (; flush_pos != group_table[old_table].end(); ++flush_pos) {
146 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
149 tup.channel = output_channel;
150 result.push_back(tup);
152 // free((*flush_pos).second);
154 group_table[old_table].clear();
155 group_table[old_table].resize();
158 flush_pos = group_table[curr_table].begin();
159 // If the table isn't empty, flush it now.
160 if (!group_table[curr_table].empty()) {
161 for (; flush_pos != group_table[curr_table].end(); ++flush_pos) {
163 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
166 tup.channel = output_channel;
167 result.push_back(tup);
169 // free((*flush_pos).second);
171 group_table[curr_table].clear();
174 flush_finished = true;
179 int flush_old(list<host_tuple>& result) {
182 typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
183 unsigned int old_table = 1-curr_table;
185 // If the old table isn't empty, flush it now.
186 if (!group_table[old_table].empty()) {
187 for (; flush_pos != group_table[old_table].end(); ++flush_pos) {
189 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
192 tup.channel = output_channel;
193 result.push_back(tup);
195 // free((*flush_pos).second);
198 //group_table[old_table].clear();
199 //group_table[old_table].resize();
202 group_table[old_table].clear();
203 group_table[old_table].resize();
205 // swap tables, enable partial flush processing.
206 flush_pos = group_table[curr_table].begin();
207 curr_table = old_table;
208 flush_finished = false;
212 int set_param_block(int sz, void * value) {
213 func.set_param_block(sz, value);
217 int get_temp_status(host_tuple& result) {
218 result.channel = output_channel;
219 return func.create_temp_status_tuple(result, flush_finished);
222 int get_blocked_status () {
226 unsigned int get_mem_footprint() {
227 return group_table[0].get_mem_footprint() + group_table[1].get_mem_footprint();
231 #endif // GROUPBY_OPERATOR_H