X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=include%2Fhfta%2Fmerge_operator.h;h=68d2f5f0790b8f7086684969e862591c97b0fbeb;hb=07495effe193ca3f73c3bf0ce417068f9ac9dcdd;hp=662fd32ebcb2964b33ba94cf4a14b119eaeb6663;hpb=93d248304a68de7a8f9daf4aa74f9ee4cd27410c;p=com%2Fgs-lite.git diff --git a/include/hfta/merge_operator.h b/include/hfta/merge_operator.h index 662fd32..68d2f5f 100644 --- a/include/hfta/merge_operator.h +++ b/include/hfta/merge_operator.h @@ -1,247 +1,247 @@ -/* ------------------------------------------------ -Copyright 2014 AT&T Intellectual Property - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - ------------------------------------------- */ - -#ifndef MERGE_OPERATOR_H -#define MERGE_OPERATOR_H - -#include "host_tuple.h" -#include "base_operator.h" -#include -using namespace std; - -#include - - -//int last_tb = 0; - -template -class merge_operator : public base_operator { -private : - // input channel on which we want to receive the next tuple - // value -1 indicates that we never received a single tuple - // and we are not concerned on which channel tuple will arrive - int wait_channel; - - // list of tuples from one of the channel waiting to be compared - // against tuple from the other channel - list merge_queue; - int merge_qsize; - - // comparator object used to compare the tuples - merge_functor func; - - // soft limit on queue size - we consider operator blocked on its input - // whenever we reach this soft limit (not used anymore) - size_t soft_queue_size_limit; - - int dropped_cnt; - - // memory footprint for the merge queue in bytes - unsigned int queue_mem; - - // appends tuple to the end of the merge_queue - // if tuple is stack resident, makes it heap resident - void append_tuple(host_tuple& tup) { - if (!tup.heap_resident) { - char* data = (char*)malloc(tup.tuple_size); - memcpy(data, tup.data, tup.tuple_size); - tup.data = data; - tup.heap_resident = true; - } - merge_queue.push_back(tup); - merge_qsize++; - queue_mem += tup.tuple_size; - } - - void purge_queue(int channel, list& result){ - if (merge_queue.empty()) - return; - host_tuple top_tuple = merge_queue.front(); - // remove all the tuple smaller than arrived tuple - while(func.compare_stored_with_temp_status(top_tuple, 1-channel) <= 0) { - merge_queue.pop_front(); - merge_qsize--; - queue_mem -= top_tuple.tuple_size; - if(merge_qsize<0) abort(); - func.update_stored_temp_status(top_tuple,channel); - func.xform_tuple(top_tuple); - top_tuple.channel = output_channel; - result.push_back(top_tuple); - - if (merge_queue.empty()) - break; - - top_tuple = merge_queue.front(); - func.update_stored_temp_status(top_tuple,channel); - } - } - - - -public: - - merge_operator(int schema_handle1, int schema_handle2, size_t size_limit, const char* name ) : base_operator(name), func(schema_handle1) { - wait_channel = -1; - soft_queue_size_limit = size_limit; - merge_qsize = 0; - dropped_cnt = 0; - queue_mem = 0; - } - - - int accept_tuple(host_tuple& tup, list& result) { - int last_channel; - - if (tup.channel > 1) { - fprintf(stderr, "Illegal channel number %d for two-way merge\n", tup.channel); - return 0; - } - - /* reject "out of order" tuple - we can receive those after forced flush */ - func.get_timestamp(tup); - int res = func.compare_with_temp_status(tup.channel); - bool is_temp_tuple = func.temp_status_received(tup); - -/* -// Ignore temp tuples until we can fix their timestamps. -if(is_temp_tuple){ - tup.free_tuple(); - return 0; -}*/ - if (res < 0){ // out of order tuple - - if (++dropped_cnt % 100000 == 0) { - gslog(LOG_ALERT, "%d tuples dropped by %s merge\n", dropped_cnt,get_name()); - } - //if(func.print_warnings()) - // fprintf(stderr,"Warning: merge %s receives an out-of-order tuple on channel %d.\n", get_name(), tup.channel); - // free tuple memory - tup.free_tuple(); - return 0; - } else { - - if (wait_channel < 0 || wait_channel != tup.channel) { - if (wait_channel < 0) - func.update_temp_status(tup); - - if (!is_temp_tuple){ - if(func.compare_with_temp_status(1-tup.channel) <= 0){ - if (!tup.heap_resident) { - char* data = (char*)malloc(tup.tuple_size); - memcpy(data, tup.data, tup.tuple_size); - tup.data = data; - tup.heap_resident = true; - } - func.xform_tuple(tup); - tup.channel = output_channel; - result.push_back(tup); - }else{ - append_tuple(tup); // put arrived tuple in the queue - } - } -// func.update_temp_status_by_slack(tup, 1-tup.channel); -// purge_queue(tup.channel, result); - - wait_channel = 1-tup.channel; - }else{ - // If possible, clear tuples from the other queue. - func.update_temp_status(tup); - purge_queue(1-tup.channel, result); - - if(func.compare_with_temp_status(1-tup.channel) <= 0) { // other tuples in the queue are larger that arrived tuple - if (!is_temp_tuple) { - if (!tup.heap_resident) { - char* data = (char*)malloc(tup.tuple_size); - memcpy(data, tup.data, tup.tuple_size); - tup.data = data; - tup.heap_resident = true; - } - func.xform_tuple(tup); - tup.channel = output_channel; - result.push_back(tup); - } - } - else { - if (!is_temp_tuple) - append_tuple(tup); // put arrived tuple in the queue - wait_channel = 1 - tup.channel; // now we want the tuple from other channel - } - } - - } - - // temp status tuples emited by merge don't serve any other purpose - // other than tracing the flow of tuples - if (is_temp_tuple) { - host_tuple temp_tup; - if (!func.create_temp_status_tuple(temp_tup)) { - temp_tup.channel = output_channel; - result.push_back(temp_tup); - } - // clear memory of heap-resident temporal tuples - tup.free_tuple(); - } - - if (!merge_qsize) - wait_channel = -1; - - return 0; - } - - int flush(list& result) { - - if (merge_queue.empty()) - return 0; - - host_tuple top_tuple; - list::iterator iter; - for (iter = merge_queue.begin(); iter != merge_queue.end(); iter++) { - top_tuple = *iter; - func.update_stored_temp_status(top_tuple,top_tuple.channel); - func.xform_tuple(top_tuple); - top_tuple.channel = output_channel; - result.push_back(top_tuple); - } - - queue_mem = 0; - - return 0; - } - - int set_param_block(int sz, void * value) { - return 0; - } - - - int get_temp_status(host_tuple& result) { - result.channel = output_channel; - return func.create_temp_status_tuple(result); - } - - - int get_blocked_status () { - if (merge_qsize> soft_queue_size_limit) - return wait_channel; - else - return -1; - } - - unsigned int get_mem_footprint() { - return queue_mem; - } - -}; - -#endif // MERGE_OPERATOR_H +/* ------------------------------------------------ +Copyright 2014 AT&T Intellectual Property + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ------------------------------------------- */ + +#ifndef MERGE_OPERATOR_H +#define MERGE_OPERATOR_H + +#include "host_tuple.h" +#include "base_operator.h" +#include +using namespace std; + +#include + + +//int last_tb = 0; + +template +class merge_operator : public base_operator { +private : + // input channel on which we want to receive the next tuple + // value -1 indicates that we never received a single tuple + // and we are not concerned on which channel tuple will arrive + int wait_channel; + + // list of tuples from one of the channel waiting to be compared + // against tuple from the other channel + list merge_queue; + int merge_qsize; + + // comparator object used to compare the tuples + merge_functor func; + + // soft limit on queue size - we consider operator blocked on its input + // whenever we reach this soft limit (not used anymore) + size_t soft_queue_size_limit; + + int dropped_cnt; + + // memory footprint for the merge queue in bytes + unsigned int queue_mem; + + // appends tuple to the end of the merge_queue + // if tuple is stack resident, makes it heap resident + void append_tuple(host_tuple& tup) { + if (!tup.heap_resident) { + char* data = (char*)malloc(tup.tuple_size); + memcpy(data, tup.data, tup.tuple_size); + tup.data = data; + tup.heap_resident = true; + } + merge_queue.push_back(tup); + merge_qsize++; + queue_mem += tup.tuple_size; + } + + void purge_queue(int channel, list& result){ + if (merge_queue.empty()) + return; + host_tuple top_tuple = merge_queue.front(); + // remove all the tuple smaller than arrived tuple + while(func.compare_stored_with_temp_status(top_tuple, 1-channel) <= 0) { + merge_queue.pop_front(); + merge_qsize--; + queue_mem -= top_tuple.tuple_size; + if(merge_qsize<0) abort(); + func.update_stored_temp_status(top_tuple,channel); + func.xform_tuple(top_tuple); + top_tuple.channel = output_channel; + result.push_back(top_tuple); + + if (merge_queue.empty()) + break; + + top_tuple = merge_queue.front(); + func.update_stored_temp_status(top_tuple,channel); + } + } + + + +public: + + merge_operator(int schema_handle1, int schema_handle2, size_t size_limit, const char* name ) : base_operator(name), func(schema_handle1) { + wait_channel = -1; + soft_queue_size_limit = size_limit; + merge_qsize = 0; + dropped_cnt = 0; + queue_mem = 0; + } + + + int accept_tuple(host_tuple& tup, list& result) { + int last_channel; + + if (tup.channel > 1) { + fprintf(stderr, "Illegal channel number %d for two-way merge\n", tup.channel); + return 0; + } + + /* reject "out of order" tuple - we can receive those after forced flush */ + func.get_timestamp(tup); + int res = func.compare_with_temp_status(tup.channel); + bool is_temp_tuple = func.temp_status_received(tup); + +/* +// Ignore temp tuples until we can fix their timestamps. +if(is_temp_tuple){ + tup.free_tuple(); + return 0; +}*/ + if (res < 0){ // out of order tuple + + if (++dropped_cnt % 100000 == 0) { + gslog(LOG_ALERT, "%d tuples dropped by %s merge\n", dropped_cnt,get_name()); + } + //if(func.print_warnings()) + // fprintf(stderr,"Warning: merge %s receives an out-of-order tuple on channel %d.\n", get_name(), tup.channel); + // free tuple memory + tup.free_tuple(); + return 0; + } else { + + if (wait_channel < 0 || wait_channel != tup.channel) { + if (wait_channel < 0) + func.update_temp_status(tup); + + if (!is_temp_tuple){ + if(func.compare_with_temp_status(1-tup.channel) <= 0){ + if (!tup.heap_resident) { + char* data = (char*)malloc(tup.tuple_size); + memcpy(data, tup.data, tup.tuple_size); + tup.data = data; + tup.heap_resident = true; + } + func.xform_tuple(tup); + tup.channel = output_channel; + result.push_back(tup); + }else{ + append_tuple(tup); // put arrived tuple in the queue + } + } +// func.update_temp_status_by_slack(tup, 1-tup.channel); +// purge_queue(tup.channel, result); + + wait_channel = 1-tup.channel; + }else{ + // If possible, clear tuples from the other queue. + func.update_temp_status(tup); + purge_queue(1-tup.channel, result); + + if(func.compare_with_temp_status(1-tup.channel) <= 0) { // other tuples in the queue are larger that arrived tuple + if (!is_temp_tuple) { + if (!tup.heap_resident) { + char* data = (char*)malloc(tup.tuple_size); + memcpy(data, tup.data, tup.tuple_size); + tup.data = data; + tup.heap_resident = true; + } + func.xform_tuple(tup); + tup.channel = output_channel; + result.push_back(tup); + } + } + else { + if (!is_temp_tuple) + append_tuple(tup); // put arrived tuple in the queue + wait_channel = 1 - tup.channel; // now we want the tuple from other channel + } + } + + } + + // temp status tuples emited by merge don't serve any other purpose + // other than tracing the flow of tuples + if (is_temp_tuple) { + host_tuple temp_tup; + if (!func.create_temp_status_tuple(temp_tup)) { + temp_tup.channel = output_channel; + result.push_back(temp_tup); + } + // clear memory of heap-resident temporal tuples + tup.free_tuple(); + } + + if (!merge_qsize) + wait_channel = -1; + + return 0; + } + + int flush(list& result) { + + if (merge_queue.empty()) + return 0; + + host_tuple top_tuple; + list::iterator iter; + for (iter = merge_queue.begin(); iter != merge_queue.end(); iter++) { + top_tuple = *iter; + func.update_stored_temp_status(top_tuple,top_tuple.channel); + func.xform_tuple(top_tuple); + top_tuple.channel = output_channel; + result.push_back(top_tuple); + } + + queue_mem = 0; + + return 0; + } + + int set_param_block(int sz, void * value) { + return 0; + } + + + int get_temp_status(host_tuple& result) { + result.channel = output_channel; + return func.create_temp_status_tuple(result); + } + + + int get_blocked_status () { + if (merge_qsize> soft_queue_size_limit) + return wait_channel; + else + return -1; + } + + unsigned int get_mem_footprint() { + return queue_mem; + } + +}; + +#endif // MERGE_OPERATOR_H