1 /** ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #ifndef GROUPBY_OPERATOR_H
17 #define GROUPBY_OPERATOR_H
19 #include "host_tuple.h"
20 #include "base_operator.h"
22 #include "hash_table.h"
27 template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func>
28 class groupby_operator : public base_operator {
31 hash_table<group, aggregate, hasher_func, equal_func> group_table;
33 typename hash_table<group, aggregate, hasher_func, equal_func>::iterator flush_pos;
36 groupby_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {
37 flush_finished = true;
38 n_patterns = func.n_groupby_patterns();
41 int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
43 // Push out completed groups
45 // extract the key information from the tuple and
46 // copy it into buffer
48 if (!func.create_group(tup, (gs_sp_t)&grp)) {
49 if(func.disordered()){
50 fprintf(stderr,"Out of order record in %s\n",op_name);
53 if (func.flush_needed()){
56 if (func.temp_status_received()) {
58 if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {
59 temp_tup.channel = output_channel;
60 result.push_back(temp_tup);
66 if(func.disordered()){
67 fprintf(stderr,"Out of order record in %s\n",op_name);
71 typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
72 if ((iter = group_table.find(grp)) != group_table.end()) {
73 // Temporal GBvar is part of the group so no flush is needed.
74 func.update_aggregate(tup, grp, (*iter).second);
76 if (func.flush_needed()) {
81 // create an aggregate in preallocated buffer
82 func.create_aggregate(tup, (char*)&aggr);
83 // neeed operator= doing a deep copy
84 group_table.insert(grp, aggr);
87 // TODO this code is wrong, must check if each pattern is in the group table.
88 for(p=0;p<n_patterns;++p){
89 // need shallow copy constructor for groups
90 group new_grp(grp, func.get_pattern(p));
92 func.create_aggregate(tup, (char*)&aggr);
93 // neeed operator= doing a deep copy
94 group_table.insert(new_grp, aggr);
103 int flush(list<host_tuple>& result) {
106 flush_pos = group_table.begin();
107 // If the table isn't empty, flush it now.
108 if (!group_table.empty()) {
109 for (; flush_pos != group_table.end(); ++flush_pos) {
111 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
114 tup.channel = output_channel;
115 result.push_back(tup);
117 // free((*flush_pos).second);
122 flush_finished = true;
127 int flush_old(list<host_tuple>& result) {
131 group_table.resize();
135 int set_param_block(int sz, void * value) {
136 func.set_param_block(sz, value);
140 int get_temp_status(host_tuple& result) {
141 result.channel = output_channel;
142 return func.create_temp_status_tuple(result, flush_finished);
145 int get_blocked_status () {
149 unsigned int get_mem_footprint() {
150 return group_table.get_mem_footprint();
154 #endif // GROUPBY_OPERATOR_H