Add new udafs and RMR support to gsprintconsole_ves
[com/gs-lite.git] / include / hfta / running_gb_operator.h
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #ifndef RWGROUPBY_OPERATOR_H
17 #define RWGROUPBY_OPERATOR_H
18
19 #include "host_tuple.h"
20 #include "base_operator.h"
21 #include <list>
22 #include "hash_table.h"
23
24 #define _GB_FLUSH_PER_TUPLE_ 1
25
26 using namespace std;
27
28 template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func>
29 class running_agg_operator : public base_operator {
30 private :
31         groupby_func func;
32         hash_table<group, aggregate, hasher_func, equal_func> group_table;
33         typename hash_table<group, aggregate, hasher_func, equal_func>::iterator flush_pos;
34         gs_int32_t nflushes;
35
36
37
38 public:
39         running_agg_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {
40                 flush_pos = group_table.end();
41         }
42
43         virtual int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
44
45 //                      Push out completed groups
46
47                 group grp, *ret;
48                 ret = func.create_group(tup, (gs_sp_t)&grp);
49                 nflushes = func.flush_needed();
50                 if(func.disordered()){
51                         // fprintf(stderr,"Out of order record in %s\n",op_name);
52                         return 0;
53                 }
54
55                 if (! ret) {
56                         if (nflushes>0){
57                                 flush(result);
58                         }
59                         if (func.temp_status_received()) {
60                                 host_tuple temp_tup;
61                                 if (!func.create_temp_status_tuple(temp_tup, true)) {
62                                         temp_tup.channel = output_channel;
63                                         result.push_back(temp_tup);
64                                 }
65                         }
66                         tup.free_tuple();
67                         return 0;
68                 }
69
70                 if (nflushes>0) {
71                         flush(result);
72                 }
73                 typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
74                 if ((iter = group_table.find(grp)) != group_table.end()) {
75                         func.update_aggregate(tup, grp, (*iter).second);
76                 }else{
77                                 char aggr_buffer[sizeof(aggregate)];
78                                 // create an aggregate in preallocated buffer
79                                 func.create_aggregate(tup, aggr_buffer);
80                                 // neeed operator= doing a deep copy
81                                 group_table.insert(grp, (*(aggregate*)aggr_buffer));
82                 }
83                 tup.free_tuple();
84                 return 0;
85         }
86
87         virtual int flush(list<host_tuple>& result) {
88                 host_tuple tup;
89                 typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
90
91 //      Limit the number of successive flushes - avoid explosive behavior
92                 const gs_int32_t max_flushes = 25;
93                 if(nflushes>max_flushes){
94                         fprintf(stderr,"Warning in operator %s, temporal advance of %d windows needed, max number of windows that can be reported at once is %d\n",op_name, nflushes, max_flushes);
95                         nflushes = max_flushes;
96                 }
97
98                 for(gs_int32_t flush_no = 0; flush_no < nflushes; ++flush_no){
99 //      advance the TB for the reinit
100                         if(flush_no < nflushes-1){
101                                 func.advance_last_tb();
102                         }else{
103                                 func.reset_last_tb();   // Move to current tb in case flush limit reached
104                         }
105 //              If the old table isn't empty, flush it now.
106                         for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) {
107                                 bool failed = false;
108                                 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
109                                 if (!failed) {
110                                         tup.channel = output_channel;
111                                         result.push_back(tup);
112                                 }
113                                 if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){
114                                         group &g = (*flush_pos).first;
115                                         //aggregate a = (*flush_pos).second;
116                                         ++flush_pos;
117                                         group_table.erase(g);
118                                 }else{
119                                         func.reinit_aggregates((*flush_pos).first, (*flush_pos).second);
120                                         ++flush_pos;
121                                 }
122                         }
123                 }
124
125                 return 0;
126         }
127
128         virtual int set_param_block(int sz, void * value) {
129                 func.set_param_block(sz, value);
130                 return 0;
131         }
132
133         virtual int get_temp_status(host_tuple& result) {
134                 result.channel = output_channel;
135                 return func.create_temp_status_tuple(result, true);
136         }
137
138         virtual int get_blocked_status () {
139                 return -1;
140         }
141
142         unsigned int get_mem_footprint() {
143                 return group_table.get_mem_footprint();
144         }
145 };
146
147 #endif  // GROUPBY_OPERATOR_H