Fixed newline characters throughout the code
[com/gs-lite.git] / include / hfta / merge_operator.h
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #ifndef MERGE_OPERATOR_H
17 #define MERGE_OPERATOR_H
18
19 #include "host_tuple.h"
20 #include "base_operator.h"
21 #include <list>
22 using namespace std;
23
24 #include <stdio.h>
25
26
27 //int last_tb = 0;
28
29 template <class merge_functor>
30 class merge_operator : public base_operator {
31 private :
32         // input channel on which we want to receive the next tuple
33         // value -1 indicates that we never received a single tuple
34         // and we are not concerned on which channel tuple will arrive
35         int wait_channel;
36
37         // list of tuples from one of the channel waiting to be compared
38         // against tuple from the other channel
39         list<host_tuple> merge_queue;
40         int merge_qsize;
41
42         // comparator object used to compare the tuples
43         merge_functor func;
44
45         // soft limit on queue size - we consider operator blocked on its input
46         // whenever we reach this soft limit (not used anymore)
47         size_t soft_queue_size_limit;
48
49         int dropped_cnt;
50
51         // memory footprint for the merge queue in bytes
52         unsigned int queue_mem;
53
54         // appends tuple to the end of the merge_queue
55         // if tuple is stack resident, makes it heap resident
56         void append_tuple(host_tuple& tup) {
57                 if (!tup.heap_resident) {
58                         char* data = (char*)malloc(tup.tuple_size);
59                         memcpy(data, tup.data, tup.tuple_size);
60                         tup.data = data;
61                         tup.heap_resident = true;
62                 }
63                 merge_queue.push_back(tup);
64                 merge_qsize++;
65                 queue_mem += tup.tuple_size;
66         }
67
68         void purge_queue(int channel, list<host_tuple>& result){
69                 if (merge_queue.empty())
70                         return;
71                 host_tuple top_tuple = merge_queue.front();
72                 // remove all the tuple smaller than arrived tuple
73                 while(func.compare_stored_with_temp_status(top_tuple, 1-channel) <= 0) {
74                         merge_queue.pop_front();
75                         merge_qsize--;
76                         queue_mem -= top_tuple.tuple_size;
77                         if(merge_qsize<0) abort();
78                         func.update_stored_temp_status(top_tuple,channel);
79                         func.xform_tuple(top_tuple);
80                         top_tuple.channel = output_channel;
81                         result.push_back(top_tuple);
82
83                         if (merge_queue.empty())
84                                 break;
85
86                         top_tuple = merge_queue.front();
87                         func.update_stored_temp_status(top_tuple,channel);
88                 }
89         }
90
91
92
93 public:
94
95         merge_operator(int schema_handle1, int schema_handle2, size_t size_limit, const char* name ) : base_operator(name), func(schema_handle1) {
96                 wait_channel = -1;
97                 soft_queue_size_limit = size_limit;
98                 merge_qsize = 0;
99                 dropped_cnt = 0;
100                 queue_mem = 0;
101         }
102
103
104         int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
105                 int last_channel;
106
107                 if (tup.channel > 1) {
108                         fprintf(stderr, "Illegal channel number %d for two-way merge\n", tup.channel);
109                         return 0;
110                 }
111
112                 /* reject "out of order" tuple - we can receive those after forced flush */
113                 func.get_timestamp(tup);
114                 int res = func.compare_with_temp_status(tup.channel);
115                 bool is_temp_tuple = func.temp_status_received(tup);
116
117 /*
118 //                      Ignore temp tuples until we can fix their timestamps.
119 if(is_temp_tuple){
120  tup.free_tuple();
121  return 0;
122 }*/
123                 if (res < 0){   // out of order tuple
124
125                         if (++dropped_cnt % 100000 == 0) {
126                                 gslog(LOG_ALERT, "%d tuples dropped by %s  merge\n", dropped_cnt,get_name());
127                         }
128                         //if(func.print_warnings())
129                         //      fprintf(stderr,"Warning: merge %s receives an out-of-order tuple on channel %d.\n", get_name(), tup.channel);
130                         // free tuple memory
131                         tup.free_tuple();
132                         return 0;
133                 } else {
134
135                         if (wait_channel < 0 || wait_channel != tup.channel) {
136                                 if (wait_channel < 0)
137                                         func.update_temp_status(tup);
138
139                                 if (!is_temp_tuple){
140                                         if(func.compare_with_temp_status(1-tup.channel) <= 0){
141                                                 if (!tup.heap_resident) {
142                                                         char* data = (char*)malloc(tup.tuple_size);
143                                                         memcpy(data, tup.data, tup.tuple_size);
144                                                         tup.data = data;
145                                                         tup.heap_resident = true;
146                                                 }
147                                                 func.xform_tuple(tup);
148                                                 tup.channel = output_channel;
149                                                 result.push_back(tup);
150                                         }else{
151                                                 append_tuple(tup);              // put arrived tuple in the queue
152                                         }
153                                 }
154 //                              func.update_temp_status_by_slack(tup, 1-tup.channel);
155 //                              purge_queue(tup.channel, result);
156
157                                 wait_channel = 1-tup.channel;
158                         }else{
159         //                              If possible, clear tuples from the other queue.
160                                 func.update_temp_status(tup);
161                                 purge_queue(1-tup.channel, result);
162
163                                 if(func.compare_with_temp_status(1-tup.channel) <= 0) { // other tuples in the queue are larger that arrived tuple
164                                         if (!is_temp_tuple) {
165                                                 if (!tup.heap_resident) {
166                                                         char* data = (char*)malloc(tup.tuple_size);
167                                                         memcpy(data, tup.data, tup.tuple_size);
168                                                         tup.data = data;
169                                                         tup.heap_resident = true;
170                                                 }
171                                                 func.xform_tuple(tup);
172                                                 tup.channel = output_channel;
173                                                 result.push_back(tup);
174                                         }
175                                 }
176                                 else {
177                                         if (!is_temp_tuple)
178                                                 append_tuple(tup);              // put arrived tuple in the queue
179                                         wait_channel = 1 - tup.channel; // now we want the tuple from other channel
180                                 }
181                         }
182
183                 }
184
185                 // temp status tuples emited by merge don't serve any other purpose
186                 // other than tracing the flow of tuples
187                 if (is_temp_tuple) {
188                         host_tuple temp_tup;
189                         if (!func.create_temp_status_tuple(temp_tup)) {
190                                 temp_tup.channel = output_channel;
191                                 result.push_back(temp_tup);
192                         }
193                         // clear memory of heap-resident temporal tuples
194                         tup.free_tuple();
195                 }
196
197                 if (!merge_qsize)
198                         wait_channel = -1;
199
200                 return 0;
201         }
202
203         int flush(list<host_tuple>& result) {
204
205                 if (merge_queue.empty())
206                         return 0;
207
208                 host_tuple top_tuple;
209                 list<host_tuple>::iterator iter;
210                 for (iter = merge_queue.begin(); iter != merge_queue.end(); iter++) {
211                         top_tuple = *iter;
212                         func.update_stored_temp_status(top_tuple,top_tuple.channel);
213                         func.xform_tuple(top_tuple);
214                         top_tuple.channel = output_channel;
215                         result.push_back(top_tuple);
216                 }
217
218                 queue_mem = 0;
219
220                 return 0;
221         }
222
223         int set_param_block(int sz, void * value) {
224                         return 0;
225         }
226
227
228         int get_temp_status(host_tuple& result) {
229                 result.channel = output_channel;
230                 return func.create_temp_status_tuple(result);
231         }
232
233
234         int get_blocked_status () {
235                 if (merge_qsize> soft_queue_size_limit)
236                         return wait_channel;
237                 else
238                         return -1;
239         }
240
241         unsigned int get_mem_footprint() {
242                 return queue_mem;
243         }
244
245 };
246
247 #endif  // MERGE_OPERATOR_H