6085907590bbbb10e85edf9f6263f8aeda096a09
[com/gs-lite.git] / include / hfta / groupby_operator.h
1 /** ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #ifndef GROUPBY_OPERATOR_H
17 #define GROUPBY_OPERATOR_H
18
19 #include "host_tuple.h"
20 #include "base_operator.h"
21 #include <list>
22 #include "hash_table.h"
23
24
25 using namespace std;
26
27 template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func>
28 class groupby_operator : public base_operator {
29 private :
30         groupby_func func;
31         hash_table<group, aggregate, hasher_func, equal_func> group_table;
32         bool flush_finished;
33         typename hash_table<group, aggregate, hasher_func, equal_func>::iterator flush_pos;
34         int n_patterns;
35 public:
36         groupby_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {
37                 flush_finished = true;
38                 n_patterns = func.n_groupby_patterns();
39         }
40
41         int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
42
43 //                      Push out completed groups
44
45                 // extract the key information from the tuple and
46                 // copy it into buffer
47                 group grp;
48                 if (!func.create_group(tup, (gs_sp_t)&grp)) {
49                         if(func.disordered()){
50                                 fprintf(stderr,"Out of order record in %s\n",op_name);
51                                 return 0;
52                         }
53                         if (func.flush_needed()){
54                                 flush_old(result);
55                         }
56                         if (func.temp_status_received()) {
57                                 host_tuple temp_tup;
58                                 if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {
59                                         temp_tup.channel = output_channel;
60                                         result.push_back(temp_tup);
61                                 }
62                         }
63                         tup.free_tuple();
64                         return 0;
65                 }
66                 if(func.disordered()){
67                         fprintf(stderr,"Out of order record in %s\n",op_name);
68                         return 0;
69                 }
70
71                 typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
72                 if ((iter = group_table.find(grp)) != group_table.end()) {
73 //                              Temporal GBvar is part of the group so no flush is needed.
74                         func.update_aggregate(tup, grp, (*iter).second);
75                 }else{
76                         if (func.flush_needed()) {
77                                 flush_old(result);
78                         }
79                         if(n_patterns <= 1){
80                                 char aggr_buffer[sizeof(aggregate)];
81                                 // create an aggregate in preallocated buffer
82                                 func.create_aggregate(tup, aggr_buffer);
83                                 // neeed operator= doing a deep copy
84                                 group_table.insert(grp, (*(aggregate*)aggr_buffer));
85                         }else{
86                                 int p;
87 // TODO this code is wrong, must check if each pattern is in the group table.
88                                 for(p=0;p<n_patterns;++p){
89                                         // need shallow copy constructor for groups
90                                         group new_grp(grp, func.get_pattern(p));
91                                         char aggr_buffer[sizeof(aggregate)];
92                                         func.create_aggregate(tup, aggr_buffer);
93                                         // neeed operator= doing a deep copy
94                                         group_table.insert(new_grp, (*(aggregate*)aggr_buffer));
95                                 }
96                         }
97                 }
98                 tup.free_tuple();
99                 return 0;
100         }
101
102
103         int flush(list<host_tuple>& result) {
104                 host_tuple tup;
105
106                 flush_pos = group_table.begin();
107 //                      If the table isn't empty, flush it now.
108                 if (!group_table.empty()) {
109                         for (; flush_pos != group_table.end(); ++flush_pos) {
110                                 bool failed = false;
111                                 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
112                                 if (!failed) {
113
114                                         tup.channel = output_channel;
115                                         result.push_back(tup);
116                                 }
117 //                              free((*flush_pos).second);
118                         }
119                         group_table.clear();
120                 }
121
122                 flush_finished = true;
123
124                 return 0;
125         }
126
127         int flush_old(list<host_tuple>& result) {
128
129                 flush(result);
130                 group_table.clear();
131                 group_table.resize();
132                 return 0;
133         }
134
135         int set_param_block(int sz, void * value) {
136                 func.set_param_block(sz, value);
137                 return 0;
138         }
139
140         int get_temp_status(host_tuple& result) {
141                 result.channel = output_channel;
142                 return func.create_temp_status_tuple(result, flush_finished);
143         }
144
145         int get_blocked_status () {
146                 return -1;
147         }
148
149         unsigned int get_mem_footprint() {
150                 return group_table.get_mem_footprint();
151         }
152 };
153
154 #endif  // GROUPBY_OPERATOR_H