1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #ifndef RWGROUPBY_OPERATOR_H
17 #define RWGROUPBY_OPERATOR_H
19 #include "host_tuple.h"
20 #include "base_operator.h"
22 #include "hash_table.h"
24 #define _GB_FLUSH_PER_TUPLE_ 1
28 template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func>
29 class running_agg_operator : public base_operator {
32 hash_table<group, aggregate, hasher_func, equal_func> group_table;
33 typename hash_table<group, aggregate, hasher_func, equal_func>::iterator flush_pos;
39 running_agg_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {
40 flush_pos = group_table.end();
43 virtual int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
45 // Push out completed groups
48 ret = func.create_group(tup, (gs_sp_t)&grp);
49 nflushes = func.flush_needed();
50 if(func.disordered()){
51 fprintf(stderr,"Out of order record in %s\n",op_name);
59 if (func.temp_status_received()) {
61 if (!func.create_temp_status_tuple(temp_tup, true)) {
62 temp_tup.channel = output_channel;
63 result.push_back(temp_tup);
73 typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
74 if ((iter = group_table.find(grp)) != group_table.end()) {
75 func.update_aggregate(tup, grp, (*iter).second);
78 // create an aggregate in preallocated buffer
79 func.create_aggregate(tup, (char*)&aggr);
80 // neeed operator= doing a deep copy
81 group_table.insert(grp, aggr);
87 virtual int flush(list<host_tuple>& result) {
89 typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
91 // Limit the number of successive flushes - avoid explosive behavior
92 const gs_int32_t max_flushes = 10;
93 if(nflushes>max_flushes){
94 fprintf(stderr,"Warning in operator %s, temporal advance of %d windows needed, max number of windows that can be reported at once is %d\n",op_name, nflushes, max_flushes);
95 nflushes = max_flushes;
98 for(gs_int32_t flush_no = 0; flush_no < nflushes; ++flush_no){
99 // advance the TB for the reinit
100 if(flush_no < nflushes-1){
101 func.advance_last_tb();
103 func.reset_last_tb(); // Move to current tb in case flush limit reached
105 // If the old table isn't empty, flush it now.
106 for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) {
108 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
110 tup.channel = output_channel;
111 result.push_back(tup);
113 if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){
114 group &g = (*flush_pos).first;
115 //aggregate a = (*flush_pos).second;
117 group_table.erase(g);
119 func.reinit_aggregates((*flush_pos).first, (*flush_pos).second);
128 virtual int set_param_block(int sz, void * value) {
129 func.set_param_block(sz, value);
133 virtual int get_temp_status(host_tuple& result) {
134 result.channel = output_channel;
135 return func.create_temp_status_tuple(result, true);
138 virtual int get_blocked_status () {
142 unsigned int get_mem_footprint() {
143 return group_table.get_mem_footprint();
147 #endif // GROUPBY_OPERATOR_H