1 /* ------------------------------------------------
\r
2 Copyright 2014 AT&T Intellectual Property
\r
3 Licensed under the Apache License, Version 2.0 (the "License");
\r
4 you may not use this file except in compliance with the License.
\r
5 You may obtain a copy of the License at
\r
7 http://www.apache.org/licenses/LICENSE-2.0
\r
9 Unless required by applicable law or agreed to in writing, software
\r
10 distributed under the License is distributed on an "AS IS" BASIS,
\r
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
12 See the License for the specific language governing permissions and
\r
13 limitations under the License.
\r
14 ------------------------------------------- */
\r
16 #ifndef MERGE_OPERATOR_H
\r
17 #define MERGE_OPERATOR_H
\r
19 #include "host_tuple.h"
\r
20 #include "base_operator.h"
\r
22 using namespace std;
\r
29 template <class merge_functor>
\r
30 class merge_operator : public base_operator {
\r
32 // input channel on which we want to receive the next tuple
\r
33 // value -1 indicates that we never received a single tuple
\r
34 // and we are not concerned on which channel tuple will arrive
\r
37 // list of tuples from one of the channel waiting to be compared
\r
38 // against tuple from the other channel
\r
39 list<host_tuple> merge_queue;
\r
42 // comparator object used to compare the tuples
\r
45 // soft limit on queue size - we consider operator blocked on its input
\r
46 // whenever we reach this soft limit (not used anymore)
\r
47 size_t soft_queue_size_limit;
\r
51 // memory footprint for the merge queue in bytes
\r
52 unsigned int queue_mem;
\r
54 // appends tuple to the end of the merge_queue
\r
55 // if tuple is stack resident, makes it heap resident
\r
56 void append_tuple(host_tuple& tup) {
\r
57 if (!tup.heap_resident) {
\r
58 char* data = (char*)malloc(tup.tuple_size);
\r
59 memcpy(data, tup.data, tup.tuple_size);
\r
61 tup.heap_resident = true;
\r
63 merge_queue.push_back(tup);
\r
65 queue_mem += tup.tuple_size;
\r
68 void purge_queue(int channel, list<host_tuple>& result){
\r
69 if (merge_queue.empty())
\r
71 host_tuple top_tuple = merge_queue.front();
\r
72 // remove all the tuple smaller than arrived tuple
\r
73 while(func.compare_stored_with_temp_status(top_tuple, 1-channel) <= 0) {
\r
74 merge_queue.pop_front();
\r
76 queue_mem -= top_tuple.tuple_size;
\r
77 if(merge_qsize<0) abort();
\r
78 func.update_stored_temp_status(top_tuple,channel);
\r
79 func.xform_tuple(top_tuple);
\r
80 top_tuple.channel = output_channel;
\r
81 result.push_back(top_tuple);
\r
83 if (merge_queue.empty())
\r
86 top_tuple = merge_queue.front();
\r
87 func.update_stored_temp_status(top_tuple,channel);
\r
95 merge_operator(int schema_handle1, int schema_handle2, size_t size_limit, const char* name ) : base_operator(name), func(schema_handle1) {
\r
97 soft_queue_size_limit = size_limit;
\r
104 int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
\r
107 if (tup.channel > 1) {
\r
108 fprintf(stderr, "Illegal channel number %d for two-way merge\n", tup.channel);
\r
112 /* reject "out of order" tuple - we can receive those after forced flush */
\r
113 func.get_timestamp(tup);
\r
114 int res = func.compare_with_temp_status(tup.channel);
\r
115 bool is_temp_tuple = func.temp_status_received(tup);
\r
118 // Ignore temp tuples until we can fix their timestamps.
\r
123 if (res < 0){ // out of order tuple
\r
125 if (++dropped_cnt % 100000 == 0) {
\r
126 gslog(LOG_ALERT, "%d tuples dropped by %s merge\n", dropped_cnt,get_name());
\r
128 //if(func.print_warnings())
\r
129 // fprintf(stderr,"Warning: merge %s receives an out-of-order tuple on channel %d.\n", get_name(), tup.channel);
\r
130 // free tuple memory
\r
135 if (wait_channel < 0 || wait_channel != tup.channel) {
\r
136 if (wait_channel < 0)
\r
137 func.update_temp_status(tup);
\r
139 if (!is_temp_tuple){
\r
140 if(func.compare_with_temp_status(1-tup.channel) <= 0){
\r
141 if (!tup.heap_resident) {
\r
142 char* data = (char*)malloc(tup.tuple_size);
\r
143 memcpy(data, tup.data, tup.tuple_size);
\r
145 tup.heap_resident = true;
\r
147 func.xform_tuple(tup);
\r
148 tup.channel = output_channel;
\r
149 result.push_back(tup);
\r
151 append_tuple(tup); // put arrived tuple in the queue
\r
154 // func.update_temp_status_by_slack(tup, 1-tup.channel);
\r
155 // purge_queue(tup.channel, result);
\r
157 wait_channel = 1-tup.channel;
\r
159 // If possible, clear tuples from the other queue.
\r
160 func.update_temp_status(tup);
\r
161 purge_queue(1-tup.channel, result);
\r
163 if(func.compare_with_temp_status(1-tup.channel) <= 0) { // other tuples in the queue are larger that arrived tuple
\r
164 if (!is_temp_tuple) {
\r
165 if (!tup.heap_resident) {
\r
166 char* data = (char*)malloc(tup.tuple_size);
\r
167 memcpy(data, tup.data, tup.tuple_size);
\r
169 tup.heap_resident = true;
\r
171 func.xform_tuple(tup);
\r
172 tup.channel = output_channel;
\r
173 result.push_back(tup);
\r
177 if (!is_temp_tuple)
\r
178 append_tuple(tup); // put arrived tuple in the queue
\r
179 wait_channel = 1 - tup.channel; // now we want the tuple from other channel
\r
185 // temp status tuples emited by merge don't serve any other purpose
\r
186 // other than tracing the flow of tuples
\r
187 if (is_temp_tuple) {
\r
188 host_tuple temp_tup;
\r
189 if (!func.create_temp_status_tuple(temp_tup)) {
\r
190 temp_tup.channel = output_channel;
\r
191 result.push_back(temp_tup);
\r
193 // clear memory of heap-resident temporal tuples
\r
203 int flush(list<host_tuple>& result) {
\r
205 if (merge_queue.empty())
\r
208 host_tuple top_tuple;
\r
209 list<host_tuple>::iterator iter;
\r
210 for (iter = merge_queue.begin(); iter != merge_queue.end(); iter++) {
\r
212 func.update_stored_temp_status(top_tuple,top_tuple.channel);
\r
213 func.xform_tuple(top_tuple);
\r
214 top_tuple.channel = output_channel;
\r
215 result.push_back(top_tuple);
\r
223 int set_param_block(int sz, void * value) {
\r
228 int get_temp_status(host_tuple& result) {
\r
229 result.channel = output_channel;
\r
230 return func.create_temp_status_tuple(result);
\r
234 int get_blocked_status () {
\r
235 if (merge_qsize> soft_queue_size_limit)
\r
236 return wait_channel;
\r
241 unsigned int get_mem_footprint() {
\r
247 #endif // MERGE_OPERATOR_H
\r