Fixed newline characters throughout the code
[com/gs-lite.git] / include / hfta / running_gb_operator.h
index 0762c63..5bd7388 100644 (file)
-/* ------------------------------------------------\r
-Copyright 2014 AT&T Intellectual Property\r
-   Licensed under the Apache License, Version 2.0 (the "License");\r
-   you may not use this file except in compliance with the License.\r
-   You may obtain a copy of the License at\r
-\r
-     http://www.apache.org/licenses/LICENSE-2.0\r
-\r
-   Unless required by applicable law or agreed to in writing, software\r
-   distributed under the License is distributed on an "AS IS" BASIS,\r
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
-   See the License for the specific language governing permissions and\r
-   limitations under the License.\r
- ------------------------------------------- */\r
-\r
-#ifndef RWGROUPBY_OPERATOR_H\r
-#define RWGROUPBY_OPERATOR_H\r
-\r
-#include "host_tuple.h"\r
-#include "base_operator.h"\r
-#include <list>\r
-#include "hash_table.h"\r
-\r
-#define _GB_FLUSH_PER_TUPLE_ 1\r
-\r
-using namespace std;\r
-\r
-template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func>\r
-class running_agg_operator : public base_operator {\r
-private :\r
-       groupby_func func;\r
-       hash_table<group*, aggregate*, hasher_func, equal_func> group_table;\r
-       typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;\r
-\r
-\r
-\r
-public:\r
-       running_agg_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {\r
-               flush_pos = group_table.end();\r
-       }\r
-\r
-       virtual int accept_tuple(host_tuple& tup, list<host_tuple>& result) {\r
-\r
-//                     Push out completed groups\r
-\r
-               // create buffer on the stack to store key object\r
-               char buffer[sizeof(group)];\r
-\r
-               // extract the key information from the tuple and\r
-               // copy it into buffer\r
-               group* grp = func.create_group(tup, buffer);\r
-/*//                   Ignore temp tuples until we can fix their timestamps.\r
-if (func.temp_status_received()) {\r
- tup.free_tuple();\r
- return 0;\r
-}\r
-*/\r
-\r
-               if (!grp) {\r
-                       if (func.flush_needed()){\r
-                               flush(result);\r
-                       }\r
-                       if (func.temp_status_received()) {\r
-                               host_tuple temp_tup;\r
-                               if (!func.create_temp_status_tuple(temp_tup, true)) {\r
-                                       temp_tup.channel = output_channel;\r
-                                       result.push_back(temp_tup);\r
-                               }\r
-                       }\r
-                       tup.free_tuple();\r
-                       return 0;\r
-               }\r
-\r
-               if (func.flush_needed()) {\r
-                       flush(result);\r
-               }\r
-               typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;\r
-               if ((iter = group_table.find(grp)) != group_table.end()) {\r
-                       aggregate* old_aggr = (*iter).second;\r
-                       func.update_aggregate(tup, grp, old_aggr);\r
-               }else{\r
-                       // create a copy of the group on the heap\r
-                       group* new_grp = new group(grp);        // need a copy constructor for groups\r
-//                     aggregate* aggr = (aggregate*)malloc(sizeof(aggregate));\r
-                       aggregate* aggr = new aggregate();\r
-                       // create an aggregate in preallocated buffer\r
-                       aggr = func.create_aggregate(tup, (char*)aggr);\r
-\r
-                       group_table.insert(new_grp, aggr);\r
-               }\r
-               tup.free_tuple();\r
-               return 0;\r
-       }\r
-\r
-       virtual int flush(list<host_tuple>& result) {\r
-               host_tuple tup;\r
-               typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;\r
-//             If the old table isn't empty, flush it now.\r
-               for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) {\r
-                       bool failed = false;\r
-                       tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);\r
-                       if (!failed) {\r
-                               tup.channel = output_channel;\r
-                               result.push_back(tup);\r
-                       }\r
-                       if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){\r
-                           group* g = (*flush_pos).first;\r
-                       aggregate* a = (*flush_pos).second;\r
-                               ++flush_pos;\r
-                               group_table.erase(g);\r
-                               delete (g);\r
-                               delete (a);\r
-                       }else{\r
-                               func.reinit_aggregates((*flush_pos).first, (*flush_pos).second);\r
-                               ++flush_pos;\r
-                       }\r
-               }\r
-\r
-               return 0;\r
-       }\r
-\r
-       virtual int set_param_block(int sz, void * value) {\r
-               func.set_param_block(sz, value);\r
-               return 0;\r
-       }\r
-\r
-       virtual int get_temp_status(host_tuple& result) {\r
-               result.channel = output_channel;\r
-               return func.create_temp_status_tuple(result, true);\r
-       }\r
-\r
-       virtual int get_blocked_status () {\r
-               return -1;\r
-       }\r
-\r
-       unsigned int get_mem_footprint() {\r
-               return group_table.get_mem_footprint();\r
-       }\r
-};\r
-\r
-#endif // GROUPBY_OPERATOR_H\r
+/* ------------------------------------------------
+Copyright 2014 AT&T Intellectual Property
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ ------------------------------------------- */
+
+#ifndef RWGROUPBY_OPERATOR_H
+#define RWGROUPBY_OPERATOR_H
+
+#include "host_tuple.h"
+#include "base_operator.h"
+#include <list>
+#include "hash_table.h"
+
+#define _GB_FLUSH_PER_TUPLE_ 1
+
+using namespace std;
+
+template <class groupby_func, class group, class aggregate, class hasher_func, class equal_func>
+class running_agg_operator : public base_operator {
+private :
+       groupby_func func;
+       hash_table<group*, aggregate*, hasher_func, equal_func> group_table;
+       typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator flush_pos;
+
+
+
+public:
+       running_agg_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) {
+               flush_pos = group_table.end();
+       }
+
+       virtual int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
+
+//                     Push out completed groups
+
+               // create buffer on the stack to store key object
+               char buffer[sizeof(group)];
+
+               // extract the key information from the tuple and
+               // copy it into buffer
+               group* grp = func.create_group(tup, buffer);
+/*//                   Ignore temp tuples until we can fix their timestamps.
+if (func.temp_status_received()) {
+ tup.free_tuple();
+ return 0;
+}
+*/
+
+               if (!grp) {
+                       if (func.flush_needed()){
+                               flush(result);
+                       }
+                       if (func.temp_status_received()) {
+                               host_tuple temp_tup;
+                               if (!func.create_temp_status_tuple(temp_tup, true)) {
+                                       temp_tup.channel = output_channel;
+                                       result.push_back(temp_tup);
+                               }
+                       }
+                       tup.free_tuple();
+                       return 0;
+               }
+
+               if (func.flush_needed()) {
+                       flush(result);
+               }
+               typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
+               if ((iter = group_table.find(grp)) != group_table.end()) {
+                       aggregate* old_aggr = (*iter).second;
+                       func.update_aggregate(tup, grp, old_aggr);
+               }else{
+                       // create a copy of the group on the heap
+                       group* new_grp = new group(grp);        // need a copy constructor for groups
+//                     aggregate* aggr = (aggregate*)malloc(sizeof(aggregate));
+                       aggregate* aggr = new aggregate();
+                       // create an aggregate in preallocated buffer
+                       aggr = func.create_aggregate(tup, (char*)aggr);
+
+                       group_table.insert(new_grp, aggr);
+               }
+               tup.free_tuple();
+               return 0;
+       }
+
+       virtual int flush(list<host_tuple>& result) {
+               host_tuple tup;
+               typename hash_table<group*, aggregate*, hasher_func, equal_func>::iterator iter;
+//             If the old table isn't empty, flush it now.
+               for (flush_pos = group_table.begin(); flush_pos != group_table.end(); ) {
+                       bool failed = false;
+                       tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed);
+                       if (!failed) {
+                               tup.channel = output_channel;
+                               result.push_back(tup);
+                       }
+                       if(func.cleaning_when((*flush_pos).first,(*flush_pos).second)){
+                           group* g = (*flush_pos).first;
+                       aggregate* a = (*flush_pos).second;
+                               ++flush_pos;
+                               group_table.erase(g);
+                               delete (g);
+                               delete (a);
+                       }else{
+                               func.reinit_aggregates((*flush_pos).first, (*flush_pos).second);
+                               ++flush_pos;
+                       }
+               }
+
+               return 0;
+       }
+
+       virtual int set_param_block(int sz, void * value) {
+               func.set_param_block(sz, value);
+               return 0;
+       }
+
+       virtual int get_temp_status(host_tuple& result) {
+               result.channel = output_channel;
+               return func.create_temp_status_tuple(result, true);
+       }
+
+       virtual int get_blocked_status () {
+               return -1;
+       }
+
+       unsigned int get_mem_footprint() {
+               return group_table.get_mem_footprint();
+       }
+};
+
+#endif // GROUPBY_OPERATOR_H