1 /* ------------------------------------------------
\r
2 Copyright 2014 AT&T Intellectual Property
\r
3 Licensed under the Apache License, Version 2.0 (the "License");
\r
4 you may not use this file except in compliance with the License.
\r
5 You may obtain a copy of the License at
\r
7 http://www.apache.org/licenses/LICENSE-2.0
\r
9 Unless required by applicable law or agreed to in writing, software
\r
10 distributed under the License is distributed on an "AS IS" BASIS,
\r
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
12 See the License for the specific language governing permissions and
\r
13 limitations under the License.
\r
14 ------------------------------------------- */
\r
16 #ifndef MERGE_OPERATOR_OOP_H
\r
17 #define MERGE_OPERATOR_OOP_H
\r
19 #include "host_tuple.h"
\r
20 #include "base_operator.h"
\r
22 using namespace std;
\r
29 template <class merge_functor>
\r
30 class merge_operator_oop : public base_operator {
\r
32 // input channel on which we want to receive the next tuple
\r
33 // value -1 indicates that we never received a single tuple
\r
34 // and we are not concerned on which channel tuple will arrive
\r
37 // list of tuples from one of the channel waiting to be compared
\r
38 // against tuple from the other channel
\r
39 list<host_tuple> merge_queue;
\r
42 // comparator object used to compare the tuples
\r
45 // soft limit on queue size - we consider operator blocked on its input
\r
46 // whenever we reach this soft limit (not used anymore)
\r
47 size_t soft_queue_size_limit;
\r
51 // memory footprint for the merge queue in bytes
\r
52 unsigned int queue_mem;
\r
54 // appends tuple to the end of the merge_queue
\r
55 // tuple is stack resident, makes it heap resident
\r
56 void append_tuple(host_tuple& tup) {
\r
57 if (!tup.heap_resident) {
\r
58 char* data = (char*)malloc(tup.tuple_size);
\r
59 memcpy(data, tup.data, tup.tuple_size);
\r
61 tup.heap_resident = true;
\r
63 merge_queue.push_back(tup);
\r
65 queue_mem += tup.tuple_size;
\r
69 // purge tuples in the queue, which are from stream channel -- Jin
\r
70 void purge_queue(int channel, list<host_tuple>& result){
\r
71 if (merge_queue.empty())
\r
73 host_tuple top_tuple = merge_queue.front();
\r
74 // remove all the tuple smaller than arrived tuple
\r
75 while(func.compare_with_temp_status(top_tuple, 1-channel) <= 0) {
\r
76 merge_queue.pop_front();
\r
78 queue_mem -= top_tuple.tuple_size;
\r
79 if(merge_qsize<0) abort();
\r
80 func.update_temp_status(top_tuple,channel);
\r
81 func.xform_tuple(top_tuple);
\r
82 top_tuple.channel = output_channel;
\r
83 result.push_back(top_tuple);
\r
85 if (merge_queue.empty())
\r
88 top_tuple = merge_queue.front();
\r
89 func.update_temp_status(top_tuple,channel);
\r
97 merge_operator_oop(int schema_handle1, int schema_handle2, size_t size_limit, const char* name ) : base_operator(name), func(schema_handle1) {
\r
99 soft_queue_size_limit = size_limit;
\r
105 int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
\r
108 if (tup.channel > 1) {
\r
109 fprintf(stderr, "Illegal channel number %d for two-way merge\n", tup.channel);
\r
113 func.get_timestamp(tup);
\r
114 func.compare_with_temp_status(tup.channel);
\r
115 bool is_temp_tuple = func.temp_status_received(tup);
\r
117 if (!is_temp_tuple) {
\r
118 result.push_back(tup);
\r
120 func.update_temp_status(tup);
\r
122 host_tuple temp_tup;
\r
123 if (!func.create_temp_status_tuple(temp_tup)) {
\r
124 temp_tup.channel = output_channel;
\r
125 result.push_back(temp_tup);
\r
127 // clear memory of heap-resident temporal tuples
\r
136 /* where is this called? when using the blocking mode -- Jin */
\r
137 int flush(list<host_tuple>& result) {
\r
139 if (merge_queue.empty())
\r
142 host_tuple top_tuple;
\r
143 list<host_tuple>::iterator iter;
\r
144 for (iter = merge_queue.begin(); iter != merge_queue.end(); iter++) {
\r
146 func.update_stored_temp_status(top_tuple,top_tuple.channel);
\r
147 func.xform_tuple(top_tuple);
\r
148 top_tuple.channel = output_channel;
\r
149 result.push_back(top_tuple);
\r
157 int set_param_block(int sz, void * value) {
\r
162 int get_temp_status(host_tuple& result) {
\r
163 result.channel = output_channel;
\r
164 return func.create_temp_status_tuple(result);
\r
168 int get_blocked_status () {
\r
169 if (merge_qsize> soft_queue_size_limit)
\r
170 return wait_channel;
\r
175 unsigned int get_mem_footprint() {
\r
181 #endif // MERGE_OPERATOR_OOP_H
\r