X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=include%2Fhfta%2Fgroupby_operator.h;fp=include%2Fhfta%2Fgroupby_operator.h;h=e831bc0503e8ed89f08b5cc31915afbc9a3494ca;hb=dec9c93423775db0f4783a93145f016d5d780ffd;hp=26645c34d97438addfab334e5f97dae8b440b9fe;hpb=a3f38475d1e9340f916140f4cf70221908cdff72;p=com%2Fgs-lite.git diff --git a/include/hfta/groupby_operator.h b/include/hfta/groupby_operator.h index 26645c3..e831bc0 100644 --- a/include/hfta/groupby_operator.h +++ b/include/hfta/groupby_operator.h @@ -1,4 +1,4 @@ -/* ------------------------------------------------ +/** ------------------------------------------------ Copyright 2014 AT&T Intellectual Property Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -21,7 +21,6 @@ Copyright 2014 AT&T Intellectual Property #include #include "hash_table.h" -#define _GB_FLUSH_PER_TUPLE_ 1 using namespace std; @@ -29,19 +28,13 @@ template group_table[2]; + hash_table group_table; bool flush_finished; - unsigned int curr_table; - typename hash_table::iterator flush_pos; + typename hash_table::iterator flush_pos; int n_patterns; - - - public: groupby_operator(int schema_handle, const char* name) : base_operator(name), func(schema_handle) { flush_finished = true; - curr_table = 0; - flush_pos = group_table[1-curr_table].end(); n_patterns = func.n_groupby_patterns(); } @@ -49,24 +42,17 @@ public: // Push out completed groups - if(!flush_finished) partial_flush(result); - - // create buffer on the stack to store key object - char buffer[sizeof(group)]; - // extract the key information from the tuple and // copy it into buffer - group* grp = func.create_group(tup, buffer); - if (!grp) { -/* -// Ignore temp tuples until we can fix their timestamps. -if (func.temp_status_received()) { - tup.free_tuple(); - return 0; -}*/ + group grp; + if (!func.create_group(tup, (gs_sp_t)&grp)) { + if(func.disordered()){ + fprintf(stderr,"Out of order record in %s\n",op_name); + return 0; + } if (func.flush_needed()){ flush_old(result); - } + } if (func.temp_status_received()) { host_tuple temp_tup; if (!func.create_temp_status_tuple(temp_tup, flush_finished)) { @@ -77,32 +63,35 @@ if (func.temp_status_received()) { tup.free_tuple(); return 0; } + if(func.disordered()){ + fprintf(stderr,"Out of order record in %s\n",op_name); + return 0; + } - typename hash_table::iterator iter; - if ((iter = group_table[curr_table].find(grp)) != group_table[curr_table].end()) { + typename hash_table::iterator iter; + if ((iter = group_table.find(grp)) != group_table.end()) { // Temporal GBvar is part of the group so no flush is needed. - aggregate* old_aggr = (*iter).second; - func.update_aggregate(tup, grp, old_aggr); + func.update_aggregate(tup, grp, (*iter).second); }else{ if (func.flush_needed()) { flush_old(result); } if(n_patterns <= 1){ - // create a copy of the group on the heap - group* new_grp = new group(grp); // need a copy constructor for groups -// aggregate* aggr = (aggregate*)malloc(sizeof(aggregate)); - aggregate* aggr = new aggregate(); - // create an aggregate in preallocated buffer - aggr = func.create_aggregate(tup, (char*)aggr); - - group_table[curr_table].insert(new_grp, aggr); + aggregate aggr; + // create an aggregate in preallocated buffer + func.create_aggregate(tup, (char*)&aggr); + // neeed operator= doing a deep copy + group_table.insert(grp, aggr); }else{ int p; +// TODO this code is wrong, must check if each pattern is in the group table. for(p=0;p& result) { - host_tuple tup; - unsigned int old_table = 1-curr_table; - unsigned int i; - -// emit up to _GB_FLUSH_PER_TABLE_ output tuples. - if (!group_table[old_table].empty()) { - for (i=0; flush_pos != group_table[old_table].end() && i<_GB_FLUSH_PER_TUPLE_; ++flush_pos, ++i) { - bool failed = false; - tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed); - if (!failed) { - tup.channel = output_channel; - result.push_back(tup); - } - delete ((*flush_pos).first); - delete ((*flush_pos).second); -// free((*flush_pos).second); - } - } - -// Finalize processing if empty. - if(flush_pos == group_table[old_table].end()) { - flush_finished = true; - group_table[old_table].clear(); - group_table[old_table].rehash(); - } - return 0; - } int flush(list& result) { host_tuple tup; - typename hash_table::iterator iter; - unsigned int old_table = 1-curr_table; -// If the old table isn't empty, flush it now. - if (!group_table[old_table].empty()) { - for (; flush_pos != group_table[old_table].end(); ++flush_pos) { + flush_pos = group_table.begin(); +// If the table isn't empty, flush it now. + if (!group_table.empty()) { + for (; flush_pos != group_table.end(); ++flush_pos) { bool failed = false; tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed); if (!failed) { @@ -154,30 +114,9 @@ if (func.temp_status_received()) { tup.channel = output_channel; result.push_back(tup); } - delete ((*flush_pos).first); - delete ((*flush_pos).second); // free((*flush_pos).second); } - group_table[old_table].clear(); - group_table[old_table].rehash(); - } - - flush_pos = group_table[curr_table].begin(); -// If the old table isn't empty, flush it now. - if (!group_table[curr_table].empty()) { - for (; flush_pos != group_table[curr_table].end(); ++flush_pos) { - bool failed = false; - tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed); - if (!failed) { - - tup.channel = output_channel; - result.push_back(tup); - } - delete ((*flush_pos).first); - delete ((*flush_pos).second); -// free((*flush_pos).second); - } - group_table[curr_table].clear(); + group_table.clear(); } flush_finished = true; @@ -186,33 +125,10 @@ if (func.temp_status_received()) { } int flush_old(list& result) { - host_tuple tup; - typename hash_table::iterator iter; - unsigned int old_table = 1-curr_table; - -// If the old table isn't empty, flush it now. - if (!group_table[old_table].empty()) { - for (; flush_pos != group_table[old_table].end(); ++flush_pos) { - bool failed = false; - tup = func.create_output_tuple((*flush_pos).first,(*flush_pos).second, failed); - if (!failed) { - - tup.channel = output_channel; - result.push_back(tup); - } - delete ((*flush_pos).first); - delete ((*flush_pos).second); -// free((*flush_pos).second); - } - group_table[old_table].clear(); - group_table[old_table].rehash(); - } - -// swap tables, enable partial flush processing. - flush_pos = group_table[curr_table].begin(); - curr_table = old_table; - flush_finished = false; + flush(result); + group_table.clear(); + group_table.resize(); return 0; } @@ -231,7 +147,7 @@ if (func.temp_status_received()) { } unsigned int get_mem_footprint() { - return group_table[0].get_mem_footprint() + group_table[1].get_mem_footprint(); + return group_table.get_mem_footprint(); } };