Update running groupby operator
[com/gs-lite.git] / include / hfta / running_gb_operator.h
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #ifndef RWGROUPBY_OPERATOR_H
17 #define RWGROUPBY_OPERATOR_H
18
19 #include "host_tuple.h"
20 #include "base_operator.h"
21 #include <list>
22 #include "hash_table.h"
23
24 #define _GB_FLUSH_PER_TUPLE_ 1
25
26 using namespace std;
27
28 template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func>
29 class running_agg_operator : public base_operator {
30 private :
31         groupby_func func;
32         hash_table<group*, aggregate*, hasher_func, equal_func> group_table;
33         typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
34         gs_int32_t nflushes;
35
36
37
38 public:
39         running_agg_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {
40                 flush_pos = group_table.end();
41         }
42
43         virtual int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
44
45 //                      Push out completed groups
46
47 // create buffer on the stack to store key object
48                 char buffer[sizeof(group)];
49 //      Number of flushes required
50
51 // extract the key information from the tuple and
52 // copy it into buffer
53                 group* grp = func.create_group(tup, buffer);
54                 nflushes = func.flush_needed();
55                 
56                 if (!grp) {
57                         if (nflushes>0){
58                                 flush(result);
59                         }
60                         if (func.temp_status_received()) {
61                                 host_tuple temp_tup;
62                                 if (!func.create_temp_status_tuple(temp_tup, true)) {
63                                         temp_tup.channel = output_channel;
64                                         result.push_back(temp_tup);
65                                 }
66                         }
67                         tup.free_tuple();
68                         return 0;
69                 }
70
71                 if (nflushes>0) {
72                         flush(result);
73                 }
74                 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
75                 if ((iter = group_table.find(grp)) != group_table.end()) {
76                         aggregate* old_aggr = (*iter).second;
77                         func.update_aggregate(tup, grp, old_aggr);
78                 }else{
79                         // create a copy of the group on the heap
80                         group* new_grp = new group(grp);        // need a copy constructor for groups
81 //                      aggregate* aggr = (aggregate*)malloc(sizeof(aggregate));
82                         aggregate* aggr = new aggregate();
83                         // create an aggregate in preallocated buffer
84                         aggr = func.create_aggregate(tup, (char*)aggr);
85
86                         group_table.insert(new_grp, aggr);
87                 }
88                 tup.free_tuple();
89                 return 0;
90         }
91
92         virtual int flush(list<host_tuple>& result) {
93                 host_tuple tup;
94                 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
95
96 //      Limit the number of successive flushes - avoid explosive behavior
97                 const gs_int32_t max_flushes = 10;
98                 if(nflushes>max_flushes){
99                         fprintf(stderr,"Warning in operator %s, temporal advance of %d windows needed, max number of windows that can be reported at once is %d\n",op_name, nflushes, max_flushes);
100                         nflushes = max_flushes;
101                 }
102
103                 for(gs_int32_t flush_no = 0; flush_no < nflushes; ++flush_no){
104 //      advance the TB for the reinit
105                         if(flush_no < nflushes-1){
106                                 func.advance_last_tb();
107                         }else{
108                                 func.reset_last_tb();   // Move to current tb in case flush limit reached
109                         }
110 //              If the old table isn't empty, flush it now.
111                         for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) {
112                                 bool failed = false;
113                                 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
114                                 if (!failed) {
115                                         tup.channel = output_channel;
116                                         result.push_back(tup);
117                                 }
118                                 if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){
119                                 group* g = (*flush_pos).first;
120                                 aggregate* a = (*flush_pos).second;
121                                         ++flush_pos;
122                                         group_table.erase(g);
123                                         delete (g);
124                                         delete (a);
125                                 }else{
126                                         func.reinit_aggregates((*flush_pos).first, (*flush_pos).second);
127                                         ++flush_pos;
128                                 }
129                         }
130                 }
131
132                 return 0;
133         }
134
135         virtual int set_param_block(int sz, void * value) {
136                 func.set_param_block(sz, value);
137                 return 0;
138         }
139
140         virtual int get_temp_status(host_tuple& result) {
141                 result.channel = output_channel;
142                 return func.create_temp_status_tuple(result, true);
143         }
144
145         virtual int get_blocked_status () {
146                 return -1;
147         }
148
149         unsigned int get_mem_footprint() {
150                 return group_table.get_mem_footprint();
151         }
152 };
153
154 #endif  // GROUPBY_OPERATOR_H