-/* ------------------------------------------------\r
-Copyright 2014 AT&T Intellectual Property\r
- Licensed under the Apache License, Version 2.0 (the "License");\r
- you may not use this file except in compliance with the License.\r
- You may obtain a copy of the License at\r
-\r
- http://www.apache.org/licenses/LICENSE-2.0\r
-\r
- Unless required by applicable law or agreed to in writing, software\r
- distributed under the License is distributed on an "AS IS" BASIS,\r
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- See the License for the specific language governing permissions and\r
- limitations under the License.\r
- ------------------------------------------- */\r
-\r
-#ifndef GROUPBY_OPERATOR_H\r
-#define GROUPBY_OPERATOR_H\r
-\r
-#include "host_tuple.h"\r
-#include "base_operator.h"\r
-#include <list>\r
-#include "hash_table.h"\r
-\r
-#define _GB_FLUSH_PER_TUPLE_ 1\r
-\r
-using namespace std;\r
-\r
-template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func>\r
-class groupby_operator : public base_operator {\r
-private :\r
- groupby_func func;\r
- hash_table<group*, aggregate*, hasher_func, equal_func> group_table[2];\r
- bool flush_finished;\r
- unsigned int curr_table;\r
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;\r
- int n_patterns;\r
-\r
-\r
-\r
-public:\r
- groupby_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {\r
- flush_finished = true;\r
- curr_table = 0;\r
- flush_pos = group_table[1-curr_table].end();\r
- n_patterns = func.n_groupby_patterns();\r
- }\r
-\r
- int accept_tuple(host_tuple& tup, list<host_tuple>& result) {\r
-\r
-// Push out completed groups\r
-\r
- if(!flush_finished) partial_flush(result);\r
-\r
- // create buffer on the stack to store key object\r
- char buffer[sizeof(group)];\r
-\r
- // extract the key information from the tuple and\r
- // copy it into buffer\r
- group* grp = func.create_group(tup, buffer);\r
- if (!grp) {\r
-/*\r
-// Ignore temp tuples until we can fix their timestamps.\r
-if (func.temp_status_received()) {\r
- tup.free_tuple();\r
- return 0;\r
-}*/\r
- if (func.flush_needed()){\r
- flush_old(result);\r
- }\r
- if (func.temp_status_received()) {\r
- host_tuple temp_tup;\r
- if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {\r
- temp_tup.channel = output_channel;\r
- result.push_back(temp_tup);\r
- }\r
- }\r
- tup.free_tuple();\r
- return 0;\r
- }\r
-\r
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;\r
- if ((iter = group_table[curr_table].find(grp)) != group_table[curr_table].end()) {\r
-// Temporal GBvar is part of the group so no flush is needed.\r
- aggregate* old_aggr = (*iter).second;\r
- func.update_aggregate(tup, grp, old_aggr);\r
- }else{\r
- if (func.flush_needed()) {\r
- flush_old(result);\r
- }\r
- if(n_patterns <= 1){\r
- // create a copy of the group on the heap\r
- group* new_grp = new group(grp); // need a copy constructor for groups\r
-// aggregate* aggr = (aggregate*)malloc(sizeof(aggregate));\r
- aggregate* aggr = new aggregate();\r
- // create an aggregate in preallocated buffer\r
- aggr = func.create_aggregate(tup, (char*)aggr);\r
-\r
- group_table[curr_table].insert(new_grp, aggr);\r
- }else{\r
- int p;\r
- for(p=0;p<n_patterns;++p){\r
- group* new_grp = new group(grp, func.get_pattern(p));\r
- aggregate* aggr = new aggregate();\r
- aggr = func.create_aggregate(tup, (char*)aggr);\r
- group_table[curr_table].insert(new_grp, aggr);\r
- }\r
- }\r
- }\r
- tup.free_tuple();\r
- return 0;\r
- }\r
-\r
- int partial_flush(list<host_tuple>& result) {\r
- host_tuple tup;\r
- unsigned int old_table = 1-curr_table;\r
- unsigned int i;\r
-\r
-// emit up to _GB_FLUSH_PER_TABLE_ output tuples.\r
- if (!group_table[old_table].empty()) {\r
- for (i=0; flush_pos != group_table[old_table].end() && i<_GB_FLUSH_PER_TUPLE_; ++flush_pos, ++i) {\r
- bool failed = false;\r
- tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);\r
- if (!failed) {\r
- tup.channel = output_channel;\r
- result.push_back(tup);\r
- }\r
- delete ((*flush_pos).first);\r
- delete ((*flush_pos).second);\r
-// free((*flush_pos).second);\r
- }\r
- }\r
-\r
-// Finalize processing if empty.\r
- if(flush_pos == group_table[old_table].end()) {\r
- flush_finished = true;\r
- group_table[old_table].clear();\r
- group_table[old_table].rehash();\r
- }\r
- return 0;\r
- }\r
-\r
- int flush(list<host_tuple>& result) {\r
- host_tuple tup;\r
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;\r
- unsigned int old_table = 1-curr_table;\r
-\r
-// If the old table isn't empty, flush it now.\r
- if (!group_table[old_table].empty()) {\r
- for (; flush_pos != group_table[old_table].end(); ++flush_pos) {\r
- bool failed = false;\r
- tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);\r
- if (!failed) {\r
-\r
- tup.channel = output_channel;\r
- result.push_back(tup);\r
- }\r
- delete ((*flush_pos).first);\r
- delete ((*flush_pos).second);\r
-// free((*flush_pos).second);\r
- }\r
- group_table[old_table].clear();\r
- group_table[old_table].rehash();\r
- }\r
-\r
- flush_pos = group_table[curr_table].begin();\r
-// If the old table isn't empty, flush it now.\r
- if (!group_table[curr_table].empty()) {\r
- for (; flush_pos != group_table[curr_table].end(); ++flush_pos) {\r
- bool failed = false;\r
- tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);\r
- if (!failed) {\r
-\r
- tup.channel = output_channel;\r
- result.push_back(tup);\r
- }\r
- delete ((*flush_pos).first);\r
- delete ((*flush_pos).second);\r
-// free((*flush_pos).second);\r
- }\r
- group_table[curr_table].clear();\r
- }\r
-\r
- flush_finished = true;\r
-\r
- return 0;\r
- }\r
-\r
- int flush_old(list<host_tuple>& result) {\r
- host_tuple tup;\r
- typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;\r
- unsigned int old_table = 1-curr_table;\r
-\r
-// If the old table isn't empty, flush it now.\r
- if (!group_table[old_table].empty()) {\r
- for (; flush_pos != group_table[old_table].end(); ++flush_pos) {\r
- bool failed = false;\r
- tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);\r
- if (!failed) {\r
-\r
- tup.channel = output_channel;\r
- result.push_back(tup);\r
- }\r
- delete ((*flush_pos).first);\r
- delete ((*flush_pos).second);\r
-// free((*flush_pos).second);\r
- }\r
- group_table[old_table].clear();\r
- group_table[old_table].rehash();\r
- }\r
-\r
-// swap tables, enable partial flush processing.\r
- flush_pos = group_table[curr_table].begin();\r
- curr_table = old_table;\r
- flush_finished = false;\r
-\r
- return 0;\r
- }\r
-\r
- int set_param_block(int sz, void * value) {\r
- func.set_param_block(sz, value);\r
- return 0;\r
- }\r
-\r
- int get_temp_status(host_tuple& result) {\r
- result.channel = output_channel;\r
- return func.create_temp_status_tuple(result, flush_finished);\r
- }\r
-\r
- int get_blocked_status () {\r
- return -1;\r
- }\r
-\r
- unsigned int get_mem_footprint() {\r
- return group_table[0].get_mem_footprint() + group_table[1].get_mem_footprint();\r
- }\r
-};\r
-\r
-#endif // GROUPBY_OPERATOR_H\r
+/** ------------------------------------------------
+Copyright 2014 AT&T Intellectual Property
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ------------------------------------------- */
+
+#ifndef GROUPBY_OPERATOR_H
+#define GROUPBY_OPERATOR_H
+
+#include "host_tuple.h"
+#include "base_operator.h"
+#include <list>
+#include "hash_table.h"
+
+
+using namespace std;
+
+template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func>
+class groupby_operator : public base_operator {
+private :
+ groupby_func func;
+ hash_table<group, aggregate, hasher_func, equal_func> group_table;
+ bool flush_finished;
+ typename hash_table<group, aggregate, hasher_func, equal_func>::iterator flush_pos;
+ int n_patterns;
+public:
+ groupby_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {
+ flush_finished = true;
+ n_patterns = func.n_groupby_patterns();
+ }
+
+ int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
+
+// Push out completed groups
+
+ // extract the key information from the tuple and
+ // copy it into buffer
+ group grp;
+ if (!func.create_group(tup, (gs_sp_t)&grp)) {
+ if(func.disordered()){
+ fprintf(stderr,"Out of order record in %s\n",op_name);
+ return 0;
+ }
+ if (func.flush_needed()){
+ flush_old(result);
+ }
+ if (func.temp_status_received()) {
+ host_tuple temp_tup;
+ if (!func.create_temp_status_tuple(temp_tup, flush_finished)) {
+ temp_tup.channel = output_channel;
+ result.push_back(temp_tup);
+ }
+ }
+ tup.free_tuple();
+ return 0;
+ }
+ if(func.disordered()){
+ fprintf(stderr,"Out of order record in %s\n",op_name);
+ return 0;
+ }
+
+ typename hash_table<group, aggregate, hasher_func, equal_func>::iterator iter;
+ if ((iter = group_table.find(grp)) != group_table.end()) {
+// Temporal GBvar is part of the group so no flush is needed.
+ func.update_aggregate(tup, grp, (*iter).second);
+ }else{
+ if (func.flush_needed()) {
+ flush_old(result);
+ }
+ if(n_patterns <= 1){
+ aggregate aggr;
+ // create an aggregate in preallocated buffer
+ func.create_aggregate(tup, (char*)&aggr);
+ // neeed operator= doing a deep copy
+ group_table.insert(grp, aggr);
+ }else{
+ int p;
+// TODO this code is wrong, must check if each pattern is in the group table.
+ for(p=0;p<n_patterns;++p){
+ // need shallow copy constructor for groups
+ group new_grp(grp, func.get_pattern(p));
+ aggregate aggr;
+ func.create_aggregate(tup, (char*)&aggr);
+ // neeed operator= doing a deep copy
+ group_table.insert(new_grp, aggr);
+ }
+ }
+ }
+ tup.free_tuple();
+ return 0;
+ }
+
+
+ int flush(list<host_tuple>& result) {
+ host_tuple tup;
+
+ flush_pos = group_table.begin();
+// If the table isn't empty, flush it now.
+ if (!group_table.empty()) {
+ for (; flush_pos != group_table.end(); ++flush_pos) {
+ bool failed = false;
+ tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
+ if (!failed) {
+
+ tup.channel = output_channel;
+ result.push_back(tup);
+ }
+// free((*flush_pos).second);
+ }
+ group_table.clear();
+ }
+
+ flush_finished = true;
+
+ return 0;
+ }
+
+ int flush_old(list<host_tuple>& result) {
+
+ flush(result);
+ group_table.clear();
+ group_table.resize();
+ return 0;
+ }
+
+ int set_param_block(int sz, void * value) {
+ func.set_param_block(sz, value);
+ return 0;
+ }
+
+ int get_temp_status(host_tuple& result) {
+ result.channel = output_channel;
+ return func.create_temp_status_tuple(result, flush_finished);
+ }
+
+ int get_blocked_status () {
+ return -1;
+ }
+
+ unsigned int get_mem_footprint() {
+ return group_table.get_mem_footprint();
+ }
+};
+
+#endif // GROUPBY_OPERATOR_H