1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #ifndef RWGROUPBY_OPERATOR_H
17 #define RWGROUPBY_OPERATOR_H
19 #include "host_tuple.h"
20 #include "base_operator.h"
22 #include "hash_table.h"
24 #define _GB_FLUSH_PER_TUPLE_ 1
28 template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func>
29 class running_agg_operator : public base_operator {
32 hash_table<group*, aggregate*, hasher_func, equal_func> group_table;
33 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
39 running_agg_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {
40 flush_pos = group_table.end();
43 virtual int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
45 // Push out completed groups
47 // create buffer on the stack to store key object
48 char buffer[sizeof(group)];
49 // Number of flushes required
51 // extract the key information from the tuple and
52 // copy it into buffer
53 group* grp = func.create_group(tup, buffer);
54 nflushes = func.flush_needed();
60 if (func.temp_status_received()) {
62 if (!func.create_temp_status_tuple(temp_tup, true)) {
63 temp_tup.channel = output_channel;
64 result.push_back(temp_tup);
74 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
75 if ((iter = group_table.find(grp)) != group_table.end()) {
76 aggregate* old_aggr = (*iter).second;
77 func.update_aggregate(tup, grp, old_aggr);
79 // create a copy of the group on the heap
80 group* new_grp = new group(grp); // need a copy constructor for groups
81 // aggregate* aggr = (aggregate*)malloc(sizeof(aggregate));
82 aggregate* aggr = new aggregate();
83 // create an aggregate in preallocated buffer
84 aggr = func.create_aggregate(tup, (char*)aggr);
86 group_table.insert(new_grp, aggr);
92 virtual int flush(list<host_tuple>& result) {
94 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
96 // Limit the number of successive flushes - avoid explosive behavior
97 const gs_int32_t max_flushes = 10;
98 if(nflushes>max_flushes){
99 fprintf(stderr,"Warning in operator %s, temporal advance of %d windows needed, max number of windows that can be reported at once is %d\n",op_name, nflushes, max_flushes);
100 nflushes = max_flushes;
103 for(gs_int32_t flush_no = 0; flush_no < nflushes; ++flush_no){
104 // advance the TB for the reinit
105 if(flush_no < nflushes-1){
106 func.advance_last_tb();
108 func.reset_last_tb(); // Move to current tb in case flush limit reached
110 // If the old table isn't empty, flush it now.
111 for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) {
113 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
115 tup.channel = output_channel;
116 result.push_back(tup);
118 if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){
119 group* g = (*flush_pos).first;
120 aggregate* a = (*flush_pos).second;
122 group_table.erase(g);
126 func.reinit_aggregates((*flush_pos).first, (*flush_pos).second);
135 virtual int set_param_block(int sz, void * value) {
136 func.set_param_block(sz, value);
140 virtual int get_temp_status(host_tuple& result) {
141 result.channel = output_channel;
142 return func.create_temp_status_tuple(result, true);
145 virtual int get_blocked_status () {
149 unsigned int get_mem_footprint() {
150 return group_table.get_mem_footprint();
154 #endif // GROUPBY_OPERATOR_H