cc5ab36191e1ddb9847573d33d720d20dfb2938e
[com/gs-lite.git] / include / hfta / groupby_slowflush_operator.h
1 /** ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #ifndef GROUPBY_SLOWFLUSH_OPERATOR_H
17 #define GROUPBY_OPERATOR_H
18
19 #include "host_tuple.h"
20 #include "base_operator.h"
21 #include <list>
22 #include "hash_table.h"
23
24 #define _HFTA_SLOW_FLUSH
25
26 using namespace std;
27
28 template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func>
29 class groupby_slowflush_operator : public base_operator {
30 private :
31         groupby_func func;
32         hash_table<group, aggregate, hasher_func, equal_func> group_table[2];
33         bool flush_finished;
34         unsigned int curr_table;
35         typename hash_table<group, aggregate, hasher_func, equal_func>::iterator flush_pos;
36         int n_patterns;
37         int gb_per_flush;
38
39 public:
40         groupby_slowflush_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {
41                 flush_finished = true;
42                 curr_table = 0;
43                 flush_pos = group_table[1-curr_table].end();
44                 n_patterns = func.n_groupby_patterns();
45                 gb_per_flush = func.gb_flush_per_tuple();
46         }
47
48         int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
49
50 //                      Push out completed groups
51                 if(!flush_finished) partial_flush(result);
52
53                 // extract the key information from the tuple and
54                 // copy it into buffer
55                 group grp;
56                 if (!func.create_group(tup, (gs_sp_t)&grp)) {
57                         if(func.disordered()){
58                                 fprintf(stderr,"Out of order record in %s\n",op_name);
59                                 return 0;
60                         }
61                         if (func.flush_needed()){
62                                 flush_old(result);
63                         }
64                         if (func.temp_status_received()) {
65                                 host_tuple temp_tup;
66                                 if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {
67                                         temp_tup.channel = output_channel;
68                                         result.push_back(temp_tup);
69                                 }
70                         }
71                         tup.free_tuple();
72                         return 0;
73                 }
74                 if(func.disordered()){
75                         fprintf(stderr,"Out of order record in %s\n",op_name);
76                         return 0;
77                 }
78
79                 typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
80                 if ((iter = group_table[curr_table].find(grp)) != group_table[curr_table].end()) {
81 //                              Temporal GBvar is part of the group so no flush is needed.
82                         func.update_aggregate(tup, grp, (*iter).second);
83                 }else{
84                         if (func.flush_needed()) {
85                                 flush_old(result);
86                         }
87                         if(n_patterns <= 1){
88                                 char aggr_buffer[sizeof(aggregate)];
89                                 // create an aggregate in preallocated buffer
90                                 func.create_aggregate(tup, aggr_buffer);
91                                 // neeed operator= doing a deep copy
92                                 group_table[curr_table].insert(grp, (*(aggregate*)aggr_buffer));
93                         }else{
94                                 int p;
95                                 for(p=0;p<n_patterns;++p){
96 // TODO this code is wrong need to check each pattern to see if its in the table
97                                         // need shallow copy constructor for groups
98                                         group new_grp(grp, func.get_pattern(p));
99                                         char aggr_buffer[sizeof(aggregate)];
100                                         func.create_aggregate(tup, aggr_buffer);
101                                         // neeed operator= doing a deep copy
102                                         group_table[curr_table].insert(new_grp, (*(aggregate*)aggr_buffer));
103                                 }
104                         }
105                 }
106                 tup.free_tuple();
107                 return 0;
108         }
109
110         int partial_flush(list<host_tuple>& result) {
111                 host_tuple tup;
112                 unsigned int old_table = 1-curr_table;
113                 unsigned int i;
114
115 //                              emit up to _GB_FLUSH_PER_TABLE_ output tuples.
116                 if (!group_table[old_table].empty()) {
117                         for (i=0; flush_pos != group_table[old_table].end() && i<gb_per_flush; ++flush_pos, ++i) {
118                                 bool failed = false;
119                                 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
120                                 if (!failed) {
121                                         tup.channel = output_channel;
122                                         result.push_back(tup);
123                                 }
124 //                              free((*flush_pos).second);
125                         }
126                 }
127
128 //                      Finalize processing if empty.
129                 if(flush_pos == group_table[old_table].end()) {
130                         flush_finished = true;
131                         group_table[old_table].clear();
132                         group_table[old_table].resize();
133                 }
134                 return 0;
135         }
136
137         int flush(list<host_tuple>& result) {
138                 host_tuple tup;
139                 typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
140                 unsigned int old_table = 1-curr_table;
141
142 //                      If the old table isn't empty, flush it now.
143                 if (!group_table[old_table].empty()) {
144                         for (; flush_pos != group_table[old_table].end(); ++flush_pos) {
145                                 bool failed = false;
146                                 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
147                                 if (!failed) {
148
149                                         tup.channel = output_channel;
150                                         result.push_back(tup);
151                                 }
152 //                              free((*flush_pos).second);
153                         }
154                         group_table[old_table].clear();
155                         group_table[old_table].resize();
156                 }
157
158                 flush_pos = group_table[curr_table].begin();
159 //                      If the table isn't empty, flush it now.
160                 if (!group_table[curr_table].empty()) {
161                         for (; flush_pos != group_table[curr_table].end(); ++flush_pos) {
162                                 bool failed = false;
163                                 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
164                                 if (!failed) {
165
166                                         tup.channel = output_channel;
167                                         result.push_back(tup);
168                                 }
169 //                              free((*flush_pos).second);
170                         }
171                         group_table[curr_table].clear();
172                 }
173
174                 flush_finished = true;
175
176                 return 0;
177         }
178
179         int flush_old(list<host_tuple>& result) {
180
181                 host_tuple tup;
182                 typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
183                 unsigned int old_table = 1-curr_table;
184
185 //                      If the old table isn't empty, flush it now.
186                 if (!group_table[old_table].empty()) {
187                         for (; flush_pos != group_table[old_table].end(); ++flush_pos) {
188                                 bool failed = false;
189                                 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
190                                 if (!failed) {
191
192                                         tup.channel = output_channel;
193                                         result.push_back(tup);
194                                 }
195 //                              free((*flush_pos).second);
196                         }
197
198                         //group_table[old_table].clear();
199                         //group_table[old_table].resize();
200                 }
201
202                 group_table[old_table].clear();
203                 group_table[old_table].resize();
204
205 //                      swap tables, enable partial flush processing.
206                 flush_pos = group_table[curr_table].begin();
207                 curr_table = old_table;
208                 flush_finished = false;
209                 return 0;
210         }
211
212         int set_param_block(int sz, void * value) {
213                 func.set_param_block(sz, value);
214                 return 0;
215         }
216
217         int get_temp_status(host_tuple& result) {
218                 result.channel = output_channel;
219                 return func.create_temp_status_tuple(result, flush_finished);
220         }
221
222         int get_blocked_status () {
223                 return -1;
224         }
225
226         unsigned int get_mem_footprint() {
227                 return group_table[0].get_mem_footprint() + group_table[1].get_mem_footprint();
228         }
229 };
230
231 #endif  // GROUPBY_OPERATOR_H