1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #ifndef MERGE_OPERATOR_H
17 #define MERGE_OPERATOR_H
19 #include "host_tuple.h"
20 #include "base_operator.h"
29 template <class merge_functor>
30 class merge_operator : public base_operator {
32 // input channel on which we want to receive the next tuple
33 // value -1 indicates that we never received a single tuple
34 // and we are not concerned on which channel tuple will arrive
37 // list of tuples from one of the channel waiting to be compared
38 // against tuple from the other channel
39 list<host_tuple> merge_queue;
42 // comparator object used to compare the tuples
45 // soft limit on queue size - we consider operator blocked on its input
46 // whenever we reach this soft limit (not used anymore)
47 size_t soft_queue_size_limit;
51 // memory footprint for the merge queue in bytes
52 unsigned int queue_mem;
54 // appends tuple to the end of the merge_queue
55 // if tuple is stack resident, makes it heap resident
56 void append_tuple(host_tuple& tup) {
57 if (!tup.heap_resident) {
58 char* data = (char*)malloc(tup.tuple_size);
59 memcpy(data, tup.data, tup.tuple_size);
61 tup.heap_resident = true;
63 merge_queue.push_back(tup);
65 queue_mem += tup.tuple_size;
68 void purge_queue(int channel, list<host_tuple>& result){
69 if (merge_queue.empty())
71 host_tuple top_tuple = merge_queue.front();
72 // remove all the tuple smaller than arrived tuple
73 while(func.compare_stored_with_temp_status(top_tuple, 1-channel) <= 0) {
74 merge_queue.pop_front();
76 queue_mem -= top_tuple.tuple_size;
77 if(merge_qsize<0) abort();
78 func.update_stored_temp_status(top_tuple,channel);
79 func.xform_tuple(top_tuple);
80 top_tuple.channel = output_channel;
81 result.push_back(top_tuple);
83 if (merge_queue.empty())
86 top_tuple = merge_queue.front();
87 func.update_stored_temp_status(top_tuple,channel);
95 merge_operator(int schema_handle1, int schema_handle2, size_t size_limit, const char* name ) : base_operator(name), func(schema_handle1) {
97 soft_queue_size_limit = size_limit;
104 int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
107 if (tup.channel > 1) {
108 fprintf(stderr, "Illegal channel number %d for two-way merge\n", tup.channel);
112 /* reject "out of order" tuple - we can receive those after forced flush */
113 func.get_timestamp(tup);
114 int res = func.compare_with_temp_status(tup.channel);
115 bool is_temp_tuple = func.temp_status_received(tup);
118 // Ignore temp tuples until we can fix their timestamps.
123 if (res < 0){ // out of order tuple
125 if (++dropped_cnt % 100000 == 0) {
126 gslog(LOG_ALERT, "%d tuples dropped by %s merge\n", dropped_cnt,get_name());
128 //if(func.print_warnings())
129 // fprintf(stderr,"Warning: merge %s receives an out-of-order tuple on channel %d.\n", get_name(), tup.channel);
135 if (wait_channel < 0 || wait_channel != tup.channel) {
136 if (wait_channel < 0)
137 func.update_temp_status(tup);
140 if(func.compare_with_temp_status(1-tup.channel) <= 0){
141 if (!tup.heap_resident) {
142 char* data = (char*)malloc(tup.tuple_size);
143 memcpy(data, tup.data, tup.tuple_size);
145 tup.heap_resident = true;
147 func.xform_tuple(tup);
148 tup.channel = output_channel;
149 result.push_back(tup);
151 append_tuple(tup); // put arrived tuple in the queue
154 // func.update_temp_status_by_slack(tup, 1-tup.channel);
155 // purge_queue(tup.channel, result);
157 wait_channel = 1-tup.channel;
159 // If possible, clear tuples from the other queue.
160 func.update_temp_status(tup);
161 purge_queue(1-tup.channel, result);
163 if(func.compare_with_temp_status(1-tup.channel) <= 0) { // other tuples in the queue are larger that arrived tuple
164 if (!is_temp_tuple) {
165 if (!tup.heap_resident) {
166 char* data = (char*)malloc(tup.tuple_size);
167 memcpy(data, tup.data, tup.tuple_size);
169 tup.heap_resident = true;
171 func.xform_tuple(tup);
172 tup.channel = output_channel;
173 result.push_back(tup);
178 append_tuple(tup); // put arrived tuple in the queue
179 wait_channel = 1 - tup.channel; // now we want the tuple from other channel
185 // temp status tuples emited by merge don't serve any other purpose
186 // other than tracing the flow of tuples
189 if (!func.create_temp_status_tuple(temp_tup)) {
190 temp_tup.channel = output_channel;
191 result.push_back(temp_tup);
193 // clear memory of heap-resident temporal tuples
203 int flush(list<host_tuple>& result) {
205 if (merge_queue.empty())
208 host_tuple top_tuple;
209 list<host_tuple>::iterator iter;
210 for (iter = merge_queue.begin(); iter != merge_queue.end(); iter++) {
212 func.update_stored_temp_status(top_tuple,top_tuple.channel);
213 func.xform_tuple(top_tuple);
214 top_tuple.channel = output_channel;
215 result.push_back(top_tuple);
224 int set_param_block(int sz, void * value) {
229 int get_temp_status(host_tuple& result) {
230 result.channel = output_channel;
231 return func.create_temp_status_tuple(result);
235 int get_blocked_status () {
236 if (merge_qsize> soft_queue_size_limit)
242 unsigned int get_mem_footprint() {
248 #endif // MERGE_OPERATOR_H