1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #ifndef MERGE_OPERATOR_OOP_H
17 #define MERGE_OPERATOR_OOP_H
19 #include "host_tuple.h"
20 #include "base_operator.h"
29 template <class merge_functor>
30 class merge_operator_oop : public base_operator {
32 // input channel on which we want to receive the next tuple
33 // value -1 indicates that we never received a single tuple
34 // and we are not concerned on which channel tuple will arrive
37 // list of tuples from one of the channel waiting to be compared
38 // against tuple from the other channel
39 list<host_tuple> merge_queue;
42 // comparator object used to compare the tuples
45 // soft limit on queue size - we consider operator blocked on its input
46 // whenever we reach this soft limit (not used anymore)
47 size_t soft_queue_size_limit;
51 // memory footprint for the merge queue in bytes
52 unsigned int queue_mem;
54 // appends tuple to the end of the merge_queue
55 // tuple is stack resident, makes it heap resident
56 void append_tuple(host_tuple& tup) {
57 if (!tup.heap_resident) {
58 char* data = (char*)malloc(tup.tuple_size);
59 memcpy(data, tup.data, tup.tuple_size);
61 tup.heap_resident = true;
63 merge_queue.push_back(tup);
65 queue_mem += tup.tuple_size;
69 // purge tuples in the queue, which are from stream channel -- Jin
70 void purge_queue(int channel, list<host_tuple>& result){
71 if (merge_queue.empty())
73 host_tuple top_tuple = merge_queue.front();
74 // remove all the tuple smaller than arrived tuple
75 while(func.compare_with_temp_status(top_tuple, 1-channel) <= 0) {
76 merge_queue.pop_front();
78 queue_mem -= top_tuple.tuple_size;
79 if(merge_qsize<0) abort();
80 func.update_temp_status(top_tuple,channel);
81 func.xform_tuple(top_tuple);
82 top_tuple.channel = output_channel;
83 result.push_back(top_tuple);
85 if (merge_queue.empty())
88 top_tuple = merge_queue.front();
89 func.update_temp_status(top_tuple,channel);
97 merge_operator_oop(int schema_handle1, int schema_handle2, size_t size_limit, const char* name ) : base_operator(name), func(schema_handle1) {
99 soft_queue_size_limit = size_limit;
105 int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
108 if (tup.channel > 1) {
109 fprintf(stderr, "Illegal channel number %d for two-way merge\n", tup.channel);
113 func.get_timestamp(tup);
114 func.compare_with_temp_status(tup.channel);
115 bool is_temp_tuple = func.temp_status_received(tup);
117 if (!is_temp_tuple) {
118 result.push_back(tup);
120 func.update_temp_status(tup);
123 if (!func.create_temp_status_tuple(temp_tup)) {
124 temp_tup.channel = output_channel;
125 result.push_back(temp_tup);
127 // clear memory of heap-resident temporal tuples
136 /* where is this called? when using the blocking mode -- Jin */
137 int flush(list<host_tuple>& result) {
139 if (merge_queue.empty())
142 host_tuple top_tuple;
143 list<host_tuple>::iterator iter;
144 for (iter = merge_queue.begin(); iter != merge_queue.end(); iter++) {
146 func.update_stored_temp_status(top_tuple,top_tuple.channel);
147 func.xform_tuple(top_tuple);
148 top_tuple.channel = output_channel;
149 result.push_back(top_tuple);
157 int set_param_block(int sz, void * value) {
162 int get_temp_status(host_tuple& result) {
163 result.channel = output_channel;
164 return func.create_temp_status_tuple(result);
168 int get_blocked_status () {
169 if (merge_qsize> soft_queue_size_limit)
175 unsigned int get_mem_footprint() {
181 #endif // MERGE_OPERATOR_OOP_H