1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #ifndef GROUPBY_OPERATOR_OOP_H
17 #define GROUPBY_OPERATOR_OOP_H
19 #include "host_tuple.h"
20 #include "base_operator.h"
23 #include "hash_table.h"
26 // TED: should be supplied by the groupby_func
27 #define _GB_FLUSH_PER_TUPLE_ 1
29 /* max allowed disorder -- Jin */
30 // TED: should be supplied by the groupby_func
31 #define DISORDER_LEVEL 2
37 // ASSUME temporal_type is one of int, uint, llong, ullong
39 template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func, class temporal_type>
40 class groupby_operator_oop : public base_operator {
44 /* a list of hash tables, which maintains aggregates for current window and also k previous ones -- Jin */
45 vector<hash_table<group*, aggregate*, hasher_func, equal_func>* > group_tables;
47 /* the minimum and maximum window id of the hash tables -- Jin */
48 temporal_type min_wid, max_wid;
52 temporal_type curr_table;
53 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
55 temporal_type last_flushed_temporal_gb;
56 temporal_type last_temporal_gb;
63 groupby_operator_oop(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {
64 flush_finished = true;
69 n_patterns = func.n_groupby_patterns();
72 ~groupby_operator_oop() {
73 hash_table<group*, aggregate*, hasher_func, equal_func>* table;
74 // delete all the elements in the group_tables list;
75 while (!group_tables.empty()) {
76 table = group_tables.back();
77 group_tables.pop_back();
79 //fprintf(stderr,"Deleting group table (c) at %lx\n",(gs_uint64_t)(table));
85 int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
88 // Push out completed groups
89 if(!flush_finished) partial_flush(result);
91 // create buffer on the stack to store key object
92 char buffer[sizeof(group)];
94 // extract the key information from the tuple and
95 // copy it into buffer
96 group* grp = func.create_group(tup, buffer);
100 //printf("grp==NULL recieved ");
101 if (func.temp_status_received()) {
102 //printf("temp status record ");
103 last_flushed_temporal_gb = func.get_last_flushed_gb ();
104 last_temporal_gb = func.get_last_gb ();
108 //fprintf(stderr,"min_wid=%d, max_wid=%d, last_temporal_gb=%d, last_flushed_temporal_gb=%d, flush_finished=%d\n",min_wid, max_wid, last_temporal_gb, last_flushed_temporal_gb, flush_finished);
110 /* no data has arrived, and so we ignore the temp tuples -- Jin */
111 if (group_tables.size()>0) {
114 if(last_flushed_temporal_gb >= min_wid){
115 index = last_flushed_temporal_gb - min_wid;
117 index = -(min_wid - last_flushed_temporal_gb); // unsigned arithmetic
120 if (func.flush_needed() && index>=0) {
122 //fprintf(stderr, "flush needed: last_flushed_gb %u , min_wid %u \n", last_flushed_temporal_gb, min_wid);
124 // Init flush on first temp tuple -- Jin
125 if ( !flush_finished) {
127 //fprintf(stderr, "last_flushed_gb is %u, min_wid is %u \n", last_flushed_temporal_gb, min_wid);
131 if (last_temporal_gb > min_wid && group_tables.size()>0) {
132 flush_finished = false;
135 // we start to flush from the head of the group tables -- Jin
136 if(group_tables.size()>0){
137 flush_pos = group_tables[0]->begin();
141 //fprintf(stderr, "after flush old \n");
147 if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {
148 temp_tup.channel = output_channel;
149 result.push_back(temp_tup);
156 //fprintf (stderr, "after create group grp=%lx, curr_table = %d\n",(gs_uint64_t)grp, grp->get_curr_gb());
158 /* This is a regular tuple -- Jin */
159 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
160 /* first, decide which hash table we need to work at */
161 curr_table = grp->get_curr_gb();
162 if (max_wid == 0 && min_wid == 0) {
163 group_tables.push_back((new hash_table<group*, aggregate*, hasher_func, equal_func>()));
164 //fprintf(stderr,"Added (1) hash tbl at %lx, pos=%d\n",(gs_uint64_t)(group_tables.back()),group_tables.size());
165 max_wid = min_wid = curr_table;
167 if (curr_table < min_wid) {
168 for (temporal_type i = curr_table; i < min_wid; i++){
169 group_tables.insert(group_tables.begin(), new hash_table<group*, aggregate*, hasher_func, equal_func>());
170 //fprintf(stderr,"Added (2) hash tbl at %lx, pos=%d\n",(gs_uint64_t)(group_tables.back()),group_tables.size());
172 min_wid = curr_table;
174 if (curr_table > max_wid) {
175 hash_table<group*, aggregate*, hasher_func, equal_func>* pt;
176 for (temporal_type i = max_wid; i < curr_table; i++) {
177 pt =new hash_table<group*, aggregate*, hasher_func, equal_func>();
178 group_tables.push_back(pt);
179 //fprintf(stderr,"Added (3) hash tbl at %lx, pos=%d\n",(gs_uint64_t)(group_tables.back()),group_tables.size());
182 max_wid = curr_table;
184 gs_int64_t index = curr_table - min_wid;
186 if ((iter = group_tables[index]->find(grp)) != group_tables[index]->end()) {
187 aggregate* old_aggr = (*iter).second;
188 func.update_aggregate(tup, grp, old_aggr);
190 /* We only flush when a temp tuple is received, so we only check on temp tuple -- Jin */
191 // create a copy of the group on the heap
194 group* new_grp = new group(grp); // need a copy constructor for groups
196 aggregate* aggr = new aggregate();
198 // create an aggregate in preallocated buffer
199 aggr = func.create_aggregate(tup, (char*)aggr);
201 // hash_table<group*, aggregate*, hasher_func, equal_func>* pt;
202 group_tables[index]->insert(new_grp, aggr);
205 for(p=0;p<n_patterns;++p){
206 group* new_grp = new group(grp, func.get_pattern(p));
207 aggregate* aggr = new aggregate();
208 aggr = func.create_aggregate(tup, (char*)aggr);
209 group_tables[index]->insert(new_grp, aggr);
217 int partial_flush(list<host_tuple>& result) {
219 /* the hash table we should flush is func->last_flushed_gb_0 -- Jin */
220 /* should we guarantee that everything before func->last_flushed_gb_0 also flushed ??? */
224 //fprintf(stderr, "partial_flush size of group_tables = %u, min_wid is %u, max_wid is %u last_temporal_gb=%d \n", group_tables.size(), min_wid, max_wid, last_temporal_gb);
225 if(group_tables.size()==0){
226 flush_finished = true;
227 //fprintf(stderr, "out of partial flush early \n");
231 // emit up to _GB_FLUSH_PER_TABLE_ output tuples.
232 if (!group_tables[0]->empty()) {
233 for (i=0; flush_pos!=group_tables[0]->end() && i < _GB_FLUSH_PER_TUPLE_; ++flush_pos, ++i) {
236 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
238 tup.channel = output_channel;
239 result.push_back(tup);
241 //fprintf(stderr,"partial_flush Deleting at Flush_pos=%lx \n",(gs_uint64_t)((*flush_pos).first));
242 delete ((*flush_pos).first);
243 delete ((*flush_pos).second);
247 // Finalize processing if empty.
248 if (flush_pos == group_tables[0]->end()) {
249 /* one window is completely flushed, so recycle the hash table -- Jin */
251 hash_table<group*, aggregate*, hasher_func, equal_func>* table = group_tables[0];
253 //fprintf(stderr,"partial_flush Delelting group table %lx\n",(gs_uint64_t)(group_tables[0]));
254 group_tables[0]->clear();
255 delete (group_tables[0]);
257 group_tables.erase(group_tables.begin());
261 if (last_temporal_gb > min_wid && group_tables.size()>0) {
262 flush_pos = group_tables[0]->begin();
265 flush_finished = true;
268 //fprintf(stderr, "out of partial flush \n");
273 /* Where is this function called ??? */ /* externally */
274 int flush(list<host_tuple>& result) {
276 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
277 /* the hash table we should flush is func->last_flushed_gb_0 -- Jin */
278 /* should we guarantee that everything before func->last_flushed_gb_0 also flushed ??? */
279 while ( group_tables.size() > 0) {
280 if (!group_tables[0]->empty()) {
282 flush_pos = group_tables[0]->begin();
283 for (; flush_pos != group_tables[0]->end(); ++flush_pos) {
286 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
290 tup.channel = output_channel;
291 result.push_back(tup);
293 //fprintf(stderr,"flush_old Deleting at Flush_pos=%lx \n",(gs_uint64_t)((*flush_pos).first));
294 delete ((*flush_pos).first);
295 delete ((*flush_pos).second);
301 // remove the hashtable from group_tables
302 hash_table<group*, aggregate*, hasher_func, equal_func>* table = group_tables[0];
305 //fprintf(stderr,"flush_old Delelting group table %lx\n",(gs_uint64_t)(group_tables[0]));
307 group_tables.erase(group_tables.begin());
309 if(group_tables.size()>0){
310 flush_pos = group_tables[0]->begin();
316 flush_finished = true;
321 /* flushes every hash table before last_flush_gb, and get ready to flush the next window -- Jin */
322 int flush_old(list<host_tuple>& result) {
324 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
327 //fprintf(stderr, "flush_old size of group_tables = %u, min_wid is %u, max_wid is %u last_temporal_gb=%d, num=%d\n", group_tables.size(), min_wid, max_wid, last_temporal_gb, num);
329 num = last_temporal_gb - min_wid;
331 //If the old table isn't empty, flush it now.
332 for (i = 0; i < num && group_tables.size() > 0; i++) {
333 if (!group_tables[0]->empty()) {
334 for (; flush_pos != group_tables[0]->end(); ++flush_pos) {
337 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
341 tup.channel = output_channel;
342 result.push_back(tup);
344 //fprintf(stderr,"flush_old Deleting at Flush_pos=%lx \n",(gs_uint64_t)((*flush_pos).first));
345 delete ((*flush_pos).first);
346 delete ((*flush_pos).second);
352 // remove the hashtable from group_tables
353 hash_table<group*, aggregate*, hasher_func, equal_func>* table = group_tables[0];
356 //fprintf(stderr,"flush_old Delelting group table %lx\n",(gs_uint64_t)(group_tables[0]));
358 group_tables.erase(group_tables.begin());
360 if(group_tables.size()>0){
361 flush_pos = group_tables[0]->begin();
365 flush_finished = true;
367 //fprintf(stderr, "end of flush_old \n");
373 int set_param_block(int sz, void * value) {
374 func.set_param_block(sz, value);
378 int get_temp_status(host_tuple& result) {
379 result.channel = output_channel;
380 return func.create_temp_status_tuple(result, flush_finished);
383 int get_blocked_status () {
387 unsigned int get_mem_footprint() {
391 for(i=0;i<group_tables.size();++i)
392 ret += group_tables[i]->get_mem_footprint() ;
398 #endif // GROUPBY_OPERATOR_OOP_H