1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #ifndef RWGROUPBY_OPERATOR_H
17 #define RWGROUPBY_OPERATOR_H
19 #include "host_tuple.h"
20 #include "base_operator.h"
22 #include "hash_table.h"
24 #define _GB_FLUSH_PER_TUPLE_ 1
28 template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func>
29 class running_agg_operator : public base_operator {
32 hash_table<group*, aggregate*, hasher_func, equal_func> group_table;
33 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
38 running_agg_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {
39 flush_pos = group_table.end();
42 virtual int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
44 // Push out completed groups
46 // create buffer on the stack to store key object
47 char buffer[sizeof(group)];
49 // extract the key information from the tuple and
50 // copy it into buffer
51 group* grp = func.create_group(tup, buffer);
52 /*// Ignore temp tuples until we can fix their timestamps.
53 if (func.temp_status_received()) {
60 if (func.flush_needed()){
63 if (func.temp_status_received()) {
65 if (!func.create_temp_status_tuple(temp_tup, true)) {
66 temp_tup.channel = output_channel;
67 result.push_back(temp_tup);
74 if (func.flush_needed()) {
77 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
78 if ((iter = group_table.find(grp)) != group_table.end()) {
79 aggregate* old_aggr = (*iter).second;
80 func.update_aggregate(tup, grp, old_aggr);
82 // create a copy of the group on the heap
83 group* new_grp = new group(grp); // need a copy constructor for groups
84 // aggregate* aggr = (aggregate*)malloc(sizeof(aggregate));
85 aggregate* aggr = new aggregate();
86 // create an aggregate in preallocated buffer
87 aggr = func.create_aggregate(tup, (char*)aggr);
89 group_table.insert(new_grp, aggr);
95 virtual int flush(list<host_tuple>& result) {
97 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
98 // If the old table isn't empty, flush it now.
99 for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) {
101 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
103 tup.channel = output_channel;
104 result.push_back(tup);
106 if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){
107 group* g = (*flush_pos).first;
108 aggregate* a = (*flush_pos).second;
110 group_table.erase(g);
114 func.reinit_aggregates((*flush_pos).first, (*flush_pos).second);
122 virtual int set_param_block(int sz, void * value) {
123 func.set_param_block(sz, value);
127 virtual int get_temp_status(host_tuple& result) {
128 result.channel = output_channel;
129 return func.create_temp_status_tuple(result, true);
132 virtual int get_blocked_status () {
136 unsigned int get_mem_footprint() {
137 return group_table.get_mem_footprint();
141 #endif // GROUPBY_OPERATOR_H