Fixed newline characters throughout the code
[com/gs-lite.git] / include / hfta / groupby_operator_oop.h
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #ifndef GROUPBY_OPERATOR_OOP_H
17 #define GROUPBY_OPERATOR_OOP_H
18
19 #include "host_tuple.h"
20 #include "base_operator.h"
21 #include <list>
22 #include <vector>
23 #include "hash_table.h"
24 #include <cassert>
25
26 //              TED: should be supplied by the groupby_func
27 #define _GB_FLUSH_PER_TUPLE_ 1
28
29 /* max allowed disorder  -- Jin */
30 //              TED: should be supplied by the groupby_func
31 #define DISORDER_LEVEL 2
32
33 //#define NDEBUG
34
35 using namespace std;
36
37 // ASSUME temporal_type is one of int, uint, llong, ullong
38
39 template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func, class temporal_type>
40 class groupby_operator_oop : public base_operator {
41 private :
42         groupby_func func;
43
44         /* a list of hash tables, which maintains aggregates for current window and also k previous ones  -- Jin */
45         vector<hash_table<group*, aggregate*, hasher_func, equal_func>* > group_tables;
46
47         /* the minimum and maximum window id of the hash tables  -- Jin  */
48         temporal_type min_wid, max_wid;
49
50
51         bool flush_finished;
52         temporal_type curr_table;
53         typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
54
55         temporal_type last_flushed_temporal_gb;
56         temporal_type last_temporal_gb;
57
58 int n_slow_flush;
59         int n_patterns;
60
61
62 public:
63         groupby_operator_oop(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {
64                 flush_finished = true;
65
66                 min_wid = 0;
67                 max_wid = 0;
68 n_slow_flush = 0;
69                 n_patterns = func.n_groupby_patterns();
70         }
71
72         ~groupby_operator_oop() {
73                 hash_table<group*, aggregate*, hasher_func, equal_func>* table;
74                 // delete all the elements in the group_tables list;
75                 while (!group_tables.empty()) {
76                         table = group_tables.back();
77                         group_tables.pop_back();
78                         table->clear();
79 //fprintf(stderr,"Deleting group table (c) at %lx\n",(gs_uint64_t)(table));
80                         delete (table);
81                 }
82
83         }
84
85         int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
86
87
88                 // Push out completed groups
89                 if(!flush_finished) partial_flush(result);
90
91                 // create buffer on the stack to store key object
92                 char buffer[sizeof(group)];
93
94                 // extract the key information from the tuple and
95                 // copy it into buffer
96                 group* grp = func.create_group(tup, buffer);
97
98
99                 if (!grp) {
100 //printf("grp==NULL recieved ");
101                         if (func.temp_status_received()) {
102 //printf("temp status record ");
103                                 last_flushed_temporal_gb = func.get_last_flushed_gb ();
104                                 last_temporal_gb = func.get_last_gb ();
105                         }
106 //printf("\n");
107
108 //fprintf(stderr,"min_wid=%d, max_wid=%d, last_temporal_gb=%d, last_flushed_temporal_gb=%d, flush_finished=%d\n",min_wid, max_wid, last_temporal_gb, last_flushed_temporal_gb, flush_finished);
109
110                         /* no data has arrived, and so we ignore the temp tuples -- Jin */
111                         if (group_tables.size()>0) {
112
113                                 gs_int64_t index;
114                                 if(last_flushed_temporal_gb >= min_wid){
115                                         index = last_flushed_temporal_gb - min_wid;
116                                 }else{
117                                         index = -(min_wid - last_flushed_temporal_gb);  // unsigned arithmetic
118                                 }
119
120                                 if (func.flush_needed() && index>=0) {
121 #ifdef NDEBUG
122 //fprintf(stderr, "flush needed: last_flushed_gb  %u , min_wid %u \n", last_flushed_temporal_gb, min_wid);
123 #endif
124                                         // Init flush on first temp tuple -- Jin
125                                         if ( !flush_finished) {
126 #ifdef NDEBUG
127 //fprintf(stderr, "last_flushed_gb is %u, min_wid is %u \n", last_flushed_temporal_gb, min_wid);
128 #endif
129                                                 flush_old(result);
130                                         }
131                                         if (last_temporal_gb > min_wid && group_tables.size()>0) {
132                                                 flush_finished = false;
133                                         }
134
135                                 // we start to flush from the head of the group tables -- Jin
136                                         if(group_tables.size()>0){
137                                                 flush_pos = group_tables[0]->begin();
138                                         }
139
140 #ifdef NDEBUG
141 //fprintf(stderr, "after flush old \n");
142 #endif
143                                 }
144                         }
145
146                         host_tuple temp_tup;
147                         if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {
148                                 temp_tup.channel = output_channel;
149                                 result.push_back(temp_tup);
150                         }
151
152                         tup.free_tuple();
153                         return 0;
154                 }
155
156 //fprintf (stderr, "after create group grp=%lx, curr_table = %d\n",(gs_uint64_t)grp, grp->get_curr_gb());
157
158                 /*  This is a regular tuple -- Jin */
159                 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
160                 /*  first, decide which hash table we need to work at  */
161                 curr_table = grp->get_curr_gb();
162                 if (max_wid == 0 && min_wid == 0) {
163                         group_tables.push_back((new hash_table<group*, aggregate*, hasher_func, equal_func>()));
164 //fprintf(stderr,"Added (1) hash tbl at %lx, pos=%d\n",(gs_uint64_t)(group_tables.back()),group_tables.size());
165                         max_wid = min_wid = curr_table;
166                 }
167                 if (curr_table < min_wid) {
168                         for (temporal_type i = curr_table; i < min_wid; i++){
169                                 group_tables.insert(group_tables.begin(), new hash_table<group*, aggregate*, hasher_func, equal_func>());
170 //fprintf(stderr,"Added (2) hash tbl at %lx, pos=%d\n",(gs_uint64_t)(group_tables.back()),group_tables.size());
171                         }
172                         min_wid = curr_table;
173                 }
174                 if (curr_table > max_wid) {
175                         hash_table<group*, aggregate*, hasher_func, equal_func>* pt;
176                         for (temporal_type i = max_wid; i < curr_table; i++) {
177                                 pt =new hash_table<group*, aggregate*, hasher_func, equal_func>();
178                                 group_tables.push_back(pt);
179 //fprintf(stderr,"Added (3) hash tbl at %lx, pos=%d\n",(gs_uint64_t)(group_tables.back()),group_tables.size());
180                         }
181
182                         max_wid = curr_table;
183                 }
184                 gs_int64_t index = curr_table - min_wid;
185
186                 if ((iter = group_tables[index]->find(grp)) != group_tables[index]->end()) {
187                         aggregate* old_aggr = (*iter).second;
188                         func.update_aggregate(tup, grp, old_aggr);
189                 }else{
190                         /* We only flush when a temp tuple is received, so we only check on temp tuple  -- Jin */
191                         // create a copy of the group on the heap
192                         if(n_patterns <= 1){
193
194                                 group* new_grp = new group(grp);        // need a copy constructor for groups
195
196                                 aggregate* aggr = new aggregate();
197
198                         // create an aggregate in preallocated buffer
199                                 aggr = func.create_aggregate(tup, (char*)aggr);
200
201 //                              hash_table<group*, aggregate*, hasher_func, equal_func>* pt;
202                                 group_tables[index]->insert(new_grp, aggr);
203                         }else{
204                                 int p;
205                                 for(p=0;p<n_patterns;++p){
206                                         group* new_grp = new group(grp, func.get_pattern(p));
207                                         aggregate* aggr = new aggregate();
208                                         aggr = func.create_aggregate(tup, (char*)aggr);
209                                         group_tables[index]->insert(new_grp, aggr);
210                                 }
211                         }
212                 }
213                 tup.free_tuple();
214                 return 0;
215         }
216
217         int partial_flush(list<host_tuple>& result) {
218                 host_tuple tup;
219                 /* the hash table we should flush is func->last_flushed_gb_0  -- Jin */
220                 /* should we guarantee that everything before func->last_flushed_gb_0 also flushed ??? */
221
222                 gs_int64_t i;
223
224 //fprintf(stderr, "partial_flush size of group_tables = %u, min_wid is %u, max_wid is %u last_temporal_gb=%d \n", group_tables.size(), min_wid, max_wid, last_temporal_gb);
225         if(group_tables.size()==0){
226                 flush_finished = true;
227 //fprintf(stderr, "out of partial flush early \n");
228                 return 0;
229         }
230
231 //                              emit up to _GB_FLUSH_PER_TABLE_ output tuples.
232                 if (!group_tables[0]->empty()) {
233                         for (i=0; flush_pos!=group_tables[0]->end() && i < _GB_FLUSH_PER_TUPLE_; ++flush_pos, ++i) {
234 n_slow_flush++;
235                                 bool failed = false;
236                                 tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
237                                 if (!failed) {
238                                         tup.channel = output_channel;
239                                         result.push_back(tup);
240                                 }
241 //fprintf(stderr,"partial_flush Deleting at Flush_pos=%lx \n",(gs_uint64_t)((*flush_pos).first));
242                                 delete ((*flush_pos).first);
243                                 delete ((*flush_pos).second);
244                         }
245                 }
246
247 //                      Finalize processing if empty.
248                 if (flush_pos == group_tables[0]->end()) {
249                         /* one window is completely flushed, so recycle the hash table  -- Jin */
250
251                         hash_table<group*, aggregate*, hasher_func, equal_func>* table =  group_tables[0];
252
253 //fprintf(stderr,"partial_flush Delelting group table %lx\n",(gs_uint64_t)(group_tables[0]));
254                         group_tables[0]->clear();
255                         delete (group_tables[0]);
256
257                         group_tables.erase(group_tables.begin());
258
259                         min_wid++;
260
261                         if (last_temporal_gb > min_wid && group_tables.size()>0) {
262                                 flush_pos = group_tables[0]->begin();
263
264                         } else {
265                                 flush_finished = true;
266                         }
267                 }
268 //fprintf(stderr, "out of partial flush \n");
269                 return 0;
270         }
271
272
273         /* Where is this function called ??? */  /* externally */
274         int flush(list<host_tuple>& result) {
275                 host_tuple tup;
276                 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
277                 /* the hash table we should flush is func->last_flushed_gb_0  -- Jin */
278                 /* should we guarantee that everything before func->last_flushed_gb_0 also flushed ??? */
279                 while ( group_tables.size() > 0) {
280                         if (!group_tables[0]->empty()) {
281                                 if (flush_finished)
282                                         flush_pos = group_tables[0]->begin();
283                                 for (; flush_pos != group_tables[0]->end(); ++flush_pos) {
284                                         bool failed = false;
285
286                                         tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
287
288                                         if (!failed) {
289
290                                                 tup.channel = output_channel;
291                                                 result.push_back(tup);
292                                         }
293 //fprintf(stderr,"flush_old Deleting at Flush_pos=%lx \n",(gs_uint64_t)((*flush_pos).first));
294                                         delete ((*flush_pos).first);
295                                         delete ((*flush_pos).second);
296
297                                 }
298                         }
299                         min_wid++;
300
301                 // remove the hashtable from group_tables
302                         hash_table<group*, aggregate*, hasher_func, equal_func>* table = group_tables[0];
303
304                         table->clear();
305 //fprintf(stderr,"flush_old Delelting group table %lx\n",(gs_uint64_t)(group_tables[0]));
306                         delete (table);
307                         group_tables.erase(group_tables.begin());
308
309                         if(group_tables.size()>0){
310                                 flush_pos = group_tables[0]->begin();
311                         }
312                 }
313
314
315
316                 flush_finished = true;
317
318                 return 0;
319         }
320
321         /* flushes every hash table before last_flush_gb, and get ready to flush the next window  -- Jin */
322         int flush_old(list<host_tuple>& result) {
323                 host_tuple tup;
324                 typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
325                 gs_int64_t num, i;
326
327 //fprintf(stderr, "flush_old size of group_tables = %u, min_wid is %u, max_wid is %u last_temporal_gb=%d, num=%d\n", group_tables.size(), min_wid, max_wid, last_temporal_gb, num);
328
329                 num = last_temporal_gb - min_wid;
330
331                 //If the old table isn't empty, flush it now.
332                 for (i = 0; i < num && group_tables.size() > 0; i++) {
333                         if (!group_tables[0]->empty()) {
334                                 for (; flush_pos != group_tables[0]->end(); ++flush_pos) {
335                                         bool failed = false;
336
337                                         tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
338
339                                         if (!failed) {
340
341                                                 tup.channel = output_channel;
342                                                 result.push_back(tup);
343                                         }
344 //fprintf(stderr,"flush_old Deleting at Flush_pos=%lx \n",(gs_uint64_t)((*flush_pos).first));
345                                         delete ((*flush_pos).first);
346                                         delete ((*flush_pos).second);
347
348                                 }
349                         }
350                         min_wid++;
351
352                 // remove the hashtable from group_tables
353                         hash_table<group*, aggregate*, hasher_func, equal_func>* table = group_tables[0];
354
355                         table->clear();
356 //fprintf(stderr,"flush_old Delelting group table %lx\n",(gs_uint64_t)(group_tables[0]));
357                         delete (table);
358                         group_tables.erase(group_tables.begin());
359
360                         if(group_tables.size()>0){
361                                 flush_pos = group_tables[0]->begin();
362                         }
363                 }
364
365                 flush_finished = true;
366
367 //fprintf(stderr, "end of flush_old \n");
368
369                 return 0;
370         }
371
372
373         int set_param_block(int sz, void * value) {
374                 func.set_param_block(sz, value);
375                 return 0;
376         }
377
378         int get_temp_status(host_tuple& result) {
379                 result.channel = output_channel;
380                 return func.create_temp_status_tuple(result, flush_finished);
381         }
382
383         int get_blocked_status () {
384                 return -1;
385         }
386
387         unsigned int get_mem_footprint() {
388                 unsigned int ret;
389                 unsigned int i;
390
391                 for(i=0;i<group_tables.size();++i)
392                         ret += group_tables[i]->get_mem_footprint() ;
393
394                 return ret;
395         }
396 };
397
398 #endif  // GROUPBY_OPERATOR_OOP_H
399