Fixed newline characters throughout the code
[com/gs-lite.git] / include / hfta / running_gb_operator.h
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #ifndef RWGROUPBY_OPERATOR_H
17 #define RWGROUPBY_OPERATOR_H
18
19 #include "host_tuple.h"
20 #include "base_operator.h"
21 #include <list>
22 #include "hash_table.h"
23
24 #define _GB_FLUSH_PER_TUPLE_ 1
25
26 using namespace std;
27
28 template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func>
29 class running_agg_operator : public base_operator {
30 private :
31         groupby_func func;
32         hash_table<group*, aggregate*, hasher_func, equal_func> group_table;
33         typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
34
35
36
37 public:
38         running_agg_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {
39                 flush_pos = group_table.end();
40         }
41
42         virtual int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
43
44 //                      Push out completed groups
45
46                 // create buffer on the stack to store key object
47                 char buffer[sizeof(group)];
48
49                 // extract the key information from the tuple and
50                 // copy it into buffer
51                 group* grp = func.create_group(tup, buffer);
52 /*//                    Ignore temp tuples until we can fix their timestamps.
53 if (func.temp_status_received()) {
54  tup.free_tuple();
55  return 0;
56 }
57 */
58
59                 if (!grp) {
60                         if (func.flush_needed()){
61                                 flush(result);
62                         }
63                         if (func.temp_status_received()) {
64                                 host_tuple temp_tup;
65                                 if (!func.create_temp_status_tuple(temp_tup, true)) {
66                                         temp_tup.channel = output_channel;
67                                         result.push_back(temp_tup);
68                                 }
69                         }
70                         tup.free_tuple();
71                         return 0;
72                 }
73
74                 if (func.flush_needed()) {
75                         flush(result);
76                 }
77                 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
78                 if ((iter = group_table.find(grp)) != group_table.end()) {
79                         aggregate* old_aggr = (*iter).second;
80                         func.update_aggregate(tup, grp, old_aggr);
81                 }else{
82                         // create a copy of the group on the heap
83                         group* new_grp = new group(grp);        // need a copy constructor for groups
84 //                      aggregate* aggr = (aggregate*)malloc(sizeof(aggregate));
85                         aggregate* aggr = new aggregate();
86                         // create an aggregate in preallocated buffer
87                         aggr = func.create_aggregate(tup, (char*)aggr);
88
89                         group_table.insert(new_grp, aggr);
90                 }
91                 tup.free_tuple();
92                 return 0;
93         }
94
95         virtual int flush(list<host_tuple>& result) {
96                 host_tuple tup;
97                 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
98 //              If the old table isn't empty, flush it now.
99                 for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) {
100                         bool failed = false;
101                         tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
102                         if (!failed) {
103                                 tup.channel = output_channel;
104                                 result.push_back(tup);
105                         }
106                         if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){
107                             group* g = (*flush_pos).first;
108                         aggregate* a = (*flush_pos).second;
109                                 ++flush_pos;
110                                 group_table.erase(g);
111                                 delete (g);
112                                 delete (a);
113                         }else{
114                                 func.reinit_aggregates((*flush_pos).first, (*flush_pos).second);
115                                 ++flush_pos;
116                         }
117                 }
118
119                 return 0;
120         }
121
122         virtual int set_param_block(int sz, void * value) {
123                 func.set_param_block(sz, value);
124                 return 0;
125         }
126
127         virtual int get_temp_status(host_tuple& result) {
128                 result.channel = output_channel;
129                 return func.create_temp_status_tuple(result, true);
130         }
131
132         virtual int get_blocked_status () {
133                 return -1;
134         }
135
136         unsigned int get_mem_footprint() {
137                 return group_table.get_mem_footprint();
138         }
139 };
140
141 #endif  // GROUPBY_OPERATOR_H